Skip to content

Commit

Permalink
Re-organize FormulaRuntime.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laimiux committed Jun 30, 2023
1 parent 4cdcb4c commit 1b46c3d
Showing 1 changed file with 46 additions and 57 deletions.
103 changes: 46 additions & 57 deletions formula/src/main/java/com/instacart/formula/FormulaRuntime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@ class FormulaRuntime<Input : Any, Output : Any>(
forceRun()
hasInitialFinished = true

lastOutput?.let {
onOutput(it)
}
emitOutputIfNeeded(isInitialRun = true)
} else {
forceRun()
}
Expand All @@ -71,59 +69,60 @@ class FormulaRuntime<Input : Any, Output : Any>(
effectQueue.addLast(it)
}

run(shouldEvaluate = evaluate)
run(evaluate = evaluate)
}

override fun onPostponedTransition(event: Event) {
event.dispatch()
}

private fun forceRun() = run(shouldEvaluate = true)
private fun forceRun() = run(evaluate = true)

/**
* Performs the evaluation and execution phases.
*
* @param shouldEvaluate Determines if evaluation needs to be run.
* @param evaluate Determines if evaluation needs to be run.
*/
private fun run(shouldEvaluate: Boolean) {
private fun run(evaluate: Boolean) {
if (isEvaluating) return

try {
val freshRun = !isExecuting
if (freshRun) {
inspector?.onRunStarted(shouldEvaluate)
}
runFormula(evaluate)
emitOutputIfNeeded(isInitialRun = false)
} catch (e: Throwable) {
isEvaluating = false

manager?.markAsTerminated()
onError(e)
manager?.performTerminationSideEffects()
}
}

val manager = checkNotNull(manager)
val currentInput = checkNotNull(input)
private fun runFormula(evaluate: Boolean) {
val freshRun = !isExecuting
if (freshRun) {
inspector?.onRunStarted(evaluate)
}

val manager = checkNotNull(manager)
val currentInput = checkNotNull(input)

if (evaluate && !manager.terminated) {
isEvaluating = true
if (shouldEvaluate && !manager.terminated) {
evaluationPhase(manager, currentInput)
}
evaluationPhase(manager, currentInput)
isEvaluating = false
}

if (shouldEvaluate || effectQueue.isNotEmpty()) {
executionRequested = true
}
if (isExecuting) return
if (evaluate || effectQueue.isNotEmpty()) {
executionRequested = true
}

effectPhase(manager)
if (isExecuting) return

if (freshRun) {
inspector?.onRunFinished()
}
executeEffects()

if (hasInitialFinished && emitOutput) {
emitOutput = false
onOutput(checkNotNull(lastOutput))
}
} catch (e: Throwable) {
isEvaluating = false

manager?.markAsTerminated()
onError(e)
manager?.performTerminationSideEffects()
if (freshRun) {
inspector?.onRunFinished()
}
}

Expand All @@ -149,38 +148,28 @@ class FormulaRuntime<Input : Any, Output : Any>(
}
}

/**
* Executes operations containing side-effects such as starting/terminating streams.
*/
private fun effectPhase(manager: FormulaManagerImpl<Input, *, Output>) {
isExecuting = true
while (executionRequested) {
executionRequested = false

val transitionId = manager.transitionID
// We execute pending side-effects even after termination
if (executeEffects(manager, transitionId)) {
continue
}
}
isExecuting = false
}

/**
* Executes effects from the [effectQueue].
*/
private fun executeEffects(manager: FormulaManagerImpl<*, *, *>, transitionId: Long): Boolean {
private fun executeEffects() {
isExecuting = true
// Walk through the effect queue and execute them
while (effectQueue.isNotEmpty()) {
val effects = effectQueue.pollFirst()
if (effects != null) {
effects.execute()
inspector?.onEffectExecuted()

if (manager.hasTransitioned(transitionId)) {
return true
}
}
}
return false
isExecuting = false
}

private fun emitOutputIfNeeded(isInitialRun: Boolean,) {
if (isInitialRun) {
lastOutput?.let(onOutput)
} else if (hasInitialFinished && emitOutput) {
emitOutput = false
onOutput(checkNotNull(lastOutput))
}
}
}

0 comments on commit 1b46c3d

Please sign in to comment.