Skip to content

Commit

Permalink
Fix #727 Add RoleAppLauncher implementation for BIO, independent of c…
Browse files Browse the repository at this point in the history
…ats.LiftIO (#731)

* fix role services starting in opposite order
* change BIOAsync.fromFuture to supply execution context
  • Loading branch information
neko-kai authored Nov 17, 2019
1 parent d6a1734 commit 9b47e54
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ import java.util.concurrent.CountDownLatch

import cats.effect.{ContextShift, IO, LiftIO}
import izumi.distage.model.monadic.DIEffect
import izumi.functional.bio.{BIOAsync, F}
import izumi.fundamentals.platform.functional.Identity
import izumi.logstage.api.IzLogger

import scala.concurrent.{ExecutionContext, Promise}

trait AppShutdownStrategy[F[_]] {
def await(logger: IzLogger): F[Unit]

def release(): Unit
}

class JvmExitHookLatchShutdownStrategy extends AppShutdownStrategy[Identity] {
private val latch = new CountDownLatch(1)

private val mainLatch: CountDownLatch = new CountDownLatch(1)
private val mainLatch = new CountDownLatch(1)

override def release(): Unit = mainLatch.countDown()

Expand All @@ -27,22 +26,20 @@ class JvmExitHookLatchShutdownStrategy extends AppShutdownStrategy[Identity] {
mainLatch.await() // we need to let main thread to finish everything
}

def await(logger: IzLogger): Identity[Unit] = {
def await(logger: IzLogger): Unit = {
val shutdownHook = new Thread(() => {
stop()
}, "termination-hook-latch")

DIEffect[Identity].maybeSuspend {
logger.info("Waiting on latch...")
Runtime.getRuntime.addShutdownHook(shutdownHook)
latch.await()
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case _: IllegalStateException =>
}
logger.info("Going to shut down...")
logger.info("Waiting on latch...")
Runtime.getRuntime.addShutdownHook(shutdownHook)
latch.await()
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case _: IllegalStateException =>
}
logger.info("Going to shut down...")
}
}

Expand All @@ -56,7 +53,7 @@ class ImmediateExitShutdownStrategy[F[_] : DIEffect] extends AppShutdownStrategy
override def release(): Unit = {}
}

class CatsEffectIOShutdownStrategy[F[_] : LiftIO](executionContext: ExecutionContext) extends AppShutdownStrategy[F] {
class CatsEffectIOShutdownStrategy[F[_]: LiftIO](executionContext: ExecutionContext) extends AppShutdownStrategy[F] {
private val shutdownPromise: Promise[Unit] = Promise[Unit]()
private val mainLatch: CountDownLatch = new CountDownLatch(1)

Expand Down Expand Up @@ -95,3 +92,40 @@ class CatsEffectIOShutdownStrategy[F[_] : LiftIO](executionContext: ExecutionCon
LiftIO[F].liftIO(fio)
}
}

class BIOShutdownStrategy[F[+_, +_]: BIOAsync] extends AppShutdownStrategy[F[Throwable, ?]] {
private val shutdownPromise: Promise[Unit] = Promise[Unit]()
private val mainLatch: CountDownLatch = new CountDownLatch(1)

override def release(): Unit = {
mainLatch.countDown()
}

def stop(): Unit = {
shutdownPromise.success(())
mainLatch.await() // we need to let main thread to finish everything
}

def await(logger: IzLogger): F[Throwable, Unit] = {
val shutdownHook = new Thread(() => {
stop()
}, "termination-hook-promise")

logger.info("Waiting on latch...")
Runtime.getRuntime.addShutdownHook(shutdownHook)

val f = shutdownPromise.future

F.fromFuture { implicit ec =>
f.map[Unit] {
_ =>
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case _: IllegalStateException =>
}
logger.info("Going to shut down...")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import izumi.distage.roles.model.{AppActivation, DiAppBootstrapException}
import izumi.distage.roles.services.PluginSource.AllLoadedPlugins
import izumi.distage.roles.services.ResourceRewriter.RewriteRules
import izumi.distage.roles.services._
import izumi.functional.bio.{BIOAsync, BIOPrimitives}
import izumi.fundamentals.platform.cli.model.raw.RawAppArgs
import izumi.fundamentals.platform.cli.model.schema.ParserDef
import izumi.fundamentals.platform.functional.Identity
Expand Down Expand Up @@ -243,6 +244,10 @@ object RoleAppLauncher {
override protected val hook: AppShutdownStrategy[F] = new CatsEffectIOShutdownStrategy(executionContext)
}

abstract class LauncherBIO[F[+_, +_]: BIOAsync: BIOPrimitives](implicit tagK: TagK[F[Throwable, ?]]) extends RoleAppLauncher[F[Throwable, ?]] {
override protected val hook: AppShutdownStrategy[F[Throwable, ?]] = new BIOShutdownStrategy[F]
}

abstract class LauncherIdentity extends RoleAppLauncher[Identity] {
override protected val hook: AppShutdownStrategy[Identity] = new JvmExitHookLatchShutdownStrategy
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object RoleAppExecutor {
} yield ()
}

protected def runRoles(index: Map[String, AbstractRoleF[F]])(implicit effect: DIEffect[F]): F[Unit] = {
protected def runRoles(index: Map[String, AbstractRoleF[F]])(implicit F: DIEffect[F]): F[Unit] = {
val rolesToRun = parameters.roles.flatMap {
r =>
index.get(r.role) match {
Expand All @@ -65,39 +65,37 @@ object RoleAppExecutor {
if (rolesToRun.nonEmpty) {
lateLogger.info(s"Going to run: ${rolesToRun.size -> "roles"}")

val tt = rolesToRun.map {
val roleServices = rolesToRun.map {
case (task, cfg) =>

task.start(cfg.roleParameters, cfg.freeArgs)
task -> task.start(cfg.roleParameters, cfg.freeArgs)
}

val finalizer = (_: Unit) => {
hook.await(lateLogger)
}
val f = tt.foldLeft(finalizer) {
case (acc, role) =>
val f = roleServices.foldRight(finalizer) {
case ((role, res), acc) =>
_ =>
val loggedTask = for {
_ <- effect.maybeSuspend(lateLogger.info(s"Role is about to initialize: $role"))
_ <- role.use(acc)
_ <- effect.maybeSuspend(lateLogger.info(s"Role initialized: $role"))
_ <- F.maybeSuspend(lateLogger.info(s"Role is about to initialize: $role"))
_ <- res.use(acc)
_ <- F.maybeSuspend(lateLogger.info(s"Role initialized: $role"))
} yield ()

effect.definitelyRecover(loggedTask) {
F.definitelyRecover(loggedTask) {
t =>
for {
_ <- effect.maybeSuspend(lateLogger.error(s"Role $role failed: $t"))
_ <- effect.fail[Unit](t)
} yield ()
F.maybeSuspend(lateLogger.error(s"Role $role failed: $t"))
.flatMap(_ => F.fail[Unit](t))
}
}
f(())
} else {
effect.maybeSuspend(lateLogger.info("No services to run, exiting..."))
F.maybeSuspend(lateLogger.info("No services to run, exiting..."))
}
}

protected def runTasks(index: Map[String, Object])(implicit effect: DIEffect[F]): F[Unit] = {
protected def runTasks(index: Map[String, Object])(implicit F: DIEffect[F]): F[Unit] = {
val tasksToRun = parameters.roles.flatMap {
r =>
index.get(r.role) match {
Expand All @@ -114,19 +112,19 @@ object RoleAppExecutor {

lateLogger.info(s"Going to run: ${tasksToRun.size -> "tasks"}")

effect.traverse_(tasksToRun) {
F.traverse_(tasksToRun) {
case (task, cfg) =>
val loggedTask = for {
_ <- effect.maybeSuspend(lateLogger.info(s"Task is about to start: $task"))
_ <- F.maybeSuspend(lateLogger.info(s"Task is about to start: $task"))
_ <- task.start(cfg.roleParameters, cfg.freeArgs)
_ <- effect.maybeSuspend(lateLogger.info(s"Task finished: $task"))
_ <- F.maybeSuspend(lateLogger.info(s"Task finished: $task"))
} yield ()

effect.definitelyRecover(loggedTask) {
F.definitelyRecover(loggedTask) {
error =>
for {
_ <- effect.maybeSuspend(lateLogger.error(s"Task failed: $task, $error"))
_ <- effect.fail[Unit](error)
_ <- F.maybeSuspend(lateLogger.error(s"Task failed: $task, $error"))
_ <- F.fail[Unit](error)
} yield ()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,21 @@ class DistageTestRunner[F[_] : TagK](
}
}

private def check(testplans: Seq[DistageTest[F]], plans: SplittedPlan, effect: DIEffect[F], integLocator: Locator)(f: => F[Unit]): F[Unit] = {
private def check(testplans: Seq[DistageTest[F]], plans: SplittedPlan, F: DIEffect[F], integLocator: Locator)(f: => F[Unit]): F[Unit] = {
integrationChecker.check(plans.subRoots, integLocator) match {
case Some(value) =>
effect.traverse_(testplans) {
F.traverse_(testplans) {
test =>

effect.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Cancelled(value)))
F.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Cancelled(value)))
}

case None =>
f
}
}

private def proceed(checker: PlanCircularDependencyCheck, testplans: Seq[(DistageTest[F], OrderedPlan)], shared: SplittedPlan, sharedIntegrationLocator: Locator)(implicit effect: DIEffect[F]): F[Unit] = {
private def proceed(checker: PlanCircularDependencyCheck, testplans: Seq[(DistageTest[F], OrderedPlan)], shared: SplittedPlan, sharedIntegrationLocator: Locator)(implicit F: DIEffect[F]): F[Unit] = {
// here we produce our shared plan
checker.verify(shared.primary)
Injector.inherit(sharedIntegrationLocator).produceF[F](shared.primary).use {
Expand All @@ -129,15 +129,15 @@ class DistageTestRunner[F[_] : TagK](

// now we are ready to run each individual test
// note: scheduling here is custom also and tests may automatically run in parallel for any non-trivial monad
effect.traverse_(testplans.groupBy {
F.traverse_(testplans.groupBy {
t =>
val id = t._1.meta.id
SuiteData(id.suiteName, id.suiteId, id.suiteClassName)
}) {
case (id, plans) =>
for {
_ <- effect.maybeSuspend(reporter.beginSuite(id))
_ <- effect.traverse_(plans) {
_ <- F.maybeSuspend(reporter.beginSuite(id))
_ <- F.traverse_(plans) {
case (test, testplan) =>
val allSharedKeys = sharedLocator.allInstances.map(_.key).toSet

Expand All @@ -150,27 +150,27 @@ class DistageTestRunner[F[_] : TagK](
// we are ready to run the test, finally
testInjector.produceF[F](newtestplan.subplan).use {
integLocator =>
check(Seq(test), newtestplan, effect, integLocator) {
check(Seq(test), newtestplan, F, integLocator) {
proceedIndividual(test, newtestplan, integLocator)
}
}
}
_ <- effect.maybeSuspend(reporter.endSuite(id))
_ <- F.maybeSuspend(reporter.endSuite(id))
} yield {
}

}
}
}

private def proceedIndividual(test: DistageTest[F], newtestplan: SplittedPlan, integLocator: Locator)(implicit effect: DIEffect[F]): F[Unit] = {
private def proceedIndividual(test: DistageTest[F], newtestplan: SplittedPlan, integLocator: Locator)(implicit F: DIEffect[F]): F[Unit] = {
Injector.inherit(integLocator).produceF[F](newtestplan.primary).use {
testLocator =>
def doRun(before: Long): F[Unit] = for {

_ <- testLocator.run(test.test)
after <- effect.maybeSuspend(System.nanoTime())
_ <- effect.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Succeed(FiniteDuration(after - before, TimeUnit.NANOSECONDS))))
after <- F.maybeSuspend(System.nanoTime())
_ <- F.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Succeed(FiniteDuration(after - before, TimeUnit.NANOSECONDS))))
} yield {
}

Expand All @@ -179,13 +179,13 @@ class DistageTestRunner[F[_] : TagK](
case t: Throwable =>
val after = System.nanoTime()
reporter.testStatus(test.meta, TestStatus.Failed(t, FiniteDuration(after - before, TimeUnit.NANOSECONDS)))
effect.pure(())
F.pure(())
}

for {
before <- effect.maybeSuspend(System.nanoTime())
_ <- effect.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Running))
_ <- effect.definitelyRecoverCause(doRun(before))(doRecover(before))
before <- F.maybeSuspend(System.nanoTime())
_ <- F.maybeSuspend(reporter.testStatus(test.meta, TestStatus.Running))
_ <- F.definitelyRecoverCause(doRun(before))(doRecover(before))
} yield ()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,20 +175,20 @@ object ExternalResourceProvider {
eff <- locator.instances.find(_.key.tpe == effectTag).map(_.value.asInstanceOf[DIEffect[FakeF]])
ru <- locator.instances.find(_.key.tpe == runnerTag).map(_.value.asInstanceOf[DIEffectRunner[FakeF]])
} yield {
implicit val effect: DIEffect[FakeF] = eff
implicit val F: DIEffect[FakeF] = eff
implicit val runner: DIEffectRunner[FakeF] = ru
runner.run {
for {
_ <- effects.foldLeft(effect.maybeSuspend(logger.log(s"Running finalizers in effect type ${rt.fType}..."))) {
_ <- effects.foldLeft(F.maybeSuspend(logger.log(s"Running finalizers in effect type ${rt.fType}..."))) {
case (acc, f) =>
acc.guarantee {
for {
_ <- effect.maybeSuspend(logger.log(s"Closing ${f.key}..."))
_ <- effect.suspendF(f.effect())
_ <- F.maybeSuspend(logger.log(s"Closing ${f.key}..."))
_ <- F.suspendF(f.effect())
} yield ()
}
}
_ <- effect.maybeSuspend(logger.log(s"Finished finalizers in effect type ${rt.fType}!"))
_ <- F.maybeSuspend(logger.log(s"Finished finalizers in effect type ${rt.fType}!"))
} yield ()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import zio.duration.Duration.fromScala
import zio.{Task, ZIO, ZSchedule}

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionException, Future}
import scala.concurrent.{ExecutionContext, ExecutionException, Future}
import scala.util.Try

object BIOZio extends BIOZio[Any]
Expand Down Expand Up @@ -120,8 +120,8 @@ class BIOAsyncZio[R](clockService: Clock) extends BIOZio[R] with BIOAsync[ZIO[R,
}
}

@inline override final def fromFuture[A](future: => Future[A]): IO[Throwable, A] = {
ZIO.fromFuture(_ => future)
@inline override final def fromFuture[A](mkFuture: ExecutionContext => Future[A]): IO[Throwable, A] = {
ZIO.fromFuture(mkFuture)
}

@inline override final def fromFutureJava[A](javaFuture: => CompletionStage[A]): IO[Throwable, A] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import izumi.functional.bio.impl.{BIOAsyncZio, BIOZio}
import izumi.functional.mono.{Clock, Entropy, SyncSafe}
import zio.ZIO

import scala.concurrent.Future
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.language.implicitConversions
import scala.util.Try
Expand Down Expand Up @@ -194,7 +194,7 @@ package object bio extends BIOSyntax {
@inline def asyncF[E, A](register: (Either[E, A] => Unit) => F[E, Unit]): F[E, A]
@inline def asyncCancelable[E, A](register: (Either[E, A] => Unit) => Canceler): F[E, A]

@inline def fromFuture[A](future: => Future[A]): F[Throwable, A]
@inline def fromFuture[A](mkFuture: ExecutionContext => Future[A]): F[Throwable, A]
@inline def fromFutureJava[A](javaFuture: => CompletionStage[A]): F[Throwable, A]

@inline def yieldNow: F[Nothing, Unit]
Expand Down

0 comments on commit 9b47e54

Please sign in to comment.