From 39822392c40a8d9190e536f1437b8cf0d5a831cd Mon Sep 17 00:00:00 2001 From: Tim Linschoten Date: Tue, 19 Mar 2024 13:06:37 +0100 Subject: [PATCH] Fire event retry strategy can be retried (#1635) * Added extra logging if a recipe is not found during deletion of a process. * Set the default retention period to 14 days for deletion if the recipe is not found. * Changed the behaviour of the FireEvent retry strategy. It now blocks the interaction instead of resolving it. This allows the interaction to be retried with the retyInteraction method. * Re-added removed tests. --- .../src/main/protobuf/process_instance.proto | 13 +++++++ .../actor/process_index/ProcessIndex.scala | 13 +++++-- .../process_instance/ProcessInstance.scala | 20 +++++++++- .../ProcessInstanceEventSourcing.scala | 39 ++++++++++++++++++- .../ProcessInstanceSerialization.scala | 33 +++++++++++++++- .../internal/ExceptionStrategy.scala | 1 - .../BakerTypedProtobufSerializer.scala | 2 + .../runtime/akka/BakerExecutionSpec.scala | 28 +++++++++++++ .../model/recipeinstance/RecipeInstance.scala | 6 ++- .../recipeinstance/RecipeInstanceState.scala | 8 ++++ 10 files changed, 152 insertions(+), 11 deletions(-) diff --git a/core/akka-runtime/src/main/protobuf/process_instance.proto b/core/akka-runtime/src/main/protobuf/process_instance.proto index 5fb1fdeb7..a2ca802cc 100644 --- a/core/akka-runtime/src/main/protobuf/process_instance.proto +++ b/core/akka-runtime/src/main/protobuf/process_instance.proto @@ -49,6 +49,19 @@ message TransitionFired { optional SerializedData data = 8; } +message TransitionFailedWithOutput { + option (scalapb.message).extends = "com.ing.baker.runtime.akka.actor.serialization.BakerSerializable"; + + optional int64 job_id = 1; + optional string correlation_id = 9; + optional int64 transition_id = 3; + optional int64 time_started = 4; + optional int64 time_completed = 5; + repeated ConsumedToken consumed = 6; + repeated ProducedToken produced = 7; + optional SerializedData data = 8; +} + message TransitionDelayed { option (scalapb.message).extends = "com.ing.baker.runtime.akka.actor.serialization.BakerSerializable"; optional int64 job_id = 1; diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala index 8c18c0b05..e1bf3a4f0 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala @@ -238,10 +238,15 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration], } def shouldDelete(meta: ActorMetadata): Boolean = { - meta.processStatus != Deleted && - getCompiledRecipe(meta.recipeId) - .flatMap(_.retentionPeriod) - .exists { p => meta.createdDateTime + p.toMillis < System.currentTimeMillis() } + if(meta.processStatus != Deleted) + getCompiledRecipe(meta.recipeId) match { + case Some(recipe) => + recipe.retentionPeriod.exists { p => meta.createdDateTime + p.toMillis < System.currentTimeMillis() } + case None => + log.error(s"Could not find recipe: ${meta.recipeId} during deletion for recipeInstanceId: ${meta.recipeInstanceId} using default 14 days") + meta.createdDateTime + (14 days).toMillis < System.currentTimeMillis() + } + else false } private def deleteProcess(meta: ActorMetadata): Unit = { diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala index ec2a5a93e..720df06d2 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala @@ -299,7 +299,23 @@ class ProcessInstance[S, E]( context become running(instance, scheduledRetries)}) case event@TransitionFiredEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) => + val transition = instance.petriNet.transitions.getById(transitionId) + log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition, jobId, timeStarted, timeCompleted) + // persist the success event + persistEvent(instance, event)( + eventSource.apply(instance) + .andThen(step) + .andThen { + case (updatedInstance, newJobs) => + // the sender is notified of the transition having fired + sender() ! TransitionFired(jobId, transitionId, correlationId, consumed, produced, newJobs.map(_.id), filterIngredientValuesFromEventInstance(output)) + + // the job is removed from the state since it completed + context become running(updatedInstance, scheduledRetries - jobId) + } + ) + case event@TransitionFailedWithOutputEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) => val transition = instance.petriNet.transitions.getById(transitionId) log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition, jobId, timeStarted, timeCompleted) // persist the success event @@ -397,10 +413,10 @@ class ProcessInstance[S, E]( ) case Continue(produced, out) => - val transitionFiredEvent = TransitionFiredEvent( + val TransitionFailedWithOutput = TransitionFailedWithOutputEvent( jobId, transitionId, correlationId, timeStarted, timeFailed, consume, marshallMarking(produced), out) - persistEvent(instance, transitionFiredEvent)( + persistEvent(instance, TransitionFailedWithOutput)( eventSource.apply(instance) .andThen(step) .andThen { case (updatedInstance, newJobs) => diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala index 03bea38ba..9571cd039 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala @@ -40,6 +40,19 @@ object ProcessInstanceEventSourcing extends LazyLogging { produced: Marking[Id], output: Any) extends TransitionEvent + /** + * An event describing the fact that a transition failed but was continued with a given event + * This does not consume the input and puts the transition in a blocked state but does create the output. + */ + case class TransitionFailedWithOutputEvent(override val jobId: Long, + override val transitionId: Id, + correlationId: Option[String], + timeStarted: Long, + timeCompleted: Long, + consumed: Marking[Id], + produced: Marking[Id], + output: Any) extends TransitionEvent + /** * An event describing the fact that a transition failed to fire. */ @@ -86,6 +99,7 @@ object ProcessInstanceEventSourcing extends LazyLogging { val initialMarking: Marking[Place] = initial.unmarshall(instance.petriNet.places) Instance[S](instance.petriNet, 1, initialMarking, Map.empty, initialState.asInstanceOf[S], Map.empty, Set.empty) + case e: TransitionFiredEvent => val transition = instance.petriNet.transitions.getById(e.transitionId) val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E]) @@ -99,6 +113,28 @@ object ProcessInstanceEventSourcing extends LazyLogging { state = newState, jobs = instance.jobs - e.jobId ) + + case e: TransitionFailedWithOutputEvent => + val transition = instance.petriNet.transitions.getById(e.transitionId) + val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E]) +// We only have the consumed markings for the job + val consumed: Marking[Place] = e.consumed.unmarshall(instance.petriNet.places) + val produced: Marking[Place] = e.produced.unmarshall(instance.petriNet.places) + + //TODO determine what to do with the job not found situation + val job = instance.jobs.getOrElse(e.jobId, { + Job[S](e.jobId, e.correlationId, instance.state, transition, consumed, null, None) + }) + val updatedJob: Job[S] = job.copy(failure = Some(ExceptionState(0, 1, "Blocked after FireEvent retry strategy", ExceptionStrategy.BlockTransition))) + + instance.copy[S]( + sequenceNr = instance.sequenceNr + 1, + marking = instance.marking |+| produced, + receivedCorrelationIds = instance.receivedCorrelationIds ++ e.correlationId, + state = newState, + jobs = instance.jobs + (job.id -> updatedJob) + ) + case e: TransitionFailedEvent => val transition = instance.petriNet.transitions.getById(e.transitionId) @@ -142,7 +178,7 @@ object ProcessInstanceEventSourcing extends LazyLogging { val newBakerMetaData: Map[String, String] = state.ingredients.get(RecipeInstanceMetaDataName) match { case Some(value) => - if(value.isInstanceOf(MapType(CharArray))) { + if (value.isInstanceOf(MapType(CharArray))) { val oldMetaData: Map[String, String] = value.asMap[String, String](classOf[String], classOf[String]).asScala.toMap oldMetaData ++ e.metaData } @@ -217,6 +253,7 @@ abstract class ProcessInstanceEventSourcing[S, E]( override def receiveRecover: Receive = { case e: protobuf.Initialized => applyToRecoveringState(e) case e: protobuf.TransitionFired => applyToRecoveringState(e) + case e: protobuf.TransitionFailedWithOutput => applyToRecoveringState(e) case e: protobuf.TransitionFailed => applyToRecoveringState(e) case e: protobuf.TransitionDelayed => applyToRecoveringState(e) case e: protobuf.DelayedTransitionFired => applyToRecoveringState(e) diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala index b4c087fc6..845cb9955 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala @@ -32,6 +32,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { def deserializeEvent(event: AnyRef): Instance[S] => ProcessInstanceEventSourcing.Event = event match { case e: protobuf.Initialized => deserializeInitialized(e) case e: protobuf.TransitionFired => deserializeTransitionFired(e) + case e: protobuf.TransitionFailedWithOutput => deserializeTransitionFailedWithOutput(e) case e: protobuf.TransitionDelayed => deserializeTransitionDelayed(e) case e: protobuf.DelayedTransitionFired => deserializeDelayedTransitionFired(e) case e: protobuf.TransitionFailed => deserializeTransitionFailed(e) @@ -46,6 +47,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { _ => e match { case e: InitializedEvent => serializeInitialized(e) case e: TransitionFiredEvent => serializeTransitionFired(e) + case e: TransitionFailedWithOutputEvent => serializeTransitionFailedWithOutput(e) case e: TransitionDelayed => serializeTransitionDelayed(e) case e: DelayedTransitionFired => serializeDelayedTransitionFired(e) case e: TransitionFailedEvent => serializeTransitionFailed(e) @@ -166,7 +168,6 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { } private def serializeTransitionFired(e: TransitionFiredEvent): protobuf.TransitionFired = { - val consumedTokens = serializeConsumedMarking(e.consumed) val producedTokens = serializeProducedMarking(e.produced) @@ -181,6 +182,21 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { ) } + private def serializeTransitionFailedWithOutput(e: TransitionFailedWithOutputEvent): protobuf.TransitionFailedWithOutput = { + val consumedTokens = serializeConsumedMarking(e.consumed) + val producedTokens = serializeProducedMarking(e.produced) + + protobuf.TransitionFailedWithOutput( + jobId = Some(e.jobId), + transitionId = Some(e.transitionId), + timeStarted = Some(e.timeStarted), + timeCompleted = Some(e.timeCompleted), + consumed = consumedTokens, + produced = producedTokens, + data = serializeObject(e.output) + ) + } + private def serializeTransitionDelayed(e: TransitionDelayed): protobuf.TransitionDelayed = { val consumedTokens = serializeConsumedMarking(e.consumed) protobuf.TransitionDelayed( @@ -203,7 +219,6 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { } private def deserializeTransitionFired(e: protobuf.TransitionFired): Instance[S] => TransitionFiredEvent = instance => { - val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed) val produced: Marking[Id] = deserializeProducedMarking(instance, e.produced) @@ -217,6 +232,20 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { TransitionFiredEvent(jobId, transitionId, e.correlationId, timeStarted, timeCompleted, consumed, produced, output) } + private def deserializeTransitionFailedWithOutput(e: protobuf.TransitionFailedWithOutput): Instance[S] => TransitionFailedWithOutputEvent = instance => { + val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed) + val produced: Marking[Id] = deserializeProducedMarking(instance, e.produced) + + val output = e.data.map(deserializeObject).orNull + + val transitionId = e.transitionId.getOrElse(missingFieldException("transition_id")) + val jobId = e.jobId.getOrElse(missingFieldException("job_id")) + val timeStarted = e.timeStarted.getOrElse(missingFieldException("time_started")) + val timeCompleted = e.timeCompleted.getOrElse(missingFieldException("time_completed")) + + TransitionFailedWithOutputEvent(jobId, transitionId, e.correlationId, timeStarted, timeCompleted, consumed, produced, output) + } + private def deserializeTransitionDelayed(e: protobuf.TransitionDelayed): Instance[S] => TransitionDelayed = instance => { val consumed: Marking[Id] = deserializeConsumedMarking(instance, e.consumed) diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/internal/ExceptionStrategy.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/internal/ExceptionStrategy.scala index 5bebec05c..23320d641 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/internal/ExceptionStrategy.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/internal/ExceptionStrategy.scala @@ -1,6 +1,5 @@ package com.ing.baker.runtime.akka.actor.process_instance.internal -import com.ing.baker.il.petrinet.Place import com.ing.baker.petrinet.api.Marking object ExceptionStrategy { diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/serialization/BakerTypedProtobufSerializer.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/serialization/BakerTypedProtobufSerializer.scala index 73c6e438d..1f75c0f4e 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/serialization/BakerTypedProtobufSerializer.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/serialization/BakerTypedProtobufSerializer.scala @@ -147,6 +147,8 @@ object BakerTypedProtobufSerializer { .register("ProcessInstanceProtocol.MetaDataAdded"), forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFired] .register("TransitionFired")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFired)), + forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailedWithOutput] + .register("TransitionFailedWithOutput")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailedWithOutput)), forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailed] .register("TransitionFailed")(ProtoMap.identityProtoMap(com.ing.baker.runtime.akka.actor.process_instance.protobuf.TransitionFailed)), forType[com.ing.baker.runtime.akka.actor.process_instance.protobuf.Initialized] diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala index eca0c9309..bcf37ff86 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala @@ -1182,6 +1182,34 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { "interactionOneOriginalIngredient" -> "success!") } + + "retry a blocked interaction after it had the FireEvent retry strategy" in { + val recipe = + Recipe("RetryBlockedInteractionRecipe") + .withInteraction(interactionOne + .withFailureStrategy(InteractionFailureStrategy.FireEventAfterFailure(Some("interactionOneSuccessful")))) + .withSensoryEvent(initialEvent) + + for { + (baker, recipeId) <- setupBakerWithRecipe(recipe, mockImplementations) + _ = when(testInteractionOneMock.apply(anyString(), anyString())) + .thenThrow(new RuntimeException("Expected test failure")) + .thenReturn(Future.successful(InteractionOneSuccessful("success!"))) + recipeInstanceId = UUID.randomUUID().toString + _ <- baker.bake(recipeId, recipeInstanceId) + _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue))) + state0 <- baker.getRecipeInstanceState(recipeInstanceId) + _ = state0.ingredients shouldBe + ingredientMap( + "initialIngredient" -> initialIngredientValue) + _ <- baker.retryInteraction(recipeInstanceId, interactionOne.name) + state <- baker.getRecipeInstanceState(recipeInstanceId) + } yield state.ingredients shouldBe + ingredientMap( + "initialIngredient" -> initialIngredientValue, + "interactionOneOriginalIngredient" -> "success!") + } + "be able to return" when { "all occurred events" in { for { diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala index 915502b91..d39f3e7a7 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala @@ -106,7 +106,11 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance } yield output -> enabledExecutions case Left(ExceptionStrategyOutcome.Continue(eventName)) => - handleExecutionOutcome(finishedExecution)(Right(Some(EventInstance(eventName, Map.empty)))) + val output: EventInstance = EventInstance(eventName, Map.empty) + for { + enabledExecutions <- state.modify(_.recordFailedWithOutputExecution(finishedExecution, Some(output))) + _ <- scheduleIdleStop + } yield Some(output) -> enabledExecutions case Left(strategy @ ExceptionStrategyOutcome.BlockTransition) => state diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala index d59794487..431846c1d 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala @@ -75,6 +75,14 @@ case class RecipeInstanceState( def recordFailedExecution(transitionExecution: TransitionExecution, exceptionStrategy: ExceptionStrategyOutcome): RecipeInstanceState = addExecution(transitionExecution.toFailedState(exceptionStrategy)) + def recordFailedWithOutputExecution(transitionExecution: TransitionExecution, output: Option[EventInstance]): (RecipeInstanceState, Set[TransitionExecution]) = + aggregateOutputEvent(output) + .increaseSequenceNumber + .aggregatePetriNetChanges(transitionExecution, output) + .addCompletedCorrelationId(transitionExecution) + .addExecution(transitionExecution.copy(state = TransitionExecution.State.Failed(transitionExecution.failureCount, ExceptionStrategyOutcome.BlockTransition))) + .allEnabledExecutions + def recordCompletedExecution(transitionExecution: TransitionExecution, output: Option[EventInstance]): (RecipeInstanceState, Set[TransitionExecution]) = aggregateOutputEvent(output) .increaseSequenceNumber