Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse memory usage by ProcessIndex Actors via reusing cache between s… #1173

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions bakery/state/src/main/scala/com/ing/bakery/baker/Bakery.scala
Original file line number Diff line number Diff line change
@@ -1,30 +1,29 @@
package com.ing.bakery.baker

import java.io.File

import akka.actor.ActorSystem
import akka.cluster.Cluster
import cats.effect.{ContextShift, IO, Resource, Timer}
import com.ing.baker.runtime.akka.{AkkaBaker, AkkaBakerConfig}
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.recipe_manager.{ActorBasedRecipeManager, DefaultRecipeManager, RecipeManager}
import com.ing.baker.runtime.recipe_manager.{ActorBasedRecipeManager, RecipeManager}
import com.ing.baker.runtime.scaladsl.Baker
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import io.prometheus.client.CollectorRegistry
import org.http4s.metrics.prometheus.Prometheus

import java.io.File
import scala.concurrent.ExecutionContext

case class Bakery(baker: Baker,
executionContext: ExecutionContext,
system: ActorSystem)
executionContext: ExecutionContext,
system: ActorSystem)

object Bakery extends LazyLogging {

def resource(externalContext: Option[Any] = None,
interactionManager: Option[InteractionManager[IO]] = None,
recipeManager: Option[RecipeManager] = None) : Resource[IO, Bakery] = {
recipeManager: Option[RecipeManager] = None): Resource[IO, Bakery] = {
val configPath = sys.env.getOrElse("CONFIG_DIRECTORY", "/opt/docker/conf")
val config = ConfigFactory.load(ConfigFactory.parseFile(new File(s"$configPath/application.conf")))
val bakerConfig = config.getConfig("baker")
Expand All @@ -49,13 +48,16 @@ object Bakery extends LazyLogging {
interactions <- InteractionRegistry.resource(externalContext, config, system)
baker = AkkaBaker.apply(
AkkaBakerConfig(
externalContext = externalContext,
interactions = interactionManager.getOrElse(interactions),
recipeManager = recipeManager.getOrElse(ActorBasedRecipeManager.getRecipeManagerActor(system, config)),
bakerActorProvider = AkkaBakerConfig.bakerProviderFrom(config),
timeouts = AkkaBakerConfig.Timeouts.apply(config),
bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings.from(config))(system))
_ <- Resource.make(IO{baker})(baker => IO.fromFuture(IO(baker.gracefulShutdown)))
externalContext = externalContext,
interactions = interactionManager.getOrElse(interactions),
recipeManager = recipeManager.getOrElse(ActorBasedRecipeManager.getRecipeManagerActor(system, config)),
bakerActorProvider = AkkaBakerConfig.bakerProviderFrom(config),
timeouts = AkkaBakerConfig.Timeouts.apply(config),
bakerValidationSettings = AkkaBakerConfig.BakerValidationSettings.from(config),
recipeCacheSize = config.getInt("baker.recipe-cache-size"))(system))
_ <- Resource.make(IO {
baker
})(baker => IO.fromFuture(IO(baker.gracefulShutdown)))
_ <- Resource.eval(eventSink.attach(baker))
_ <- Resource.eval(IO.async[Unit] { callback =>
//If using local Baker the registerOnMemberUp is never called, should onl be used during local testing.
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ val dependencyOverrideSettings: Seq[Setting[_]] = Seq(
snakeYaml,
jacksonDatabind,
bouncyCastleBcprov,
bouncyCastleBcpkix
bouncyCastleBcpkix,
scalaJava8Compat
)
)

Expand Down Expand Up @@ -126,7 +127,8 @@ lazy val `baker-interface`: Project = project.in(file("core/baker-interface"))
fs2Io,
scalaJava8Compat,
javaxInject,
guava
guava,
scaffeine
) ++ providedDeps(findbugs) ++ testDeps(
scalaTest,
scalaCheckPlusMockito,
Expand Down
3 changes: 3 additions & 0 deletions core/akka-runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ baker {
# the timeout for refreshing the local recipe cache
process-index-update-cache-timeout = 5 seconds

# Maximum size for caffeine cache used for recipes
recipe-cache-size = 1000

# the default timeout for Baker.processEvent(..)
process-event-timeout = 10 seconds

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.ing.baker.runtime.akka

import java.util.{List => JavaList}

import akka.actor.{Actor, ActorRef, ActorSystem, Address, Props}
import akka.pattern.{FutureRef, ask}
import akka.util.Timeout
Expand All @@ -11,6 +9,7 @@ import com.ing.baker.il._
import com.ing.baker.il.failurestrategy.ExceptionStrategyOutcome
import com.ing.baker.runtime.akka.actor._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_index.RecipeCache
import com.ing.baker.runtime.akka.actor.process_instance.ProcessInstanceProtocol.{Initialized, InstanceState, Uninitialized}
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol
import com.ing.baker.runtime.akka.actor.recipe_manager.RecipeManagerProtocol.RecipeFound
Expand All @@ -25,6 +24,7 @@ import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._

import java.util.{List => JavaList}
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand Down Expand Up @@ -74,15 +74,19 @@ object AkkaBaker {
* For each recipe a new instance can be baked, sensory events can be send and state can be inquired upon
*/
class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker with LazyLogging {

import config.system

config.bakerActorProvider.initialize(system)

private val recipeManager: RecipeManager =
config.recipeManager

private val recipeCache: RecipeCache =
new RecipeCache(recipeManager, config.timeouts.updateCacheTimeout, config.recipeCacheSize)

private val processIndexActor: ActorRef =
config.bakerActorProvider.createProcessIndexActor(config.interactions, recipeManager)
config.bakerActorProvider.createProcessIndexActor(config.interactions, recipeCache)

/**
* Adds a recipe to baker and returns a recipeId for the recipe.
Expand Down Expand Up @@ -137,7 +141,7 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
// here we ask the RecipeManager actor to return us the recipe for the given id
recipeManager.get(recipeId).flatMap {
case Some(r: RecipeRecord) =>
getImplementationErrors(r.recipe).map(errors => RecipeInformation(r.recipe, r.updated, errors, r.validate ))
getImplementationErrors(r.recipe).map(errors => RecipeInformation(r.recipe, r.updated, errors, r.validate))
case None =>
Future.failed(NoSuchRecipeException(recipeId))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import com.ing.baker.runtime.serialization.Encryption
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ValueReader

import scala.concurrent.duration._

Expand All @@ -22,7 +21,8 @@ case class AkkaBakerConfig(
interactions: InteractionManager[IO],
recipeManager: RecipeManager,
timeouts: AkkaBakerConfig.Timeouts,
bakerValidationSettings: BakerValidationSettings
bakerValidationSettings: BakerValidationSettings,
recipeCacheSize: Int
)(implicit val system: ActorSystem)

object AkkaBakerConfig extends LazyLogging {
Expand All @@ -39,10 +39,11 @@ object AkkaBakerConfig extends LazyLogging {
}

case class Timeouts(defaultBakeTimeout: FiniteDuration,
defaultProcessEventTimeout: FiniteDuration,
defaultInquireTimeout: FiniteDuration,
defaultShutdownTimeout: FiniteDuration,
defaultAddRecipeTimeout: FiniteDuration)
defaultProcessEventTimeout: FiniteDuration,
defaultInquireTimeout: FiniteDuration,
defaultShutdownTimeout: FiniteDuration,
defaultAddRecipeTimeout: FiniteDuration,
updateCacheTimeout: FiniteDuration)

object Timeouts {

Expand All @@ -53,6 +54,7 @@ object AkkaBakerConfig extends LazyLogging {
defaultInquireTimeout = 10.seconds,
defaultShutdownTimeout = 30.seconds,
defaultAddRecipeTimeout = 10.seconds,
updateCacheTimeout = 10.seconds
)

def apply(config: Config): Timeouts =
Expand All @@ -61,7 +63,8 @@ object AkkaBakerConfig extends LazyLogging {
defaultProcessEventTimeout = config.as[FiniteDuration]("baker.process-event-timeout"),
defaultInquireTimeout = config.as[FiniteDuration]("baker.process-inquire-timeout"),
defaultShutdownTimeout = config.as[FiniteDuration]("baker.shutdown-timeout"),
defaultAddRecipeTimeout = config.as[FiniteDuration]("baker.add-recipe-timeout")
defaultAddRecipeTimeout = config.as[FiniteDuration]("baker.add-recipe-timeout"),
updateCacheTimeout = config.as[FiniteDuration]("baker.process-index-update-cache-timeout")
)
}

Expand All @@ -87,7 +90,8 @@ object AkkaBakerConfig extends LazyLogging {
bakerValidationSettings = BakerValidationSettings.default,
bakerActorProvider = localProvider,
interactions = interactions,
recipeManager = DefaultRecipeManager.pollingAware(actorSystem.dispatcher)
recipeManager = DefaultRecipeManager.pollingAware(actorSystem.dispatcher),
recipeCacheSize = 10000
)(actorSystem)
}

Expand All @@ -114,7 +118,8 @@ object AkkaBakerConfig extends LazyLogging {
bakerValidationSettings = BakerValidationSettings.default,
bakerActorProvider = clusterProvider,
interactions = interactions,
recipeManager = ActorBasedRecipeManager.clusterBasedRecipeManagerActor(actorSystem, Timeouts.default)
recipeManager = ActorBasedRecipeManager.clusterBasedRecipeManagerActor(actorSystem, Timeouts.default),
recipeCacheSize = 10000
)(actorSystem)
}

Expand All @@ -123,12 +128,13 @@ object AkkaBakerConfig extends LazyLogging {
throw new IllegalStateException("You must 'include baker.conf' in your application.conf")

AkkaBakerConfig(
None,
externalContext = None,
timeouts = Timeouts.apply(config),
bakerValidationSettings = BakerValidationSettings.from(config),
bakerActorProvider = bakerProviderFrom(config),
interactions = interactions,
recipeManager = recipeManager
recipeManager = recipeManager,
recipeCacheSize = config.getInt("baker.recipe-cache-size")
)(actorSystem)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package com.ing.baker.runtime.akka.actor

import akka.actor.{ActorRef, ActorSystem}
import cats.effect.IO
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexActor.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.RecipeCache
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.recipe_manager.RecipeManager

Expand All @@ -12,7 +13,7 @@ trait BakerActorProvider extends {

def initialize(implicit system: ActorSystem): Unit

def createProcessIndexActor(interactionManager: InteractionManager[IO], recipeManager: RecipeManager)(implicit actorSystem: ActorSystem) : ActorRef
def createProcessIndexActor(interactionManager: InteractionManager[IO], recipeCache: RecipeCache)(implicit actorSystem: ActorSystem) : ActorRef

def getAllProcessesMetadata(actorRef: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration): Seq[ActorMetadata]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import cats.effect.IO
import com.ing.baker.il.sha256HashCode
import com.ing.baker.runtime.akka.AkkaBakerConfig
import com.ing.baker.runtime.akka.actor.ClusterBakerActorProvider._
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexActor.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol._
import com.ing.baker.runtime.akka.actor.process_index._
import com.ing.baker.runtime.akka.actor.serialization.BakerSerializable
Expand Down Expand Up @@ -95,11 +95,11 @@ class ClusterBakerActorProvider(


override def createProcessIndexActor(interactionManager: InteractionManager[IO],
recipeManager: RecipeManager)(implicit actorSystem: ActorSystem): ActorRef = {
recipeCache: RecipeCache)(implicit actorSystem: ActorSystem): ActorRef = {
val roles = Cluster(actorSystem).selfRoles
ClusterSharding(actorSystem).start(
typeName = "ProcessIndexActor",
entityProps = ProcessIndex.props(actorIdleTimeout, Some(retentionCheckInterval), configuredEncryption, interactionManager, recipeManager, ingredientsFilter),
entityProps = ProcessIndexActor.props(recipeCache, actorIdleTimeout, Some(retentionCheckInterval), configuredEncryption, interactionManager, ingredientsFilter),
settings = {
if (roles.contains("state-node"))
ClusterShardingSettings(actorSystem).withRole("state-node")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package com.ing.baker.runtime.akka.actor
import akka.actor.{ActorRef, ActorSystem}
import cats.effect.IO
import com.ing.baker.runtime.akka.AkkaBakerConfig
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndex.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.{ProcessIndexActor, RecipeCache}
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexActor.ActorMetadata
import com.ing.baker.runtime.akka.actor.process_index.ProcessIndexProtocol.{GetIndex, Index}
import com.ing.baker.runtime.model.InteractionManager
import com.ing.baker.runtime.recipe_manager.RecipeManager
Expand All @@ -22,17 +22,16 @@ class LocalBakerActorProvider(
) extends BakerActorProvider {
override def initialize(implicit system: ActorSystem): Unit = Unit

override def createProcessIndexActor(interactionManager: InteractionManager[IO], recipeManager: RecipeManager)(
override def createProcessIndexActor(interactionManager: InteractionManager[IO], recipeCache: RecipeCache)(
implicit actorSystem: ActorSystem): ActorRef = {
actorSystem.actorOf(
ProcessIndex.props(
ProcessIndexActor.props(
recipeCache,
actorIdleTimeout,
Some(retentionCheckInterval),
configuredEncryption,
interactionManager,
recipeManager,
ingredientsFilter
))
ingredientsFilter))
}

override def getAllProcessesMetadata(actorRef: ActorRef)(implicit system: ActorSystem, timeout: FiniteDuration): Seq[ActorMetadata] = {
Expand Down
Loading