Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Threading usov #424

Merged
merged 7 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ interface IScheduler {
}
}

//Provides better performance but loose event consistency.
val outOfOrderExecution : Boolean get() = false

val executionOrder: ExecutionOrder

fun invokeOrQueue(action: () -> Unit) {
if (isActive) action()
Expand All @@ -39,4 +37,35 @@ interface ISchedulerWithBackground : IScheduler {
* An associated scheduler which executes all tasks outside of the main thread
*/
val backgroundScheduler: IScheduler
}
}

//Provides better performance but loose event consistency.
val IScheduler.outOfOrderExecution : Boolean get() = executionOrder == ExecutionOrder.OutOfOrder

/**
* Represents the execution order guarantee of a scheduler.
*/
enum class ExecutionOrder {
/**
* The scheduler guarantees a sequential execution order.
* Tasks are executed in the order they were received.
*/
Sequential,

/**
* The scheduler does not guarantee a sequential execution order.
* Tasks may be executed concurrently or in a different order than received.
*/
OutOfOrder,

/**
* The execution order of the scheduler is unknown.
* This is typically used when the scheduler is a wrapper around another service or dispatcher
* where the execution order cannot be directly determined.
* It is important to note that 'Unknown' should not be treated as 'OutOfOrder'. It may represent
* a sequential scheduler and any optimization assuming an 'OutOfOrder' execution may potentially
* disrupt the actual execution order.
*/
Unknown
}

Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package com.jetbrains.rd.util.threading

import com.jetbrains.rd.util.Logger
import com.jetbrains.rd.util.catch
import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reflection.threadLocal
import com.jetbrains.rd.util.spinUntil
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.asExecutor
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger

val Executor.asRdScheduler: IScheduler
get() {
get() = asRdScheduler(ExecutionOrder.Unknown)

fun Executor.asRdScheduler(executionOrder: ExecutionOrder): IScheduler {
val executor = this
return executor as? IScheduler ?: object : IScheduler {

Expand All @@ -20,6 +25,9 @@ val Executor.asRdScheduler: IScheduler

override val isActive get() = active > 0

override val executionOrder: ExecutionOrder
get() = executionOrder

override fun queue(action: () -> Unit) {
tasksInQueue.incrementAndGet()
executor.execute {
Expand All @@ -42,5 +50,131 @@ val Executor.asRdScheduler: IScheduler
}
}


val CoroutineDispatcher.asRdScheduler: IScheduler
get() = (this as? IScheduler) ?: asExecutor().asRdScheduler

fun CoroutineDispatcher.asRdScheduler(dispatcherExecutionOrder: ExecutionOrder): IScheduler = (this as? IScheduler)
?: asExecutor().asRdScheduler(dispatcherExecutionOrder)

/**
* Transforms the current scheduler into a sequential one.
*
* If the current scheduler already guarantees sequential execution, it is returned as is.
* Otherwise, a new sequential scheduler is created that wraps the original scheduler.
*
* The returned sequential scheduler attempts to keep the original execution order
* until concurrent execution is detected. When concurrency is detected (i.e. when a new task is
* scheduled while a previous one hasn't finished yet), it stops preserving the original order and
* queues tasks internally to ensure sequential execution.
*
* @return a scheduler that guarantees sequential execution.
*/
fun IScheduler.asSequentialScheduler(): IScheduler {
if (executionOrder == ExecutionOrder.Sequential)
return this

return SequentialScheduler(this)
}

private class SequentialScheduler(private val realScheduler: IScheduler) : IScheduler {
private val queue = ConcurrentLinkedQueue<() -> Unit>()
private var thread: Thread? = null

private var state = when (realScheduler.executionOrder) {
ExecutionOrder.Sequential, ExecutionOrder.Unknown -> ActionKind.DelegateAsIs
ExecutionOrder.OutOfOrder -> ActionKind.Repost
}

override val executionOrder: ExecutionOrder get() = ExecutionOrder.Sequential

override val isActive: Boolean
get() = thread == Thread.currentThread()

override fun queue(action: () -> Unit) {
queue.add(action)

val delegateAsIs = synchronized(queue) {
when (state) {
ActionKind.Nothing -> return

ActionKind.Repost -> {
state = ActionKind.Nothing
false
}

ActionKind.DelegateAsIs -> true
}
}

if (delegateAsIs) {
delegateAsIs()
} else {
repost()
}
}

private fun delegateAsIs() {
realScheduler.queue {

synchronized(queue) {
if (state != ActionKind.DelegateAsIs)
return@queue

if (thread == null) {
thread = Thread.currentThread()
} else {
// concurrent behavior detected
state = ActionKind.Nothing
Iliya-usov marked this conversation as resolved.
Show resolved Hide resolved
return@queue
}
}

val action = queue.poll()
Logger.root.catch(action)

synchronized(queue) {
thread = null

if (state == ActionKind.DelegateAsIs)
return@queue
}

repost()
}
}

private fun repost() {
assert(state == ActionKind.Nothing)

realScheduler.queue {
thread = Thread.currentThread()
for (i in 0 until 16) {
val action = queue.poll() ?: break

Logger.root.catch(action)
}
thread = null

synchronized(queue) {
if (queue.peek() == null) {
state = ActionKind.Repost
return@queue
}
}

repost()
}
}

override fun flush() {
require(!isActive) { "Can't flush this scheduler in a reentrant way: we are inside queued item's execution" }
spinUntil { queue.isEmpty() }
}

private enum class ActionKind {
Nothing,
Repost,
DelegateAsIs
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.getLogger
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.lifetime.plusAssign
import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reflection.threadLocal
import java.util.*
Expand Down Expand Up @@ -33,6 +34,9 @@ abstract class SingleThreadSchedulerBase(val name: String) : IScheduler {
}
val tasksInQueue = AtomicInteger(0)

override val executionOrder: ExecutionOrder
get() = ExecutionOrder.Sequential

override fun queue(action: () -> Unit) {
tasksInQueue.incrementAndGet()
executor.execute {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.jetbrains.rd.util.threading

import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reactive.outOfOrderExecution
import com.jetbrains.rd.util.threadLocalWithInitial


Expand All @@ -23,6 +25,8 @@ object SynchronousScheduler : IScheduler {
override val isActive: Boolean
get() = active.get() > 0

override val executionOrder: ExecutionOrder
get() = ExecutionOrder.OutOfOrder


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.jetbrains.rd.util.threading.coroutines

import com.jetbrains.rd.util.reactive.IScheduler
import kotlinx.coroutines.CoroutineDispatcher
import kotlin.coroutines.CoroutineContext

private class SchedulerCoroutineDispatcher(private val scheduler: IScheduler, private val allowInlining: Boolean) : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) = scheduler.queue { block.run() }
override fun isDispatchNeeded(context: CoroutineContext) = !allowInlining || !scheduler.isActive
}

val IScheduler.asCoroutineDispatcher get() = (this as? CoroutineDispatcher) ?: asCoroutineDispatcher(false)
fun IScheduler.asCoroutineDispatcher(allowInlining: Boolean): CoroutineDispatcher = SchedulerCoroutineDispatcher(this, allowInlining)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.jetbrains.rd.util.threading.coroutines

import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reactive.ISource
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext

fun <T> ISource<T>.nextValueAsync(
lifetime: Lifetime,
condition: (T) -> Boolean = { true }
): Deferred<T> = CompletableDeferred<T>().also { d ->
val nestedDef = lifetime.createNested().apply {
synchronizeWith(d)
}

advise(nestedDef.lifetime) {
if (condition(it)) {
d.complete(it)
}
}
}

fun <T : Any> ISource<T?>.nextNotNullValueAsync(lifetime: Lifetime): Deferred<T> = CompletableDeferred<T>().also { d ->
val nestedDef = lifetime.createNested().apply {
synchronizeWith(d)
}

advise(nestedDef.lifetime) {
if (it != null) {
d.complete(it)
}
}
}

suspend fun <T> ISource<T>.nextValue(
condition: (T) -> Boolean = { true }
): T = Lifetime.using { // unsubscribe if coroutine was cancelled
nextValueAsync(it, condition).await()
}

fun ISource<Boolean>.nextTrueValueAsync(lifetime: Lifetime) = nextValueAsync(lifetime) { it }
fun ISource<Boolean>.nextFalseValueAsync(lifetime: Lifetime) = nextValueAsync(lifetime) { !it }

suspend fun ISource<Boolean>.nextTrueValue() = nextValue { it }
suspend fun ISource<Boolean>.nextFalseValue() = nextValue { !it }

suspend fun <T : Any> ISource<T?>.nextNotNullValue(): T =
Lifetime.using { // unsubscribe if coroutine was cancelled
nextNotNullValueAsync(it).await()
}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, scheduler: IScheduler, handler: suspend (T) -> Unit) {
adviseSuspend(lifetime, scheduler.asCoroutineDispatcher(allowInlining = true), handler)
}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, context: CoroutineContext, handler: suspend (T) -> Unit) {
advise(lifetime) {
lifetime.launch(context) {
handler(it)
}
}
}
Loading
Loading