Skip to content

Commit

Permalink
Add BIOAsync.fromFuture, .fromFutureJava (#730)
Browse files Browse the repository at this point in the history
  • Loading branch information
neko-kai authored Nov 17, 2019
1 parent 8bb50c1 commit d6a1734
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package izumi.functional.bio.impl

import java.util.concurrent.{CompletionException, CompletionStage}

import izumi.functional.bio.BIOExit.ZIOExit
import izumi.functional.bio.{BIO, BIOAsync, BIOExit, BIOFiber}
import zio.clock.Clock
import zio.duration.Duration.fromScala
import zio.{ZIO, ZSchedule}
import zio.{Task, ZIO, ZSchedule}

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionException, Future}
import scala.util.Try

object BIOZio extends BIOZio[Any]
Expand All @@ -28,7 +31,6 @@ class BIOZio[R] extends BIO[ZIO[R, +?, +?]] {
@inline override final def map[E, A, B](r: IO[E, A])(f: A => B): IO[E, B] = r.map(f)
@inline override final def as[E, A, B](r: IO[E, A])(v: => B): IO[E, B] = r.as(v)


@inline override final def tapError[E, A, E1 >: E](r: IO[E, A])(f: E => IO[E1, Unit]): IO[E1, A] = r.tapError(f)
@inline override final def leftMap[E, A, E2](r: IO[E, A])(f: E => E2): IO[E2, A] = r.mapError(f)
@inline override final def leftFlatMap[E, A, E2](r: IO[E, A])(f: E => IO[Nothing, E2]): IO[E2, A] = r.flatMapError(f)
Expand Down Expand Up @@ -118,6 +120,44 @@ class BIOAsyncZio[R](clockService: Clock) extends BIOZio[R] with BIOAsync[ZIO[R,
}
}

@inline override final def fromFuture[A](future: => Future[A]): IO[Throwable, A] = {
ZIO.fromFuture(_ => future)
}

@inline override final def fromFutureJava[A](javaFuture: => CompletionStage[A]): IO[Throwable, A] = {
def unwrapDone[T](isFatal: Throwable => Boolean)(f: java.util.concurrent.Future[T]): Task[T] = {
try Task.succeed(f.get()) catch catchFromGet(isFatal)
}
def catchFromGet(isFatal: Throwable => Boolean): PartialFunction[Throwable, Task[Nothing]] = {
case e: CompletionException =>
Task.fail(e.getCause)
case e: ExecutionException =>
Task.fail(e.getCause)
case _: InterruptedException =>
Task.interrupt
case e if !isFatal(e) =>
Task.fail(e)
}
ZIO.effect(javaFuture).flatMap[R, Throwable, A](javaFuture => Task.effectSuspendTotalWith { p =>
val cf = javaFuture.toCompletableFuture
if (cf.isDone) {
unwrapDone(p.fatal)(cf)
} else {
Task.effectAsync {
cb =>
val _ = javaFuture.handle[Unit] {
(v: A, t: Throwable) =>
val io = Option(t).fold[Task[A]](Task.succeed(v)) {
t =>
catchFromGet(p.fatal).lift(t).getOrElse(Task.die(t))
}
cb(io)
}
}
}
})
}

@inline override final def uninterruptible[E, A](r: IO[E, A]): IO[E, A] = {
r.uninterruptible
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package izumi.functional

import java.util.concurrent.CompletionStage

import cats.~>
import izumi.functional.bio.impl.{BIOAsyncZio, BIOZio}
import izumi.functional.mono.{Clock, Entropy, SyncSafe}
import zio.ZIO

import scala.concurrent.Future
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.language.implicitConversions
import scala.util.Try
Expand Down Expand Up @@ -191,6 +194,9 @@ package object bio extends BIOSyntax {
@inline def asyncF[E, A](register: (Either[E, A] => Unit) => F[E, Unit]): F[E, A]
@inline def asyncCancelable[E, A](register: (Either[E, A] => Unit) => Canceler): F[E, A]

@inline def fromFuture[A](future: => Future[A]): F[Throwable, A]
@inline def fromFutureJava[A](javaFuture: => CompletionStage[A]): F[Throwable, A]

@inline def yieldNow: F[Nothing, Unit]
@inline def never: F[Nothing, Nothing] = async(_ => ())

Expand Down

0 comments on commit d6a1734

Please sign in to comment.