Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.github.pylonmc.rebar.async

import io.github.pylonmc.rebar.async.schedulers.PriorityQueueScheduler
import io.github.pylonmc.rebar.async.schedulers.Scheduler
import io.github.pylonmc.rebar.async.schedulers.TimingWheel
import kotlinx.coroutines.*
import org.bukkit.plugin.Plugin
import java.lang.Runnable
import java.util.PriorityQueue
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -17,7 +19,13 @@ import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRate: Long) : CoroutineDispatcher(), Runnable, Delay {

private val taskQueue = PriorityQueue<Task>()
// while TimingWheel would work for other tickRates (mainly tickRate % 2 != 0),
// it would delay their executions, also most tasks use tickRate 1
private val scheduler: Scheduler = if (tickRate == 1L) {
TimingWheel(11)
} else {
PriorityQueueScheduler()
}

private var tick = 0L

Expand All @@ -28,10 +36,7 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat
override fun run() {
if (!plugin.isEnabled) return
tick += tickRate
while (taskQueue.isNotEmpty() && taskQueue.peek().executeAt <= tick) {
val task = taskQueue.poll()
task.runnable.run()
}
scheduler.getValid(tick).forEach { it.runnable.run() }
}

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
Expand All @@ -40,23 +45,17 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat

override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!plugin.isEnabled) return
taskQueue.add(Task(tick, block))
scheduler.schedule(tick, block)
}

@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
if (!plugin.isEnabled) return
val ticks = timeMillis / 50
taskQueue.add(Task(tick + ticks) {
val ticks = timeMillis / 50L
scheduler.schedule(tick +ticks) {
if (continuation.isActive) {
with(continuation) { resumeUndispatched(Unit) }
}
})
}

private data class Task(val executeAt: Long, val runnable: Runnable) : Comparable<Task> {
override fun compareTo(other: Task): Int {
return executeAt.compareTo(other.executeAt)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.pylonmc.rebar.async

data class ScheduledTask(val executeTick: Long, val runnable: Runnable) : Comparable<ScheduledTask> {
Comment thread
Intybyte marked this conversation as resolved.
Outdated
Comment thread
Intybyte marked this conversation as resolved.
Outdated
override fun compareTo(other: ScheduledTask): Int {
return executeTick.compareTo(other.executeTick)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.github.pylonmc.rebar.async.schedulers

import io.github.pylonmc.rebar.async.ScheduledTask
import java.util.*
import java.util.concurrent.PriorityBlockingQueue

/**
* Scheduler using a [PriorityQueue] as a delegate
*
* O(log n) insertions and evictions
*/
class PriorityQueueScheduler : Scheduler {
private val taskQueue = PriorityBlockingQueue<ScheduledTask>()
Comment thread
Intybyte marked this conversation as resolved.
Outdated

override fun schedule(executeAt: Long, runnable: Runnable) {
taskQueue.add(ScheduledTask(executeAt, runnable))
}

override fun getValid(currentTick: Long): List<ScheduledTask> {
val list = mutableListOf<ScheduledTask>()
while (taskQueue.isNotEmpty() && taskQueue.peek().executeTick <= currentTick) {
val task = taskQueue.poll()
list.add(task)
}

return list
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.pylonmc.rebar.async.schedulers

import io.github.pylonmc.rebar.async.ScheduledTask

interface Scheduler {

/**
* Adds a task to the scheduler
*/
fun schedule(executeAt: Long, runnable: Runnable)

/**
* Gets and evicts valid tasks
*/
fun getValid(currentTick: Long) : List<ScheduledTask>
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading the docs right that you can only schedule tasks for tick 2^exponent? A little confused as to how this works, does it actually behave any differently from a priority queue in terms of when it runs task? The docs seem to imply so or maybe I am reading them wrong?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, view it as an hashmap based on the execution tick

It runs and check the tasks for a specific tick so the tasks are ran the same for tickSpeed = 1, otherwise it would make some buckets useless and it could break if tasks fall in said buckets

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example if it does tickspeed 2 it would skip all the even position buckets, and if you run a delay and falls into said positions it would break said task

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.github.pylonmc.rebar.async.schedulers

import io.github.pylonmc.rebar.async.ScheduledTask
import java.util.concurrent.ConcurrentLinkedQueue


/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo (even though I doubt there would be much improvement)
*
* @param exponent wheel size (wheelSize = 2 ^ exponent)
*/
class TimingWheel(exponent: Int) : Scheduler {
Comment thread
Intybyte marked this conversation as resolved.
Outdated
private val wheelSize = 1 shl exponent
private val mask = wheelSize - 1
private val wheel = Array(wheelSize) { ArrayDeque<ScheduledTask>() }
// use thread safe queue
private val incoming = ConcurrentLinkedQueue<ScheduledTask>()

override fun schedule(executeAt: Long, runnable: Runnable) {
incoming.add(ScheduledTask(executeAt, runnable))
}

override fun getValid(currentTick: Long) : List<ScheduledTask> {
while (true) {
val task = incoming.poll() ?: break
val slot = task.executeTick.toInt() and mask
wheel[slot].add(task)
}

val slot = currentTick.toInt() and mask
val bucket = wheel[slot]
if (bucket.isEmpty()) {
return emptyList()
}

val iter = bucket.iterator()
val list = mutableListOf<ScheduledTask>()
while (iter.hasNext()) {
val task = iter.next()

if (task.executeTick <= currentTick) {
list.add(task)
iter.remove()
}
}

return list
}
}
Loading