From d6a17341f7d944ecac2bb5edba0356e532980c13 Mon Sep 17 00:00:00 2001 From: Kai <450507+neko-kai@users.noreply.github.com> Date: Sun, 17 Nov 2019 01:51:54 +0000 Subject: [PATCH] Add BIOAsync.fromFuture, .fromFutureJava (#730) --- .../izumi/functional/bio/impl/BIOZio.scala | 44 ++++++++++++++++++- .../scala/izumi/functional/bio/package.scala | 6 +++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/impl/BIOZio.scala b/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/impl/BIOZio.scala index f50136381b..34297bfd38 100644 --- a/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/impl/BIOZio.scala +++ b/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/impl/BIOZio.scala @@ -1,12 +1,15 @@ package izumi.functional.bio.impl +import java.util.concurrent.{CompletionException, CompletionStage} + import izumi.functional.bio.BIOExit.ZIOExit import izumi.functional.bio.{BIO, BIOAsync, BIOExit, BIOFiber} import zio.clock.Clock import zio.duration.Duration.fromScala -import zio.{ZIO, ZSchedule} +import zio.{Task, ZIO, ZSchedule} import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{ExecutionException, Future} import scala.util.Try object BIOZio extends BIOZio[Any] @@ -28,7 +31,6 @@ class BIOZio[R] extends BIO[ZIO[R, +?, +?]] { @inline override final def map[E, A, B](r: IO[E, A])(f: A => B): IO[E, B] = r.map(f) @inline override final def as[E, A, B](r: IO[E, A])(v: => B): IO[E, B] = r.as(v) - @inline override final def tapError[E, A, E1 >: E](r: IO[E, A])(f: E => IO[E1, Unit]): IO[E1, A] = r.tapError(f) @inline override final def leftMap[E, A, E2](r: IO[E, A])(f: E => E2): IO[E2, A] = r.mapError(f) @inline override final def leftFlatMap[E, A, E2](r: IO[E, A])(f: E => IO[Nothing, E2]): IO[E2, A] = r.flatMapError(f) @@ -118,6 +120,44 @@ class BIOAsyncZio[R](clockService: Clock) extends BIOZio[R] with BIOAsync[ZIO[R, } } + @inline override final def fromFuture[A](future: => Future[A]): IO[Throwable, A] = { + ZIO.fromFuture(_ => future) + } + + @inline override final def fromFutureJava[A](javaFuture: => CompletionStage[A]): IO[Throwable, A] = { + def unwrapDone[T](isFatal: Throwable => Boolean)(f: java.util.concurrent.Future[T]): Task[T] = { + try Task.succeed(f.get()) catch catchFromGet(isFatal) + } + def catchFromGet(isFatal: Throwable => Boolean): PartialFunction[Throwable, Task[Nothing]] = { + case e: CompletionException => + Task.fail(e.getCause) + case e: ExecutionException => + Task.fail(e.getCause) + case _: InterruptedException => + Task.interrupt + case e if !isFatal(e) => + Task.fail(e) + } + ZIO.effect(javaFuture).flatMap[R, Throwable, A](javaFuture => Task.effectSuspendTotalWith { p => + val cf = javaFuture.toCompletableFuture + if (cf.isDone) { + unwrapDone(p.fatal)(cf) + } else { + Task.effectAsync { + cb => + val _ = javaFuture.handle[Unit] { + (v: A, t: Throwable) => + val io = Option(t).fold[Task[A]](Task.succeed(v)) { + t => + catchFromGet(p.fatal).lift(t).getOrElse(Task.die(t)) + } + cb(io) + } + } + } + }) + } + @inline override final def uninterruptible[E, A](r: IO[E, A]): IO[E, A] = { r.uninterruptible } diff --git a/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/package.scala b/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/package.scala index 4cea460aea..35163369a7 100644 --- a/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/package.scala +++ b/fundamentals/fundamentals-bio/src/main/scala/izumi/functional/bio/package.scala @@ -1,10 +1,13 @@ package izumi.functional +import java.util.concurrent.CompletionStage + import cats.~> import izumi.functional.bio.impl.{BIOAsyncZio, BIOZio} import izumi.functional.mono.{Clock, Entropy, SyncSafe} import zio.ZIO +import scala.concurrent.Future import scala.concurrent.duration.{Duration, FiniteDuration} import scala.language.implicitConversions import scala.util.Try @@ -191,6 +194,9 @@ package object bio extends BIOSyntax { @inline def asyncF[E, A](register: (Either[E, A] => Unit) => F[E, Unit]): F[E, A] @inline def asyncCancelable[E, A](register: (Either[E, A] => Unit) => Canceler): F[E, A] + @inline def fromFuture[A](future: => Future[A]): F[Throwable, A] + @inline def fromFutureJava[A](javaFuture: => CompletionStage[A]): F[Throwable, A] + @inline def yieldNow: F[Nothing, Unit] @inline def never: F[Nothing, Nothing] = async(_ => ())