Skip to content

Commit

Permalink
Merge pull request #424 from JetBrains/threading-usov
Browse files Browse the repository at this point in the history
Threading usov
  • Loading branch information
Iliya-usov authored Jul 27, 2023
2 parents beaf0ee + 20cb738 commit 98ba9d4
Show file tree
Hide file tree
Showing 24 changed files with 566 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ interface IScheduler {
}
}

//Provides better performance but loose event consistency.
val outOfOrderExecution : Boolean get() = false

val executionOrder: ExecutionOrder

fun invokeOrQueue(action: () -> Unit) {
if (isActive) action()
Expand All @@ -39,4 +37,35 @@ interface ISchedulerWithBackground : IScheduler {
* An associated scheduler which executes all tasks outside of the main thread
*/
val backgroundScheduler: IScheduler
}
}

//Provides better performance but loose event consistency.
val IScheduler.outOfOrderExecution : Boolean get() = executionOrder == ExecutionOrder.OutOfOrder

/**
* Represents the execution order guarantee of a scheduler.
*/
enum class ExecutionOrder {
/**
* The scheduler guarantees a sequential execution order.
* Tasks are executed in the order they were received.
*/
Sequential,

/**
* The scheduler does not guarantee a sequential execution order.
* Tasks may be executed concurrently or in a different order than received.
*/
OutOfOrder,

/**
* The execution order of the scheduler is unknown.
* This is typically used when the scheduler is a wrapper around another service or dispatcher
* where the execution order cannot be directly determined.
* It is important to note that 'Unknown' should not be treated as 'OutOfOrder'. It may represent
* a sequential scheduler and any optimization assuming an 'OutOfOrder' execution may potentially
* disrupt the actual execution order.
*/
Unknown
}

Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package com.jetbrains.rd.util.threading

import com.jetbrains.rd.util.Logger
import com.jetbrains.rd.util.catch
import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reflection.threadLocal
import com.jetbrains.rd.util.spinUntil
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.asExecutor
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executor
import java.util.concurrent.atomic.AtomicInteger

val Executor.asRdScheduler: IScheduler
get() {
get() = asRdScheduler(ExecutionOrder.Unknown)

fun Executor.asRdScheduler(executionOrder: ExecutionOrder): IScheduler {
val executor = this
return executor as? IScheduler ?: object : IScheduler {

Expand All @@ -20,6 +25,9 @@ val Executor.asRdScheduler: IScheduler

override val isActive get() = active > 0

override val executionOrder: ExecutionOrder
get() = executionOrder

override fun queue(action: () -> Unit) {
tasksInQueue.incrementAndGet()
executor.execute {
Expand All @@ -42,5 +50,131 @@ val Executor.asRdScheduler: IScheduler
}
}


val CoroutineDispatcher.asRdScheduler: IScheduler
get() = (this as? IScheduler) ?: asExecutor().asRdScheduler

fun CoroutineDispatcher.asRdScheduler(dispatcherExecutionOrder: ExecutionOrder): IScheduler = (this as? IScheduler)
?: asExecutor().asRdScheduler(dispatcherExecutionOrder)

/**
* Transforms the current scheduler into a sequential one.
*
* If the current scheduler already guarantees sequential execution, it is returned as is.
* Otherwise, a new sequential scheduler is created that wraps the original scheduler.
*
* The returned sequential scheduler attempts to keep the original execution order
* until concurrent execution is detected. When concurrency is detected (i.e. when a new task is
* scheduled while a previous one hasn't finished yet), it stops preserving the original order and
* queues tasks internally to ensure sequential execution.
*
* @return a scheduler that guarantees sequential execution.
*/
fun IScheduler.asSequentialScheduler(): IScheduler {
if (executionOrder == ExecutionOrder.Sequential)
return this

return SequentialScheduler(this)
}

private class SequentialScheduler(private val realScheduler: IScheduler) : IScheduler {
private val queue = ConcurrentLinkedQueue<() -> Unit>()
private var thread: Thread? = null

private var state = when (realScheduler.executionOrder) {
ExecutionOrder.Sequential, ExecutionOrder.Unknown -> ActionKind.DelegateAsIs
ExecutionOrder.OutOfOrder -> ActionKind.Repost
}

override val executionOrder: ExecutionOrder get() = ExecutionOrder.Sequential

override val isActive: Boolean
get() = thread == Thread.currentThread()

override fun queue(action: () -> Unit) {
queue.add(action)

val delegateAsIs = synchronized(queue) {
when (state) {
ActionKind.Nothing -> return

ActionKind.Repost -> {
state = ActionKind.Nothing
false
}

ActionKind.DelegateAsIs -> true
}
}

if (delegateAsIs) {
delegateAsIs()
} else {
repost()
}
}

private fun delegateAsIs() {
realScheduler.queue {

synchronized(queue) {
if (state != ActionKind.DelegateAsIs)
return@queue

if (thread == null) {
thread = Thread.currentThread()
} else {
// concurrent behavior detected
state = ActionKind.Nothing
return@queue
}
}

val action = queue.poll()
Logger.root.catch(action)

synchronized(queue) {
thread = null

if (state == ActionKind.DelegateAsIs)
return@queue
}

repost()
}
}

private fun repost() {
assert(state == ActionKind.Nothing)

realScheduler.queue {
thread = Thread.currentThread()
for (i in 0 until 16) {
val action = queue.poll() ?: break

Logger.root.catch(action)
}
thread = null

synchronized(queue) {
if (queue.peek() == null) {
state = ActionKind.Repost
return@queue
}
}

repost()
}
}

override fun flush() {
require(!isActive) { "Can't flush this scheduler in a reentrant way: we are inside queued item's execution" }
spinUntil { queue.isEmpty() }
}

private enum class ActionKind {
Nothing,
Repost,
DelegateAsIs
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.jetbrains.rd.util.error
import com.jetbrains.rd.util.getLogger
import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.lifetime.plusAssign
import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reflection.threadLocal
import java.util.*
Expand Down Expand Up @@ -33,6 +34,9 @@ abstract class SingleThreadSchedulerBase(val name: String) : IScheduler {
}
val tasksInQueue = AtomicInteger(0)

override val executionOrder: ExecutionOrder
get() = ExecutionOrder.Sequential

override fun queue(action: () -> Unit) {
tasksInQueue.incrementAndGet()
executor.execute {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.jetbrains.rd.util.threading

import com.jetbrains.rd.util.reactive.ExecutionOrder
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reactive.outOfOrderExecution
import com.jetbrains.rd.util.threadLocalWithInitial


Expand All @@ -23,6 +25,8 @@ object SynchronousScheduler : IScheduler {
override val isActive: Boolean
get() = active.get() > 0

override val executionOrder: ExecutionOrder
get() = ExecutionOrder.OutOfOrder


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.jetbrains.rd.util.threading.coroutines

import com.jetbrains.rd.util.reactive.IScheduler
import kotlinx.coroutines.CoroutineDispatcher
import kotlin.coroutines.CoroutineContext

private class SchedulerCoroutineDispatcher(private val scheduler: IScheduler, private val allowInlining: Boolean) : CoroutineDispatcher() {
override fun dispatch(context: CoroutineContext, block: Runnable) = scheduler.queue { block.run() }
override fun isDispatchNeeded(context: CoroutineContext) = !allowInlining || !scheduler.isActive
}

val IScheduler.asCoroutineDispatcher get() = (this as? CoroutineDispatcher) ?: asCoroutineDispatcher(false)
fun IScheduler.asCoroutineDispatcher(allowInlining: Boolean): CoroutineDispatcher = SchedulerCoroutineDispatcher(this, allowInlining)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.jetbrains.rd.util.threading.coroutines

import com.jetbrains.rd.util.lifetime.Lifetime
import com.jetbrains.rd.util.reactive.IScheduler
import com.jetbrains.rd.util.reactive.ISource
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlin.coroutines.CoroutineContext

fun <T> ISource<T>.nextValueAsync(
lifetime: Lifetime,
condition: (T) -> Boolean = { true }
): Deferred<T> = CompletableDeferred<T>().also { d ->
val nestedDef = lifetime.createNested().apply {
synchronizeWith(d)
}

advise(nestedDef.lifetime) {
if (condition(it)) {
d.complete(it)
}
}
}

fun <T : Any> ISource<T?>.nextNotNullValueAsync(lifetime: Lifetime): Deferred<T> = CompletableDeferred<T>().also { d ->
val nestedDef = lifetime.createNested().apply {
synchronizeWith(d)
}

advise(nestedDef.lifetime) {
if (it != null) {
d.complete(it)
}
}
}

suspend fun <T> ISource<T>.nextValue(
condition: (T) -> Boolean = { true }
): T = Lifetime.using { // unsubscribe if coroutine was cancelled
nextValueAsync(it, condition).await()
}

fun ISource<Boolean>.nextTrueValueAsync(lifetime: Lifetime) = nextValueAsync(lifetime) { it }
fun ISource<Boolean>.nextFalseValueAsync(lifetime: Lifetime) = nextValueAsync(lifetime) { !it }

suspend fun ISource<Boolean>.nextTrueValue() = nextValue { it }
suspend fun ISource<Boolean>.nextFalseValue() = nextValue { !it }

suspend fun <T : Any> ISource<T?>.nextNotNullValue(): T =
Lifetime.using { // unsubscribe if coroutine was cancelled
nextNotNullValueAsync(it).await()
}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, scheduler: IScheduler, handler: suspend (T) -> Unit) {
adviseSuspend(lifetime, scheduler.asCoroutineDispatcher(allowInlining = true), handler)
}

fun<T> ISource<T>.adviseSuspend(lifetime: Lifetime, context: CoroutineContext, handler: suspend (T) -> Unit) {
advise(lifetime) {
lifetime.launch(context) {
handler(it)
}
}
}
Loading

0 comments on commit 98ba9d4

Please sign in to comment.