diff --git a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/BukkitMainThreadDispatcher.kt b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/BukkitMainThreadDispatcher.kt index 4a55a9a0d..c83d96bc5 100644 --- a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/BukkitMainThreadDispatcher.kt +++ b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/BukkitMainThreadDispatcher.kt @@ -1,9 +1,11 @@ package io.github.pylonmc.rebar.async +import io.github.pylonmc.rebar.async.schedulers.PriorityQueueScheduler +import io.github.pylonmc.rebar.async.schedulers.Scheduler +import io.github.pylonmc.rebar.async.schedulers.TimingWheel import kotlinx.coroutines.* import org.bukkit.plugin.Plugin import java.lang.Runnable -import java.util.PriorityQueue import kotlin.coroutines.CoroutineContext /** @@ -17,7 +19,13 @@ import kotlin.coroutines.CoroutineContext @OptIn(InternalCoroutinesApi::class) class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRate: Long) : CoroutineDispatcher(), Runnable, Delay { - private val taskQueue = PriorityQueue() + // while TimingWheel would work for other tickRates (mainly tickRate % 2 != 0), + // it would delay their executions, also most tasks use tickRate 1 + private val scheduler: Scheduler = if (tickRate == 1L) { + TimingWheel(11) + } else { + PriorityQueueScheduler() + } private var tick = 0L @@ -28,10 +36,7 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat override fun run() { if (!plugin.isEnabled) return tick += tickRate - while (taskQueue.isNotEmpty() && taskQueue.peek().executeAt <= tick) { - val task = taskQueue.poll() - task.runnable.run() - } + scheduler.getValid(tick).forEach { it.runnable.run() } } override fun isDispatchNeeded(context: CoroutineContext): Boolean { @@ -40,23 +45,17 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat override fun dispatch(context: CoroutineContext, block: Runnable) { if (!plugin.isEnabled) return - taskQueue.add(Task(tick, block)) + scheduler.schedule(tick, block) } @OptIn(ExperimentalCoroutinesApi::class) override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { if (!plugin.isEnabled) return - val ticks = timeMillis / 50 - taskQueue.add(Task(tick + ticks) { + val ticks = timeMillis / 50L + scheduler.schedule(tick + ticks) { if (continuation.isActive) { with(continuation) { resumeUndispatched(Unit) } } - }) - } - - private data class Task(val executeAt: Long, val runnable: Runnable) : Comparable { - override fun compareTo(other: Task): Int { - return executeAt.compareTo(other.executeAt) } } -} \ No newline at end of file +} diff --git a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/RebarScheduledTask.kt b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/RebarScheduledTask.kt new file mode 100644 index 000000000..3f8c61b59 --- /dev/null +++ b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/RebarScheduledTask.kt @@ -0,0 +1,7 @@ +package io.github.pylonmc.rebar.async + +data class RebarScheduledTask(val executeTick: Long, val runnable: Runnable) : Comparable { + override fun compareTo(other: RebarScheduledTask): Int { + return executeTick.compareTo(other.executeTick) + } +} \ No newline at end of file diff --git a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/PriorityQueueScheduler.kt b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/PriorityQueueScheduler.kt new file mode 100644 index 000000000..4604ee9cb --- /dev/null +++ b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/PriorityQueueScheduler.kt @@ -0,0 +1,28 @@ +package io.github.pylonmc.rebar.async.schedulers + +import com.google.common.collect.Queues +import io.github.pylonmc.rebar.async.RebarScheduledTask +import java.util.* + +/** + * Scheduler using a [PriorityQueue] as a delegate + * + * O(log n) insertions and evictions + */ +class PriorityQueueScheduler : Scheduler { + private val taskQueue = Queues.synchronizedQueue(PriorityQueue()) + + override fun schedule(executeAt: Long, runnable: Runnable) { + taskQueue.add(RebarScheduledTask(executeAt, runnable)) + } + + override fun getValid(currentTick: Long): List { + val list = mutableListOf() + while (taskQueue.isNotEmpty() && taskQueue.peek().executeTick <= currentTick) { + val task = taskQueue.poll() + list.add(task) + } + + return list + } +} \ No newline at end of file diff --git a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/Scheduler.kt b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/Scheduler.kt new file mode 100644 index 000000000..fccf51e0f --- /dev/null +++ b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/Scheduler.kt @@ -0,0 +1,16 @@ +package io.github.pylonmc.rebar.async.schedulers + +import io.github.pylonmc.rebar.async.RebarScheduledTask + +interface Scheduler { + + /** + * Adds a task to the scheduler + */ + fun schedule(executeAt: Long, runnable: Runnable) + + /** + * Gets and evicts valid tasks + */ + fun getValid(currentTick: Long) : List +} \ No newline at end of file diff --git a/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/TimingWheel.kt b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/TimingWheel.kt new file mode 100644 index 000000000..48dc0b9f8 --- /dev/null +++ b/rebar/src/main/kotlin/io/github/pylonmc/rebar/async/schedulers/TimingWheel.kt @@ -0,0 +1,57 @@ +package io.github.pylonmc.rebar.async.schedulers + +import io.github.pylonmc.rebar.async.RebarScheduledTask +import java.util.concurrent.ConcurrentLinkedQueue + + +/** + * This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel). + * Each slot in the wheel represents a specific tick modulo the wheel size. + * Tasks are placed into slots based on their target execution tick. + * On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached. + * + * O(1) task scheduling and retrieval within a single wheel rotation. + * We are using power of 2 for faster operations than modulo (even though I doubt there would be much improvement) + * + * @param exponent wheel size (wheelSize = 2 ^ exponent) + */ +internal class TimingWheel(exponent: Int) : Scheduler { + // note: if we are going to use a lot of tasks with a long delay, + // maybe add round support + private val wheelSize = 1 shl exponent + private val mask = wheelSize - 1L + private val wheel = Array(wheelSize) { ArrayDeque() } + // use thread safe queue + private val incoming = ConcurrentLinkedQueue() + + override fun schedule(executeAt: Long, runnable: Runnable) { + incoming.add(RebarScheduledTask(executeAt, runnable)) + } + + override fun getValid(currentTick: Long) : List { + while (true) { + val task = incoming.poll() ?: break + val slot = (task.executeTick and mask).toInt() + wheel[slot].add(task) + } + + val slot = (currentTick and mask).toInt() + val bucket = wheel[slot] + if (bucket.isEmpty()) { + return emptyList() + } + + val iter = bucket.iterator() + val list = mutableListOf() + while (iter.hasNext()) { + val task = iter.next() + + if (task.executeTick <= currentTick) { + list.add(task) + iter.remove() + } + } + + return list + } +} \ No newline at end of file