Skip to content

Commit

Permalink
Bring Promise and withResolver together
Browse files Browse the repository at this point in the history
  • Loading branch information
m8nmueller committed Dec 18, 2023
1 parent 26132c9 commit 50bea78
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 101 deletions.
68 changes: 34 additions & 34 deletions jvm/src/main/scala/PosixLikeIO/PIO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ import scala.Tuple.Union
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}

object File:
extension (resolver: Future.Resolver[Int])
def toCompletionHandler = new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit = resolver.resolve(result)
override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e)
}

class File(val path: String) {
import File._

private var channel: Option[AsynchronousFileChannel] = None

def isOpened: Boolean = channel.isDefined && channel.get.isOpen
Expand All @@ -32,50 +41,41 @@ class File(val path: String) {
def read(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

val p = Promise[Int]()
channel.get.read(
buffer,
0,
buffer,
new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit = p.complete(Success(result))
override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e))
}
)
p.future
Future.withResolver[Int]: resolver =>
channel.get.read(
buffer,
0,
buffer,
resolver.toCompletionHandler
)

def readString(size: Int, charset: Charset = StandardCharsets.UTF_8): Future[String] =
assert(channel.isDefined)
assert(size >= 0)

val buffer = ByteBuffer.allocate(size)
val p = Promise[String]()
channel.get.read(
buffer,
0,
buffer,
new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit =
p.complete(Success(charset.decode(attachment.slice(0, result)).toString()))
override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e))
}
)
p.future
Future.withResolver[String]: resolver =>
channel.get.read(
buffer,
0,
buffer,
new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit =
resolver.resolve(charset.decode(attachment.slice(0, result)).toString())
override def failed(e: Throwable, attachment: ByteBuffer): Unit = resolver.reject(e)
}
)

def write(buffer: ByteBuffer): Future[Int] =
assert(channel.isDefined)

val p = Promise[Int]()
channel.get.write(
buffer,
0,
buffer,
new CompletionHandler[Integer, ByteBuffer] {
override def completed(result: Integer, attachment: ByteBuffer): Unit = p.complete(Success(result))
override def failed(e: Throwable, attachment: ByteBuffer): Unit = p.complete(Failure(e))
}
)
p.future
Future.withResolver[Int]: resolver =>
channel.get.write(
buffer,
0,
buffer,
resolver.toCompletionHandler
)

def writeString(s: String, charset: Charset = StandardCharsets.UTF_8): Future[Int] =
write(ByteBuffer.wrap(s.getBytes(charset)))
Expand Down
2 changes: 1 addition & 1 deletion shared/src/main/scala/async/CompletionGroup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class CompletionGroup extends Cancellable.Tracking:
private[async] def waitCompletion()(using Async): Unit =
synchronized:
if members.nonEmpty && cancelWait.isEmpty then cancelWait = Some(Promise())
cancelWait.foreach(cWait => cWait.future.await)
cancelWait.foreach(cWait => cWait.await)
unlink()

/** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */
Expand Down
114 changes: 59 additions & 55 deletions shared/src/main/scala/async/futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ trait Future[+T] extends Async.OriginalSource[Try[T]], Cancellable

object Future:

/** A future that is completed explicitly by calling its `complete` method. There are two public implementations
/** A future that is completed explicitly by calling its `complete` method. There are three public implementations
*
* - RunnableFuture: Completion is done by running a block of code
* - Promise.future: Completion is done by external request.
* - Promise.apply: Completion is done by external request.
* - withResolver: Completion is done by external request set up from a block of code.
*/
private class CoreFuture[+T] extends Future[T]:

Expand Down Expand Up @@ -156,47 +157,6 @@ object Future:
def apply[T](body: Async ?=> T)(using Async): Future[T] =
RunnableFuture(body)

/** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of
* resolve/reject/complete may be used and only once.
*/
trait Resolver[-T]:
/** Complete the future with a data item successfully */
def resolve(item: T): Unit = complete(Success(item))

/** Complete the future with a failure */
def reject(exc: Throwable): Unit = complete(Failure(exc))

/** Complete the future with the result, be it Success or Failure */
def complete(result: Try[T]): Unit

/** Register a cancellation handler to be called when the created future is cancelled. Note that only one handler
* may be used.
*/
def onCancel(handler: () => Unit): Unit
end Resolver

/** Create a future that may be completed asynchronously using external means.
*
* The body is run synchronously on the callers thread to setup an external asynchronous operation whose
* success/failure it communicates using the [[Resolver]] to complete the future.
*
* If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]].
*/
def withResolver[T](body: Resolver[T] => Unit): Future[T] =
val future = new CoreFuture[T] with Resolver[T] {
@volatile var cancelHandle = () => ()
override def onCancel(handler: () => Unit): Unit = cancelHandle = handler
override def complete(result: Try[T]): Unit = super.complete(result)

override def cancel(): Unit =
if setCancelled() then
cancelHandle()
reject(CancellationException())
}
body(future)
future
end withResolver

/** A future that immediately terminates with the given result */
def now[T](result: Try[T]): Future[T] =
val f = CoreFuture[T]()
Expand Down Expand Up @@ -261,23 +221,67 @@ object Future:

end extension

/** A promise defines a future that is be completed via the promise's `complete` method.
/** A promise defines a future that is be completed via the `complete` method.
*/
class Promise[T]:
private val myFuture = new CoreFuture[T]:
fut =>
override def cancel(): Unit =
if setCancelled() then fut.complete(Failure(new CancellationException()))
trait Promise[T] extends Future[T]:
inline def asFuture: Future[T] = this

/** Define the result value of `future`. */
def complete(result: Try[T]): Unit

/** The future defined by this promise */
val future: Future[T] = myFuture
object Promise:
def apply[T](): Promise[T] =
new CoreFuture[T] with Promise[T]:
override def cancel(): Unit =
if setCancelled() then complete(Failure(new CancellationException()))

/** Define the result value of `future`. However, if `future` was cancelled in the meantime complete with a
* `CancellationException` failure instead.
*/
def complete(result: Try[T]): Unit = myFuture.complete(result)
/** Define the result value of `future`. However, if `future` was cancelled in the meantime complete with a
* `CancellationException` failure instead.
*/
override def complete(result: Try[T]): Unit = super[CoreFuture].complete(result)
end Promise

/** The group of handlers to be used in [[withResolver]]. As a Future is completed only once, only one of
* resolve/reject/complete may be used and only once.
*/
trait Resolver[-T]:
/** Complete the future with a data item successfully */
def resolve(item: T): Unit = complete(Success(item))

/** Complete the future with a failure */
def reject(exc: Throwable): Unit = complete(Failure(exc))

/** Complete the future with the result, be it Success or Failure */
def complete(result: Try[T]): Unit

/** Register a cancellation handler to be called when the created future is cancelled. Note that only one handler
* may be used.
*/
def onCancel(handler: () => Unit): Unit
end Resolver

/** Create a promise that may be completed asynchronously using external means.
*
* The body is run synchronously on the callers thread to setup an external asynchronous operation whose
* success/failure it communicates using the [[Resolver]] to complete the future.
*
* If the external operation supports cancellation, the body can register one handler using [[Resolver.onCancel]].
*/
def withResolver[T](body: Resolver[T] => Unit): Promise[T] =
val future = new CoreFuture[T] with Resolver[T] with Promise[T] {
@volatile var cancelHandle = () => ()
override def onCancel(handler: () => Unit): Unit = cancelHandle = handler
override def complete(result: Try[T]): Unit = super.complete(result)

override def cancel(): Unit =
if setCancelled() then
cancelHandle()
reject(CancellationException())
}
body(future)
future
end withResolver

/** Collects a list of futures into a channel of futures, arriving as they finish. */
class Collector[T](futures: Future[T]*):
private val ch = UnboundedChannel[Future[T]]()
Expand Down
8 changes: 4 additions & 4 deletions shared/src/test/scala/CancellationBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class CancellationBehavior extends munit.FunSuite:
val promise = Future.Promise[Unit]()
Async.group:
startFuture(info, promise.complete(Success(())))
promise.future.await
promise.await
info.assertCancelled()

test("nested link group"):
Expand All @@ -89,13 +89,13 @@ class CancellationBehavior extends munit.FunSuite:
info1, {
Async.group:
startFuture(info2, promise2.complete(Success(())))
promise2.future.await
promise2.await
info2.assertCancelled()
Future.now(Success(())).await // check cancellation
promise1.complete(Success(()))
}
)
promise1.future.await
promise1.await
info1.assertCancelled()
info2.assertCancelled()

Expand Down Expand Up @@ -123,6 +123,6 @@ class CancellationBehavior extends munit.FunSuite:
Async.group:
Async.current.group.cancel() // cancel now
val f = startFuture(info, promise.complete(Success(())))
promise.future.awaitResult
promise.awaitResult
f.awaitResult
info.assertCancelled()
4 changes: 2 additions & 2 deletions shared/src/test/scala/FutureBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class FutureBehavior extends munit.FunSuite {
test("Promise can be cancelled") {
Async.blocking:
val p = Promise[Int]()
val f = p.future
val f = p.asFuture
f.cancel()
p.complete(Success(10))
f.awaitResult match
Expand All @@ -314,7 +314,7 @@ class FutureBehavior extends munit.FunSuite {
Async.blocking:
val p = Promise[Int]()
p.complete(Success(10))
val f = p.future
val f = p.asFuture
f.cancel()
assertEquals(f.await, 10)
}
Expand Down
4 changes: 2 additions & 2 deletions shared/src/test/scala/ListenerBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ListenerBehavior extends munit.FunSuite:
val prom1 = Promise[Unit]()
val prom2 = Promise[Unit]()
Async.blocking:
val raced = race(Future { prom1.future.await; 10 }, Future { prom2.future.await; 20 })
val raced = race(Future { prom1.await; 10 }, Future { prom2.await; 20 })
assert(!raced.poll(Listener.acceptingListener((x, _) => fail(s"race uncomplete $x"))))
prom1.complete(Success(()))
assertEquals(raced.await, 10)
Expand Down Expand Up @@ -330,7 +330,7 @@ private class NumberedTestListener private (sleep: AtomicBoolean, fail: Boolean,
if sleep.getAndSet(false) then
Async.blocking:
waiter = Some(Promise())
waiter.get.future.await
waiter.get.await
waiter.foreach: promise =>
promise.complete(Success(()))
waiter = None
Expand Down
6 changes: 3 additions & 3 deletions shared/src/test/scala/SourceBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ class SourceBehavior extends munit.FunSuite {
val g = f.transformValuesWith(identity)
f.onComplete(Listener.acceptingListener { (_, _) => aRan.complete(Success(())) })
g.onComplete(Listener.acceptingListener { (_, _) => bRan.complete(Success(())) })
assertEquals(aRan.future.poll(), None)
assertEquals(bRan.future.poll(), None)
assertEquals(aRan.poll(), None)
assertEquals(bRan.poll(), None)
f.await
Thread.sleep(100) // onComplete of await and manual may be scheduled
aRan.future.zip(bRan.future).alt(Future(sleep(600))).await
aRan.zip(bRan).alt(Future(sleep(600))).await
}

test("either") {
Expand Down

0 comments on commit 50bea78

Please sign in to comment.