Skip to content

Commit

Permalink
RecipeInstanceMetadata and RecipeInstanceEventList as ingredient inpu…
Browse files Browse the repository at this point in the history
…t for interactions (#1643)

* Added extra logging if a recipe is not found during deletion of a process.

* Set the default retention period to 14 days for deletion if the recipe is not found.

* Changed the behaviour of the FireEvent retry strategy. It now blocks the interaction instead of resolving it. This allows the interaction to be retried with the retyInteraction method.

* Re-added removed tests.

* Added the RecipeInstanceMetadata as input ingredient.

* Added a new Baker Event to represent internal interaction output events, checkpoint events and delayed transition events.

* Added the RecipeInstanceEventList as a input ingredient.

* Made the RecipeInstanceMetaData an internal concept instead of putting it in the ingredients and needing to transform it every time.

* Added validations that the Baker internal ingredients are not provided in the recipe.

* Filter out the CheckPoint event interactions from the visualisation
  • Loading branch information
Tim-Linschoten committed Mar 19, 2024
1 parent 3982239 commit 209b5ec
Show file tree
Hide file tree
Showing 47 changed files with 636 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol.RecipeFound
import com.ing.baker.runtime.akka.internal.CachingInteractionManager
import com.ing.baker.runtime.common.BakerException._
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName
import com.ing.baker.runtime.common.{InteractionExecutionFailureReason, RecipeRecord, SensoryEventStatus}
import com.ing.baker.runtime.model.recipeinstance.RecipeInstanceState.getMetaDataFromIngredients
import com.ing.baker.runtime.recipe_manager.{ActorBasedRecipeManager, DefaultRecipeManager, RecipeManager}
Expand Down Expand Up @@ -184,9 +184,8 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None))))
case Some(interactionInstance) =>
interactionInstance.execute(
ingredients.filter(ingredientInstance => ingredientInstance.name != RecipeInstanceMetaDataName),
getMetaDataFromIngredients(ingredients)
.getOrElse(Map.empty))
ingredients.filter(ingredientInstance => ingredientInstance.name != RecipeInstanceMetadataName),
getMetaDataFromIngredients(ingredients).getOrElse(Map.empty))
.map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess))))
.recover {
case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.ing.baker.runtime.akka.actor.logging

import akka.event.EventStream
import com.ing.baker.il.petrinet.Transition
import com.ing.baker.runtime.common.{EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated}
import com.ing.baker.runtime.common.{EventFired, EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated}
import com.ing.baker.runtime.model.BakerLogging

object LogAndSendEvent {
Expand Down Expand Up @@ -46,14 +46,9 @@ object LogAndSendEvent {
bakerLogging.eventRejected(eventRejected)
}

def firingEvent(recipeInstanceId: String,
recipeId: String,
recipeName: String,
executionId: Long,
transition: Transition,
timeStarted: Long): Unit = {
//TODO This does not have a corrosponding BakerEvent, this should be created
bakerLogging.firingEvent(recipeInstanceId, recipeId, recipeName, executionId, transition, timeStarted)
def eventFired(eventFired: EventFired,
eventStream: EventStream): Unit = {
eventStream.publish(eventFired)
bakerLogging.eventFired(eventFired)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
persistWithSnapshot(ActorCreated(recipeId, recipeInstanceId, createdTime)) { _ =>

// after that we actually create the ProcessInstance actor
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)
val initializeCmd = Initialize(compiledRecipe.initialMarking, processState)

//TODO ensure the initialiseCMD is accepted before we add it ot the index
Expand All @@ -405,7 +405,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
//Temporary solution for the situation that the initializeCmd is not send in the original Bake
getCompiledRecipe(recipeId) match {
case Some(compiledRecipe) =>
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)
val initializeCmd = Initialize(compiledRecipe.initialMarking, processState)
createProcessActor(recipeInstanceId, compiledRecipe) ! initializeCmd
sender() ! ProcessAlreadyExists(recipeInstanceId)
Expand All @@ -417,7 +417,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
//Temporary solution for the situation that the initializeCmd is not send in the original Bake
getCompiledRecipe(recipeId) match {
case Some(compiledRecipe) =>
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)
val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)
val initializeCmd = Initialize(compiledRecipe.initialMarking, processState)
actorRef ! initializeCmd
sender() ! ProcessAlreadyExists(recipeInstanceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import akka.cluster.sharding.ShardRegion.Passivate
import akka.event.{DiagnosticLoggingAdapter, Logging}
import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess}
import cats.effect.IO
import com.ing.baker.il.failurestrategy.{BlockInteraction, FireEventAfterFailure, RetryWithIncrementalBackoff}
import com.ing.baker.il.petrinet.{EventTransition, InteractionTransition, Place, Transition}
import com.ing.baker.il.{CompiledRecipe, EventDescriptor, checkpointEventInteractionPrefix}
import com.ing.baker.il.failurestrategy.{BlockInteraction, FireEventAfterFailure, InteractionFailureStrategy, RetryWithIncrementalBackoff}
import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka.actor.delayed_transition_actor.DelayedTransitionActorProtocol.{FireDelayedTransitionAck, ScheduleDelayedTransition}
import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.FireSensoryEventRejection
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstance._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceEventSourcing._
Expand All @@ -17,18 +19,16 @@ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol
import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{Continue, RetryWithDelay}
import com.ing.baker.runtime.akka.actor.process_instance.internal._
import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstanceProtocol => protocol}
import com.ing.baker.runtime.akka.actor.delayed_transition_actor.DelayedTransitionActorProtocol.{FireDelayedTransitionAck, ScheduleDelayedTransition}
import com.ing.baker.runtime.akka.internal.{FatalInteractionException, RecipeRuntime}
import com.ing.baker.runtime.model.BakerLogging
import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, RecipeInstanceState}
import com.ing.baker.runtime.scaladsl.{EventFired, EventInstance, IngredientInstance, RecipeInstanceState}
import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.{FromValue, PrimitiveValue, Value}

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.existentials
import scala.reflect.runtime.universe
import scala.util.Try

object ProcessInstance {
Expand Down Expand Up @@ -356,7 +356,10 @@ class ProcessInstance[S, E](
).marshall
val internalEvent = ProcessInstanceEventSourcing.DelayedTransitionFired(jobId, transitionId, produced, out)

log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition.asInstanceOf[Transition], jobId, System.currentTimeMillis(), System.currentTimeMillis())
val timestamp = System.currentTimeMillis()
log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition.asInstanceOf[Transition], jobId, timestamp, timestamp)

LogAndSendEvent.eventFired(EventFired(timestamp, compiledRecipe.name, compiledRecipe.recipeId, recipeInstanceId, out), context.system.eventStream)

persistEvent(instance, internalEvent)(
eventSource.apply(instance)
Expand Down Expand Up @@ -570,13 +573,17 @@ class ProcessInstance[S, E](
def executeJob(job: Job[S], originalSender: ActorRef): Unit = {
log.fireTransition(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
job.transition match {
case _: EventTransition =>
case eventTransition: EventTransition =>
BakerLogging.default.firingEvent(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis())
executeJobViaExecutor(job, originalSender)
case i: InteractionTransition if isDelayedInteraction(i) =>
startDelayedTransition(i, job, originalSender)
case i: InteractionTransition if isCheckpointEventInteraction(i) =>
val event = jobToFinishedInteraction(job, i.eventsToFire.head.name)
val event: TransitionFiredEvent = jobToFinishedInteraction(job, i.eventsToFire.head.name)

val currentTime = System.currentTimeMillis()
LogAndSendEvent.eventFired(EventFired(currentTime, compiledRecipe.name, compiledRecipe.recipeId, recipeInstanceId, event.output.asInstanceOf[EventInstance]), context.system.eventStream)

self.tell(event, originalSender)
case _ => executeJobViaExecutor(job, originalSender)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import com.ing.baker.petrinet.api._
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceEventSourcing.Event
import com.ing.baker.runtime.akka.actor.process_instance.internal.{ExceptionState, ExceptionStrategy, Instance, Job}
import com.ing.baker.runtime.akka.actor.serialization.AkkaSerializerProvider
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName
import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName
import com.ing.baker.runtime.scaladsl.RecipeInstanceState
import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.{CharArray, MapType, Value}
Expand Down Expand Up @@ -117,11 +117,9 @@ object ProcessInstanceEventSourcing extends LazyLogging {
case e: TransitionFailedWithOutputEvent =>
val transition = instance.petriNet.transitions.getById(e.transitionId)
val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E])
// We only have the consumed markings for the job
val consumed: Marking[Place] = e.consumed.unmarshall(instance.petriNet.places)
val produced: Marking[Place] = e.produced.unmarshall(instance.petriNet.places)

//TODO determine what to do with the job not found situation
val job = instance.jobs.getOrElse(e.jobId, {
Job[S](e.jobId, e.correlationId, instance.state, transition, consumed, null, None)
})
Expand Down Expand Up @@ -175,24 +173,11 @@ object ProcessInstanceEventSourcing extends LazyLogging {
case e: MetaDataAdded =>
val newState: S = instance.state match {
case state: RecipeInstanceState =>
val newBakerMetaData: Map[String, String] =
state.ingredients.get(RecipeInstanceMetaDataName) match {
case Some(value) =>
if (value.isInstanceOf(MapType(CharArray))) {
val oldMetaData: Map[String, String] = value.asMap[String, String](classOf[String], classOf[String]).asScala.toMap
oldMetaData ++ e.metaData
}
else {
//If the old metadata is not of Type[String, String] we overwrite it since this is not allowed.
logger.info("Old RecipeInstanceMetaData was not of type Map[String, String]")
e.metaData
}
case None =>
e.metaData
}
val newIngredients: Map[String, Value] =
state.ingredients + (RecipeInstanceMetaDataName -> com.ing.baker.types.Converters.toValue(newBakerMetaData))
state.copy(ingredients = newIngredients).asInstanceOf[S]
val newRecipeInstanceMetaData: Map[String, String] = state.recipeInstanceMetadata ++ e.metaData
//We still add an ingredient for the metaData since this makes it easier to use it during interaction execution
val newIngredients: Map[String, Value] = state.ingredients +
(RecipeInstanceMetadataName -> com.ing.baker.types.Converters.toValue(newRecipeInstanceMetaData))
state.copy(ingredients = newIngredients, recipeInstanceMetadata = newRecipeInstanceMetaData).asInstanceOf[S]
case state =>
state
}
Expand Down
Loading

0 comments on commit 209b5ec

Please sign in to comment.