Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.github.pylonmc.rebar.async

import io.github.pylonmc.rebar.async.schedulers.PriorityQueueScheduler
import io.github.pylonmc.rebar.async.schedulers.Scheduler
import io.github.pylonmc.rebar.async.schedulers.TimingWheel
import kotlinx.coroutines.*
import org.bukkit.plugin.Plugin
import java.lang.Runnable
import java.util.PriorityQueue
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -17,7 +19,13 @@ import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRate: Long) : CoroutineDispatcher(), Runnable, Delay {

private val taskQueue = PriorityQueue<Task>()
// while TimingWheel would work for other tickRates (mainly tickRate % 2 != 0),
// it would delay their executions, also most tasks use tickRate 1
private val scheduler: Scheduler = if (tickRate == 1L) {
TimingWheel(11)
} else {
PriorityQueueScheduler()
}

private var tick = 0L

Expand All @@ -28,10 +36,7 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat
override fun run() {
if (!plugin.isEnabled) return
tick += tickRate
while (taskQueue.isNotEmpty() && taskQueue.peek().executeAt <= tick) {
val task = taskQueue.poll()
task.runnable.run()
}
scheduler.getValid(tick).forEach { it.runnable.run() }
}

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
Expand All @@ -40,23 +45,17 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat

override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!plugin.isEnabled) return
taskQueue.add(Task(tick, block))
scheduler.schedule(tick, block)
}

@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
if (!plugin.isEnabled) return
val ticks = timeMillis / 50
taskQueue.add(Task(tick + ticks) {
val ticks = timeMillis / 50L
scheduler.schedule(tick + ticks) {
if (continuation.isActive) {
with(continuation) { resumeUndispatched(Unit) }
}
})
}

private data class Task(val executeAt: Long, val runnable: Runnable) : Comparable<Task> {
override fun compareTo(other: Task): Int {
return executeAt.compareTo(other.executeAt)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.pylonmc.rebar.async

data class RebarScheduledTask(val executeTick: Long, val runnable: Runnable) : Comparable<RebarScheduledTask> {
override fun compareTo(other: RebarScheduledTask): Int {
return executeTick.compareTo(other.executeTick)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.github.pylonmc.rebar.async.schedulers

import com.google.common.collect.Queues
import io.github.pylonmc.rebar.async.RebarScheduledTask
import java.util.*

/**
* Scheduler using a [PriorityQueue] as a delegate
*
* O(log n) insertions and evictions
*/
class PriorityQueueScheduler : Scheduler {
private val taskQueue = Queues.synchronizedQueue(PriorityQueue<RebarScheduledTask>())

override fun schedule(executeAt: Long, runnable: Runnable) {
taskQueue.add(RebarScheduledTask(executeAt, runnable))
}

override fun getValid(currentTick: Long): List<RebarScheduledTask> {
val list = mutableListOf<RebarScheduledTask>()
while (taskQueue.isNotEmpty() && taskQueue.peek().executeTick <= currentTick) {
val task = taskQueue.poll()
list.add(task)
}

return list
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.pylonmc.rebar.async.schedulers

import io.github.pylonmc.rebar.async.RebarScheduledTask

interface Scheduler {

/**
* Adds a task to the scheduler
*/
fun schedule(executeAt: Long, runnable: Runnable)

/**
* Gets and evicts valid tasks
*/
fun getValid(currentTick: Long) : List<RebarScheduledTask>
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I reading the docs right that you can only schedule tasks for tick 2^exponent? A little confused as to how this works, does it actually behave any differently from a priority queue in terms of when it runs task? The docs seem to imply so or maybe I am reading them wrong?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, view it as an hashmap based on the execution tick

It runs and check the tasks for a specific tick so the tasks are ran the same for tickSpeed = 1, otherwise it would make some buckets useless and it could break if tasks fall in said buckets

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example if it does tickspeed 2 it would skip all the even position buckets, and if you run a delay and falls into said positions it would break said task

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.github.pylonmc.rebar.async.schedulers

import io.github.pylonmc.rebar.async.RebarScheduledTask
import java.util.concurrent.ConcurrentLinkedQueue


/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo (even though I doubt there would be much improvement)
*
* @param exponent wheel size (wheelSize = 2 ^ exponent)
*/
internal class TimingWheel(exponent: Int) : Scheduler {
// note: if we are going to use a lot of tasks with a long delay,
// maybe add round support
private val wheelSize = 1 shl exponent
private val mask = wheelSize - 1L
private val wheel = Array(wheelSize) { ArrayDeque<RebarScheduledTask>() }
// use thread safe queue
private val incoming = ConcurrentLinkedQueue<RebarScheduledTask>()

override fun schedule(executeAt: Long, runnable: Runnable) {
incoming.add(RebarScheduledTask(executeAt, runnable))
}

override fun getValid(currentTick: Long) : List<RebarScheduledTask> {
while (true) {
val task = incoming.poll() ?: break
val slot = (task.executeTick and mask).toInt()
wheel[slot].add(task)
}

val slot = (currentTick and mask).toInt()
val bucket = wheel[slot]
if (bucket.isEmpty()) {
return emptyList()
}

val iter = bucket.iterator()
val list = mutableListOf<RebarScheduledTask>()
while (iter.hasNext()) {
val task = iter.next()

if (task.executeTick <= currentTick) {
list.add(task)
iter.remove()
}
}

return list
}
}
Loading