Skip to content

Commit

Permalink
Merge branch 'master' into optimise/memory
Browse files Browse the repository at this point in the history
# Conflicts:
#	core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala
#	core/akka-runtime/src/test/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexActorSpec.scala
  • Loading branch information
yk24na committed Feb 16, 2022
2 parents 737dbb9 + 587005a commit 9ca269e
Show file tree
Hide file tree
Showing 109 changed files with 719 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.ing.bakery.scaladsl.EndpointConfig
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.{Request, Uri}

import scala.collection.immutable.Seq
import scala.collection.JavaConverters._
import scala.compat.java8.FunctionConverters._
import scala.compat.java8.FutureConverters
Expand Down Expand Up @@ -54,7 +55,7 @@ object BakerClient {
EndpointConfig(hosts.asScala.map(Uri.unsafeFromString).toIndexedSeq, apiUrlPrefix, apiLoggingEnabled),
if (fallbackHosts.size == 0) None
else Some(EndpointConfig(fallbackHosts.asScala.map(Uri.unsafeFromString).toIndexedSeq, fallbackApiUrlPrefix, apiLoggingEnabled)),
filters = filters.asScala.map(_.asScala))
filters = filters.asScala.map(_.asScala).toIndexedSeq)
}
.allocated
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.http4s.client.Client
import org.http4s.client.blaze._
import org.http4s.client.dsl.io._

import scala.collection.immutable.Seq
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NoStackTrace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.http4s.blaze.pipeline.Command
import org.scalatest.ConfigMap
import org.scalatest.compatible.Assertion

import scala.collection.immutable.Seq
import scala.concurrent.Future

class RemoteInteractionSpec extends BakeryFunSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.matchers.should.Matchers

import java.net.InetSocketAddress
import java.util.UUID
import scala.collection.immutable.Seq
import scala.concurrent.Future

class StateRuntimeSpec extends BakeryFunSpec with Matchers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.ing.baker.recipe.common.InteractionFailureStrategy.{BlockInteraction,
import com.ing.baker.recipe.scaladsl.{Event, Ingredient, Interaction, Recipe}
import com.ing.bakery.recipe.Events.{ItemsReserved, OrderHadUnavailableItems, OrderPlaced, ReserveItemsOutput}

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.concurrent.duration._

Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ lazy val `bakery-state`: Project = project.in(file("bakery/state"))
akkaClusterMetrics,
akkaDiscovery,
akkaDiscoveryKube,
akkaPki,
http4s,
http4sDsl,
http4sCirce,
Expand Down Expand Up @@ -428,6 +429,7 @@ lazy val `baker-example`: Project = project
) ++ testDeps(
scalaTest,
scalaCheck,
mockitoScala,
junitInterface,
slf4jApi
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._

import java.util.{List => JavaList}
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.language.postfixOps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
def handleRejections: Receive = {
case rejection: ProcessInstanceProtocol.TransitionNotEnabled =>
throw new IllegalArgumentException(s"IMMINENT BUG: $rejection should be transformed into a FiringLimitMet rejection")
case rejection: ProcessInstanceProtocol.AlreadyReceived
case rejection: ProcessInstanceProtocol.AlreadyReceived =>
throw new IllegalArgumentException(s"IMMINENT BUG: $rejection should be transformed into a AlreadyReceived rejection")
case rejection: ProcessInstanceProtocol.Uninitialized
case rejection: ProcessInstanceProtocol.Uninitialized =>
throw new IllegalArgumentException(s"IMMINENT BUG: $rejection should be transformed into a NoSuchProcess rejection")
case rejection: FireSensoryEventRejection =>
rejectWith(rejection)
Expand All @@ -127,23 +127,23 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing
handleRejections orElse {
case recipe: CompiledRecipe =>
context.become(waitForFirstEvent(recipe))
case ReceiveTimeout
case ReceiveTimeout =>
log.debug("Timeout on SensoryEventResponseHandler when expecting a compiled recipe")
stopActor()
case message
case message =>
log.debug(s"Unexpected message $message on SensoryEventResponseHandler when expecting a compiled recipe")
stopActor()
}

def waitForFirstEvent(recipe: CompiledRecipe): Receive =
handleRejections orElse {
case firstEvent: TransitionFired
case firstEvent: TransitionFired =>
notifyReceive(recipe)
context.become(streaming(firstEvent.newJobsIds - firstEvent.jobId, List(firstEvent)))
case ReceiveTimeout
case ReceiveTimeout =>
log.debug("Timeout on SensoryEventResponseHandler when expecting the first transition fired event")
stopActor()
case message
case message =>
log.debug(s"Unexpected message $message on SensoryEventResponseHandler when expecting the first transition fired event")
stopActor()
}
Expand All @@ -165,16 +165,16 @@ class SensoryEventResponseHandler(receiver: ActorRef, command: ProcessEvent, ing

case _ =>
val pf: PartialFunction[Any, Unit] = {
case event: TransitionFired
case event: TransitionFired =>
context.become(streaming(runningJobs ++ event.newJobsIds - event.jobId, event :: cache))
case event: TransitionFailed if event.strategy.isRetry && waitForRetries
case event: TransitionFailed if event.strategy.isRetry && waitForRetries =>
context.become(streaming(runningJobs, event :: cache))
case event: TransitionFailed
case event: TransitionFailed =>
context.become(streaming(runningJobs - event.jobId, event :: cache))
case ReceiveTimeout
case ReceiveTimeout =>
log.debug("Timeout on SensoryEventResponseHandler when streaming")
stopActor()
case message
case message =>
log.debug(s"Unexpected message $message on SensoryEventResponseHandler when streaming")
stopActor()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
filterIngredientValues(state, settings.ingredientsFilter)
case _ => instance.state
},
instance.jobs.mapValues(mapJobsToProtocol).map(identity))
instance.jobs.view.map { case (key, value) => (key, mapJobsToProtocol(value))}.toMap
)
}

private def mapJobsToProtocol(job: internal.Job[P, T, S]): protocol.JobState =
Expand All @@ -105,7 +106,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}

def uninitialized: Receive = {
case Initialize(initialMarking, state)
case Initialize(initialMarking, state) =>

val uninitialized = Instance.uninitialized[P, T, S](petriNet)
val event = InitializedEvent(initialMarking, state)
Expand All @@ -115,7 +116,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
eventSource.apply(uninitialized)
.andThen(step)
.andThen {
case (updatedInstance, _)
case (updatedInstance, _) =>

// notifies the sender that initialization was successful
sender() ! Initialized(initialMarking, state)
Expand All @@ -125,10 +126,10 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}
}

case Stop(_)
case Stop(_) =>
context.stop(context.self)

case c: Command
case c: Command =>
log.warning(s"Received unexpected command in uninitialized state: ${c.getClass.getName}")
sender() ! Uninitialized(recipeInstanceId)
context.stop(context.self)
Expand Down Expand Up @@ -164,7 +165,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
def running(instance: Instance[P, T, S],
scheduledRetries: Map[Long, Cancellable]): Receive = {

case Stop(deleteHistory)
case Stop(deleteHistory) =>
scheduledRetries.values.foreach(_.cancel())
if (deleteHistory) {
log.debug("Deleting recipe instance history")
Expand All @@ -187,7 +188,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
log.warning("Receive timeout happened but jobs are still active: will wait for another receive timeout")
}

case GetState
case GetState =>
val instanceState: InstanceState = mapStateToProtocol(instance)
instanceState.state match {
case state: RecipeInstanceState =>
Expand All @@ -197,7 +198,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}


case event@TransitionFiredEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output)
case event@TransitionFiredEvent(jobId, transitionId, correlationId, timeStarted, timeCompleted, consumed, produced, output) =>

val transition = instance.petriNet.transitions.getById(transitionId)

Expand All @@ -208,7 +209,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
eventSource.apply(instance)
.andThen(step)
.andThen {
case (updatedInstance, newJobs)
case (updatedInstance, newJobs) =>

// the sender is notified of the transition having fired
sender() ! TransitionFired(jobId, transitionId, correlationId, consumed, produced, newJobs.map(_.id), output)
Expand All @@ -218,14 +219,14 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}
)

case event@TransitionFailedEvent(jobId, transitionId, correlationId, timeStarted, timeFailed, consume, input, reason, strategy)
case event@TransitionFailedEvent(jobId, transitionId, correlationId, timeStarted, timeFailed, consume, input, reason, strategy) =>

val transition = instance.petriNet.transitions.getById(transitionId)

log.transitionFailed(recipeInstanceId, transition.asInstanceOf[Transition], jobId, timeStarted, timeFailed, reason)

strategy match {
case RetryWithDelay(delay)
case RetryWithDelay(delay) =>

log.scheduleRetry(recipeInstanceId, transition.asInstanceOf[Transition], delay)

Expand All @@ -234,7 +235,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
// persist the failure event
persistEvent(instance, event)(
eventSource.apply(instance)
.andThen { updatedInstance
.andThen { updatedInstance =>

// a retry is scheduled on the scheduler of the actor system
val retry = system.scheduler.scheduleOnce(delay milliseconds) {
Expand All @@ -256,21 +257,21 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
persistEvent(instance, transitionFiredEvent)(
eventSource.apply(instance)
.andThen(step)
.andThen { case (updatedInstance, newJobs)
.andThen { case (updatedInstance, newJobs) =>
sender() ! TransitionFired(jobId, transitionId, correlationId, consume, marshallMarking(produced), newJobs.map(_.id), out)
context become running(updatedInstance, scheduledRetries - jobId)
})

case _
case _ =>
persistEvent(instance, event)(
eventSource.apply(instance)
.andThen { updatedInstance
.andThen { updatedInstance =>
sender() ! TransitionFailed(jobId, transitionId, correlationId, consume, input, reason, mapExceptionStrategyToProtocol(strategy))
context become running(updatedInstance, scheduledRetries - jobId)
})
}

case FireTransition(transitionId, input, correlationIdOption)
case FireTransition(transitionId, input, correlationIdOption) =>

/**
* TODO
Expand All @@ -285,18 +286,18 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
sender() ! FireSensoryEventRejection.AlreadyReceived(recipeInstanceId, correlationId)
case _ =>
runtime.createJob(transition, input, correlationIdOption).run(instance).value match {
case (updatedInstance, Right(job))
case (updatedInstance, Right(job)) =>
executeJob(job, sender())
context become running(updatedInstance, scheduledRetries)
case (_, Left(reason))
case (_, Left(reason)) =>

log.fireTransitionRejected(recipeInstanceId, transition.asInstanceOf[Transition], reason)

sender() ! FireSensoryEventRejection.FiringLimitMet(recipeInstanceId)
}
}

case Initialize(_, _)
case Initialize(_, _) =>
sender() ! AlreadyInitialized(recipeInstanceId)

case OverrideExceptionStrategy(jobId, protocol.ExceptionStrategy.RetryWithDelay(timeout)) =>
Expand Down Expand Up @@ -391,14 +392,14 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
def step(instance: Instance[P, T, S]): (Instance[P, T, S], Set[Job[P, T, S]]) = {

runtime.allEnabledJobs.run(instance).value match {
case (updatedInstance, jobs)
case (updatedInstance, jobs) =>

if (jobs.isEmpty && updatedInstance.activeJobs.isEmpty)
settings.idleTTL.foreach { ttl
settings.idleTTL.foreach { ttl =>
system.scheduler.scheduleOnce(ttl, context.self, IdleStop(updatedInstance.sequenceNr))
}

jobs.foreach(job executeJob(job, sender()))
jobs.foreach(job => executeJob(job, sender()))
(updatedInstance, jobs)
}
}
Expand All @@ -424,7 +425,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](

def scheduleFailedJobsForRetry(instance: Instance[P, T, S]): Map[Long, Cancellable] = {
instance.jobs.values.foldLeft(Map.empty[Long, Cancellable]) {
case (map, j@Job(_, _, _, _, _, _, Some(internal.ExceptionState(failureTime, _, _, RetryWithDelay(delay)))))
case (map, j@Job(_, _, _, _, _, _, Some(internal.ExceptionState(failureTime, _, _, RetryWithDelay(delay))))) =>
val newDelay = failureTime + delay - System.currentTimeMillis()
if (newDelay < 0) {
executeJob(j, sender())
Expand All @@ -436,7 +437,7 @@ class ProcessInstance[P: Identifiable, T: Identifiable, S, E](
}
map + (j.id -> cancellable)
}
case (acc, _) acc
case (acc, _) => acc
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ object ProcessInstanceEventSourcing {
case class InitializedEvent(marking: Marking[Id],
state: Any) extends Event

def apply[P : Identifiable, T : Identifiable, S, E](sourceFn: T (S E S)): Instance[P, T, S] Event Instance[P, T, S] = instance {
case InitializedEvent(initial, initialState)
def apply[P : Identifiable, T : Identifiable, S, E](sourceFn: T => (S => E => S)): Instance[P, T, S] => Event => Instance[P, T, S] = instance => {
case InitializedEvent(initial, initialState) =>

val initialMarking: Marking[P] = initial.unmarshall(instance.petriNet.places)

Instance[P, T, S](instance.petriNet, 1, initialMarking, initialState.asInstanceOf[S], Map.empty, Set.empty)
case e: TransitionFiredEvent
case e: TransitionFiredEvent =>

val transition = instance.petriNet.transitions.getById(e.transitionId)
val newState = sourceFn(transition)(instance.state)(e.output.asInstanceOf[E])
Expand All @@ -72,7 +72,7 @@ object ProcessInstanceEventSourcing {
state = newState,
jobs = instance.jobs - e.jobId
)
case e: TransitionFailedEvent
case e: TransitionFailedEvent =>
val transition = instance.petriNet.transitions.getById(e.transitionId)

val consumed: Marking[P] = e.consume.unmarshall(instance.petriNet.places)
Expand All @@ -91,7 +91,7 @@ object ProcessInstanceEventSourcing {
topology: PetriNet[P, T],
encryption: Encryption,
readJournal: CurrentEventsByPersistenceIdQuery,
eventSourceFn: T (S E S))(implicit actorSystem: ActorSystem): Source[(Instance[P, T, S], Event), NotUsed] = {
eventSourceFn: T => (S => E => S))(implicit actorSystem: ActorSystem): Source[(Instance[P, T, S], Event), NotUsed] = {

val serializer = new ProcessInstanceSerialization[P, T, S, E](AkkaSerializerProvider(actorSystem, encryption))

Expand All @@ -101,7 +101,7 @@ object ProcessInstanceEventSourcing {

// TODO: remove null value
src.scan[(Instance[P, T, S], Event)]((Instance.uninitialized[P, T, S](topology), null.asInstanceOf[Event])) {
case ((instance, _), e)
case ((instance, _), e) =>
val serializedEvent = e.event.asInstanceOf[AnyRef]
val deserializedEvent = serializer.deserializeEvent(serializedEvent)(instance)
val updatedInstance = eventSource.apply(instance)(deserializedEvent)
Expand Down Expand Up @@ -137,10 +137,10 @@ abstract class ProcessInstanceEventSourcing[P : Identifiable, T : Identifiable,
}

override def receiveRecover: Receive = {
case e: protobuf.Initialized applyToRecoveringState(e)
case e: protobuf.TransitionFired applyToRecoveringState(e)
case e: protobuf.TransitionFailed applyToRecoveringState(e)
case RecoveryCompleted
case e: protobuf.Initialized => applyToRecoveringState(e)
case e: protobuf.TransitionFired => applyToRecoveringState(e)
case e: protobuf.TransitionFailed => applyToRecoveringState(e)
case RecoveryCompleted =>
if (recoveringState.sequenceNr > 0)
onRecoveryCompleted(recoveringState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ object ProcessInstanceProtocol {
exceptionState: Option[ExceptionState]) {

def isActive: Boolean = exceptionState match {
case Some(ExceptionState(_, _, ExceptionStrategy.RetryWithDelay(_))) true
case None true
case _ false
case Some(ExceptionState(_, _, ExceptionStrategy.RetryWithDelay(_))) => true
case None => true
case _ => false
}
}

Expand Down
Loading

0 comments on commit 9ca269e

Please sign in to comment.