diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala index ed958768c..9ec21639a 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala @@ -14,7 +14,7 @@ import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol.RecipeFound import com.ing.baker.runtime.akka.internal.CachingInteractionManager import com.ing.baker.runtime.common.BakerException._ -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName import com.ing.baker.runtime.common.{InteractionExecutionFailureReason, RecipeRecord, SensoryEventStatus} import com.ing.baker.runtime.model.recipeinstance.RecipeInstanceState.getMetaDataFromIngredients import com.ing.baker.runtime.recipe_manager.{ActorBasedRecipeManager, DefaultRecipeManager, RecipeManager} @@ -184,9 +184,8 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker InteractionExecutionFailureReason.INTERACTION_NOT_FOUND, None, None)))) case Some(interactionInstance) => interactionInstance.execute( - ingredients.filter(ingredientInstance => ingredientInstance.name != RecipeInstanceMetaDataName), - getMetaDataFromIngredients(ingredients) - .getOrElse(Map.empty)) + ingredients.filter(ingredientInstance => ingredientInstance.name != RecipeInstanceMetadataName), + getMetaDataFromIngredients(ingredients).getOrElse(Map.empty)) .map(executionSuccess => InteractionExecutionResult(Right(InteractionExecutionResult.Success(executionSuccess)))) .recover { case e => InteractionExecutionResult(Left(InteractionExecutionResult.Failure( diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala index e11e6fd14..49ab1d855 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/logging/LogAndSendEvent.scala @@ -2,7 +2,7 @@ package com.ing.baker.runtime.akka.actor.logging import akka.event.EventStream import com.ing.baker.il.petrinet.Transition -import com.ing.baker.runtime.common.{EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated} +import com.ing.baker.runtime.common.{EventFired, EventReceived, EventRejected, InteractionCompleted, InteractionFailed, InteractionStarted, RecipeAdded, RecipeInstanceCreated} import com.ing.baker.runtime.model.BakerLogging object LogAndSendEvent { @@ -46,14 +46,9 @@ object LogAndSendEvent { bakerLogging.eventRejected(eventRejected) } - def firingEvent(recipeInstanceId: String, - recipeId: String, - recipeName: String, - executionId: Long, - transition: Transition, - timeStarted: Long): Unit = { - //TODO This does not have a corrosponding BakerEvent, this should be created - bakerLogging.firingEvent(recipeInstanceId, recipeId, recipeName, executionId, transition, timeStarted) + def eventFired(eventFired: EventFired, + eventStream: EventStream): Unit = { + eventStream.publish(eventFired) + bakerLogging.eventFired(eventFired) } - } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala index e1bf3a4f0..13967ca52 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala @@ -379,7 +379,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration], persistWithSnapshot(ActorCreated(recipeId, recipeInstanceId, createdTime)) { _ => // after that we actually create the ProcessInstance actor - val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty) + val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty) val initializeCmd = Initialize(compiledRecipe.initialMarking, processState) //TODO ensure the initialiseCMD is accepted before we add it ot the index @@ -405,7 +405,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration], //Temporary solution for the situation that the initializeCmd is not send in the original Bake getCompiledRecipe(recipeId) match { case Some(compiledRecipe) => - val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty) + val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty) val initializeCmd = Initialize(compiledRecipe.initialMarking, processState) createProcessActor(recipeInstanceId, compiledRecipe) ! initializeCmd sender() ! ProcessAlreadyExists(recipeInstanceId) @@ -417,7 +417,7 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration], //Temporary solution for the situation that the initializeCmd is not send in the original Bake getCompiledRecipe(recipeId) match { case Some(compiledRecipe) => - val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty) + val processState = RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty) val initializeCmd = Initialize(compiledRecipe.initialMarking, processState) actorRef ! initializeCmd sender() ! ProcessAlreadyExists(recipeInstanceId) diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala index 720df06d2..c729e059d 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala @@ -5,10 +5,12 @@ import akka.cluster.sharding.ShardRegion.Passivate import akka.event.{DiagnosticLoggingAdapter, Logging} import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess} import cats.effect.IO +import com.ing.baker.il.failurestrategy.{BlockInteraction, FireEventAfterFailure, RetryWithIncrementalBackoff} import com.ing.baker.il.petrinet.{EventTransition, InteractionTransition, Place, Transition} import com.ing.baker.il.{CompiledRecipe, EventDescriptor, checkpointEventInteractionPrefix} -import com.ing.baker.il.failurestrategy.{BlockInteraction, FireEventAfterFailure, InteractionFailureStrategy, RetryWithIncrementalBackoff} import com.ing.baker.petrinet.api._ +import com.ing.baker.runtime.akka.actor.delayed_transition_actor.DelayedTransitionActorProtocol.{FireDelayedTransitionAck, ScheduleDelayedTransition} +import com.ing.baker.runtime.akka.actor.logging.LogAndSendEvent import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.FireSensoryEventRejection import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstance._ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceEventSourcing._ @@ -17,10 +19,9 @@ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol import com.ing.baker.runtime.akka.actor.process_instance.internal.ExceptionStrategy.{Continue, RetryWithDelay} import com.ing.baker.runtime.akka.actor.process_instance.internal._ import com.ing.baker.runtime.akka.actor.process_instance.{ProcessInstanceProtocol => protocol} -import com.ing.baker.runtime.akka.actor.delayed_transition_actor.DelayedTransitionActorProtocol.{FireDelayedTransitionAck, ScheduleDelayedTransition} import com.ing.baker.runtime.akka.internal.{FatalInteractionException, RecipeRuntime} import com.ing.baker.runtime.model.BakerLogging -import com.ing.baker.runtime.scaladsl.{EventInstance, IngredientInstance, RecipeInstanceState} +import com.ing.baker.runtime.scaladsl.{EventFired, EventInstance, IngredientInstance, RecipeInstanceState} import com.ing.baker.runtime.serialization.Encryption import com.ing.baker.types.{FromValue, PrimitiveValue, Value} @@ -28,7 +29,6 @@ import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.existentials -import scala.reflect.runtime.universe import scala.util.Try object ProcessInstance { @@ -356,7 +356,10 @@ class ProcessInstance[S, E]( ).marshall val internalEvent = ProcessInstanceEventSourcing.DelayedTransitionFired(jobId, transitionId, produced, out) - log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition.asInstanceOf[Transition], jobId, System.currentTimeMillis(), System.currentTimeMillis()) + val timestamp = System.currentTimeMillis() + log.transitionFired(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, transition.asInstanceOf[Transition], jobId, timestamp, timestamp) + + LogAndSendEvent.eventFired(EventFired(timestamp, compiledRecipe.name, compiledRecipe.recipeId, recipeInstanceId, out), context.system.eventStream) persistEvent(instance, internalEvent)( eventSource.apply(instance) @@ -570,13 +573,17 @@ class ProcessInstance[S, E]( def executeJob(job: Job[S], originalSender: ActorRef): Unit = { log.fireTransition(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis()) job.transition match { - case _: EventTransition => + case eventTransition: EventTransition => BakerLogging.default.firingEvent(recipeInstanceId, compiledRecipe.recipeId, compiledRecipe.name, job.id, job.transition.asInstanceOf[Transition], System.currentTimeMillis()) executeJobViaExecutor(job, originalSender) case i: InteractionTransition if isDelayedInteraction(i) => startDelayedTransition(i, job, originalSender) case i: InteractionTransition if isCheckpointEventInteraction(i) => - val event = jobToFinishedInteraction(job, i.eventsToFire.head.name) + val event: TransitionFiredEvent = jobToFinishedInteraction(job, i.eventsToFire.head.name) + + val currentTime = System.currentTimeMillis() + LogAndSendEvent.eventFired(EventFired(currentTime, compiledRecipe.name, compiledRecipe.recipeId, recipeInstanceId, event.output.asInstanceOf[EventInstance]), context.system.eventStream) + self.tell(event, originalSender) case _ => executeJobViaExecutor(job, originalSender) } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala index 9571cd039..916dc9b55 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala @@ -11,7 +11,7 @@ import com.ing.baker.petrinet.api._ import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceEventSourcing.Event import com.ing.baker.runtime.akka.actor.process_instance.internal.{ExceptionState, ExceptionStrategy, Instance, Job} import com.ing.baker.runtime.akka.actor.serialization.AkkaSerializerProvider -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName import com.ing.baker.runtime.scaladsl.RecipeInstanceState import com.ing.baker.runtime.serialization.Encryption import com.ing.baker.types.{CharArray, MapType, Value} @@ -117,11 +117,9 @@ object ProcessInstanceEventSourcing extends LazyLogging { case e: TransitionFailedWithOutputEvent => val transition = instance.petriNet.transitions.getById(e.transitionId) val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E]) -// We only have the consumed markings for the job val consumed: Marking[Place] = e.consumed.unmarshall(instance.petriNet.places) val produced: Marking[Place] = e.produced.unmarshall(instance.petriNet.places) - //TODO determine what to do with the job not found situation val job = instance.jobs.getOrElse(e.jobId, { Job[S](e.jobId, e.correlationId, instance.state, transition, consumed, null, None) }) @@ -175,24 +173,11 @@ object ProcessInstanceEventSourcing extends LazyLogging { case e: MetaDataAdded => val newState: S = instance.state match { case state: RecipeInstanceState => - val newBakerMetaData: Map[String, String] = - state.ingredients.get(RecipeInstanceMetaDataName) match { - case Some(value) => - if (value.isInstanceOf(MapType(CharArray))) { - val oldMetaData: Map[String, String] = value.asMap[String, String](classOf[String], classOf[String]).asScala.toMap - oldMetaData ++ e.metaData - } - else { - //If the old metadata is not of Type[String, String] we overwrite it since this is not allowed. - logger.info("Old RecipeInstanceMetaData was not of type Map[String, String]") - e.metaData - } - case None => - e.metaData - } - val newIngredients: Map[String, Value] = - state.ingredients + (RecipeInstanceMetaDataName -> com.ing.baker.types.Converters.toValue(newBakerMetaData)) - state.copy(ingredients = newIngredients).asInstanceOf[S] + val newRecipeInstanceMetaData: Map[String, String] = state.recipeInstanceMetadata ++ e.metaData + //We still add an ingredient for the metaData since this makes it easier to use it during interaction execution + val newIngredients: Map[String, Value] = state.ingredients + + (RecipeInstanceMetadataName -> com.ing.baker.types.Converters.toValue(newRecipeInstanceMetaData)) + state.copy(ingredients = newIngredients, recipeInstanceMetadata = newRecipeInstanceMetaData).asInstanceOf[S] case state => state } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala index 44b95e86c..59370c86d 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/internal/RecipeRuntime.scala @@ -1,6 +1,5 @@ package com.ing.baker.runtime.akka.internal -import java.lang.reflect.InvocationTargetException import akka.event.EventStream import cats.effect.{ContextShift, IO} import com.ing.baker.il @@ -16,9 +15,9 @@ import com.ing.baker.runtime.akka.actor.process_instance.internal._ import com.ing.baker.runtime.akka.internal.RecipeRuntime._ import com.ing.baker.runtime.model.InteractionManager import com.ing.baker.runtime.scaladsl._ -import com.ing.baker.types.{PrimitiveValue, Value} -import org.slf4j.MDC +import com.ing.baker.types.{ListValue, PrimitiveValue, RecordValue, Value} +import java.lang.reflect.InvocationTargetException import scala.collection.immutable.{Map, Seq} import scala.concurrent.ExecutionContext @@ -95,11 +94,24 @@ object RecipeRuntime { def createInteractionInput(interaction: InteractionTransition, state: RecipeInstanceState): Seq[IngredientInstance] = { // the process id is a special ingredient that is always available - val recipeInstanceId: (String, Value) = il.recipeInstanceIdName -> PrimitiveValue(state.recipeInstanceId.toString) - val processId: (String, Value) = il.processIdName -> PrimitiveValue(state.recipeInstanceId.toString) + val recipeInstanceId: (String, Value) = il.recipeInstanceIdName -> PrimitiveValue(state.recipeInstanceId) + + // Only map the recipeInstanceEventList if is it required, otherwise give an empty list + val recipeInstanceEventList: (String, Value) = + if(interaction.requiredIngredients.exists(_.name == il.recipeInstanceEventListName)) + il.recipeInstanceEventListName -> ListValue(state.events.map(e => PrimitiveValue(e.name)).toList) + else + il.recipeInstanceEventListName -> ListValue(List()) + + val processId: (String, Value) = il.processIdName -> PrimitiveValue(state.recipeInstanceId) // a map of all ingredients, the order is important, the predefined parameters and recipeInstanceId have precedence over the state ingredients. - val allIngredients: Map[String, Value] = state.ingredients ++ interaction.predefinedParameters + recipeInstanceId + processId + val allIngredients: Map[String, Value] = + state.ingredients ++ + interaction.predefinedParameters + + recipeInstanceId + + processId + + recipeInstanceEventList // arranges the ingredients in the expected order interaction.requiredIngredients.map { @@ -124,6 +136,7 @@ object RecipeRuntime { class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManager[IO], eventStream: EventStream)(implicit ec: ExecutionContext) extends ProcessInstanceRuntime[RecipeInstanceState, EventInstance] { protected implicit lazy val contextShift: ContextShift[IO] = IO.contextShift(ec) + /** * All transitions except sensory event interactions are auto-fireable by the runtime */ @@ -179,67 +192,61 @@ class RecipeRuntime(recipe: CompiledRecipe, interactionManager: InteractionManag override def transitionTask(petriNet: PetriNet, t: Transition)(marking: Marking[Place], state: RecipeInstanceState, input: Any): IO[(Marking[Place], EventInstance)] = t match { case interaction: InteractionTransition => interactionTask(interaction, petriNet.outMarking(t), state) - case t: EventTransition => IO.pure(petriNet.outMarking(t).toMarking, input.asInstanceOf[EventInstance]) + case eventTransition: EventTransition => + if(input != null) { + // Send EventFired for SensoryEvents + LogAndSendEvent.eventFired(EventFired(System.currentTimeMillis(), recipe.name, recipe.recipeId, state.recipeInstanceId, input.asInstanceOf[EventInstance]), eventStream) + } + IO.pure(petriNet.outMarking(eventTransition).toMarking, input.asInstanceOf[EventInstance]) case t => IO.pure(petriNet.outMarking(t).toMarking, null.asInstanceOf[EventInstance]) } def interactionTask(interaction: InteractionTransition, outAdjacent: MultiSet[Place], processState: RecipeInstanceState): IO[(Marking[Place], EventInstance)] = { - // returns a delayed task that will get executed by the process instance - // add MDC values for logging - MDC.put("recipeInstanceId", processState.recipeInstanceId) - MDC.put("recipeId", recipe.recipeId) - MDC.put("recipeName", recipe.name) - - try { - // create the interaction input - val input = createInteractionInput(interaction, processState) + // create the interaction input + val input = createInteractionInput(interaction, processState) - val timeStarted = System.currentTimeMillis() + val timeStarted = System.currentTimeMillis() - // publish the fact that we started the interaction - LogAndSendEvent.interactionStarted(InteractionStarted(timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName), eventStream) + // publish the fact that we started the interaction + LogAndSendEvent.interactionStarted(InteractionStarted(timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName), eventStream) - // executes the interaction and obtain the (optional) output event - interactionManager.execute(interaction, input, - com.ing.baker.runtime.model.recipeinstance.RecipeInstanceState.getMetaDataFromIngredients(processState.ingredients)).map { interactionOutput => + // executes the interaction and obtain the (optional) output event + interactionManager.execute(interaction, input, Some(processState.recipeInstanceMetadata)).map { interactionOutput => - // validates the event, throws a FatalInteraction exception if invalid - RecipeRuntime.validateInteractionOutput(interaction, interactionOutput).foreach { validationError => - throw new FatalInteractionException(validationError) - } - - // transform the event if there is one - val outputEvent: Option[EventInstance] = interactionOutput - .map(e => transformInteractionEvent(interaction, e)) + // validates the event, throws a FatalInteraction exception if invalid + RecipeRuntime.validateInteractionOutput(interaction, interactionOutput).foreach { validationError => + throw new FatalInteractionException(validationError) + } - val timeCompleted = System.currentTimeMillis() + // transform the event if there is one + val outputEvent: Option[EventInstance] = interactionOutput + .map(e => transformInteractionEvent(interaction, e)) - // publish the fact that the interaction completed - LogAndSendEvent.interactionCompleted(InteractionCompleted(timeCompleted, timeCompleted - timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName, outputEvent), eventStream) + val timeCompleted = System.currentTimeMillis() - // create the output marking for the petri net - val outputMarking: Marking[Place] = RecipeRuntime.createProducedMarking(outAdjacent, outputEvent) + // publish the fact that the interaction completed + LogAndSendEvent.interactionCompleted(InteractionCompleted(timeCompleted, timeCompleted - timeStarted, recipe.name, recipe.recipeId, processState.recipeInstanceId, interaction.interactionName, outputEvent), eventStream) - val reproviderMarkings: Marking[Place] = if (interaction.isReprovider) { - outAdjacent.toMarking.filter((input: (Place, MultiSet[Any])) => input._1.placeType == IngredientPlace) - } else Map.empty + // create the output marking for the petri net + val outputMarking: Marking[Place] = RecipeRuntime.createProducedMarking(outAdjacent, outputEvent) - (outputMarking ++ reproviderMarkings, outputEvent.orNull) - } - - } finally { - // remove the MDC values - MDC.remove("recipeInstanceId") - MDC.remove("recipeId") - MDC.remove("recipeName") + outputEvent.foreach { event: EventInstance => + // Send EventFired for Interaction output events + LogAndSendEvent.eventFired(EventFired(timeCompleted, recipe.name, recipe.recipeId, processState.recipeInstanceId, event), eventStream) } - } handleErrorWith { - case e: InvocationTargetException => IO.raiseError(e.getCause) - case e: Throwable => IO.raiseError(e) + val reproviderMarkings: Marking[Place] = if (interaction.isReprovider) { + outAdjacent.toMarking.filter((input: (Place, MultiSet[Any])) => input._1.placeType == IngredientPlace) + } else Map.empty + + (outputMarking ++ reproviderMarkings, outputEvent.orNull) } + } handleErrorWith { + case e: InvocationTargetException => IO.raiseError(e.getCause) + case e: Throwable => IO.raiseError(e) + } } diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/BakerRuntimeTestBase.scala b/core/akka-runtime/src/test/scala/com/ing/baker/BakerRuntimeTestBase.scala index 440643758..234edc635 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/BakerRuntimeTestBase.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/BakerRuntimeTestBase.scala @@ -81,6 +81,8 @@ trait BakerRuntimeTestBase ) protected val testInteractionOneMock: InteractionOne = mock[InteractionOne] + protected val testInteractionOneWithMetaDataMock: InteractionOneWithMetaData = mock[InteractionOneWithMetaData] + protected val testInteractionOneWithEventListMock: InteractionOneWithEventList = mock[InteractionOneWithEventList] protected val testInteractionTwoMock: InteractionTwo = mock[InteractionTwo] protected val testInteractionThreeMock: InteractionThree = mock[InteractionThree] protected val testInteractionFourMock: InteractionFour = mock[InteractionFour] @@ -98,6 +100,8 @@ trait BakerRuntimeTestBase protected val mockImplementations: List[InteractionInstance] = List( testInteractionOneMock, + testInteractionOneWithMetaDataMock, + testInteractionOneWithEventListMock, testInteractionTwoMock, testInteractionThreeMock, testInteractionFourMock, diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerEventsSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerEventsSpec.scala index 87afe7af7..f0d14e548 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerEventsSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerEventsSpec.scala @@ -1,16 +1,17 @@ package com.ing.baker.runtime.akka import akka.actor.ActorRef -import akka.persistence.inmemory.extension.{ InMemoryJournalStorage, StorageExtension } +import akka.persistence.inmemory.extension.{InMemoryJournalStorage, StorageExtension} import akka.testkit.TestProbe import com.ing.baker._ import com.ing.baker.recipe.TestRecipe._ import com.ing.baker.recipe.common.InteractionFailureStrategy -import com.ing.baker.recipe.scaladsl.Recipe +import com.ing.baker.recipe.scaladsl.{CheckPointEvent, Event, Recipe} import com.ing.baker.runtime.common.RejectReason._ -import com.ing.baker.runtime.scaladsl.{ EventInstance, _ } +import com.ing.baker.runtime.scaladsl.{EventInstance, _} import com.ing.baker.types.PrimitiveValue import com.typesafe.scalalogging.LazyLogging + import java.util.UUID import scala.concurrent.Future import scala.concurrent.duration._ @@ -57,6 +58,8 @@ object BakerEventsSpec extends LazyLogging { providesNothingInteraction, sieveInteraction ) + .withCheckpointEvent(CheckPointEvent("CheckPointEvent") + .withRequiredEvents(Event[InteractionOneSuccessful], Event[EventFromInteractionTwo], Event[InteractionThreeSuccessful])) .withSensoryEvents( initialEvent.withMaxFiringLimit(1), initialEventExtendedName, @@ -97,18 +100,24 @@ class BakerEventsSpec extends BakerRuntimeTestBase { _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue)), "someId") // TODO check the order of the timestamps later _ = expectMsgInAnyOrderPF(listenerProbe, - {case msg@RecipeInstanceCreated(_, `recipeId`, `recipeName`, `recipeInstanceId`) => msg}, - {case msg@EventReceived(_, _, _, `recipeInstanceId`, Some("someId"), EventInstance("InitialEvent", ingredients)) if ingredients == Map("initialIngredient" -> PrimitiveValue(`initialIngredientValue`)) => msg}, - {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "SieveInteraction") => msg}, - {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionOne") => msg}, - {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionTwo") => msg}, - {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionThree") => msg}, - {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "ProvidesNothingInteraction") => msg}, - {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionOne", Some(EventInstance("InteractionOneSuccessful", ingredients))) if ingredients == Map("interactionOneIngredient" -> PrimitiveValue("interactionOneIngredient")) => msg}, - {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionTwo", Some(EventInstance("EventFromInteractionTwo", ingredients))) if ingredients == Map("interactionTwoIngredient" -> PrimitiveValue("interactionTwoIngredient")) => msg}, - {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionThree", Some(EventInstance("InteractionThreeSuccessful", ingredients))) if ingredients == Map("interactionThreeIngredient" -> PrimitiveValue("interactionThreeIngredient")) => msg}, - {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "ProvidesNothingInteraction", None) => msg}, - {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "SieveInteraction", Some(EventInstance("SieveInteractionSuccessful", ingredients))) if ingredients == Map("sievedIngredient" -> PrimitiveValue("sievedIngredient")) => msg} + {case msg@RecipeInstanceCreated(_, `recipeId`, `recipeName`, `recipeInstanceId`) => msg}, + {case msg@EventReceived(_, _, _, `recipeInstanceId`, Some("someId"), EventInstance("InitialEvent", ingredients)) if ingredients == Map("initialIngredient" -> PrimitiveValue(`initialIngredientValue`)) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("InitialEvent", ingredients)) => msg}, + {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "SieveInteraction") => msg}, + {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionOne") => msg}, + {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionTwo") => msg}, + {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "InteractionThree") => msg}, + {case msg@InteractionStarted(_, _, _, `recipeInstanceId`, "ProvidesNothingInteraction") => msg}, + {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionOne", Some(EventInstance("InteractionOneSuccessful", ingredients))) if ingredients == Map("interactionOneIngredient" -> PrimitiveValue("interactionOneIngredient")) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("InteractionOneSuccessful", _)) => msg}, + {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionTwo", Some(EventInstance("EventFromInteractionTwo", ingredients))) if ingredients == Map("interactionTwoIngredient" -> PrimitiveValue("interactionTwoIngredient")) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("EventFromInteractionTwo", ingredients)) => msg}, + {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "InteractionThree", Some(EventInstance("InteractionThreeSuccessful", ingredients))) if ingredients == Map("interactionThreeIngredient" -> PrimitiveValue("interactionThreeIngredient")) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("InteractionThreeSuccessful", _)) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("CheckPointEvent", _)) => msg}, + {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "ProvidesNothingInteraction", None) => msg}, + {case msg@InteractionCompleted(_, _, _, _, `recipeInstanceId`, "SieveInteraction", Some(EventInstance("SieveInteractionSuccessful", ingredients))) if ingredients == Map("sievedIngredient" -> PrimitiveValue("sievedIngredient")) => msg}, + {case msg@EventFired(_, _, _, `recipeInstanceId`, EventInstance("SieveInteractionSuccessful", ingredients)) => msg} ) _ = listenerProbe.expectNoMessage(eventReceiveTimeout) } yield succeed diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala index bcf37ff86..b93e1aac3 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/BakerExecutionSpec.scala @@ -13,16 +13,16 @@ import com.ing.baker.compiler.RecipeCompiler import com.ing.baker.recipe.TestRecipe._ import com.ing.baker.recipe.common.InteractionFailureStrategy import com.ing.baker.recipe.common.InteractionFailureStrategy.FireEventAfterFailure -import com.ing.baker.recipe.scaladsl.{Event, Ingredient, Interaction, Recipe, CheckPointEvent} +import com.ing.baker.recipe.scaladsl.{CheckPointEvent, Event, Ingredient, Interaction, Recipe} import com.ing.baker.runtime.akka.internal.CachingInteractionManager import com.ing.baker.runtime.common.BakerException._ -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName import com.ing.baker.runtime.common._ import com.ing.baker.runtime.scaladsl.{Baker, EventInstance, InteractionInstance, InteractionInstanceInput, RecipeEventMetadata} import com.ing.baker.types.{CharArray, Int32, PrimitiveValue, Value} import com.typesafe.config.{Config, ConfigFactory} import io.prometheus.client.CollectorRegistry -import org.mockito.ArgumentMatchers.{any, anyString, argThat, eq => mockitoEq} +import org.mockito.ArgumentMatchers.{any, anyMap, anyString, argThat, eq => mockitoEq} import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer @@ -144,7 +144,7 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { id = UUID.randomUUID().toString _ <- baker.bake(recipeId, id, Map("key" -> "value")) ingredients: Map[String, Value] <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert(metaData.containsKey("key") && metaData.get("key") == "value") } @@ -179,7 +179,7 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value")) _ <- baker.addMetaData(id, Map.apply[String, String]("key2" -> "value2")) ingredients: Map[String, Value] <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.containsKey("key") && metaData.get("key") == "value" && metaData.containsKey("key2") && metaData.get("key2") == "value2") @@ -193,7 +193,7 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value")) _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value2")) ingredients: Map[String, Value] <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.containsKey("key") && metaData.get("key") == "value2") @@ -206,7 +206,7 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { _ <- baker.bake(recipeId, id) _ <- baker.addMetaData(id, Map.empty) ingredients: Map[String, Value] <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.size() == 0) @@ -218,7 +218,7 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { id = UUID.randomUUID().toString _ <- baker.bake(recipeId, id) ingredients: Map[String, Value] <- baker.getIngredients(id) - } yield assert(!ingredients.contains(RecipeInstanceMetaDataName)) + } yield assert(!ingredients.contains(RecipeInstanceMetadataName)) } "throw a NoSuchProcessException" when { @@ -279,6 +279,51 @@ class BakerExecutionSpec extends BakerRuntimeTestBase { "interactionOneOriginalIngredient" -> interactionOneIngredientValue) } + "execute an interaction when its ingredient is provided with MetaData requirement" in { + val recipe = + Recipe("IngredientProvidedRecipeWithSpecial") + .withInteraction(interactionOneWithMetaData) + .withSensoryEvent(initialEvent) + + for { + (baker, recipeId) <- setupBakerWithRecipe(recipe, mockImplementations) + _ = when(testInteractionOneWithMetaDataMock.apply(anyString(), anyString(), any())).thenReturn(Future.successful(InteractionOneSuccessful(interactionOneIngredientValue))) + recipeInstanceId = UUID.randomUUID().toString + metaData = Map("MetaDataKey" -> "MetaDataValue") + _ <- baker.bake(recipeId, recipeInstanceId, metaData) + _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(EventInstance.unsafeFrom(InitialEvent(initialIngredientValue)))) + _ = verify(testInteractionOneWithMetaDataMock).apply(recipeInstanceId.toString, "initialIngredient", metaData) + state <- baker.getRecipeInstanceState(recipeInstanceId) + } yield + state.ingredients shouldBe + ingredientMap( + "RecipeInstanceMetaData" -> metaData, + "initialIngredient" -> initialIngredientValue, + "interactionOneOriginalIngredient" -> interactionOneIngredientValue) + } + + "execute an interaction when its ingredient is provided with EventList requirement" in { + val recipe = + Recipe("IngredientProvidedRecipeWithSpecial") + .withInteraction(interactionOneWithEventList) + .withSensoryEvent(initialEvent) + + for { + (baker, recipeId) <- setupBakerWithRecipe(recipe, mockImplementations) + _ = when(testInteractionOneWithEventListMock.apply(anyString(), anyString(), any())).thenReturn(Future.successful(InteractionOneSuccessful(interactionOneIngredientValue))) + recipeInstanceId = UUID.randomUUID().toString + eventList = List("InitialEvent") + _ <- baker.bake(recipeId, recipeInstanceId) + _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(EventInstance.unsafeFrom(InitialEvent(initialIngredientValue)))) + _ = verify(testInteractionOneWithEventListMock).apply(recipeInstanceId.toString, "initialIngredient", eventList) + state <- baker.getRecipeInstanceState(recipeInstanceId) + } yield + state.ingredients shouldBe + ingredientMap( + "initialIngredient" -> initialIngredientValue, + "interactionOneOriginalIngredient" -> interactionOneIngredientValue) + } + "re-execute an interaction when set to reprovider and event is fired two times" in { val recipe = Recipe("IngredientProvidedRecipe") diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexSpec.scala index afde1bafb..dce38c269 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexSpec.scala @@ -98,7 +98,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn "create the PetriNetInstance actor when Initialize message is received" in { val recipeInstanceId = UUID.randomUUID().toString val initializeMsg = - Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) val petriNetActorProbe = TestProbe() val actorIndex = createActorIndex(petriNetActorProbe.ref, recipeManager) actorIndex ! CreateProcess(recipeId, recipeInstanceId) @@ -143,7 +143,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn "not create the PetriNetInstance actor if already created" in { val recipeInstanceId = UUID.randomUUID().toString - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) val petriNetActorProbe = TestProbe() val actorIndex = createActorIndex(petriNetActorProbe.ref, recipeManager) actorIndex ! CreateProcess(recipeId, recipeInstanceId) @@ -171,7 +171,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn actorIndex ! CreateProcess(recipeId, recipeInstanceId) - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) processProbe.expectMsg(initializeMsg) Thread.sleep(recipeRetentionPeriod.toMillis) // inform the index to check for processes to be cleaned up @@ -203,7 +203,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn actorIndex ! CreateProcess(recipeId, recipeInstanceId) - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) processProbe.expectMsg(initializeMsg) Thread.sleep(recipeRetentionPeriod.toMillis) // inform the index to check for processes to be cleaned up @@ -242,7 +242,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn val recipeInstanceId = UUID.randomUUID().toString - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) actorIndex ! CreateProcess(recipeId, recipeInstanceId) @@ -290,7 +290,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn val recipeInstanceId = UUID.randomUUID().toString - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) actorIndex ! CreateProcess(recipeId, recipeInstanceId) @@ -322,7 +322,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn val recipeInstanceId = UUID.randomUUID().toString - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) actorIndex ! CreateProcess(recipeId, recipeInstanceId) @@ -354,7 +354,7 @@ class ProcessIndexSpec extends TestKit(ActorSystem("ProcessIndexSpec", ProcessIn val recipeInstanceId = UUID.randomUUID().toString - val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], List.empty)) + val initializeMsg = Initialize(Marking.empty[Place], RecipeInstanceState(recipeId, recipeInstanceId, Map.empty[String, Value], Map.empty[String, String], List.empty)) diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSpec.scala index 034461ad8..cb0d48e28 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSpec.scala @@ -851,7 +851,7 @@ class ProcessInstanceSpec extends AkkaTestBase("ProcessInstanceSpec") with Scala BlockInteraction, Map.empty, false) val ingredients = Map[String, Value]("waitTime" -> Converters.toValue(Duration.ofMillis(60000L))) - val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Seq.empty)) + val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Map.empty[String, String], Seq.empty)) assert(output == 60000L) } @@ -877,7 +877,7 @@ class ProcessInstanceSpec extends AkkaTestBase("ProcessInstanceSpec") with Scala val ingredients = Map[String, Value]("waitTime" -> Converters.toValue(FiniteDuration.apply(60000L, TimeUnit.MILLISECONDS))) - val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Seq.empty)) + val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Map.empty[String, String], Seq.empty)) assert(output == 60000L) } @@ -903,7 +903,7 @@ class ProcessInstanceSpec extends AkkaTestBase("ProcessInstanceSpec") with Scala val ingredients = Map[String, Value]("waitTime" -> Converters.toValue(Duration.ofMillis(1200000L))) - val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Seq.empty)) + val output: Long = ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Map.empty[String, String], Seq.empty)) assert(output == 60000L) } @@ -932,7 +932,7 @@ class ProcessInstanceSpec extends AkkaTestBase("ProcessInstanceSpec") with Scala var exceptionThrown = false try { - ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Seq.empty)) + ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Map.empty[String, String], Seq.empty)) } catch { case _: FatalInteractionException => exceptionThrown = true } @@ -961,7 +961,7 @@ class ProcessInstanceSpec extends AkkaTestBase("ProcessInstanceSpec") with Scala val ingredients = Map[String, Value]("waitTime" -> Converters.toValue(false)) var exceptionThrown = false try { - ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Seq.empty)) + ProcessInstance.getWaitTimeInMillis(interactionTransition, RecipeInstanceState("id", "id", ingredients, Map.empty[String, String], Seq.empty)) } catch { case _: FatalInteractionException => exceptionThrown = true } diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/serialization/SerializationSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/serialization/SerializationSpec.scala index b9f9d6d7f..a1ebd67db 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/serialization/SerializationSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/serialization/SerializationSpec.scala @@ -268,6 +268,7 @@ object SerializationSpec { implicit val eventNameGen: Gen[String] = Gen.alphaStr implicit val ingredientNameGen: Gen[String] = Gen.alphaStr implicit val ingredientsGen: Gen[(String, Value)] = GenUtil.tuple(ingredientNameGen, Types.anyValueGen) + implicit val metaDataGen: Gen[(String, String)] = GenUtil.tuple(ingredientNameGen, ingredientNameGen) implicit val runtimeEventGen: Gen[EventInstance] = for { eventName <- eventNameGen @@ -283,8 +284,9 @@ object SerializationSpec { recipeInstanceId <- recipeInstanceIdGen recipeId <- recipeIdGen ingredients <- Gen.mapOf(ingredientsGen) + recipeInstanceMetadata <- Gen.mapOf(metaDataGen) events <- Gen.listOf(eventMomentsGen) - } yield RecipeInstanceState(recipeId, recipeInstanceId, ingredients, events) + } yield RecipeInstanceState(recipeId, recipeInstanceId, ingredients, recipeInstanceMetadata, events) implicit val messagesGen: Gen[AnyRef] = Gen.oneOf(runtimeEventGen, processStateGen) diff --git a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/internal/RecipeRuntimeSpec.scala b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/internal/RecipeRuntimeSpec.scala index 876eea37a..679cfbf8d 100644 --- a/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/internal/RecipeRuntimeSpec.scala +++ b/core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/internal/RecipeRuntimeSpec.scala @@ -29,6 +29,10 @@ class RecipeRuntimeSpec extends AnyWordSpecLike with Matchers with MockitoSugar //in V3, process id from V1 and V2 is now called a recipe instance id when(mockState.recipeInstanceId).thenReturn(processId) + when(mockState.recipeInstanceMetadata).thenReturn(Map()) + + when(mockState.events).thenReturn(List()) + //this call would fail without the fix RecipeRuntime.createInteractionInput(mockTransition, mockState) } diff --git a/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceEventList.java b/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceEventList.java new file mode 100644 index 000000000..2e011537b --- /dev/null +++ b/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceEventList.java @@ -0,0 +1,15 @@ +package com.ing.baker.recipe.annotations; + +import javax.inject.Qualifier; +import java.lang.annotation.*; + +/** + * An annotation to be added to an argument of an action indicating that the Event List should be injected + * there. + */ +@Qualifier +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.PARAMETER) +public @interface RecipeInstanceEventList { +} diff --git a/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceMetadata.java b/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceMetadata.java new file mode 100644 index 000000000..90fb10320 --- /dev/null +++ b/core/baker-annotations/src/main/java/com/ing/baker/recipe/annotations/RecipeInstanceMetadata.java @@ -0,0 +1,15 @@ +package com.ing.baker.recipe.annotations; + +import javax.inject.Qualifier; +import java.lang.annotation.*; + +/** + * An annotation to be added to an argument of an action indicating that the Metadata should be injected + * there. + */ +@Qualifier +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.PARAMETER) +public @interface RecipeInstanceMetadata { +} diff --git a/core/baker-interface/src/main/protobuf/common.proto b/core/baker-interface/src/main/protobuf/common.proto index c228398bd..abc3e7cdf 100644 --- a/core/baker-interface/src/main/protobuf/common.proto +++ b/core/baker-interface/src/main/protobuf/common.proto @@ -158,6 +158,7 @@ message ProcessState { optional string recipeId = 5; optional string recipeInstanceId = 1; repeated Ingredient ingredients = 2; + map recipeInstanceMetadata = 6; repeated EventMoment events = 4; } @@ -352,6 +353,14 @@ message EventRejectedBakerEvent { optional RejectReason reason = 5; } +message EventFiredBakerEvent { + optional int64 timeStamp = 1; + optional string recipeName = 2; + optional string recipeId = 3; + optional string recipeInstanceId = 4; + optional RuntimeEvent event = 6; +} + message InteractionFailedBakerEvent { optional int64 timeStamp = 1; optional int64 duration = 2; @@ -400,6 +409,7 @@ message BakerEvent { oneof oneof_baker_event { EventReceivedBakerEvent eventReceived = 1; EventRejectedBakerEvent eventRejected = 2; + EventFiredBakerEvent eventFired = 8; InteractionFailedBakerEvent interactionFailed = 3; InteractionStartedBakerEvent interactionStarted = 4; InteractionCompletedBakerEvent interactionCompleted = 5; diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/BakerEvent.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/BakerEvent.scala index f98132191..4e28f8308 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/BakerEvent.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/BakerEvent.scala @@ -11,7 +11,7 @@ trait BakerEvent extends LanguageApi { } /** - * Event describing the fact that an event was received for a process. + * Event describing the fact that a sensory event was received for a process. */ trait EventReceived extends BakerEvent { val timeStamp: Long @@ -23,7 +23,7 @@ trait EventReceived extends BakerEvent { } /** - * Event describing the fact that an event was received but rejected for a process + * Event describing the fact that an sensory event was received but rejected for a process */ trait EventRejected extends BakerEvent { val timeStamp: Long @@ -33,6 +33,17 @@ trait EventRejected extends BakerEvent { val reason: RejectReason } +/** + * Event describing the fact that an interaction outcome event was fired for a process + */ +trait EventFired extends BakerEvent { + val timeStamp: Long + val recipeName: String + val recipeId: String + val recipeInstanceId: String + val event: Event +} + /** * Event describing the fact that an interaction failed during execution */ diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/RecipeInstanceState.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/RecipeInstanceState.scala index a8e6c523e..330cd3b4d 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/RecipeInstanceState.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/common/RecipeInstanceState.scala @@ -1,11 +1,12 @@ package com.ing.baker.runtime.common +import com.ing.baker.il.recipeInstanceMetadataName import com.ing.baker.runtime.common.LanguageDataStructures.LanguageApi import com.ing.baker.types.Value object RecipeInstanceState { //The name used for the RecipeInstanceMetaData ingredient - val RecipeInstanceMetaDataName = "RecipeInstanceMetaData" + val RecipeInstanceMetadataName = recipeInstanceMetadataName } /** @@ -21,5 +22,7 @@ trait RecipeInstanceState extends LanguageApi { self => def ingredients: language.Map[String, Value] + def recipeInstanceMetadata: language.Map[String, String] + def events: language.Seq[EventType] } \ No newline at end of file diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/BakerEvent.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/BakerEvent.scala index f24d352a7..0ebed081f 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/BakerEvent.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/BakerEvent.scala @@ -71,6 +71,34 @@ case class EventRejected(timeStamp: Long, def getReason: RejectReason = reason } + +/** + * Event describing the fact that an interaction outcome event was fired for a process + * + * @param timeStamp The time that the event was received + * @param recipeName The name of the recipe that interaction is part of + * @param recipeId The recipe id + * @param recipeInstanceId The id of the process + * @param correlationId The (optional) correlation id of the event + * @param event The event + */ +case class EventFired(timeStamp: Long, + recipeName: String, + recipeId: String, + recipeInstanceId: String, + event: EventInstance) extends BakerEvent with common.EventFired { + + def getTimeStamp: Long = timeStamp + + def getRecipeName: String = recipeName + + def getRecipeId: String = recipeId + + def getRecipeInstanceId: String = recipeInstanceId + + def getEvent: EventInstance = event +} + /** * Event describing the fact that an interaction failed during execution * diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/RecipeInstanceState.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/RecipeInstanceState.scala index 5ae49302d..2fc3ad95f 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/RecipeInstanceState.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/javadsl/RecipeInstanceState.scala @@ -19,6 +19,7 @@ case class RecipeInstanceState( recipeId: String, recipeInstanceId: String, ingredients: java.util.Map[String, Value], + recipeInstanceMetadata: java.util.Map[String, String], events: java.util.List[EventMoment] ) extends common.RecipeInstanceState with JavaApi { @@ -31,6 +32,13 @@ case class RecipeInstanceState( */ def getIngredients: java.util.Map[String, Value] = ingredients + /** + * Returns the accumulated ingredients. + * + * @return The accumulated ingredients + */ + def getRecipeInstanceMetadata: java.util.Map[String, String] = recipeInstanceMetadata + /** * Returns the RuntimeEvents * @@ -54,5 +62,5 @@ case class RecipeInstanceState( @nowarn def asScala: scaladsl.RecipeInstanceState = - scaladsl.RecipeInstanceState(recipeId, recipeInstanceId, ingredients.asScala.toMap, events.asScala.map(_.asScala).toIndexedSeq) + scaladsl.RecipeInstanceState(recipeId, recipeInstanceId, ingredients.asScala.toMap, recipeInstanceMetadata.asScala.toMap, events.asScala.map(_.asScala).toIndexedSeq) } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala index 29e1cc3db..61910c364 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/BakerLogging.scala @@ -114,6 +114,18 @@ case class BakerLogging(logger: Logger = BakerLogging.defaultLogger) { withMDC(mdc, _.info(msg)) } + def eventFired(eventFired: EventFired): Unit = { + val msg = s"Firing event '${eventFired.event.name}'" + val mdc = Map( + "recipeInstanceId" -> eventFired.recipeInstanceId, + "recipeId" -> eventFired.recipeId, + "recipeName" -> eventFired.recipeName, + "eventName" -> eventFired.event.name, + "runtimeTimestamp" -> eventFired.timeStamp.toString, + ) + withMDC(mdc, _.info(msg)) + } + def eventReceived(eventReceived: EventReceived): Unit = { val msg = s"Event received '${eventReceived.event.name}'" val mdc = Map( diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala index 5635a3285..70ef6caff 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/InteractionManager.scala @@ -43,7 +43,7 @@ trait InteractionManager[F[_]] { def execute(interaction: InteractionTransition, input: Seq[IngredientInstance], metadata: Option[Map[String, String]])(implicit sync: Sync[F], effect: MonadError[F, Throwable]): F[Option[EventInstance]] = { if(interaction.interactionName.startsWith(checkpointEventInteractionPrefix)){ effect.pure(Some(EventInstance(interaction.interactionName.stripPrefix(checkpointEventInteractionPrefix)))) - }else{ + } else{ findFor(interaction) .flatMap { case Some(implementation) => implementation.execute(input, metadata.getOrElse(Map())) diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeInstanceManager.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeInstanceManager.scala index bb5e6e1f6..94b031306 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeInstanceManager.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeInstanceManager.scala @@ -6,7 +6,7 @@ import cats.effect.{ConcurrentEffect, Effect, Resource, Timer} import cats.implicits._ import com.ing.baker.il.{RecipeVisualStyle, RecipeVisualizer} import com.ing.baker.runtime.common.BakerException.{ProcessAlreadyExistsException, ProcessDeletedException} -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName import com.ing.baker.runtime.common.{BakerException, SensoryEventStatus} import com.ing.baker.runtime.model.RecipeInstanceManager.RecipeInstanceStatus import com.ing.baker.runtime.model.recipeinstance.RecipeInstance @@ -79,6 +79,7 @@ trait RecipeInstanceManager[F[_]] { currentState.recipe.recipeId, recipeInstanceId, currentState.ingredients, + currentState.recipeInstanceMetadata, currentState.events ) }) @@ -151,16 +152,10 @@ trait RecipeInstanceManager[F[_]] { def addMetaData(recipeInstanceId: String, metadata: Map[String, String])(implicit components: BakerComponents[F], effect: ConcurrentEffect[F], timer: Timer[F]): F[Unit] = { getExistent(recipeInstanceId).map((recipeInstance: RecipeInstance[F]) => { recipeInstance.state.update(currentState => { - val newBakerMetaData = currentState.ingredients.get(RecipeInstanceMetaDataName) match { - case Some(value) => - if (value.isInstanceOf(MapType(com.ing.baker.types.CharArray))) { - val oldMetaData: Map[String, String] = value.asMap[String, String](classOf[String], classOf[String]).asScala.toMap - oldMetaData ++ metadata - } - else metadata - case None => metadata - } - currentState.copy(ingredients = currentState.ingredients + (RecipeInstanceMetaDataName -> com.ing.baker.types.Converters.toValue(newBakerMetaData))) + val newRecipeInstanceMetaData = currentState.recipeInstanceMetadata ++ metadata + currentState.copy( + ingredients = currentState.ingredients + (RecipeInstanceMetadataName -> com.ing.baker.types.Converters.toValue(newRecipeInstanceMetaData)), + recipeInstanceMetadata = newRecipeInstanceMetaData) }) }).flatten } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala index 002dd6e68..4c54ad94e 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/RecipeManager.scala @@ -38,9 +38,9 @@ trait RecipeManager[F[_]] extends LazyLogging { for { timestamp <- timer.clock.realTime(duration.MILLISECONDS) _ <- store(compiledRecipe, timestamp) - event = RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe) - _ <- effect.delay(components.logging.addedRecipe(event)) - _ <- components.eventStream.publish(event) + recipeAdded = RecipeAdded(compiledRecipe.name, compiledRecipe.recipeId, timestamp, compiledRecipe) + _ <- effect.delay(components.logging.addedRecipe(recipeAdded)) + _ <- components.eventStream.publish(recipeAdded) } yield () } yield compiledRecipe.recipeId diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala index d39f3e7a7..23b551c34 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstance.scala @@ -8,7 +8,7 @@ import com.ing.baker.il.CompiledRecipe import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome import com.ing.baker.runtime.model.{BakerComponents, FireSensoryEventRejection} import com.ing.baker.runtime.model.recipeinstance.RecipeInstance.FatalInteractionException -import com.ing.baker.runtime.scaladsl.{EventInstance, EventReceived, EventRejected, RecipeInstanceCreated} +import com.ing.baker.runtime.scaladsl.{EventFired, EventInstance, EventReceived, EventRejected, RecipeInstanceCreated} import com.typesafe.scalalogging.LazyLogging import fs2.Stream @@ -41,12 +41,13 @@ case class RecipeInstance[F[_]](recipeInstanceId: String, config: RecipeInstance initialExecution <- EitherT.fromEither[F](currentState.validateExecution(input, correlationId, currentTime)) .leftSemiflatMap { case (rejection, reason) => for { - event <- effect.delay(EventRejected(currentTime, recipeInstanceId, correlationId, input, rejection.asReason)) - _ <- effect.delay(components.logging.eventRejected(event)) - _ <- components.eventStream.publish(event) + eventRejected <- effect.delay(EventRejected(currentTime, recipeInstanceId, correlationId, input, rejection.asReason)) + _ <- effect.delay(components.logging.eventRejected(eventRejected)) + _ <- components.eventStream.publish(eventRejected) } yield rejection } _ <- EitherT.liftF(components.eventStream.publish(EventReceived(currentTime, currentState.recipe.name, currentState.recipe.recipeId, recipeInstanceId, correlationId, input))) + _ <- EitherT.liftF(components.eventStream.publish(EventFired(currentTime, currentState.recipe.name, currentState.recipe.recipeId, recipeInstanceId, input))) } yield baseCase(initialExecution) .collect { case Some(output) => output.filterNot(config.ingredientsFilter) } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceEventValidation.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceEventValidation.scala index c361d79ca..58e6be034 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceEventValidation.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceEventValidation.scala @@ -33,6 +33,8 @@ trait RecipeInstanceEventValidation { recipeInstance: RecipeInstanceState => consume = params.head, input = Some(input), ingredients = recipeInstance.ingredients, + recipeInstanceMetadata = recipeInstance.recipeInstanceMetadata, + eventList = events, correlationId = correlationId, isReprovider = false ) diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala index 431846c1d..91cf01d9a 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/RecipeInstanceState.scala @@ -1,23 +1,19 @@ package com.ing.baker.runtime.model.recipeinstance +import com.ing.baker.il.CompiledRecipe import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome -import com.ing.baker.il.{CompiledRecipe, EventDescriptor} -import com.ing.baker.il.petrinet.Place import com.ing.baker.il.petrinet.Place.IngredientPlace -import com.ing.baker.petrinet.api.{Marking, MultiSet} -import com.ing.baker.runtime.scaladsl.{EventInstance, EventMoment} import com.ing.baker.il.petrinet._ import com.ing.baker.petrinet.api._ import com.ing.baker.runtime.common.IngredientInstance -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName +import com.ing.baker.runtime.scaladsl.{EventInstance, EventMoment} import com.ing.baker.types.{CharArray, MapType, Value} -import scala.collection.immutable - object RecipeInstanceState { def getMetaDataFromIngredients(ingredients: Map[String, Value]): Option[Map[String, String]] = { - ingredients.get(RecipeInstanceMetaDataName).flatMap(value => { + ingredients.get(RecipeInstanceMetadataName).flatMap(value => { if (value.isInstanceOf(MapType(CharArray))) Some(value.as[Map[String, String]]) else @@ -36,6 +32,7 @@ object RecipeInstanceState { sequenceNumber = 0, marking = recipe.initialMarking, ingredients = Map.empty, + recipeInstanceMetadata = Map.empty, events = List.empty, completedCorrelationIds = Set.empty, executions = Map.empty, @@ -50,6 +47,7 @@ case class RecipeInstanceState( sequenceNumber: Long, marking: Marking[Place], ingredients: Map[String, Value], + recipeInstanceMetadata: Map[String, String], events: List[EventMoment], completedCorrelationIds: Set[String], executions: Map[Long, TransitionExecution], @@ -118,6 +116,8 @@ case class RecipeInstanceState( consume = markings.head, input = None, ingredients = ingredients, + recipeInstanceMetadata = recipeInstanceMetadata, + eventList = events, correlationId = None, isReprovider = transition.isReprovider ) @@ -129,6 +129,8 @@ case class RecipeInstanceState( consume = markings.head, input = None, ingredients = ingredients, + recipeInstanceMetadata = recipeInstanceMetadata, + eventList = events, correlationId = None, isReprovider = false ) diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala index 6e05df749..a5223cc21 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/model/recipeinstance/TransitionExecution.scala @@ -1,8 +1,7 @@ package com.ing.baker.runtime.model.recipeinstance import java.lang.reflect.InvocationTargetException - -import cats.effect.{Effect, Timer} +import cats.effect.{Effect, IO, Timer} import cats.implicits._ import com.ing.baker.il import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome @@ -12,11 +11,11 @@ import com.ing.baker.petrinet.api._ import com.ing.baker.runtime.model.BakerComponents import com.ing.baker.runtime.model.recipeinstance.RecipeInstance.FatalInteractionException import com.ing.baker.runtime.scaladsl._ -import com.ing.baker.types.{PrimitiveValue, Value} +import com.ing.baker.types.{ListValue, PrimitiveValue, RecordValue, Value} import com.typesafe.scalalogging.LazyLogging import org.slf4j.MDC -import scala.collection.immutable.Seq +import scala.collection.immutable.{List, Seq} import scala.concurrent.duration.MILLISECONDS import scala.util.Random @@ -59,6 +58,8 @@ private[recipeinstance] case class TransitionExecution( consume: Marking[Place], input: Option[EventInstance], ingredients: Map[String, Value], + recipeInstanceMetadata: Map[String, String], + eventList: List[EventMoment], correlationId: Option[String], state: TransitionExecution.State = TransitionExecution.State.Active, isReprovider: Boolean @@ -128,7 +129,21 @@ private[recipeinstance] case class TransitionExecution( def buildInteractionInput: Seq[IngredientInstance] = { val recipeInstanceIdIngredient: (String, Value) = il.recipeInstanceIdName -> PrimitiveValue(recipeInstanceId) val processIdIngredient: (String, Value) = il.processIdName -> PrimitiveValue(recipeInstanceId) - val allIngredients: Map[String, Value] = ingredients ++ interactionTransition.predefinedParameters + recipeInstanceIdIngredient + processIdIngredient + + // Only map the recipeInstanceEventList if is it required, otherwise give an empty list + val recipeInstanceEventList: (String, Value) = + if(interactionTransition.requiredIngredients.exists(_.name == il.recipeInstanceEventListName)) + il.recipeInstanceEventListName -> ListValue(eventList.map(e => PrimitiveValue(e.name))) + else + il.recipeInstanceEventListName -> ListValue(List()) + + val allIngredients: Map[String, Value] = + ingredients ++ + interactionTransition.predefinedParameters + + recipeInstanceIdIngredient + + processIdIngredient + + recipeInstanceEventList + interactionTransition.requiredIngredients.map { case IngredientDescriptor(name, _) => IngredientInstance(name, allIngredients.getOrElse(name, throw new FatalInteractionException(s"Missing parameter '$name'"))) @@ -149,27 +164,37 @@ private[recipeinstance] case class TransitionExecution( components.interactions.execute( interactionTransition, buildInteractionInput, - com.ing.baker.runtime.model.recipeinstance.RecipeInstanceState.getMetaDataFromIngredients(ingredients)) + Some(recipeInstanceMetadata)) + for { startTime <- timer.clock.realTime(MILLISECONDS) outcome <- { for { - event <- effect.delay(InteractionStarted(startTime, recipe.name, recipe.recipeId, recipeInstanceId, interactionTransition.interactionName)) - _ <- effect.delay(components.logging.interactionStarted(event)) - _ <- components.eventStream.publish(event) + interactionStarted <- effect.delay(InteractionStarted(startTime, recipe.name, recipe.recipeId, recipeInstanceId, interactionTransition.interactionName)) + _ <- effect.delay(components.logging.interactionStarted(interactionStarted)) + _ <- components.eventStream.publish(interactionStarted) interactionOutput <- effect.bracket(setupMdc)(_ => execute)(_ => cleanMdc) _ <- validateInteractionOutput(interactionTransition, interactionOutput) transformedOutput = interactionOutput.map(_.transformWith(interactionTransition)) endTime <- timer.clock.realTime(MILLISECONDS) - event = InteractionCompleted( + + interactionCompleted = InteractionCompleted( endTime, endTime - startTime, recipe.name, recipe.recipeId, recipeInstanceId, interactionTransition.interactionName, transformedOutput) - _ <- effect.delay(components.logging.interactionFinished(event)) - _ <- components.eventStream.publish(event) + _ <- effect.delay(components.logging.interactionFinished(interactionCompleted)) + _ <- components.eventStream.publish(interactionCompleted) + + _ <- transformedOutput match { + case Some(event) => + val eventFired = EventFired(endTime, recipe.name, recipe.recipeId, recipeInstanceId, event) + components.logging.eventFired(eventFired) + components.eventStream.publish(eventFired) + case None => effect.pure() + } } yield transformedOutput }.onError { case e: Throwable => @@ -180,11 +205,11 @@ private[recipeinstance] case class TransitionExecution( } for { endTime <- timer.clock.realTime(MILLISECONDS) - event = InteractionFailed( + interactionFailed = InteractionFailed( endTime, endTime - startTime, recipe.name, recipe.recipeId, recipeInstanceId, transition.label, failureCount, throwable, interactionTransition.failureStrategy.apply(failureCount + 1)) - _ <- effect.delay(components.logging.interactionFailed(event)) - _ <- components.eventStream.publish(event) + _ <- effect.delay(components.logging.interactionFailed(interactionFailed)) + _ <- components.eventStream.publish(interactionFailed) } yield () } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/BakerEvent.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/BakerEvent.scala index f2422f326..bc67b5ef3 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/BakerEvent.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/BakerEvent.scala @@ -17,6 +17,8 @@ sealed trait BakerEvent extends common.BakerEvent with ScalaApi { javadsl.EventReceived(timeStamp, recipeName, recipeId, recipeInstanceId, Optional.ofNullable(correlationId.orNull), event.asJava) case EventRejected(timeStamp, recipeInstanceId, correlationId, event, reason) => javadsl.EventRejected(timeStamp, recipeInstanceId, Optional.ofNullable(correlationId.orNull), event.asJava, reason) + case EventFired(timeStamp, recipeName, recipeId, recipeInstanceId, event) => + javadsl.EventFired(timeStamp, recipeName, recipeId, recipeInstanceId, event.asJava) case InteractionFailed(timeStamp, duration, recipeName, recipeId, recipeInstanceId, interactionName, failureCount, throwable, exceptionStrategyOutcome) => javadsl.InteractionFailed(timeStamp, duration, recipeName, recipeId, recipeInstanceId, interactionName, failureCount, throwable, exceptionStrategyOutcome) case InteractionStarted(timeStamp, recipeName, recipeId, recipeInstanceId, interactionName) => @@ -61,6 +63,23 @@ case class EventRejected(timeStamp: Long, correlationId: Option[String], event: EventInstance, reason: RejectReason) extends BakerEvent with common.EventRejected + +/** + * Event describing the fact that an interaction outcome event was fired for a process + * + * @param timeStamp The time that the event was received + * @param recipeName The name of the recipe that interaction is part of + * @param recipeId The recipe id + * @param recipeInstanceId The id of the process + * @param correlationId The (optional) correlation id of the event + * @param event The event + */ +case class EventFired(timeStamp: Long, + recipeName: String, + recipeId: String, + recipeInstanceId: String, + event: EventInstance) extends BakerEvent with common.EventFired + /** * Event describing the fact that an interaction failed during execution * diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/RecipeInstanceState.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/RecipeInstanceState.scala index 6724eaf72..dc763ba54 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/RecipeInstanceState.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/scaladsl/RecipeInstanceState.scala @@ -19,6 +19,7 @@ case class RecipeInstanceState( recipeId: String, recipeInstanceId: String, ingredients: Map[String, Value], + recipeInstanceMetadata: Map[String, String], events: Seq[EventMoment]) extends common.RecipeInstanceState with ScalaApi { @@ -28,5 +29,5 @@ case class RecipeInstanceState( @nowarn def asJava: javadsl.RecipeInstanceState = - new javadsl.RecipeInstanceState(recipeId, recipeInstanceId, ingredients.asJava, events.map(_.asJava()).asJava) + new javadsl.RecipeInstanceState(recipeId, recipeInstanceId, ingredients.asJava, recipeInstanceMetadata.asJava, events.map(_.asJava()).asJava) } diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/BakerEventMapping.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/BakerEventMapping.scala index d36b92e83..26a9d4c66 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/BakerEventMapping.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/BakerEventMapping.scala @@ -20,6 +20,7 @@ class BakerEventMapping extends ProtoMap[BakerEvent, protobuf.BakerEvent] { protobuf.BakerEvent(a match { case event: EventReceived => protobuf.BakerEvent.OneofBakerEvent.EventReceived(ctxToProto(event)(EventReceivedMapping)) case event: EventRejected => protobuf.BakerEvent.OneofBakerEvent.EventRejected(ctxToProto(event)(EventRejectedMapping)) + case event: EventFired => protobuf.BakerEvent.OneofBakerEvent.EventFired(ctxToProto(event)(EventFiredMapping)) case event: InteractionCompleted => protobuf.BakerEvent.OneofBakerEvent.InteractionCompleted(ctxToProto(event)(InteractionCompletedMapping)) case event: InteractionFailed => protobuf.BakerEvent.OneofBakerEvent.InteractionFailed(ctxToProto(event)(InteractionFailedMapping)) case event: InteractionStarted => protobuf.BakerEvent.OneofBakerEvent.InteractionStarted(ctxToProto(event)(InteractionStartedMapping)) @@ -31,6 +32,7 @@ class BakerEventMapping extends ProtoMap[BakerEvent, protobuf.BakerEvent] { message.oneofBakerEvent match { case event: protobuf.BakerEvent.OneofBakerEvent.EventReceived => ctxFromProto(event.value)(EventReceivedMapping) case event: protobuf.BakerEvent.OneofBakerEvent.EventRejected=> ctxFromProto(event.value)(EventRejectedMapping) + case event: protobuf.BakerEvent.OneofBakerEvent.EventFired => ctxFromProto(event.value)(EventFiredMapping) case event: protobuf.BakerEvent.OneofBakerEvent.InteractionCompleted=> ctxFromProto(event.value)(InteractionCompletedMapping) case event: protobuf.BakerEvent.OneofBakerEvent.InteractionFailed => ctxFromProto(event.value)(InteractionFailedMapping) case event: protobuf.BakerEvent.OneofBakerEvent.InteractionStarted => ctxFromProto(event.value)(InteractionStartedMapping) @@ -121,6 +123,36 @@ object BakerEventMapping { ) } + object EventFiredMapping extends ProtoMap[EventFired, protobuf.EventFiredBakerEvent] { + + override def companion: GeneratedMessageCompanion[protobuf.EventFiredBakerEvent] = protobuf.EventFiredBakerEvent + + override def toProto(a: EventFired): protobuf.EventFiredBakerEvent = + protobuf.EventFiredBakerEvent( + timeStamp = Some(a.timeStamp), + recipeName = Some(a.recipeName), + recipeId = Some(a.recipeId), + recipeInstanceId = Some(a.recipeInstanceId), + event = Some(ctxToProto(a.event)) + ) + + override def fromProto(message: protobuf.EventFiredBakerEvent): Try[EventFired] = + for { + timeStamp <- versioned(message.timeStamp, "timeStamp") + recipeName <- versioned(message.recipeName, "recipeName") + recipeId <- versioned(message.recipeId, "recipeId") + recipeInstanceId <- versioned(message.recipeInstanceId, "recipeInstanceId") + eventProto <- versioned(message.event, "event") + event <- ctxFromProto(eventProto) + } yield EventFired( + timeStamp = timeStamp, + recipeName = recipeName, + recipeId = recipeId, + recipeInstanceId = recipeInstanceId, + event = event + ) + } + object InteractionFailedMapping extends ProtoMap[InteractionFailed, protobuf.InteractionFailedBakerEvent] { override def companion: GeneratedMessageCompanion[protobuf.InteractionFailedBakerEvent] = protobuf.InteractionFailedBakerEvent diff --git a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/ProcessStateMapping.scala b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/ProcessStateMapping.scala index 61499209b..ecc70c7c8 100644 --- a/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/ProcessStateMapping.scala +++ b/core/baker-interface/src/main/scala/com/ing/baker/runtime/serialization/protomappings/ProcessStateMapping.scala @@ -18,7 +18,7 @@ class ProcessStateMapping extends ProtoMap[RecipeInstanceState, proto.ProcessSta val protoIngredients = a.ingredients.toSeq.map { case (name, value) => proto.Ingredient(Some(name), None, Some(ctxToProto(value))) } - proto.ProcessState(Some(a.recipeId), Some(a.recipeInstanceId), protoIngredients, a.events.map(ctxToProto(_))) + proto.ProcessState(Some(a.recipeId), Some(a.recipeInstanceId), protoIngredients, a.recipeInstanceMetadata, a.events.map(ctxToProto(_))) } def fromProto(message: proto.ProcessState): Try[RecipeInstanceState] = @@ -32,7 +32,8 @@ class ProcessStateMapping extends ProtoMap[RecipeInstanceState, proto.ProcessSta value <- ctxFromProto(protoValue) } yield (name, value) } + recipeInstanceMetaData = message.recipeInstanceMetadata events <- message.events.toList.traverse (ctxFromProto(_)) - } yield RecipeInstanceState(recipeId, recipeInstanceId, ingredients.toMap, events) + } yield RecipeInstanceState(recipeId, recipeInstanceId, ingredients.toMap, recipeInstanceMetaData, events) } diff --git a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelFixtures.scala b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelFixtures.scala index 024b946e1..318666d18 100644 --- a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelFixtures.scala +++ b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelFixtures.scala @@ -50,6 +50,8 @@ trait BakerModelFixtures[F[_]] extends TestRecipe[F] with MockitoSugar { ) val testInteractionOneMock: InteractionOne = mock[InteractionOne] + val testInteractionOneWithMetaDataMock: InteractionOneWithMetaData = mock[InteractionOneWithMetaData] + val testInteractionOneWithEventListMock: InteractionOneWithEventList = mock[InteractionOneWithEventList] val testInteractionTwoMock: InteractionTwo = mock[InteractionTwo] val testInteractionThreeMock: InteractionThree = mock[InteractionThree] val testInteractionFourMock: InteractionFour = mock[InteractionFour] @@ -67,6 +69,8 @@ trait BakerModelFixtures[F[_]] extends TestRecipe[F] with MockitoSugar { def mockImplementations(implicit effect: Applicative[F], classTag: ClassTag[F[Any]]): List[InteractionInstance[F]] = List( testInteractionOneMock, + testInteractionOneWithMetaDataMock, + testInteractionOneWithEventListMock, testInteractionTwoMock, testInteractionThreeMock, testInteractionFourMock, diff --git a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelSpecExecutionSemanticsTests.scala b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelSpecExecutionSemanticsTests.scala index 2552da8f7..f71a2baab 100644 --- a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelSpecExecutionSemanticsTests.scala +++ b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/BakerModelSpecExecutionSemanticsTests.scala @@ -4,7 +4,7 @@ import cats.effect.ConcurrentEffect import cats.implicits._ import com.ing.baker.recipe.scaladsl.{Event, Ingredient, Interaction, Recipe} import com.ing.baker.runtime.common.BakerException.{IllegalEventException, NoSuchProcessException, ProcessAlreadyExistsException} -import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetaDataName +import com.ing.baker.runtime.common.RecipeInstanceState.RecipeInstanceMetadataName import com.ing.baker.runtime.common.SensoryEventStatus import com.ing.baker.runtime.scaladsl.{EventInstance, InteractionInstanceInput, RecipeEventMetadata} import com.ing.baker.types.{CharArray, Int32, PrimitiveValue, Value} @@ -37,7 +37,7 @@ trait BakerModelSpecExecutionSemanticsTests[F[_]] { self: BakerModelSpec[F] => _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value")) _ <- baker.addMetaData(id, Map.apply[String, String]("key2" -> "value2")) ingredients <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.containsKey("key") && metaData.get("key") == "value" && metaData.containsKey("key2") && metaData.get("key2") == "value2") @@ -53,7 +53,7 @@ trait BakerModelSpecExecutionSemanticsTests[F[_]] { self: BakerModelSpec[F] => _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value")) _ <- baker.addMetaData(id, Map.apply[String, String]("key" -> "value2")) ingredients <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.containsKey("key") && metaData.get("key") == "value2") @@ -67,7 +67,7 @@ trait BakerModelSpecExecutionSemanticsTests[F[_]] { self: BakerModelSpec[F] => _ <- baker.bake(recipeId, id) _ <- baker.addMetaData(id, Map.empty) ingredients <- baker.getIngredients(id) - metaData = ingredients(RecipeInstanceMetaDataName).asMap(classOf[String], classOf[String]) + metaData = ingredients(RecipeInstanceMetadataName).asMap(classOf[String], classOf[String]) } yield assert( metaData.size() == 0) @@ -80,7 +80,7 @@ trait BakerModelSpecExecutionSemanticsTests[F[_]] { self: BakerModelSpec[F] => id = UUID.randomUUID().toString _ <- baker.bake(recipeId, id) ingredients <- baker.getIngredients(id) - } yield assert(!ingredients.contains(RecipeInstanceMetaDataName)) + } yield assert(!ingredients.contains(RecipeInstanceMetadataName)) } test("throw an ProcessAlreadyExistsException if baking a process with the same identifier twice") { context => @@ -160,6 +160,53 @@ trait BakerModelSpecExecutionSemanticsTests[F[_]] { self: BakerModelSpec[F] => "interactionOneOriginalIngredient" -> interactionOneIngredientValue) } + test("execute an interaction when its ingredient is provided with MetaData requirement") { context => + val recipe = + Recipe("IngredientProvidedRecipeWithMetaData") + .withInteraction(interactionOneWithMetaData) + .withSensoryEvent(initialEvent) + + for { + bakerWithRecipe <- context.setupBakerWithRecipe(recipe, mockImplementations) + (baker, recipeId) = bakerWithRecipe + _ = when(testInteractionOneWithMetaDataMock.apply(anyString(), anyString(), any())).thenReturn(effect.pure(InteractionOneSuccessful(interactionOneIngredientValue))) + recipeInstanceId = UUID.randomUUID().toString + metaData = Map("MetaDataKey" -> "MetaDataValue") + _ <- baker.bake(recipeId, recipeInstanceId, metaData) + _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue))) + _ = verify(testInteractionOneWithMetaDataMock).apply(recipeInstanceId, "initialIngredient", metaData) + state <- baker.getRecipeInstanceState(recipeInstanceId) + } yield + state.ingredients shouldBe + ingredientMap( + "RecipeInstanceMetaData" -> metaData, + "initialIngredient" -> initialIngredientValue, + "interactionOneOriginalIngredient" -> interactionOneIngredientValue) + } + + test("execute an interaction when its ingredient is provided with EventList requirement") { context => + val recipe = + Recipe("IngredientProvidedRecipeWithEventList") + .withInteraction(interactionOneWithEventList) + .withSensoryEvent(initialEvent) + + for { + bakerWithRecipe <- context.setupBakerWithRecipe(recipe, mockImplementations) + (baker, recipeId) = bakerWithRecipe + _ = when(testInteractionOneWithEventListMock.apply(anyString(), anyString(), any())).thenReturn(effect.pure(InteractionOneSuccessful(interactionOneIngredientValue))) + recipeInstanceId = UUID.randomUUID().toString + eventList = List("InitialEvent") + _ <- baker.bake(recipeId, recipeInstanceId) + _ <- baker.fireEventAndResolveWhenCompleted(recipeInstanceId, EventInstance.unsafeFrom(InitialEvent(initialIngredientValue))) + _ = verify(testInteractionOneWithEventListMock).apply(recipeInstanceId, "initialIngredient", eventList) + state <- baker.getRecipeInstanceState(recipeInstanceId) + } yield + state.ingredients shouldBe + ingredientMap( + "initialIngredient" -> initialIngredientValue, + "interactionOneOriginalIngredient" -> interactionOneIngredientValue) + } + test("Correctly notify on event") { context => val sensoryEvent = Event( diff --git a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/TestRecipe.scala b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/TestRecipe.scala index 0ac667318..cbbbba88b 100644 --- a/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/TestRecipe.scala +++ b/core/baker-interface/src/test/scala/com/ing/baker/runtime/model/TestRecipe.scala @@ -123,6 +123,30 @@ trait TestRecipe[F[_]] { def apply(recipeInstanceId: String, initialIngredient: String): F[InteractionOneSuccessful] } + val interactionOneWithMetaData = + Interaction( + name = "InteractionOneWithMetaData", + inputIngredients = Seq(recipeInstanceId, initialIngredient, recipeInstanceMetaData), + output = Seq(interactionOneSuccessful)) + + trait InteractionOneWithMetaData { + def name: String = "InteractionOneWithMetaData" + + def apply(recipeInstanceId: String, initialIngredient: String, bakerMetaData: Map[String, String]): F[InteractionOneSuccessful] + } + + val interactionOneWithEventList = + Interaction( + name = "InteractionOneWithEventList", + inputIngredients = Seq(recipeInstanceId, initialIngredient, recipeInstanceEventList), + output = Seq(interactionOneSuccessful)) + + trait InteractionOneWithEventList { + def name: String = "InteractionOneWithEventList" + + def apply(recipeInstanceId: String, initialIngredient: String, recipeInstanceEventList: List[String]): F[InteractionOneSuccessful] + } + val interactionTwo = Interaction( name = "InteractionTwo", diff --git a/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeValidations.scala b/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeValidations.scala index db9cb5557..1b5bb2913 100644 --- a/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeValidations.scala +++ b/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeValidations.scala @@ -14,11 +14,31 @@ object RecipeValidations { // check if the process id argument type is correct val processIdArgumentTypeValidation : Seq[String] = - interactionTransition.requiredIngredients.filter(id => id.name.equals(recipeInstanceIdName)).flatMap { + interactionTransition.requiredIngredients.filter(id => + id.name.equals(recipeInstanceIdName) + ).flatMap { case IngredientDescriptor(_, types.CharArray) => None case IngredientDescriptor(_, incompatibleType) => Some(s"Non supported process id type: ${incompatibleType} on interaction: '${interactionTransition.interactionName}'") } + //Check if MetaData is correct type + val bakerMetaDataTypeValidation: Seq[String] = + interactionTransition.requiredIngredients.filter(id => + id.name.equals(recipeInstanceMetadataName) + ).flatMap { + case IngredientDescriptor(_, types.MapType(types.CharArray)) => None + case IngredientDescriptor(_, incompatibleType) => Some(s"Non supported MetaData type: ${incompatibleType} on interaction: '${interactionTransition.interactionName}'") + } + + //Check if BakerEventList is correct type + val bakerEventListTypeValidation: Seq[String] = + interactionTransition.requiredIngredients.filter(id => + id.name.equals(recipeInstanceEventListName) + ).flatMap { + case IngredientDescriptor(_, types.ListType(types.CharArray)) => None + case IngredientDescriptor(_, incompatibleType) => Some(s"Non supported EventList type: ${incompatibleType} on interaction: '${interactionTransition.interactionName}'") + } + // check if the predefined ingredient is of the expected type val predefinedIngredientOfExpectedTypeValidation : Iterable[String] = interactionTransition.predefinedParameters.flatMap { @@ -33,7 +53,11 @@ object RecipeValidations { } } - interactionWithNoRequirementsValidation ++ processIdArgumentTypeValidation ++ predefinedIngredientOfExpectedTypeValidation + interactionWithNoRequirementsValidation ++ + processIdArgumentTypeValidation ++ + bakerMetaDataTypeValidation ++ + bakerEventListTypeValidation ++ + predefinedIngredientOfExpectedTypeValidation } def validateInteractions(compiledRecipe: CompiledRecipe): Seq[String] = { @@ -61,6 +85,15 @@ object RecipeValidations { } } + /** + * Validates that provided ingredients do not contain reserved names for Baker + */ + def validateSpecialIngredientsNotProvided(compiledRecipe: CompiledRecipe): Seq[String] = { + compiledRecipe.allIngredients.filter(i => + i.name == recipeInstanceIdName || i.name == recipeInstanceMetadataName || i.name == recipeInstanceEventListName + ).map(i => s"Ingredient '${i.name}' is provided and this is a reserved name for internal use in Baker") + }.toSeq + def validateNoCycles(compiledRecipe: CompiledRecipe): Seq[String] = { val cycle: Option[compiledRecipe.petriNet.innerGraph.Cycle] = compiledRecipe.petriNet.innerGraph.findCycle cycle.map(c => s"The petrinet topology contains a cycle: $c").toList @@ -90,6 +123,7 @@ object RecipeValidations { val postCompileValidationErrors : Seq[String] = Seq( validateInteractionIngredients(compiledRecipe), + validateSpecialIngredientsNotProvided(compiledRecipe), validateInteractions(compiledRecipe), if (!validationSettings.allowCycles) validateNoCycles(compiledRecipe) else Seq(), if (!validationSettings.allowDisconnectedness && !compiledRecipe.petriNet.innerGraph.isConnected) Seq("The petrinet topology is not completely connected") else Seq(), diff --git a/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeVisualizer.scala b/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeVisualizer.scala index 46b904608..cb6474a72 100644 --- a/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeVisualizer.scala +++ b/core/intermediate-language/src/main/scala/com/ing/baker/il/RecipeVisualizer.scala @@ -102,7 +102,10 @@ object RecipeVisualizer { // specifies which transitions to compact (remove) val transitionsToCompact = (node: RecipePetriNetGraph#NodeT) => node.value match { - case Right(transition: Transition) => transition.isInstanceOf[IntermediateTransition] || transition.isInstanceOf[MultiFacilitatorTransition] + case Right(transition: Transition) => + transition.isInstanceOf[IntermediateTransition] || + transition.isInstanceOf[MultiFacilitatorTransition] || + transition.label.startsWith(checkpointEventInteractionPrefix) case _ => false } diff --git a/core/intermediate-language/src/main/scala/com/ing/baker/il/package.scala b/core/intermediate-language/src/main/scala/com/ing/baker/il/package.scala index 4c6b4bdba..be00ae961 100644 --- a/core/intermediate-language/src/main/scala/com/ing/baker/il/package.scala +++ b/core/intermediate-language/src/main/scala/com/ing/baker/il/package.scala @@ -10,6 +10,8 @@ import scala.collection.immutable.Seq package object il { val recipeInstanceIdName = "recipeInstanceId" + val recipeInstanceMetadataName = "RecipeInstanceMetaData" //Cannot rename to RecipeInstanceMetadata since this will break backwards compatibility + val recipeInstanceEventListName = "RecipeInstanceEventList" val processIdName = "$ProcessID$" //needed for backwards compatibility with V1 and V2 val exhaustedEventAppend = "RetryExhausted" val checkpointEventInteractionPrefix = "$CheckpointEventInteraction$" diff --git a/core/intermediate-language/src/main/scala/com/ing/baker/il/petrinet/InteractionTransition.scala b/core/intermediate-language/src/main/scala/com/ing/baker/il/petrinet/InteractionTransition.scala index 8fa941fd4..162a77d90 100644 --- a/core/intermediate-language/src/main/scala/com/ing/baker/il/petrinet/InteractionTransition.scala +++ b/core/intermediate-language/src/main/scala/com/ing/baker/il/petrinet/InteractionTransition.scala @@ -33,7 +33,11 @@ case class InteractionTransition(eventsToFire: Seq[EventDescriptor], * These are the ingredients that are not pre-defined or recipeInstanceId */ val nonProvidedIngredients: Seq[IngredientDescriptor] = - requiredIngredients.filterNot(i => i.name == recipeInstanceIdName || predefinedParameters.keySet.contains(i.name)) + requiredIngredients.filterNot(i => + i.name == recipeInstanceIdName || + i.name == recipeInstanceMetadataName || + i.name == recipeInstanceEventListName || + predefinedParameters.keySet.contains(i.name)) override def toStringForRecipeId(recipeIdVariant: RecipeIdVariant): String = { val originalId = diff --git a/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/RecipeCompiler.scala b/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/RecipeCompiler.scala index bac51cf3d..edec23a05 100644 --- a/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/RecipeCompiler.scala +++ b/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/RecipeCompiler.scala @@ -8,7 +8,8 @@ import com.ing.baker.il.petrinet._ import com.ing.baker.il.{CompiledRecipe, EventDescriptor, ValidationSettings, checkpointEventInteractionPrefix} import com.ing.baker.petrinet.api._ import com.ing.baker.recipe.common._ -import com.ing.baker.recipe.scaladsl.{Interaction, Event} +import com.ing.baker.recipe.{javadsl, kotlindsl} +import com.ing.baker.recipe.scaladsl.{Event, Interaction} import scalax.collection.edge.WLDiEdge import scalax.collection.immutable.Graph @@ -238,7 +239,7 @@ object RecipeCompiler { val allEventTransitions: Seq[EventTransition] = sensoryEventTransitions ++ interactionEventTransitions // Given the event classes, it is creating the ingredient places and - // connecting a transition to a ingredient place.ยง + // connecting a transition to a ingredient place. val internalEventArcs: Seq[Arc] = allInteractionTransitions.flatMap { t => t.eventsToFire.flatMap { event => event.ingredients.map { ingredient => @@ -330,9 +331,11 @@ object RecipeCompiler { val errors = preconditionORErrors ++ preconditionANDErrors ++ precompileErrors val oldRecipeIdVariant : OldRecipeIdVariant = - if (recipe.isInstanceOf[com.ing.baker.recipe.javadsl.Recipe]) Scala212CompatibleJava - else if (recipe.isInstanceOf[com.ing.baker.recipe.kotlindsl.Recipe]) Scala212CompatibleKotlin - else Scala212CompatibleScala + recipe match { + case _: javadsl.Recipe => Scala212CompatibleJava + case _: kotlindsl.Recipe => Scala212CompatibleKotlin + case _ => Scala212CompatibleScala + } val compiledRecipe = CompiledRecipe.build( name = recipe.name, diff --git a/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/package.scala b/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/package.scala index a8fc23435..7794e41dc 100644 --- a/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/package.scala +++ b/core/recipe-compiler/src/main/scala/com/ing/baker/compiler/package.scala @@ -47,11 +47,15 @@ package object compiler { event.providedIngredients.map(ingredientToCompiledIngredient)) } - //Replace RecipeInstanceId to recipeInstanceIdName tag as know in compiledRecipe- - //Replace ingredient tags with overridden tags + // Replace RecipeInstanceId to recipeInstanceIdName tag as know in compiledRecipe + // Replace BakerMetaData to BakerMetaData tag as know in compiledRecipe + // Replace BakerEventList to BakerEventList tag as know in compiledRecipe + // Replace ingredient tags with overridden tags val inputFields: Seq[(String, Type)] = interactionDescriptor.inputIngredients .map { ingredient => if (ingredient.name == common.recipeInstanceIdName) il.recipeInstanceIdName -> ingredient.ingredientType + else if(ingredient.name == common.recipeInstanceMetadataName) il.recipeInstanceMetadataName -> ingredient.ingredientType + else if(ingredient.name == common.recipeInstanceEventListName) il.recipeInstanceEventListName -> ingredient.ingredientType else interactionDescriptor.overriddenIngredientNames.getOrElse(ingredient.name, ingredient.name) -> ingredient.ingredientType } diff --git a/core/recipe-compiler/src/test/scala/com/ing/baker/compiler/RecipeCompilerSpec.scala b/core/recipe-compiler/src/test/scala/com/ing/baker/compiler/RecipeCompilerSpec.scala index 861c7f02e..37c934dad 100644 --- a/core/recipe-compiler/src/test/scala/com/ing/baker/compiler/RecipeCompilerSpec.scala +++ b/core/recipe-compiler/src/test/scala/com/ing/baker/compiler/RecipeCompilerSpec.scala @@ -87,6 +87,50 @@ class RecipeCompilerSpec extends AnyWordSpecLike with Matchers { compiledRecipe.validationErrors should contain("Non supported process id type: Int32 on interaction: 'wrongrecipeInstanceIdInteraction'") } + "give an error if the MetaData is required and is not of the Map type" in { + val wrongMetaDataInteraction = + Interaction( + name = "wrongMetaDataInteraction", + inputIngredients = Seq(new Ingredient[Int](common.recipeInstanceMetadataName), initialIngredient), + output = Seq.empty) + + val recipe = Recipe("NonProvidedIngredient") + .withSensoryEvent(initialEvent) + .withInteractions(wrongMetaDataInteraction) + + val compiledRecipe: CompiledRecipe = RecipeCompiler.compileRecipe(recipe) + compiledRecipe.validationErrors should contain("Non supported MetaData type: Int32 on interaction: 'wrongMetaDataInteraction'") + } + + "give an error if the baker internal ingredients are provided" in { + val wrongDateEvent = Event("WrongDataEvent", + Seq( + Ingredient[String]("recipeInstanceId"), + Ingredient[String]("RecipeInstanceMetaData")), + maxFiringLimit = None) + + val wrongDateEvent2 = Event("WrongDataEvent2", + Seq(Ingredient[String]("RecipeInstanceEventList")), + maxFiringLimit = None) + + val wrongMetaDataInteraction = + Interaction( + name = "wrongDataProvidedInteraction", + inputIngredients = Seq(new Ingredient[String](common.recipeInstanceIdName), initialIngredient), + output = Seq(wrongDateEvent2)) + + val recipe = Recipe("WrongDataRecipe") + .withSensoryEvents(initialEvent, wrongDateEvent) + .withInteractions(wrongMetaDataInteraction) + + val compiledRecipe: CompiledRecipe = RecipeCompiler.compileRecipe(recipe) + compiledRecipe.validationErrors shouldBe List( + "Ingredient 'recipeInstanceId' is provided and this is a reserved name for internal use in Baker", + "Ingredient 'RecipeInstanceMetaData' is provided and this is a reserved name for internal use in Baker", + "Ingredient 'RecipeInstanceEventList' is provided and this is a reserved name for internal use in Baker" + ) + } + "give a list of wrong ingredients" when { "an ingredient is of the wrong type" in { val initialIngredientInt = new common.Ingredient("initialIngredient", RecordType(Seq(RecordField("data", Int32)))) diff --git a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/common/package.scala b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/common/package.scala index 30f8e7b21..e0b13666e 100644 --- a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/common/package.scala +++ b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/common/package.scala @@ -4,4 +4,6 @@ package object common { val recipeInstanceIdName = "RecipeInstanceId" val exhaustedEventAppend = "RetryExhausted" + val recipeInstanceMetadataName = "RecipeInstanceMetadata" + val recipeInstanceEventListName = "RecipeInstanceEventList" } diff --git a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/javadsl/ReflectionHelpers.scala b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/javadsl/ReflectionHelpers.scala index 30ec9476c..12c5a4e24 100644 --- a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/javadsl/ReflectionHelpers.scala +++ b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/javadsl/ReflectionHelpers.scala @@ -2,8 +2,7 @@ package com.ing.baker.recipe.javadsl import java.lang.annotation.Annotation import java.lang.reflect.{Method, Type} - -import com.ing.baker.recipe.annotations.{ProcessId, RecipeInstanceId} +import com.ing.baker.recipe.annotations.{RecipeInstanceEventList, RecipeInstanceMetadata, ProcessId, RecipeInstanceId} import com.ing.baker.recipe.{annotations, common} import com.thoughtworks.paranamer.AnnotationParanamer @@ -33,6 +32,10 @@ object ReflectionHelpers { common.recipeInstanceIdName else if (annotationType.equals(classOf[ProcessId])) common.recipeInstanceIdName + else if (annotationType.equals(classOf[RecipeInstanceMetadata])) + common.recipeInstanceMetadataName + else if (annotationType.equals(classOf[RecipeInstanceEventList])) + common.recipeInstanceEventListName else annotationType.getSimpleName } diff --git a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/scaladsl/package.scala b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/scaladsl/package.scala index 2186d2ba5..2b9901a99 100644 --- a/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/scaladsl/package.scala +++ b/core/recipe-dsl/src/main/scala/com/ing/baker/recipe/scaladsl/package.scala @@ -3,6 +3,8 @@ package com.ing.baker.recipe package object scaladsl { val recipeInstanceId: Ingredient[String] = new Ingredient[String](common.recipeInstanceIdName) + val recipeInstanceMetaData: Ingredient[Map[String, String]] = new Ingredient[Map[String, String]](common.recipeInstanceMetadataName) + val recipeInstanceEventList: Ingredient[List[String]] = new Ingredient[List[String]](common.recipeInstanceEventListName) } diff --git a/core/recipe-dsl/src/test/scala/com/ing/baker/recipe/TestRecipe.scala b/core/recipe-dsl/src/test/scala/com/ing/baker/recipe/TestRecipe.scala index 7a0fcfde0..adf18c8ea 100644 --- a/core/recipe-dsl/src/test/scala/com/ing/baker/recipe/TestRecipe.scala +++ b/core/recipe-dsl/src/test/scala/com/ing/baker/recipe/TestRecipe.scala @@ -102,6 +102,30 @@ object TestRecipe { def apply(recipeInstanceId: String, initialIngredient: String): Future[InteractionOneSuccessful] } + val interactionOneWithMetaData = + Interaction( + name = "InteractionOneWithMetaData", + inputIngredients = Seq(recipeInstanceId, initialIngredient, recipeInstanceMetaData), + output = Seq(interactionOneSuccessful)) + + trait InteractionOneWithMetaData { + def name: String = "InteractionOneWithMetaData" + + def apply(recipeInstanceId: String, initialIngredient: String, bakerMetaData: Map[String, String]): Future[InteractionOneSuccessful] + } + + val interactionOneWithEventList = + Interaction( + name = "InteractionOneWithEventList", + inputIngredients = Seq(recipeInstanceId, initialIngredient, recipeInstanceEventList), + output = Seq(interactionOneSuccessful)) + + trait InteractionOneWithEventList { + def name: String = "InteractionOneWithEventList" + + def apply(recipeInstanceId: String, initialIngredient: String, recipeInstanceEventList: List[String]): Future[InteractionOneSuccessful] + } + val interactionTwo = Interaction( name = "InteractionTwo", diff --git a/version.sbt b/version.sbt index c8753cfe8..1f34dcfa5 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "4.0.2-SNAPSHOT" +ThisBuild / version := "4.0.4-SNAPSHOT"