Skip to content

Commit

Permalink
Update FlowAction API. (#373)
Browse files Browse the repository at this point in the history
* Update FlowAction API.

* Fix test issues.
  • Loading branch information
Laimiux authored Aug 28, 2024
1 parent ff581b2 commit 1b03419
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package com.instacart.formula.coroutines

import com.instacart.formula.Action
import com.instacart.formula.Cancelable
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

/**
* Adapter which allows creating Formula [Action] from Kotlin coroutine's. Take a
Expand All @@ -24,20 +27,10 @@ interface FlowAction<Event> : Action<Event> {
* }
* ```
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
crossinline create: () -> Flow<Event>
fun <Event> fromFlow(
create: () -> Flow<Event>
): Action<Event> {
return object : FlowAction<Event> {

override val scope: CoroutineScope = scope

override fun flow(): Flow<Event> {
return create()
}

override fun key(): Any = Unit
}
return FlowActionImpl(null, create)
}

/**
Expand All @@ -51,31 +44,32 @@ interface FlowAction<Event> : Action<Event> {
*
* @param key Used to distinguish this [Action] from other actions.
*/
inline fun <Event> fromFlow(
scope: CoroutineScope = MainScope(),
fun <Event> fromFlow(
key: Any?,
crossinline create: () -> Flow<Event>
create: () -> Flow<Event>
): Action<Event> {
return object : FlowAction<Event> {
override val scope: CoroutineScope = scope

override fun flow(): Flow<Event> {
return create()
}

override fun key(): Any? = key
}
return FlowActionImpl(key, create)
}
}

fun flow(): Flow<Event>

val scope: CoroutineScope

@OptIn(DelicateCoroutinesApi::class)
override fun start(send: (Event) -> Unit): Cancelable? {
val job = flow()
.onEach { send(it) }
.launchIn(scope)
val job = GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
withContext(Dispatchers.Unconfined) {
flow().collect { send(it) }
}
}
return Cancelable(job::cancel)
}
}

private data class FlowActionImpl<Event>(
private val key: Any?,
private val factory: () -> Flow<Event>
) : FlowAction<Event> {
override fun flow(): Flow<Event> = factory()

override fun key(): Any? = key
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ import com.instacart.formula.coroutines.toFlow
import com.instacart.formula.plugin.Dispatcher
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestCoroutineDispatcher
import kotlinx.coroutines.test.TestCoroutineScope
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.test.setMain
import kotlinx.coroutines.withContext
import org.junit.rules.TestRule
import org.junit.rules.TestWatcher
import org.junit.runner.Description
Expand Down Expand Up @@ -70,20 +76,30 @@ object CoroutinesTestableRuntime : TestableRuntime {
}

private class FlowRelay : Relay {
private val sharedFlow = MutableSharedFlow<Unit>(0, 1)
private val sharedFlow = MutableSharedFlow<Unit>(0, 0)

override fun action(): Action<Unit> = FlowAction.fromFlow { sharedFlow }

@OptIn(DelicateCoroutinesApi::class)
override fun triggerEvent() {
sharedFlow.tryEmit(Unit)
GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
withContext(Dispatchers.Unconfined) {
sharedFlow.emit(Unit)
}
}
}
}

private class FlowStreamFormulaSubject : FlowFormula<String, Int>(), StreamFormulaSubject {
private val sharedFlow = MutableSharedFlow<Int>(0, extraBufferCapacity = 1, BufferOverflow.DROP_OLDEST)
private val sharedFlow = MutableSharedFlow<Int>(0, extraBufferCapacity = 0)

@OptIn(DelicateCoroutinesApi::class)
override fun emitEvent(event: Int) {
sharedFlow.tryEmit(event)
GlobalScope.launch(start = CoroutineStart.UNDISPATCHED) {
withContext(Dispatchers.Unconfined) {
sharedFlow.emit(event)
}
}
}

override fun initialValue(input: String): Int = 0
Expand Down Expand Up @@ -133,28 +149,13 @@ private class CoroutineTestDelegate<Input : Any, Output : Any, FormulaT : IFormu
}
}

@OptIn(ExperimentalStdlibApi::class)
@OptIn(ExperimentalCoroutinesApi::class)
private class CoroutineTestRule(
val testCoroutineScope: TestCoroutineScope = TestCoroutineScope(TestCoroutineDispatcher())
) : TestWatcher() {

override fun starting(description: Description) {
Dispatchers.resetMain()
Dispatchers.setMain(testCoroutineScope.coroutineContext[CoroutineDispatcher]!!)
super.starting(description)
}

override fun apply(base: Statement, description: Description): Statement {
var result: Statement? = null
testCoroutineScope.runBlockingTest {
result = super.apply(base, description)
}
return result!!
}

override fun finished(description: Description?) {
override fun finished(description: Description) {
super.finished(description)
Dispatchers.resetMain()
testCoroutineScope.cleanupTestCoroutines()
}
}

0 comments on commit 1b03419

Please sign in to comment.