Skip to content

Commit

Permalink
Fire event retry strategy can be retried (#1635)
Browse files Browse the repository at this point in the history
* Added extra logging if a recipe is not found during deletion of a process.

* Set the default retention period to 14 days for deletion if the recipe is not found.

* Changed the behaviour of the FireEvent retry strategy. It now blocks the interaction instead of resolving it. This allows the interaction to be retried with the retyInteraction method.

* Re-added removed tests.
  • Loading branch information
Tim-Linschoten committed Mar 19, 2024
1 parent 7919f78 commit 3982239
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 11 deletions.
13 changes: 13 additions & 0 deletions core/akka-runtime/src/main/protobuf/process_instance.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ message TransitionFired {
optional SerializedData data = 8;
}

message TransitionFailedWithOutput {
option (scalapb.message).extends = "com.ing.baker.runtime.akka.actor.serialization.BakerSerializable";

optional int64 job_id = 1;
optional string correlation_id = 9;
optional int64 transition_id = 3;
optional int64 time_started = 4;
optional int64 time_completed = 5;
repeated ConsumedToken consumed = 6;
repeated ProducedToken produced = 7;
optional SerializedData data = 8;
}

message TransitionDelayed {
option (scalapb.message).extends = "com.ing.baker.runtime.akka.actor.serialization.BakerSerializable";
optional int64 job_id = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,15 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
}

def shouldDelete(meta: ActorMetadata): Boolean = {
meta.processStatus != Deleted &&
getCompiledRecipe(meta.recipeId)
.flatMap(_.retentionPeriod)
.exists { p => meta.createdDateTime + p.toMillis < System.currentTimeMillis() }
if(meta.processStatus != Deleted)
getCompiledRecipe(meta.recipeId) match {
case Some(recipe) =>
recipe.retentionPeriod.exists { p => meta.createdDateTime + p.toMillis < System.currentTimeMillis() }
case None =>
log.error(s"Could not find recipe: ${meta.recipeId} during deletion for recipeInstanceId: ${meta.recipeInstanceId} using default 14 days")
meta.createdDateTime + (14 days).toMillis < System.currentTimeMillis()
}
else false
}

private def deleteProcess(meta: ActorMetadata): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,23 @@ class ProcessInstance[S, E](
context become running(instance, scheduledRetries)})

case event@TransitionFiredEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) =>
val transition = instance.petriNet.transitions.getById(transitionId)
log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition, jobId, timeStarted, timeCompleted)
// persist the success event
persistEvent(instance, event)(
eventSource.apply(instance)
.andThen(step)
.andThen {
case (updatedInstance, newJobs) =>
// the sender is notified of the transition having fired
sender() ! TransitionFired(jobId, transitionId, correlationId, consumed, produced, newJobs.map(_.id), filterIngredientValuesFromEventInstance(output))

// the job is removed from the state since it completed
context become running(updatedInstance, scheduledRetries - jobId)
}
)

case event@TransitionFailedWithOutputEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) =>
val transition = instance.petriNet.transitions.getById(transitionId)
log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition, jobId, timeStarted, timeCompleted)
// persist the success event
Expand Down Expand Up @@ -397,10 +413,10 @@ class ProcessInstance[S, E](
)

case Continue(produced, out) =>
val transitionFiredEvent = TransitionFiredEvent(
val TransitionFailedWithOutput = TransitionFailedWithOutputEvent(
jobId, transitionId, correlationId, timeStarted, timeFailed, consume, marshallMarking(produced), out)

persistEvent(instance, transitionFiredEvent)(
persistEvent(instance, TransitionFailedWithOutput)(
eventSource.apply(instance)
.andThen(step)
.andThen { case (updatedInstance, newJobs) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ object ProcessInstanceEventSourcing extends LazyLogging {
produced: Marking[Id],
output: Any) extends TransitionEvent

/**
* An event describing the fact that a transition failed but was continued with a given event
* This does not consume the input and puts the transition in a blocked state but does create the output.
*/
case class TransitionFailedWithOutputEvent(override val jobId: Long,
override val transitionId: Id,
correlationId: Option[String],
timeStarted: Long,
timeCompleted: Long,
consumed: Marking[Id],
produced: Marking[Id],
output: Any) extends TransitionEvent

/**
* An event describing the fact that a transition failed to fire.
*/
Expand Down Expand Up @@ -86,6 +99,7 @@ object ProcessInstanceEventSourcing extends LazyLogging {
val initialMarking: Marking[Place] = initial.unmarshall(instance.petriNet.places)

Instance[S](instance.petriNet, 1, initialMarking, Map.empty, initialState.asInstanceOf[S], Map.empty, Set.empty)

case e: TransitionFiredEvent =>
val transition = instance.petriNet.transitions.getById(e.transitionId)
val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E])
Expand All @@ -99,6 +113,28 @@ object ProcessInstanceEventSourcing extends LazyLogging {
state = newState,
jobs = instance.jobs - e.jobId
)

case e: TransitionFailedWithOutputEvent =>
val transition = instance.petriNet.transitions.getById(e.transitionId)
val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E])
// We only have the consumed markings for the job
val consumed: Marking[Place] = e.consumed.unmarshall(instance.petriNet.places)
val produced: Marking[Place] = e.produced.unmarshall(instance.petriNet.places)

//TODO determine what to do with the job not found situation
val job = instance.jobs.getOrElse(e.jobId, {
Job[S](e.jobId, e.correlationId, instance.state, transition, consumed, null, None)
})
val updatedJob: Job[S] = job.copy(failure = Some(ExceptionState(0, 1, "Blocked after FireEvent retry strategy", ExceptionStrategy.BlockTransition)))

instance.copy[S](
sequenceNr = instance.sequenceNr + 1,
marking = instance.marking |+| produced,
receivedCorrelationIds = instance.receivedCorrelationIds ++ e.correlationId,
state = newState,
jobs = instance.jobs + (job.id -> updatedJob)
)

case e: TransitionFailedEvent =>
val transition = instance.petriNet.transitions.getById(e.transitionId)

Expand Down Expand Up @@ -142,7 +178,7 @@ object ProcessInstanceEventSourcing extends LazyLogging {
val newBakerMetaData: Map[String, String] =
state.ingredients.get(RecipeInstanceMetaDataName) match {
case Some(value) =>
if(value.isInstanceOf(MapType(CharArray))) {
if (value.isInstanceOf(MapType(CharArray))) {
val oldMetaData: Map[String, String] = value.asMap[String, String](classOf[String], classOf[String]).asScala.toMap
oldMetaData ++ e.metaData
}
Expand Down Expand Up @@ -217,6 +253,7 @@ abstract class ProcessInstanceEventSourcing[S, E](
override def receiveRecover: Receive = {
case e: protobuf.Initialized => applyToRecoveringState(e)
case e: protobuf.TransitionFired => applyToRecoveringState(e)
case e: protobuf.TransitionFailedWithOutput => applyToRecoveringState(e)
case e: protobuf.TransitionFailed => applyToRecoveringState(e)
case e: protobuf.TransitionDelayed => applyToRecoveringState(e)
case e: protobuf.DelayedTransitionFired => applyToRecoveringState(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
def deserializeEvent(event: AnyRef): Instance[S] => ProcessInstanceEventSourcing.Event = event match {
case e: protobuf.Initialized => deserializeInitialized(e)
case e: protobuf.TransitionFired => deserializeTransitionFired(e)
case e: protobuf.TransitionFailedWithOutput => deserializeTransitionFailedWithOutput(e)
case e: protobuf.TransitionDelayed => deserializeTransitionDelayed(e)
case e: protobuf.DelayedTransitionFired => deserializeDelayedTransitionFired(e)
case e: protobuf.TransitionFailed => deserializeTransitionFailed(e)
Expand All @@ -46,6 +47,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
_ => e match {
case e: InitializedEvent => serializeInitialized(e)
case e: TransitionFiredEvent => serializeTransitionFired(e)
case e: TransitionFailedWithOutputEvent => serializeTransitionFailedWithOutput(e)
case e: TransitionDelayed => serializeTransitionDelayed(e)
case e: DelayedTransitionFired => serializeDelayedTransitionFired(e)
case e: TransitionFailedEvent => serializeTransitionFailed(e)
Expand Down Expand Up @@ -166,7 +168,6 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
}

private def serializeTransitionFired(e: TransitionFiredEvent): protobuf.TransitionFired = {

val consumedTokens = serializeConsumedMarking(e.consumed)
val producedTokens = serializeProducedMarking(e.produced)

Expand All @@ -181,6 +182,21 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
)
}

private def serializeTransitionFailedWithOutput(e: TransitionFailedWithOutputEvent): protobuf.TransitionFailedWithOutput = {
val consumedTokens = serializeConsumedMarking(e.consumed)
val producedTokens = serializeProducedMarking(e.produced)

protobuf.TransitionFailedWithOutput(
jobId = Some(e.jobId),
transitionId = Some(e.transitionId),
timeStarted = Some(e.timeStarted),
timeCompleted = Some(e.timeCompleted),
consumed = consumedTokens,
produced = producedTokens,
data = serializeObject(e.output)
)
}

private def serializeTransitionDelayed(e: TransitionDelayed): protobuf.TransitionDelayed = {
val consumedTokens = serializeConsumedMarking(e.consumed)
protobuf.TransitionDelayed(
Expand All @@ -203,7 +219,6 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
}

private def deserializeTransitionFired(e: protobuf.TransitionFired): Instance[S] => TransitionFiredEvent = instance => {

val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed)
val produced: Marking[Id] = deserializeProducedMarking(instance, e.produced)

Expand All @@ -217,6 +232,20 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
TransitionFiredEvent(jobId, transitionId, e.correlationId, timeStarted, timeCompleted, consumed, produced, output)
}

private def deserializeTransitionFailedWithOutput(e: protobuf.TransitionFailedWithOutput): Instance[S] => TransitionFailedWithOutputEvent = instance => {
val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed)
val produced: Marking[Id] = deserializeProducedMarking(instance, e.produced)

val output = e.data.map(deserializeObject).orNull

val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id"))
val jobId = e.jobId.getOrElse(missingFieldException("job_id"))
val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started"))
val timeCompleted = e.timeCompleted.getOrElse(missingFieldException("time_completed"))

TransitionFailedWithOutputEvent(jobId, transitionId, e.correlationId, timeStarted, timeCompleted, consumed, produced, output)
}

private def deserializeTransitionDelayed(e: protobuf.TransitionDelayed): Instance[S] => TransitionDelayed = instance => {

val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ing.baker.runtime.akka.actor.process_instance.internal

import com.ing.baker.il.petrinet.Place
import com.ing.baker.petrinet.api.Marking

object ExceptionStrategy {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ object BakerTypedProtobufSerializer {
.register("ProcessInstanceProtocol.MetaDataAdded"),
forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFired]
.register("TransitionFired")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFired)),
forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailedWithOutput]
.register("TransitionFailedWithOutput")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailedWithOutput)),
forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailed]
.register("TransitionFailed")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailed)),
forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.Initialized]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,34 @@ class BakerExecutionSpec extends BakerRuntimeTestBase {
"interactionOneOriginalIngredient" -> "success!")
}


"retry a blocked interaction after it had the FireEvent retry strategy" in {
val recipe =
Recipe("RetryBlockedInteractionRecipe")
.withInteraction(interactionOne
.withFailureStrategy(InteractionFailureStrategy.FireEventAfterFailure(Some("interactionOneSuccessful"))))
.withSensoryEvent(initialEvent)

for {
(baker, recipeId) <- setupBakerWithRecipe(recipe, mockImplementations)
_ = when(testInteractionOneMock.apply(anyString(), anyString()))
.thenThrow(new RuntimeException("Expected test failure"))
.thenReturn(Future.successful(InteractionOneSuccessful("success!")))
recipeInstanceId = UUID.randomUUID().toString
_ <- baker.bake(recipeId, recipeInstanceId)
_ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue)))
state0 <- baker.getRecipeInstanceState(recipeInstanceId)
_ = state0.ingredients shouldBe
ingredientMap(
"initialIngredient" -> initialIngredientValue)
_ <- baker.retryInteraction(recipeInstanceId, interactionOne.name)
state <- baker.getRecipeInstanceState(recipeInstanceId)
} yield state.ingredients shouldBe
ingredientMap(
"initialIngredient" -> initialIngredientValue,
"interactionOneOriginalIngredient" -> "success!")
}

"be able to return" when {
"all occurred events" in {
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance
} yield output -> enabledExecutions

case Left(ExceptionStrategyOutcome.Continue(eventName)) =>
handleExecutionOutcome(finishedExecution)(Right(Some(EventInstance(eventName, Map.empty))))
val output: EventInstance = EventInstance(eventName, Map.empty)
for {
enabledExecutions <- state.modify(_.recordFailedWithOutputExecution(finishedExecution, Some(output)))
_ <- scheduleIdleStop
} yield Some(output) -> enabledExecutions

case Left(strategy @ ExceptionStrategyOutcome.BlockTransition) =>
state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ case class RecipeInstanceState(
def recordFailedExecution(transitionExecution: TransitionExecution, exceptionStrategy: ExceptionStrategyOutcome): RecipeInstanceState =
addExecution(transitionExecution.toFailedState(exceptionStrategy))

def recordFailedWithOutputExecution(transitionExecution: TransitionExecution, output: Option[EventInstance]): (RecipeInstanceState, Set[TransitionExecution]) =
aggregateOutputEvent(output)
.increaseSequenceNumber
.aggregatePetriNetChanges(transitionExecution, output)
.addCompletedCorrelationId(transitionExecution)
.addExecution(transitionExecution.copy(state = TransitionExecution.State.Failed(transitionExecution.failureCount, ExceptionStrategyOutcome.BlockTransition)))
.allEnabledExecutions

def recordCompletedExecution(transitionExecution: TransitionExecution, output: Option[EventInstance]): (RecipeInstanceState, Set[TransitionExecution]) =
aggregateOutputEvent(output)
.increaseSequenceNumber
Expand Down

0 comments on commit 3982239

Please sign in to comment.