Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.github.pylonmc.rebar.async

import io.github.pylonmc.rebar.collections.tasks.PriorityQueueScheduler
import io.github.pylonmc.rebar.collections.tasks.Scheduler
import io.github.pylonmc.rebar.collections.tasks.TimingWheel
import kotlinx.coroutines.*
import org.bukkit.plugin.Plugin
import java.lang.Runnable
import java.util.PriorityQueue
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -17,7 +19,13 @@ import kotlin.coroutines.CoroutineContext
@OptIn(InternalCoroutinesApi::class)
class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRate: Long) : CoroutineDispatcher(), Runnable, Delay {

private val taskQueue = PriorityQueue<Task>()
// while TimingWheel would work for other tickRates (mainly tickRate % 2 != 0),
// it would delay their executions, also most tasks use tickRate 1
private val scheduler: Scheduler = if (tickRate == 1L) {
TimingWheel(11)
} else {
PriorityQueueScheduler()
}

private var tick = 0L

Expand All @@ -28,10 +36,7 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat
override fun run() {
if (!plugin.isEnabled) return
tick += tickRate
while (taskQueue.isNotEmpty() && taskQueue.peek().executeAt <= tick) {
val task = taskQueue.poll()
task.runnable.run()
}
scheduler.getValid(tick).forEach { it.runnable.run() }
}

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
Expand All @@ -40,23 +45,17 @@ class BukkitMainThreadDispatcher(private val plugin: Plugin, private val tickRat

override fun dispatch(context: CoroutineContext, block: Runnable) {
if (!plugin.isEnabled) return
taskQueue.add(Task(tick, block))
scheduler.schedule(0, tick, block)
Comment thread
Intybyte marked this conversation as resolved.
Outdated
}

@OptIn(ExperimentalCoroutinesApi::class)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
if (!plugin.isEnabled) return
val ticks = timeMillis / 50
taskQueue.add(Task(tick + ticks) {
val ticks = timeMillis / 50L
scheduler.schedule(tick, ticks) {
if (continuation.isActive) {
with(continuation) { resumeUndispatched(Unit) }
}
})
}

private data class Task(val executeAt: Long, val runnable: Runnable) : Comparable<Task> {
override fun compareTo(other: Task): Int {
return executeAt.compareTo(other.executeAt)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.github.pylonmc.rebar.collections.tasks

import java.util.*

/**
* Scheduler using a [PriorityQueue] as a delegate
*
* O(log n) insertions and evictions
*/
class PriorityQueueScheduler : Scheduler {
private val taskQueue = PriorityQueue<ScheduledTask>() // this is not really thread safe, should it be changed?
Comment thread
Intybyte marked this conversation as resolved.
Outdated

override fun schedule(tick: Long, delayTicks: Long, runnable: Runnable) {
taskQueue.add(ScheduledTask(tick + delayTicks, runnable))
}

override fun getValid(tick: Long): List<ScheduledTask> {
val list = mutableListOf<ScheduledTask>()
while (taskQueue.isNotEmpty() && taskQueue.peek().executeTick <= tick) {
val task = taskQueue.poll()
list.add(task)
}

return list
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.pylonmc.rebar.collections.tasks

data class ScheduledTask(val executeTick: Long, val runnable: Runnable) : Comparable<ScheduledTask> {
override fun compareTo(other: ScheduledTask): Int {
return executeTick.compareTo(other.executeTick)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.github.pylonmc.rebar.collections.tasks

interface Scheduler {

/**
* Adds a task to the scheduler
*/
fun schedule(tick: Long, delayTicks: Long, runnable: Runnable)
Comment thread
Intybyte marked this conversation as resolved.
Outdated

/**
* Gets and evicts valid tasks
*/
fun getValid(tick: Long) : List<ScheduledTask>
Comment thread
Intybyte marked this conversation as resolved.
Outdated
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.github.pylonmc.rebar.collections.tasks

import java.util.concurrent.ConcurrentLinkedQueue


/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo (even though I doubt there would be much improvement)
*
* @param exponent wheel size (wheelSize = 2 ^ exponent)
*/
class TimingWheel(exponent: Int) : Scheduler {
private val wheelSize = 1 shl exponent
private val mask = wheelSize - 1
private val wheel = Array(wheelSize) { ArrayDeque<ScheduledTask>() }
// use thread safe queue
private val incoming = ConcurrentLinkedQueue<ScheduledTask>()

override fun schedule(tick: Long, delayTicks: Long, runnable: Runnable) {
val executeTick = tick + delayTicks
incoming.add(ScheduledTask(executeTick, runnable))
}

override fun getValid(tick: Long) : List<ScheduledTask> {
while (true) {
val task = incoming.poll() ?: break
val slot = task.executeTick.toInt() and mask
wheel[slot].add(task)
}

val slot = tick.toInt() and mask
val bucket = wheel[slot]
if (bucket.isEmpty()) {
return emptyList()
}

val iter = bucket.iterator()
val list = mutableListOf<ScheduledTask>()
while (iter.hasNext()) {
val task = iter.next()

if (task.executeTick <= tick) {
list.add(task)
iter.remove()
}
}

return list
}
}
Loading