From 16ee5a9e8863ee38da21cff5706dd2d2d25b249f Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Wed, 11 Oct 2023 15:58:51 +0200 Subject: [PATCH 01/16] Restructure gears into cross-platform with sbt-crossproject --- build.sbt | 16 ++++- .../src}/main/scala/PosixLikeIO/PIO.scala | 0 .../examples/clientAndServerUDP.scala | 0 .../examples/readAndWriteFile.scala | 0 .../PosixLikeIO/examples/readWholeFile.scala | 0 {src => jvm/src}/main/scala/async/Timer.scala | 0 .../main/scala/async/VThreadSupport.scala | 0 .../scala/measurements/measureTimes.scala | 0 project/plugins.sbt | 2 + .../src}/main/scala/async/Async.scala | 0 .../main/scala/async/AsyncOperations.scala | 0 .../src}/main/scala/async/AsyncSupport.scala | 0 .../src}/main/scala/async/Cancellable.scala | 0 .../main/scala/async/CompletionGroup.scala | 0 .../main/scala/async/JvmAsyncOperations.scala | 0 .../src}/main/scala/async/Listener.scala | 0 .../src}/main/scala/async/channels.scala | 63 ++++++++++--------- .../src}/main/scala/async/futures.scala | 0 .../main/scala/async/listeners/locking.scala | 0 .../src}/main/scala/async/package.scala | 0 .../test/scala/CancellationBehavior.scala | 0 .../test/scala/ChannelBehavior.scala | 0 .../test/scala/FutureBehavior.scala | 0 .../test/scala/ListenerBehavior.scala | 2 +- .../test/scala/SourceBehavior.scala | 0 .../test/scala/TaskScheduleBehavior.scala | 0 26 files changed, 48 insertions(+), 35 deletions(-) rename {src => jvm/src}/main/scala/PosixLikeIO/PIO.scala (100%) rename {src => jvm/src}/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala (100%) rename {src => jvm/src}/main/scala/PosixLikeIO/examples/readAndWriteFile.scala (100%) rename {src => jvm/src}/main/scala/PosixLikeIO/examples/readWholeFile.scala (100%) rename {src => jvm/src}/main/scala/async/Timer.scala (100%) rename {src => jvm/src}/main/scala/async/VThreadSupport.scala (100%) rename {src => jvm/src}/main/scala/measurements/measureTimes.scala (100%) create mode 100644 project/plugins.sbt rename {src => shared/src}/main/scala/async/Async.scala (100%) rename {src => shared/src}/main/scala/async/AsyncOperations.scala (100%) rename {src => shared/src}/main/scala/async/AsyncSupport.scala (100%) rename {src => shared/src}/main/scala/async/Cancellable.scala (100%) rename {src => shared/src}/main/scala/async/CompletionGroup.scala (100%) rename {src => shared/src}/main/scala/async/JvmAsyncOperations.scala (100%) rename {src => shared/src}/main/scala/async/Listener.scala (100%) rename {src => shared/src}/main/scala/async/channels.scala (94%) rename {src => shared/src}/main/scala/async/futures.scala (100%) rename {src => shared/src}/main/scala/async/listeners/locking.scala (100%) rename {src => shared/src}/main/scala/async/package.scala (100%) rename {src => shared}/test/scala/CancellationBehavior.scala (100%) rename {src => shared}/test/scala/ChannelBehavior.scala (100%) rename {src => shared}/test/scala/FutureBehavior.scala (100%) rename {src => shared}/test/scala/ListenerBehavior.scala (99%) rename {src => shared}/test/scala/SourceBehavior.scala (100%) rename {src => shared}/test/scala/TaskScheduleBehavior.scala (100%) diff --git a/build.sbt b/build.sbt index cd19b5ea..93712e37 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,20 @@ +import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType} + ThisBuild / scalaVersion := "3.3.1" -lazy val root = project +lazy val root = + crossProject(JVMPlatform, NativePlatform) + .crossType(CrossType.Full) .in(file(".")) - .settings( + .settings(Seq( name := "Gears", organization := "ch.epfl.lamp", version := "0.1.0-SNAPSHOT", libraryDependencies += "org.scalameta" %% "munit" % "0.7.29" % Test - ) + )) + .jvmSettings(Seq( + javaOptions += "--version 21", + )) + +lazy val rootJVM = root.jvm +lazy val rootNative = root.native diff --git a/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala similarity index 100% rename from src/main/scala/PosixLikeIO/PIO.scala rename to jvm/src/main/scala/PosixLikeIO/PIO.scala diff --git a/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala similarity index 100% rename from src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala rename to jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala diff --git a/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala similarity index 100% rename from src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala rename to jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala diff --git a/src/main/scala/PosixLikeIO/examples/readWholeFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala similarity index 100% rename from src/main/scala/PosixLikeIO/examples/readWholeFile.scala rename to jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala diff --git a/src/main/scala/async/Timer.scala b/jvm/src/main/scala/async/Timer.scala similarity index 100% rename from src/main/scala/async/Timer.scala rename to jvm/src/main/scala/async/Timer.scala diff --git a/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala similarity index 100% rename from src/main/scala/async/VThreadSupport.scala rename to jvm/src/main/scala/async/VThreadSupport.scala diff --git a/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala similarity index 100% rename from src/main/scala/measurements/measureTimes.scala rename to jvm/src/main/scala/measurements/measureTimes.scala diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 00000000..4233a121 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,2 @@ +addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0") +addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.0-SNAPSHOT") diff --git a/src/main/scala/async/Async.scala b/shared/src/main/scala/async/Async.scala similarity index 100% rename from src/main/scala/async/Async.scala rename to shared/src/main/scala/async/Async.scala diff --git a/src/main/scala/async/AsyncOperations.scala b/shared/src/main/scala/async/AsyncOperations.scala similarity index 100% rename from src/main/scala/async/AsyncOperations.scala rename to shared/src/main/scala/async/AsyncOperations.scala diff --git a/src/main/scala/async/AsyncSupport.scala b/shared/src/main/scala/async/AsyncSupport.scala similarity index 100% rename from src/main/scala/async/AsyncSupport.scala rename to shared/src/main/scala/async/AsyncSupport.scala diff --git a/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala similarity index 100% rename from src/main/scala/async/Cancellable.scala rename to shared/src/main/scala/async/Cancellable.scala diff --git a/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala similarity index 100% rename from src/main/scala/async/CompletionGroup.scala rename to shared/src/main/scala/async/CompletionGroup.scala diff --git a/src/main/scala/async/JvmAsyncOperations.scala b/shared/src/main/scala/async/JvmAsyncOperations.scala similarity index 100% rename from src/main/scala/async/JvmAsyncOperations.scala rename to shared/src/main/scala/async/JvmAsyncOperations.scala diff --git a/src/main/scala/async/Listener.scala b/shared/src/main/scala/async/Listener.scala similarity index 100% rename from src/main/scala/async/Listener.scala rename to shared/src/main/scala/async/Listener.scala diff --git a/src/main/scala/async/channels.scala b/shared/src/main/scala/async/channels.scala similarity index 94% rename from src/main/scala/async/channels.scala rename to shared/src/main/scala/async/channels.scala index e9e980c9..1d53d025 100644 --- a/src/main/scala/async/channels.scala +++ b/shared/src/main/scala/async/channels.scala @@ -344,34 +344,35 @@ object ChannelMultiplexer: end ChannelMultiplexer -@main def channelsMultipleSendersOneReader(): Unit = - Async.blocking: - var aa = false - var ab = false - var ac = false - var b = false - val c = SyncChannel[Int]() - val f13 = Future: - for (i <- 1 to 10000) - c.send(i) - ac = true - val f11 = Future: - for (i <- 1 to 10000) - c.send(i) - aa = true - val f12 = Future: - for (i <- 1 to 10000) - c.send(i) - ab = true - val f2 = Future: - var r = 0 - for (i <- 1 to 30000) - c.read() - r += 1 - b = true - - f11.result - f2.result - f12.result - f13.result - println("all done? " + (aa && ab && ac && b)) +// TODO: make this work +// @main def channelsMultipleSendersOneReader(): Unit = +// Async.blocking: +// var aa = false +// var ab = false +// var ac = false +// var b = false +// val c = SyncChannel[Int]() +// val f13 = Future: +// for (i <- 1 to 10000) +// c.send(i) +// ac = true +// val f11 = Future: +// for (i <- 1 to 10000) +// c.send(i) +// aa = true +// val f12 = Future: +// for (i <- 1 to 10000) +// c.send(i) +// ab = true +// val f2 = Future: +// var r = 0 +// for (i <- 1 to 30000) +// c.read() +// r += 1 +// b = true + +// f11.result +// f2.result +// f12.result +// f13.result +// println("all done? " + (aa && ab && ac && b)) diff --git a/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala similarity index 100% rename from src/main/scala/async/futures.scala rename to shared/src/main/scala/async/futures.scala diff --git a/src/main/scala/async/listeners/locking.scala b/shared/src/main/scala/async/listeners/locking.scala similarity index 100% rename from src/main/scala/async/listeners/locking.scala rename to shared/src/main/scala/async/listeners/locking.scala diff --git a/src/main/scala/async/package.scala b/shared/src/main/scala/async/package.scala similarity index 100% rename from src/main/scala/async/package.scala rename to shared/src/main/scala/async/package.scala diff --git a/src/test/scala/CancellationBehavior.scala b/shared/test/scala/CancellationBehavior.scala similarity index 100% rename from src/test/scala/CancellationBehavior.scala rename to shared/test/scala/CancellationBehavior.scala diff --git a/src/test/scala/ChannelBehavior.scala b/shared/test/scala/ChannelBehavior.scala similarity index 100% rename from src/test/scala/ChannelBehavior.scala rename to shared/test/scala/ChannelBehavior.scala diff --git a/src/test/scala/FutureBehavior.scala b/shared/test/scala/FutureBehavior.scala similarity index 100% rename from src/test/scala/FutureBehavior.scala rename to shared/test/scala/FutureBehavior.scala diff --git a/src/test/scala/ListenerBehavior.scala b/shared/test/scala/ListenerBehavior.scala similarity index 99% rename from src/test/scala/ListenerBehavior.scala rename to shared/test/scala/ListenerBehavior.scala index 440a047d..43685147 100644 --- a/src/test/scala/ListenerBehavior.scala +++ b/shared/test/scala/ListenerBehavior.scala @@ -250,7 +250,7 @@ class ListenerBehavior extends munit.FunSuite: Async.race(source1).onComplete(NumberedTestListener(false, false, 1)) val wrapped = source1.listener.get - Thread.startVirtualThread: () => + Thread.ofPlatform().start: () => val result = wrapped.lock.lockSelf(source1).asInstanceOf[Listener.PartialLock] wrapped.releaseLock(result) .join() diff --git a/src/test/scala/SourceBehavior.scala b/shared/test/scala/SourceBehavior.scala similarity index 100% rename from src/test/scala/SourceBehavior.scala rename to shared/test/scala/SourceBehavior.scala diff --git a/src/test/scala/TaskScheduleBehavior.scala b/shared/test/scala/TaskScheduleBehavior.scala similarity index 100% rename from src/test/scala/TaskScheduleBehavior.scala rename to shared/test/scala/TaskScheduleBehavior.scala From c32c311c07717c69b1629b75927f893c3962ac0d Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Tue, 24 Oct 2023 12:25:21 +0200 Subject: [PATCH 02/16] Document and include build dependencies for Native --- .gitmodules | 7 +++ CONTRIBUTING.md | 7 +++ README.md | 3 -- build.sbt | 14 ++++-- dependencies/README.md | 22 +++++++++ dependencies/munit | 1 + dependencies/publish-deps.sh | 8 ++++ dependencies/scala-native | 1 + .../src/main/scala/async/Timer.scala | 0 shared/test/scala/Timer.scala | 46 +++++++++++++++++++ 10 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 .gitmodules create mode 100644 CONTRIBUTING.md create mode 100644 dependencies/README.md create mode 160000 dependencies/munit create mode 100755 dependencies/publish-deps.sh create mode 160000 dependencies/scala-native rename {jvm => shared}/src/main/scala/async/Timer.scala (100%) create mode 100644 shared/test/scala/Timer.scala diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..93ab1273 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,7 @@ +[submodule "dependencies/scala-native"] + path = dependencies/scala-native + url = git@github.com:scala-native/scala-native.git +[submodule "dependencies/munit"] + path = dependencies/munit + url = git@github.com:natsukagami/munit.git + branch = use-0.5-snapshot-sn diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 00000000..2fd5f97b --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,7 @@ +# Contributing + +## Building `gears` + +`gears` currently require: +- **On the JVM**: JVM with support for virtual threads. This usually means JVM 21+, or 19+ with `--enable-preview`. +- **On Scala Native**: Scala Native with delimited continuations support. See the pinned versions in [`dependencies`](./dependencies/README.md). diff --git a/README.md b/README.md index 88812023..c3efc5f8 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,3 @@ - - - This is a proof of concept for a base library for asynchronous computing in direct style. The library needs either fibers or virtual threads as a basis. It is at present highly experimental, incomplete and provisional. It is not yet extensively tested and not optimized at all. The concepts and code here should be regarded as a strawman, in the sense of "meant to be knocked down". diff --git a/build.sbt b/build.sbt index 93712e37..3a152947 100644 --- a/build.sbt +++ b/build.sbt @@ -10,11 +10,15 @@ lazy val root = name := "Gears", organization := "ch.epfl.lamp", version := "0.1.0-SNAPSHOT", - libraryDependencies += "org.scalameta" %% "munit" % "0.7.29" % Test + testFrameworks += new TestFramework("munit.Framework") )) .jvmSettings(Seq( - javaOptions += "--version 21", + javaOptions += "--version 21", + libraryDependencies += "org.scalameta" %% "munit" % "1.0.0-M10" % Test + )) + .nativeSettings(Seq( + nativeConfig ~= { c => + c.withMultithreadingSupport(true) + }, + libraryDependencies += "org.scalameta" %%% "munit" % "1.0.0-M10+15-3940023e-SNAPSHOT" % Test )) - -lazy val rootJVM = root.jvm -lazy val rootNative = root.native diff --git a/dependencies/README.md b/dependencies/README.md new file mode 100644 index 00000000..c242c621 --- /dev/null +++ b/dependencies/README.md @@ -0,0 +1,22 @@ +## Custom Dependencies for Scala Native + +Scala Native requires some libraries to be compiled from source and `publishLocal`'d. + +### TL; DR + +```bash +./publish-deps.sh +``` + +### What are included? + +- The current snapshot version of Scala Native, pinned in `scala-native`: for the delimited continuation support. + This needs to be published for both `3.3.1` (for `gears`) and `3.1.2` (for `munit`): + ```bash + sbt "publish-local-dev 3; ++3.1.2 publishLocal" + ``` +- A fork of `munit` that uses the above snapshot, with a simple fix (https://github.com/scalameta/munit/pull/714) to make it compile. + Pinned in `munit`. + ```bash + sbt "munitNative/publishLocal" + ``` diff --git a/dependencies/munit b/dependencies/munit new file mode 160000 index 00000000..3940023e --- /dev/null +++ b/dependencies/munit @@ -0,0 +1 @@ +Subproject commit 3940023e5a3c07673611f561b57e043fb9ac51ec diff --git a/dependencies/publish-deps.sh b/dependencies/publish-deps.sh new file mode 100755 index 00000000..64257391 --- /dev/null +++ b/dependencies/publish-deps.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +set -e + +cd "$(dirname "${BASH_SOURCE[0]}")" # change to this directory + +cd scala-native && sbt "publish-local-dev 3; ++3.1.2 publishLocal" && cd .. +cd munit && sbt "++3.1.2 munitNative/publishLocal" diff --git a/dependencies/scala-native b/dependencies/scala-native new file mode 160000 index 00000000..f4b9078c --- /dev/null +++ b/dependencies/scala-native @@ -0,0 +1 @@ +Subproject commit f4b9078c6c96dade366518f037d8f47048d2a4f3 diff --git a/jvm/src/main/scala/async/Timer.scala b/shared/src/main/scala/async/Timer.scala similarity index 100% rename from jvm/src/main/scala/async/Timer.scala rename to shared/src/main/scala/async/Timer.scala diff --git a/shared/test/scala/Timer.scala b/shared/test/scala/Timer.scala new file mode 100644 index 00000000..e201dff5 --- /dev/null +++ b/shared/test/scala/Timer.scala @@ -0,0 +1,46 @@ +import gears.async._ +import gears.async.AsyncOperations._ +import scala.concurrent.duration._ +import scala.util.{Success, Failure} +import java.util.concurrent.TimeoutException +import java.util.concurrent.CancellationException + +class TimerTest extends munit.FunSuite { + import gears.async.default.given + + test("TimerSleep1Second") { + Async.blocking: + println("start of 1 second") + val timer = Timer(1.second) + Future { timer.start() } + assert(Async.await(timer.src) == timer.TimerEvent.Tick) + println("end of 1 second") + } + + def timeoutCancellableFuture[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Future[T] = + val t = Future { sleep(d.toMillis) } + Future: + val g = Async.await(Async.either(t, f)) + g match + case Left(_) => + f.cancel() + throw TimeoutException() + case Right(v) => + t.cancel() + v.get + + test("testTimeoutFuture") { + var touched = false + Async.blocking: + val t = timeoutCancellableFuture( + 250.millis, + Future: + sleep(1000) + touched = true + ) + Async.await(t) + assert(!touched) + sleep(2000) + assert(!touched) + } +} From ee00354259cb52753d39e2e854d5cd9d17e3e324 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Thu, 19 Oct 2023 14:40:12 +0200 Subject: [PATCH 03/16] Implement a ForkJoin-executor-based AsyncSupport instance --- jvm/src/main/scala/async/VThreadSupport.scala | 4 +- .../src/main/scala/async/DefaultSupport.scala | 10 ++ .../main/scala/async/ForkJoinSupport.scala | 91 +++++++++++++++++++ .../main/scala/async/AsyncOperations.scala | 2 +- shared/src/main/scala/async/Cancellable.scala | 2 +- shared/src/main/scala/async/futures.scala | 2 +- 6 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 native/src/main/scala/async/DefaultSupport.scala create mode 100644 native/src/main/scala/async/ForkJoinSupport.scala diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 9128e2ce..7be03b7f 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -13,8 +13,8 @@ object VThreadScheduler extends Scheduler: override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = val th = Thread.startVirtualThread: () => Thread.sleep(delay.toMillis) - body.run() - () => th.interrupt() // TODO this may interrupt the body after sleeping + execute(body) + () => th.interrupt() object VThreadSupport extends AsyncSupport: diff --git a/native/src/main/scala/async/DefaultSupport.scala b/native/src/main/scala/async/DefaultSupport.scala new file mode 100644 index 00000000..ca4808ef --- /dev/null +++ b/native/src/main/scala/async/DefaultSupport.scala @@ -0,0 +1,10 @@ +package gears.async.default + +import gears.async._ +import gears.async.native.ForkJoinSupport + +object DefaultSupport extends ForkJoinSupport + +given AsyncSupport = DefaultSupport +given DefaultSupport.Scheduler = DefaultSupport +given AsyncOperations = DefaultSupport diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala new file mode 100644 index 00000000..8a83d082 --- /dev/null +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -0,0 +1,91 @@ +package gears.async.native + +import gears.async._ +import scala.scalanative.runtime.{Continuations => native} +import java.util.concurrent.ForkJoinPool +import scala.concurrent.ExecutionContext +import scala.concurrent.JavaConversions._ +import scala.concurrent.duration._ +import java.util.concurrent.atomic.AtomicBoolean +import gears.async.Future.Promise +import java.util.concurrent.CancellationException + +class NativeContinuation[-T, +R] private[native] (val cont: T => R) extends Suspension[T, R]: + def resume(arg: T): R = cont(arg) + +trait NativeSuspend extends SuspendSupport: + type Label[R] = native.BoundaryLabel[R] + type Suspension[T, R] = NativeContinuation[T, R] + + override def boundary[R](body: (Label[R]) ?=> R): R = + native.boundary(body) + + override def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T = native.suspend[T, R](f => body(NativeContinuation(f))) +end NativeSuspend + +/** Spawns a single thread that does all the sleeping. */ +class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionContext with Scheduler { + import scala.collection.mutable + private case class Sleeper(wakeTime: Deadline, toRun: Runnable): + var isCancelled = false + private given Ordering[Sleeper] = new Ordering[Sleeper] { + import math.Ordered.orderingToOrdered + override def compare(x: Sleeper, y: Sleeper): Int = y.wakeTime.compare(x.wakeTime) + } + private val sleepers = mutable.PriorityQueue.empty[Sleeper] + private var sleepingUntil: Option[Deadline] = None + + override def execute(body: Runnable): Unit = exec.execute(body) + override def reportFailure(cause: Throwable): Unit = exec.reportFailure(cause) + override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = { + val sleeper = Sleeper(delay.fromNow, body) + // push to the sleeping priority queue + this.synchronized { + sleepers += sleeper + val shouldWake = sleepingUntil.map(sleeper.wakeTime < _).getOrElse(true) + if shouldWake then this.notifyAll() + } + () => { sleeper.isCancelled = true } + } + + private def consideredOverdue(d: Deadline): Boolean = d.timeLeft <= 0.milli + + // Sleep until the first timer is due, or .wait() otherwise + private def sleepLoop(): Unit = this.synchronized { + while (true) { + sleepingUntil = sleepers.headOption.map(_.wakeTime) + sleepingUntil match + case None => this.wait() + case Some(value) => + if value.hasTimeLeft() then + this.wait(value.timeLeft.max(10.millis).toMillis) + // Pop sleepers until no more available + while (sleepers.headOption.map(_.wakeTime.isOverdue()) == Some(true)) { + val task = sleepers.dequeue() + if !task.isCancelled then execute(task.toRun) + } + } + } + + val sleeperThread = Thread(() => sleepLoop()) + sleeperThread.start() +} + +class SuspendExecutorWithSleep(exec: ExecutionContext) extends ExecutorWithSleepThread(exec) + with AsyncSupport + with AsyncOperations + with NativeSuspend { + type Scheduler = this.type + override def sleep(millis: Long)(using ac: Async): Unit = { + val sleepingFut = Promise[Unit]() + val innerCancellable = schedule(millis.millis, () => sleepingFut.complete(scala.util.Success(()))) + cancellationScope(() => + // we need to wrap the cancellable so that we mark the cancellation as well + sleepingFut.complete(scala.util.Failure(CancellationException())) + innerCancellable.cancel() + ): + sleepingFut.future.value + } +} + +class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool()) diff --git a/shared/src/main/scala/async/AsyncOperations.scala b/shared/src/main/scala/async/AsyncOperations.scala index 58681db1..91074f8c 100644 --- a/shared/src/main/scala/async/AsyncOperations.scala +++ b/shared/src/main/scala/async/AsyncOperations.scala @@ -4,5 +4,5 @@ trait AsyncOperations: def sleep(millis: Long)(using Async): Unit object AsyncOperations: - inline def sleep(millis: Long)(using AsyncOperations, Async): Unit = + inline def sleep(millis: Long)(using AsyncOperations, Async): Unit = summon[AsyncOperations].sleep(millis) diff --git a/shared/src/main/scala/async/Cancellable.scala b/shared/src/main/scala/async/Cancellable.scala index 9835d55e..ad06a7ad 100644 --- a/shared/src/main/scala/async/Cancellable.scala +++ b/shared/src/main/scala/async/Cancellable.scala @@ -11,7 +11,7 @@ trait Cancellable: /** Add this cancellable to the given group after removing * it from the previous group in which it was. */ - def link(group: CompletionGroup): this.type = + def link(group: CompletionGroup): this.type = synchronized: this.group.drop(this) this.group = group this.group.add(this) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index b2aa9db2..df9edf4f 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -136,8 +136,8 @@ object Future: ac.support.resumeAsync(k)(Success(x)) cancellable.suspension = k cancellable.listener = listener - src.onComplete(listener) cancellable.link(group) // may resume + remove listener immediately + src.onComplete(listener) ) cancellable.unlink() res.get From 8036c25e76acf6bf2e91e730db1213a0fe4eaed5 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 20 Oct 2023 11:40:45 +0200 Subject: [PATCH 04/16] Implement Timer and some efforts to make tests run --- jvm/src/main/scala/async/DefaultSupport.scala | 5 + .../main/scala/async/JvmAsyncOperations.scala | 2 - .../main/scala/async/CompletionGroup.scala | 2 +- shared/src/main/scala/async/Timer.scala | 159 ++++----- shared/src/main/scala/async/futures.scala | 303 +++++++++--------- .../test/scala/CancellationBehavior.scala | 0 .../test/scala/ChannelBehavior.scala | 0 .../{ => src}/test/scala/FutureBehavior.scala | 0 .../test/scala/ListenerBehavior.scala | 0 .../{ => src}/test/scala/SourceBehavior.scala | 0 .../test/scala/TaskScheduleBehavior.scala | 0 shared/{ => src}/test/scala/Timer.scala | 0 12 files changed, 217 insertions(+), 254 deletions(-) create mode 100644 jvm/src/main/scala/async/DefaultSupport.scala rename {shared => jvm}/src/main/scala/async/JvmAsyncOperations.scala (89%) rename shared/{ => src}/test/scala/CancellationBehavior.scala (100%) rename shared/{ => src}/test/scala/ChannelBehavior.scala (100%) rename shared/{ => src}/test/scala/FutureBehavior.scala (100%) rename shared/{ => src}/test/scala/ListenerBehavior.scala (100%) rename shared/{ => src}/test/scala/SourceBehavior.scala (100%) rename shared/{ => src}/test/scala/TaskScheduleBehavior.scala (100%) rename shared/{ => src}/test/scala/Timer.scala (100%) diff --git a/jvm/src/main/scala/async/DefaultSupport.scala b/jvm/src/main/scala/async/DefaultSupport.scala new file mode 100644 index 00000000..de258e46 --- /dev/null +++ b/jvm/src/main/scala/async/DefaultSupport.scala @@ -0,0 +1,5 @@ +package gears.async.default + +import gears.async._ + +given JvmAsyncOperations.type = JvmAsyncOperations diff --git a/shared/src/main/scala/async/JvmAsyncOperations.scala b/jvm/src/main/scala/async/JvmAsyncOperations.scala similarity index 89% rename from shared/src/main/scala/async/JvmAsyncOperations.scala rename to jvm/src/main/scala/async/JvmAsyncOperations.scala index 83c5cbb2..0a70fe1d 100644 --- a/shared/src/main/scala/async/JvmAsyncOperations.scala +++ b/jvm/src/main/scala/async/JvmAsyncOperations.scala @@ -1,7 +1,5 @@ package gears.async -given JvmAsyncOperations.type = JvmAsyncOperations - object JvmAsyncOperations extends AsyncOperations: private def jvmInterruptible[T](fn: => T)(using Async): T = diff --git a/shared/src/main/scala/async/CompletionGroup.scala b/shared/src/main/scala/async/CompletionGroup.scala index 4567f865..d9b19635 100644 --- a/shared/src/main/scala/async/CompletionGroup.scala +++ b/shared/src/main/scala/async/CompletionGroup.scala @@ -32,7 +32,7 @@ class CompletionGroup(val handleCompletion: Cancellable => Async ?=> Unit = _ => /** Add given member to the members set. If the group has already been cancelled, cancels that member immediately. */ def add(member: Cancellable): Unit = val alreadyCancelled = synchronized: - members += member + members += member // Add this member no matter what since we'll wait for it still canceled if alreadyCancelled then member.cancel() diff --git a/shared/src/main/scala/async/Timer.scala b/shared/src/main/scala/async/Timer.scala index 2eeca64c..0079251b 100644 --- a/shared/src/main/scala/async/Timer.scala +++ b/shared/src/main/scala/async/Timer.scala @@ -7,106 +7,65 @@ import java.util.concurrent.TimeoutException import scala.collection.mutable import scala.concurrent.TimeoutException import scala.util.{Failure, Success, Try} - -type TimerRang = Boolean - -/** A timer that has to be explicitly started via `start()` to begin counting time. - * Can be used only once per instance. - */ -class StartableTimer(val millis: Long) extends Async.OriginalSource[TimerRang], Cancellable { - private enum TimerState(val future: Option[Future[Unit]]): - case Ready extends TimerState(None) - case Ticking(val f: Future[Unit]) extends TimerState(Some(f)) - case RangAlready extends TimerState(None) - case Cancelled extends TimerState(None) - - private val waiting: mutable.Set[Listener[TimerRang]] = mutable.Set() - @volatile private var state = TimerState.Ready - - - def start(): Unit = - state match - case TimerState.Cancelled => throw new IllegalStateException("Timers cannot be started after being cancelled.") - case TimerState.RangAlready => throw new IllegalStateException("Timers cannot be started after they rang already.") - case TimerState.Ticking(_) => throw new IllegalStateException("Timers cannot be started once they have already been started.") - case TimerState.Ready => - Async.blocking: - val f = Future: - sleep(millis) - var toNotify = List[Listener[TimerRang]]() - synchronized: - toNotify = waiting.toList - waiting.clear() - state match - case TimerState.Ticking(_) => - state = TimerState.RangAlready - case _ => - toNotify = List() - for listener <- toNotify do listener.completeNow(true, this) - state = TimerState.Ticking(f) - - def cancel(): Unit = - state match - case TimerState.Cancelled | TimerState.Ready | TimerState.RangAlready => () - case TimerState.Ticking(f: Future[Unit]) => - f.cancel() - val toNotify = synchronized: - val ws = waiting.toList - waiting.clear() - ws - for listener <- toNotify do listener.completeNow(false, this) - state = TimerState.Cancelled - - def poll(k: Listener[TimerRang]): Boolean = - state match - case TimerState.Ready | TimerState.Ticking(_) => false - case TimerState.RangAlready => k.completeNow(true, this) ; true - case TimerState.Cancelled => k.completeNow(false, this) ; true - - def addListener(k: Listener[TimerRang]): Unit = synchronized: - waiting += k - - def dropListener(k: Listener[TimerRang]): Unit = synchronized: - waiting -= k +import gears.async.Listener +import scala.concurrent.duration._ +import scala.annotation.tailrec +import java.util.concurrent.CancellationException + +/** + * Timer exposes a steady Async.Source of ticks that happens every `tickDuration` milliseconds. + * Note that the timer does not start ticking until `start` is called (which is a blocking operation, until the timer is cancelled). + * + * You might want to manually `cancel` the timer, so that it gets garbage collected (before the enclosing `Async` scope ends). +*/ +class Timer(tickDuration: Duration) extends Cancellable { + var isCancelled = false + + private class Source extends Async.OriginalSource[this.TimerEvent] { + def tick() = synchronized { + listeners.foreach(_.completeNow(TimerEvent.Tick, this)) + } + val listeners = mutable.Set[Listener[TimerEvent]]() + override def poll(k: Listener[TimerEvent]): Boolean = + if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else false // subscribing to a timer always takes you to the next tick + override def dropListener(k: Listener[TimerEvent]): Unit = listeners -= k + override protected def addListener(k: Listener[TimerEvent]): Unit = + if isCancelled then k.completeNow(TimerEvent.Cancelled, this) + else + Timer.this.synchronized: + if isCancelled then k.completeNow(TimerEvent.Cancelled, this) + else listeners += k } - -/** Exactly like `StartableTimer` except it starts immediately upon instance creation. - */ -class Timer(millis: Long) extends StartableTimer(millis) { - this.start() + private val _src = Source() + + /** Ticks of the timer are delivered through this source. Note that ticks are ephemeral. */ + val src: Async.Source[this.TimerEvent] = _src + + def start()(using Async, AsyncOperations): Unit = + cancellationScope(this): + loop() + + @tailrec private def loop()(using Async, AsyncOperations): Unit = + if !isCancelled then + try + // println(s"Sleeping at ${new java.util.Date()}, ${isCancelled}, ${this}") + sleep(tickDuration.toMillis) + catch + case _: CancellationException => cancel() + if !isCancelled then + _src.tick() + loop() + + + override def cancel(): Unit = + synchronized { isCancelled = true } + src.synchronized { + _src.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src)) + _src.listeners.clear() + } + + enum TimerEvent: + case Tick + case Cancelled } - -@main def TimerSleep1Second(): Unit = - Async.blocking: - println("start of 1 second") - assert(Async.await(Timer(1000))) - println("end of 1 second") - - -def timeoutCancellableFuture[T](millis: Long, f: Future[T]): Future[T] = - val p = Promise[T]() - val t = Timer(millis) - Async.blocking: - val g = Async.await(Async.either(t, f)) - g match - case Left(_) => - f.cancel() - p.complete(Failure(TimeoutException())) - case Right(v) => - t.cancel() - p.complete(v) - p.future - - -@main def testTimeoutFuture(): Unit = - var touched = false - Async.blocking: - val t = timeoutCancellableFuture(250, Future: - Async.await(Timer(1000)) - touched = true) - Async.await(t) - assert(!touched) - sleep(2000) - assert(!touched) - diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index df9edf4f..2acc1db1 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -263,164 +263,165 @@ enum TaskSchedule: * Composing tasks can be referentially transparent. * Tasks can be also ran on a specified schedule. */ -class Task[+T](val body: Async ?=> T): - - /** Start a future computed from the `body` of this task */ - def run(using Async) = Future(body) - - def schedule(s: TaskSchedule)(using async: Async): Task[T] = - s match { - case TaskSchedule.Every(millis, maxRepetitions) => - assert(millis >= 1) - assert(maxRepetitions >= 0) - Task { - var repetitions = 0 - var ret: T = body - repetitions += 1 - if (maxRepetitions == 1) ret - else { - while (maxRepetitions == 0 || repetitions < maxRepetitions) { - sleep(millis) - ret = body - repetitions += 1 - } - ret - } - } - case TaskSchedule.ExponentialBackoff(millis, exponentialBase, maxRepetitions) => - assert(millis >= 1) - assert(exponentialBase >= 2) - assert(maxRepetitions >= 0) - Task { - var repetitions = 0 - var ret: T = body - repetitions += 1 - if (maxRepetitions == 1) ret - else { - var timeToSleep = millis - while (maxRepetitions == 0 || repetitions < maxRepetitions) { - sleep(timeToSleep) - timeToSleep *= exponentialBase - ret = body - repetitions += 1 - } - ret - } - } - case TaskSchedule.FibonacciBackoff(millis, maxRepetitions) => - assert(millis >= 1) - assert(maxRepetitions >= 0) - Task { - var repetitions = 0 - var a: Long = 0 - var b: Long = 1 - var ret: T = body - repetitions += 1 - if (maxRepetitions == 1) ret - else { - sleep(millis) - ret = body - repetitions += 1 - if (maxRepetitions == 2) ret - else { - while (maxRepetitions == 0 || repetitions < maxRepetitions) { - val aOld = a - a = b - b = aOld + b - sleep(b * millis) - ret = body - repetitions += 1 - } - ret - } - } - } - case TaskSchedule.RepeatUntilFailure(millis, maxRepetitions) => - assert(millis >= 0) - assert(maxRepetitions >= 0) - Task { - @tailrec - def helper(repetitions: Long = 0): T = - if (repetitions > 0 && millis > 0) - sleep(millis) - val ret: T = body - ret match { - case Failure(_) => ret - case _ if (repetitions+1) == maxRepetitions && maxRepetitions != 0 => ret - case _ => helper(repetitions + 2) - } - helper() - } - case TaskSchedule.RepeatUntilSuccess(millis, maxRepetitions) => - assert(millis >= 0) - assert(maxRepetitions >= 0) - Task { - @tailrec - def helper(repetitions: Long = 0): T = - if (repetitions > 0 && millis > 0) - sleep(millis) - val ret: T = body - ret match { - case Success(_) => ret - case _ if (repetitions + 1) == maxRepetitions && maxRepetitions != 0 => ret - case _ => helper(repetitions + 2) - } - helper() - } - } - -end Task - -private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: - val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => - Future: - try - (Success(f.value), i) - catch case e => (Failure(e), i) - }) - - @tailrec - def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = - Async.await(Async.race( fs.map(_._1)* )) match - case Success((Success(x), i)) => - if (shouldCancel) { - for ((f, j) <- futures.zipWithIndex) { - if (j != i) f.cancel() - } - } - Success(x) - case Success((Failure(e), i)) => - if (failed + 1 == futures.length) - Failure(e) - else - helper(failed + 1, fs.filter({ case (_, j) => j != i })) - case _ => assert(false) - - helper(0, fs.zipWithIndex).get - -/** `alt` defined for multiple futures, not only two. - * If either task succeeds, succeed with the success that was returned first. - * Otherwise, fail with the failure that was returned last. - */ -def alt[T](futures: Future[T]*)(using Async): Future[T] = - altAndAltCImplementation(false, futures*) - -/** `altC` defined for multiple futures, not only two. - * If either task succeeds, succeed with the success that was returned first and cancel all other tasks. - * Otherwise, fail with the failure that was returned last. - */ -def altC[T](futures: Future[T]*)(using Async): Future[T] = - altAndAltCImplementation(true, futures*) - -def uninterruptible[T](body: Async ?=> T)(using ac: Async) = +// class Task[+T](val body: Async ?=> T): + +// /** Start a future computed from the `body` of this task */ +// def run(using Async) = Future(body) + +// def schedule(s: TaskSchedule)(using async: Async): Task[T] = +// s match { +// case TaskSchedule.Every(millis, maxRepetitions) => +// assert(millis >= 1) +// assert(maxRepetitions >= 0) +// Task { +// var repetitions = 0 +// var ret: T = body +// repetitions += 1 +// if (maxRepetitions == 1) ret +// else { +// while (maxRepetitions == 0 || repetitions < maxRepetitions) { +// sleep(millis) +// ret = body +// repetitions += 1 +// } +// ret +// } +// } +// case TaskSchedule.ExponentialBackoff(millis, exponentialBase, maxRepetitions) => +// assert(millis >= 1) +// assert(exponentialBase >= 2) +// assert(maxRepetitions >= 0) +// Task { +// var repetitions = 0 +// var ret: T = body +// repetitions += 1 +// if (maxRepetitions == 1) ret +// else { +// var timeToSleep = millis +// while (maxRepetitions == 0 || repetitions < maxRepetitions) { +// sleep(timeToSleep) +// timeToSleep *= exponentialBase +// ret = body +// repetitions += 1 +// } +// ret +// } +// } +// case TaskSchedule.FibonacciBackoff(millis, maxRepetitions) => +// assert(millis >= 1) +// assert(maxRepetitions >= 0) +// Task { +// var repetitions = 0 +// var a: Long = 0 +// var b: Long = 1 +// var ret: T = body +// repetitions += 1 +// if (maxRepetitions == 1) ret +// else { +// sleep(millis) +// ret = body +// repetitions += 1 +// if (maxRepetitions == 2) ret +// else { +// while (maxRepetitions == 0 || repetitions < maxRepetitions) { +// val aOld = a +// a = b +// b = aOld + b +// sleep(b * millis) +// ret = body +// repetitions += 1 +// } +// ret +// } +// } +// } +// case TaskSchedule.RepeatUntilFailure(millis, maxRepetitions) => +// assert(millis >= 0) +// assert(maxRepetitions >= 0) +// Task { +// @tailrec +// def helper(repetitions: Long = 0): T = +// if (repetitions > 0 && millis > 0) +// sleep(millis) +// val ret: T = body +// ret match { +// case Failure(_) => ret +// case _ if (repetitions+1) == maxRepetitions && maxRepetitions != 0 => ret +// case _ => helper(repetitions + 2) +// } +// helper() +// } +// case TaskSchedule.RepeatUntilSuccess(millis, maxRepetitions) => +// assert(millis >= 0) +// assert(maxRepetitions >= 0) +// Task { +// @tailrec +// def helper(repetitions: Long = 0): T = +// if (repetitions > 0 && millis > 0) +// sleep(millis) +// val ret: T = body +// ret match { +// case Success(_) => ret +// case _ if (repetitions + 1) == maxRepetitions && maxRepetitions != 0 => ret +// case _ => helper(repetitions + 2) +// } +// helper() +// } +// } + +// end Task + +// private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: +// val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => +// Future: +// try +// (Success(f.value), i) +// catch case e => (Failure(e), i) +// }) + +// @tailrec +// def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = +// Async.await(Async.race( fs.map(_._1)* )) match +// case Success((Success(x), i)) => +// if (shouldCancel) { +// for ((f, j) <- futures.zipWithIndex) { +// if (j != i) f.cancel() +// } +// } +// Success(x) +// case Success((Failure(e), i)) => +// if (failed + 1 == futures.length) +// Failure(e) +// else +// helper(failed + 1, fs.filter({ case (_, j) => j != i })) +// case _ => assert(false) + +// helper(0, fs.zipWithIndex).get + +// /** `alt` defined for multiple futures, not only two. +// * If either task succeeds, succeed with the success that was returned first. +// * Otherwise, fail with the failure that was returned last. +// */ +// def alt[T](futures: Future[T]*)(using Async): Future[T] = +// altAndAltCImplementation(false, futures*) + +// /** `altC` defined for multiple futures, not only two. +// * If either task succeeds, succeed with the success that was returned first and cancel all other tasks. +// * Otherwise, fail with the failure that was returned last. +// */ +// def altC[T](futures: Future[T]*)(using Async): Future[T] = +// altAndAltCImplementation(true, futures*) + +def uninterruptible[T](body: Async ?=> T)(using ac: Async): T = val tracker = Cancellable.Tracking().link() - try + val r = try val group = CompletionGroup() Async.withNewCompletionGroup(group)(body) finally tracker.unlink() if tracker.isCancelled then throw new CancellationException() + r def cancellationScope[T](cancel: Cancellable)(fn: => T)(using a: Async): T = cancel.link() diff --git a/shared/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala similarity index 100% rename from shared/test/scala/CancellationBehavior.scala rename to shared/src/test/scala/CancellationBehavior.scala diff --git a/shared/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala similarity index 100% rename from shared/test/scala/ChannelBehavior.scala rename to shared/src/test/scala/ChannelBehavior.scala diff --git a/shared/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala similarity index 100% rename from shared/test/scala/FutureBehavior.scala rename to shared/src/test/scala/FutureBehavior.scala diff --git a/shared/test/scala/ListenerBehavior.scala b/shared/src/test/scala/ListenerBehavior.scala similarity index 100% rename from shared/test/scala/ListenerBehavior.scala rename to shared/src/test/scala/ListenerBehavior.scala diff --git a/shared/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala similarity index 100% rename from shared/test/scala/SourceBehavior.scala rename to shared/src/test/scala/SourceBehavior.scala diff --git a/shared/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala similarity index 100% rename from shared/test/scala/TaskScheduleBehavior.scala rename to shared/src/test/scala/TaskScheduleBehavior.scala diff --git a/shared/test/scala/Timer.scala b/shared/src/test/scala/Timer.scala similarity index 100% rename from shared/test/scala/Timer.scala rename to shared/src/test/scala/Timer.scala From cca561f0e6ec24e0368422250be40eaa064669c6 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 20 Oct 2023 13:17:46 +0200 Subject: [PATCH 05/16] Get other tests to run on all platforms --- jvm/src/main/scala/PosixLikeIO/PIO.scala | 1 + .../examples/clientAndServerUDP.scala | 5 +- .../examples/readAndWriteFile.scala | 1 + .../PosixLikeIO/examples/readWholeFile.scala | 1 + jvm/src/main/scala/async/DefaultSupport.scala | 4 +- jvm/src/main/scala/async/VThreadSupport.scala | 4 +- .../scala/measurements/measureTimes.scala | 1 + shared/src/main/scala/async/futures.scala | 297 +++++++++--------- .../src/test/scala/CancellationBehavior.scala | 1 + shared/src/test/scala/ChannelBehavior.scala | 3 + shared/src/test/scala/FutureBehavior.scala | 3 +- shared/src/test/scala/ListenerBehavior.scala | 2 +- shared/src/test/scala/SourceBehavior.scala | 1 + .../src/test/scala/TaskScheduleBehavior.scala | 3 +- 14 files changed, 169 insertions(+), 158 deletions(-) diff --git a/jvm/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala index a0f1b136..fa529556 100644 --- a/jvm/src/main/scala/PosixLikeIO/PIO.scala +++ b/jvm/src/main/scala/PosixLikeIO/PIO.scala @@ -1,6 +1,7 @@ package PosixLikeIO import gears.async.{Async, Future, given} +import gears.async.default.given import Future.Promise import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, ServerSocket, Socket} diff --git a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala index 3bcdcf8d..5e0989f9 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala @@ -1,5 +1,6 @@ package PosixLikeIO.examples +import gears.async.default.given import gears.async.{Async, Future, given} import gears.async.AsyncOperations.* import PosixLikeIO.{PIOHelper, SocketUDP} @@ -9,7 +10,6 @@ import java.nio.ByteBuffer import java.nio.file.StandardOpenOption import scala.concurrent.ExecutionContext - @main def clientAndServerUDP(): Unit = given ExecutionContext = ExecutionContext.global Async.blocking: @@ -30,6 +30,5 @@ import scala.concurrent.ExecutionContext val messageReceived = String(responseDatagram.getData.slice(0, responseDatagram.getLength), "UTF-8").toInt println("Sent " + value.toString + " and got " + messageReceived.toString + " in return.") - Async.await(client(100)) - Async.await(server) \ No newline at end of file + Async.await(server) diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala index f8d1a927..144c02ce 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala @@ -1,6 +1,7 @@ package PosixLikeIO.examples import gears.async.{Async, given} +import gears.async.default.given import PosixLikeIO.PIOHelper import java.nio.ByteBuffer diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala index 0628b6de..4e936cf8 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala @@ -1,6 +1,7 @@ package PosixLikeIO.examples import gears.async.{Async, given} +import gears.async.default.given import PosixLikeIO.PIOHelper import java.nio.ByteBuffer diff --git a/jvm/src/main/scala/async/DefaultSupport.scala b/jvm/src/main/scala/async/DefaultSupport.scala index de258e46..7f12d2d4 100644 --- a/jvm/src/main/scala/async/DefaultSupport.scala +++ b/jvm/src/main/scala/async/DefaultSupport.scala @@ -2,4 +2,6 @@ package gears.async.default import gears.async._ -given JvmAsyncOperations.type = JvmAsyncOperations +given AsyncOperations = JvmAsyncOperations +given VThreadSupport.type = VThreadSupport +given VThreadSupport.Scheduler = VThreadScheduler diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 7be03b7f..67836761 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -4,8 +4,8 @@ import scala.annotation.unchecked.uncheckedVariance import java.util.concurrent.locks.ReentrantLock import scala.concurrent.duration.FiniteDuration -given VThreadScheduler.type = VThreadScheduler -given VThreadSupport.type = VThreadSupport +// given VThreadScheduler.type = VThreadScheduler +// given VThreadSupport.type = VThreadSupport object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = Thread.startVirtualThread(body) diff --git a/jvm/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala index b8bf2f8f..ff6a0382 100644 --- a/jvm/src/main/scala/measurements/measureTimes.scala +++ b/jvm/src/main/scala/measurements/measureTimes.scala @@ -2,6 +2,7 @@ package measurements import PosixLikeIO.PIOHelper import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel, given} +import gears.async.default.given import java.io.{FileReader, FileWriter} import java.nio.file.{Files, NoSuchFileException, Paths, StandardOpenOption} diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index 2acc1db1..ea624b43 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -103,7 +103,6 @@ object Future: if cancelRequest then throw new CancellationException() private class FutureAsync(val group: CompletionGroup)(using label: ac.support.Label[Unit]) extends Async(using ac.support, ac.scheduler): - /** Await a source first by polling it, and, if that fails, by suspending * in a onComplete call. */ @@ -263,154 +262,154 @@ enum TaskSchedule: * Composing tasks can be referentially transparent. * Tasks can be also ran on a specified schedule. */ -// class Task[+T](val body: Async ?=> T): - -// /** Start a future computed from the `body` of this task */ -// def run(using Async) = Future(body) - -// def schedule(s: TaskSchedule)(using async: Async): Task[T] = -// s match { -// case TaskSchedule.Every(millis, maxRepetitions) => -// assert(millis >= 1) -// assert(maxRepetitions >= 0) -// Task { -// var repetitions = 0 -// var ret: T = body -// repetitions += 1 -// if (maxRepetitions == 1) ret -// else { -// while (maxRepetitions == 0 || repetitions < maxRepetitions) { -// sleep(millis) -// ret = body -// repetitions += 1 -// } -// ret -// } -// } -// case TaskSchedule.ExponentialBackoff(millis, exponentialBase, maxRepetitions) => -// assert(millis >= 1) -// assert(exponentialBase >= 2) -// assert(maxRepetitions >= 0) -// Task { -// var repetitions = 0 -// var ret: T = body -// repetitions += 1 -// if (maxRepetitions == 1) ret -// else { -// var timeToSleep = millis -// while (maxRepetitions == 0 || repetitions < maxRepetitions) { -// sleep(timeToSleep) -// timeToSleep *= exponentialBase -// ret = body -// repetitions += 1 -// } -// ret -// } -// } -// case TaskSchedule.FibonacciBackoff(millis, maxRepetitions) => -// assert(millis >= 1) -// assert(maxRepetitions >= 0) -// Task { -// var repetitions = 0 -// var a: Long = 0 -// var b: Long = 1 -// var ret: T = body -// repetitions += 1 -// if (maxRepetitions == 1) ret -// else { -// sleep(millis) -// ret = body -// repetitions += 1 -// if (maxRepetitions == 2) ret -// else { -// while (maxRepetitions == 0 || repetitions < maxRepetitions) { -// val aOld = a -// a = b -// b = aOld + b -// sleep(b * millis) -// ret = body -// repetitions += 1 -// } -// ret -// } -// } -// } -// case TaskSchedule.RepeatUntilFailure(millis, maxRepetitions) => -// assert(millis >= 0) -// assert(maxRepetitions >= 0) -// Task { -// @tailrec -// def helper(repetitions: Long = 0): T = -// if (repetitions > 0 && millis > 0) -// sleep(millis) -// val ret: T = body -// ret match { -// case Failure(_) => ret -// case _ if (repetitions+1) == maxRepetitions && maxRepetitions != 0 => ret -// case _ => helper(repetitions + 2) -// } -// helper() -// } -// case TaskSchedule.RepeatUntilSuccess(millis, maxRepetitions) => -// assert(millis >= 0) -// assert(maxRepetitions >= 0) -// Task { -// @tailrec -// def helper(repetitions: Long = 0): T = -// if (repetitions > 0 && millis > 0) -// sleep(millis) -// val ret: T = body -// ret match { -// case Success(_) => ret -// case _ if (repetitions + 1) == maxRepetitions && maxRepetitions != 0 => ret -// case _ => helper(repetitions + 2) -// } -// helper() -// } -// } - -// end Task - -// private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: -// val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => -// Future: -// try -// (Success(f.value), i) -// catch case e => (Failure(e), i) -// }) - -// @tailrec -// def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = -// Async.await(Async.race( fs.map(_._1)* )) match -// case Success((Success(x), i)) => -// if (shouldCancel) { -// for ((f, j) <- futures.zipWithIndex) { -// if (j != i) f.cancel() -// } -// } -// Success(x) -// case Success((Failure(e), i)) => -// if (failed + 1 == futures.length) -// Failure(e) -// else -// helper(failed + 1, fs.filter({ case (_, j) => j != i })) -// case _ => assert(false) - -// helper(0, fs.zipWithIndex).get - -// /** `alt` defined for multiple futures, not only two. -// * If either task succeeds, succeed with the success that was returned first. -// * Otherwise, fail with the failure that was returned last. -// */ -// def alt[T](futures: Future[T]*)(using Async): Future[T] = -// altAndAltCImplementation(false, futures*) - -// /** `altC` defined for multiple futures, not only two. -// * If either task succeeds, succeed with the success that was returned first and cancel all other tasks. -// * Otherwise, fail with the failure that was returned last. -// */ -// def altC[T](futures: Future[T]*)(using Async): Future[T] = -// altAndAltCImplementation(true, futures*) +class Task[+T](val body: (Async, AsyncOperations) ?=> T): + + /** Start a future computed from the `body` of this task */ + def run(using Async, AsyncOperations) = Future(body) + + def schedule(s: TaskSchedule): Task[T] = + s match { + case TaskSchedule.Every(millis, maxRepetitions) => + assert(millis >= 1) + assert(maxRepetitions >= 0) + Task { + var repetitions = 0 + var ret: T = body + repetitions += 1 + if (maxRepetitions == 1) ret + else { + while (maxRepetitions == 0 || repetitions < maxRepetitions) { + sleep(millis) + ret = body + repetitions += 1 + } + ret + } + } + case TaskSchedule.ExponentialBackoff(millis, exponentialBase, maxRepetitions) => + assert(millis >= 1) + assert(exponentialBase >= 2) + assert(maxRepetitions >= 0) + Task { + var repetitions = 0 + var ret: T = body + repetitions += 1 + if (maxRepetitions == 1) ret + else { + var timeToSleep = millis + while (maxRepetitions == 0 || repetitions < maxRepetitions) { + sleep(timeToSleep) + timeToSleep *= exponentialBase + ret = body + repetitions += 1 + } + ret + } + } + case TaskSchedule.FibonacciBackoff(millis, maxRepetitions) => + assert(millis >= 1) + assert(maxRepetitions >= 0) + Task { + var repetitions = 0 + var a: Long = 0 + var b: Long = 1 + var ret: T = body + repetitions += 1 + if (maxRepetitions == 1) ret + else { + sleep(millis) + ret = body + repetitions += 1 + if (maxRepetitions == 2) ret + else { + while (maxRepetitions == 0 || repetitions < maxRepetitions) { + val aOld = a + a = b + b = aOld + b + sleep(b * millis) + ret = body + repetitions += 1 + } + ret + } + } + } + case TaskSchedule.RepeatUntilFailure(millis, maxRepetitions) => + assert(millis >= 0) + assert(maxRepetitions >= 0) + Task { + @tailrec + def helper(repetitions: Long = 0): T = + if (repetitions > 0 && millis > 0) + sleep(millis) + val ret: T = body + ret match { + case Failure(_) => ret + case _ if (repetitions+1) == maxRepetitions && maxRepetitions != 0 => ret + case _ => helper(repetitions + 2) + } + helper() + } + case TaskSchedule.RepeatUntilSuccess(millis, maxRepetitions) => + assert(millis >= 0) + assert(maxRepetitions >= 0) + Task { + @tailrec + def helper(repetitions: Long = 0): T = + if (repetitions > 0 && millis > 0) + sleep(millis) + val ret: T = body + ret match { + case Success(_) => ret + case _ if (repetitions + 1) == maxRepetitions && maxRepetitions != 0 => ret + case _ => helper(repetitions + 2) + } + helper() + } + } + +end Task + +private def altAndAltCImplementation[T](shouldCancel: Boolean, futures: Future[T]*)(using Async): Future[T] = Future[T]: + val fs: Seq[Future[(Try[T], Int)]] = futures.zipWithIndex.map({ (f, i) => + Future: + try + (Success(f.value), i) + catch case e => (Failure(e), i) + }) + + @tailrec + def helper(failed: Int, fs: Seq[(Future[(Try[T], Int)], Int)]): Try[T] = + Async.await(Async.race( fs.map(_._1)* )) match + case Success((Success(x), i)) => + if (shouldCancel) { + for ((f, j) <- futures.zipWithIndex) { + if (j != i) f.cancel() + } + } + Success(x) + case Success((Failure(e), i)) => + if (failed + 1 == futures.length) + Failure(e) + else + helper(failed + 1, fs.filter({ case (_, j) => j != i })) + case _ => assert(false) + + helper(0, fs.zipWithIndex).get + +/** `alt` defined for multiple futures, not only two. + * If either task succeeds, succeed with the success that was returned first. + * Otherwise, fail with the failure that was returned last. + */ +def alt[T](futures: Future[T]*)(using Async): Future[T] = + altAndAltCImplementation(false, futures*) + +/** `altC` defined for multiple futures, not only two. + * If either task succeeds, succeed with the success that was returned first and cancel all other tasks. + * Otherwise, fail with the failure that was returned last. + */ +def altC[T](futures: Future[T]*)(using Async): Future[T] = + altAndAltCImplementation(true, futures*) def uninterruptible[T](body: Async ?=> T)(using ac: Async): T = val tracker = Cancellable.Tracking().link() diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index 2969bce8..a3930b02 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -1,5 +1,6 @@ import gears.async.{Async, Future, AsyncSupport, uninterruptible, given} import gears.async.AsyncOperations.* +import gears.async.default.given import scala.util.boundary import boundary.break import scala.concurrent.duration.{Duration, DurationInt} diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index 13b79f4e..04436f2a 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -1,4 +1,5 @@ import gears.async.{Async, BufferedChannel, ChannelClosedException, ChannelMultiplexer, Future, SyncChannel, Task, TaskSchedule, alt, altC, given} +import gears.async.default.given import gears.async.AsyncOperations.* import Future.{*:, zip} @@ -10,7 +11,9 @@ import scala.util.Random import scala.collection.mutable.{ArrayBuffer, Set} import java.util.concurrent.atomic.AtomicInteger +@munit.IgnoreSuite class ChannelBehavior extends munit.FunSuite { + given ExecutionContext = ExecutionContext.global test("sending is blocking in SyncChannel") { diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index 5d1ab69d..8856b953 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -1,4 +1,5 @@ import gears.async.{Async, Future, Task, TaskSchedule, alt, altC, uninterruptible, given} +import gears.async.default.given import gears.async.Future.{*:, Promise, zip} import gears.async.AsyncOperations.* @@ -384,4 +385,4 @@ class FutureBehavior extends munit.FunSuite { }).result, Failure(e2)) } -} \ No newline at end of file +} diff --git a/shared/src/test/scala/ListenerBehavior.scala b/shared/src/test/scala/ListenerBehavior.scala index 43685147..0897a1fe 100644 --- a/shared/src/test/scala/ListenerBehavior.scala +++ b/shared/src/test/scala/ListenerBehavior.scala @@ -3,7 +3,7 @@ import gears.async.Future import gears.async.Future.Promise import gears.async.Async import gears.async.Listener -import gears.async.given +import gears.async.default.given import scala.util.Success import java.util.concurrent.atomic.AtomicBoolean import gears.async.listeners.lockBoth diff --git a/shared/src/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala index 86587b4d..f803e06e 100644 --- a/shared/src/test/scala/SourceBehavior.scala +++ b/shared/src/test/scala/SourceBehavior.scala @@ -1,6 +1,7 @@ import gears.async.{Async, Future, Listener, given} import Async.either import gears.async.AsyncOperations.* +import gears.async.default.given import java.util.concurrent.CancellationException import scala.concurrent.ExecutionContext diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index 9d39eb21..ba786a51 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -1,4 +1,5 @@ import gears.async.{Async, Future, Task, TaskSchedule, alt, given} +import gears.async.default.given import Future.{*:, zip} import scala.concurrent.ExecutionContext @@ -85,4 +86,4 @@ class TaskScheduleBehavior extends munit.FunSuite { assert(end - start < 5 * 150) } -} \ No newline at end of file +} From 93878845d8071dcd4d6ca61cc857b7f11d221a48 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Wed, 25 Oct 2023 14:36:13 +0200 Subject: [PATCH 06/16] Move the timeout test to jvm --- jvm/src/test/scala/CancellationBehavior.scala | 21 +++++++++++++++++++ .../src/test/scala/CancellationBehavior.scala | 12 ----------- shared/src/test/scala/ChannelBehavior.scala | 3 +-- 3 files changed, 22 insertions(+), 14 deletions(-) create mode 100644 jvm/src/test/scala/CancellationBehavior.scala diff --git a/jvm/src/test/scala/CancellationBehavior.scala b/jvm/src/test/scala/CancellationBehavior.scala new file mode 100644 index 00000000..d53f8897 --- /dev/null +++ b/jvm/src/test/scala/CancellationBehavior.scala @@ -0,0 +1,21 @@ +import gears.async.{Async, Future, AsyncSupport, uninterruptible, given} +import gears.async.AsyncOperations.* +import gears.async.default.given +import scala.util.boundary +import boundary.break +import scala.concurrent.duration.{Duration, DurationInt} +import java.util.concurrent.CancellationException +import scala.util.Success +import scala.util.Properties + +// JVM-only since `munitTimeout` is not available on scala native. +// See (here)[https://scalameta.org/munit/docs/tests.html#customize-test-timeouts]. +class JVMCancellationBehavior extends munit.FunSuite: + override def munitTimeout: Duration = 2.seconds + test("no cancel -> timeout".fail): + Async.blocking: + val f = Future: + Thread.sleep(5000) + 1 + f.result + diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index a3930b02..0874c3b9 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -8,8 +8,6 @@ import java.util.concurrent.CancellationException import scala.util.Success class CancellationBehavior extends munit.FunSuite: - override def munitTimeout: Duration = 2.seconds - enum State: case Ready case Initialized(f: Future[?]) @@ -56,16 +54,6 @@ class CancellationBehavior extends munit.FunSuite: info.initialize(f) f - test("no cancel -> timeout".fail): - // munit v0.x only respects timeout for async tests - given scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global - scala.concurrent.Future: - Async.blocking: - val f = Future: - sleep(3000) - 1 - f.result - test("no cancel"): var x = 0 Async.blocking: diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index 04436f2a..adf23cba 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -11,8 +11,7 @@ import scala.util.Random import scala.collection.mutable.{ArrayBuffer, Set} import java.util.concurrent.atomic.AtomicInteger -@munit.IgnoreSuite -class ChannelBehavior extends munit.FunSuite { +abstract class ChannelBehavior extends munit.FunSuite { given ExecutionContext = ExecutionContext.global From 9124cc2d829e376dd0d4f0aff1bbd0fc10dacf23 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 30 Oct 2023 15:44:41 +0100 Subject: [PATCH 07/16] Properly check for completion in future addListener Otherwise a situation where Thread A | Thread B poll | complete addListener happens, and A gets deadlocked. --- shared/src/main/scala/async/futures.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index ea624b43..f37a1b13 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -54,7 +54,8 @@ object Future: else false def addListener(k: Listener[Try[T]]): Unit = synchronized: - waiting += k + if hasCompleted then k.completeNow(result, this) + else waiting += k def dropListener(k: Listener[Try[T]]): Unit = synchronized: waiting -= k From f1008a759964e8599d80c84c84981d9a3a90b17c Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Wed, 1 Nov 2023 13:59:40 +0100 Subject: [PATCH 08/16] Make sleeper thread a daemon --- native/src/main/scala/async/ForkJoinSupport.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala index 8a83d082..9038dbd3 100644 --- a/native/src/main/scala/async/ForkJoinSupport.scala +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -68,6 +68,7 @@ class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionConte } val sleeperThread = Thread(() => sleepLoop()) + sleeperThread.setDaemon(true) sleeperThread.start() } From 43415bef48d4f2c1eed065208d4aac7e4d289d16 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 3 Nov 2023 21:07:24 +0100 Subject: [PATCH 09/16] Use Boehm GC as Immix is crashing sometimes --- build.sbt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.sbt b/build.sbt index 3a152947..52bd51f2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,5 @@ import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType} +import scalanative.build._ ThisBuild / scalaVersion := "3.3.1" @@ -19,6 +20,7 @@ lazy val root = .nativeSettings(Seq( nativeConfig ~= { c => c.withMultithreadingSupport(true) + .withGC(GC.boehm) // immix doesn't work yet }, libraryDependencies += "org.scalameta" %%% "munit" % "1.0.0-M10+15-3940023e-SNAPSHOT" % Test )) From 41872bd3650879b5366601171438bee2bb638a61 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 1 Dec 2023 18:27:17 +0100 Subject: [PATCH 10/16] ci: Split jobs between native and jvm --- .github/workflows/ci.yml | 31 ++++++++++++++++++++++++++++--- dependencies/publish-deps.sh | 9 +++++++-- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ec28edd..435aa237 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -5,16 +5,41 @@ on: - main pull_request: jobs: - test: + test-jvm: strategy: fail-fast: false runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 + with: + submodules: recursive + fetch-depth: 0 - uses: coursier/cache-action@v6 - uses: actions/setup-java@v3 with: distribution: temurin java-version: 21 + - name: Build dependencies + run: ./dependencies/publish-deps.sh --scala-native-only + - name: Test + run: sbt rootJVM/test + test-native: + strategy: + fail-fast: false + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + fetch-depth: 0 + - uses: coursier/cache-action@v6 + - uses: actions/setup-java@v3 + with: + distribution: temurin + java-version: 19 + - name: Build dependencies + run: ./dependencies/publish-deps.sh + - name: Install scala-native dependencies + run: sudo apt-get install clang libstdc++-12-dev libgc-dev - name: Test - run: sbt test + run: sbt rootNative/test diff --git a/dependencies/publish-deps.sh b/dependencies/publish-deps.sh index 64257391..6c28bad6 100755 --- a/dependencies/publish-deps.sh +++ b/dependencies/publish-deps.sh @@ -4,5 +4,10 @@ set -e cd "$(dirname "${BASH_SOURCE[0]}")" # change to this directory -cd scala-native && sbt "publish-local-dev 3; ++3.1.2 publishLocal" && cd .. -cd munit && sbt "++3.1.2 munitNative/publishLocal" +cd scala-native && sbt 'publish-local-dev 3' && cd .. + +if test "$1" != "--scala-native-only"; then + cd scala-native && sbt '++3.1.2 publishLocal' && cd .. + cd munit && sbt "++3.1.2 munitNative/publishLocal" && cd .. +fi + From b23143a30846835fea647f68e364fa03ffddda22 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Mon, 4 Dec 2023 15:22:55 +0100 Subject: [PATCH 11/16] Try to set up caching of submodule outputs on CI --- .github/workflows/ci.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 435aa237..e541211c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,6 +19,14 @@ jobs: with: distribution: temurin java-version: 21 + - name: Get submodule status hash + id: get-submodules + run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT + - uses: actions/cache@v3 + with: + path: | + ./dependencies/**/target + key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }}-scala-native-only - name: Build dependencies run: ./dependencies/publish-deps.sh --scala-native-only - name: Test @@ -37,6 +45,14 @@ jobs: with: distribution: temurin java-version: 19 + - name: Get submodule status hash + id: get-submodules + run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT + - uses: actions/cache@v3 + with: + path: | + ./dependencies/**/target + key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }} - name: Build dependencies run: ./dependencies/publish-deps.sh - name: Install scala-native dependencies From 7717dcf5fcfa46a552a924bc581d6a462f833293 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 8 Dec 2023 00:13:02 +0100 Subject: [PATCH 12/16] Remove import of gears.async.given --- jvm/src/main/scala/PosixLikeIO/PIO.scala | 2 +- .../main/scala/PosixLikeIO/examples/clientAndServerUDP.scala | 2 +- jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala | 2 +- jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala | 2 +- jvm/src/main/scala/measurements/measureTimes.scala | 2 +- jvm/src/test/scala/CancellationBehavior.scala | 2 +- shared/src/test/scala/CancellationBehavior.scala | 2 +- shared/src/test/scala/ChannelBehavior.scala | 2 +- shared/src/test/scala/FutureBehavior.scala | 2 +- shared/src/test/scala/SourceBehavior.scala | 2 +- shared/src/test/scala/TaskScheduleBehavior.scala | 2 +- 11 files changed, 11 insertions(+), 11 deletions(-) diff --git a/jvm/src/main/scala/PosixLikeIO/PIO.scala b/jvm/src/main/scala/PosixLikeIO/PIO.scala index fa529556..afea8701 100644 --- a/jvm/src/main/scala/PosixLikeIO/PIO.scala +++ b/jvm/src/main/scala/PosixLikeIO/PIO.scala @@ -1,6 +1,6 @@ package PosixLikeIO -import gears.async.{Async, Future, given} +import gears.async.{Async, Future} import gears.async.default.given import Future.Promise diff --git a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala index 5e0989f9..83bae882 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/clientAndServerUDP.scala @@ -1,7 +1,7 @@ package PosixLikeIO.examples import gears.async.default.given -import gears.async.{Async, Future, given} +import gears.async.{Async, Future} import gears.async.AsyncOperations.* import PosixLikeIO.{PIOHelper, SocketUDP} diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala index 144c02ce..606f8bcf 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readAndWriteFile.scala @@ -1,6 +1,6 @@ package PosixLikeIO.examples -import gears.async.{Async, given} +import gears.async.Async import gears.async.default.given import PosixLikeIO.PIOHelper diff --git a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala index 4e936cf8..34d3035f 100644 --- a/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala +++ b/jvm/src/main/scala/PosixLikeIO/examples/readWholeFile.scala @@ -1,6 +1,6 @@ package PosixLikeIO.examples -import gears.async.{Async, given} +import gears.async.Async import gears.async.default.given import PosixLikeIO.PIOHelper diff --git a/jvm/src/main/scala/measurements/measureTimes.scala b/jvm/src/main/scala/measurements/measureTimes.scala index ff6a0382..8f624e90 100644 --- a/jvm/src/main/scala/measurements/measureTimes.scala +++ b/jvm/src/main/scala/measurements/measureTimes.scala @@ -1,7 +1,7 @@ package measurements import PosixLikeIO.PIOHelper -import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel, given} +import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel} import gears.async.default.given import java.io.{FileReader, FileWriter} diff --git a/jvm/src/test/scala/CancellationBehavior.scala b/jvm/src/test/scala/CancellationBehavior.scala index d53f8897..015238ec 100644 --- a/jvm/src/test/scala/CancellationBehavior.scala +++ b/jvm/src/test/scala/CancellationBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, AsyncSupport, uninterruptible, given} +import gears.async.{Async, Future, AsyncSupport, uninterruptible} import gears.async.AsyncOperations.* import gears.async.default.given import scala.util.boundary diff --git a/shared/src/test/scala/CancellationBehavior.scala b/shared/src/test/scala/CancellationBehavior.scala index 0874c3b9..7bbd593d 100644 --- a/shared/src/test/scala/CancellationBehavior.scala +++ b/shared/src/test/scala/CancellationBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, AsyncSupport, uninterruptible, given} +import gears.async.{Async, Future, AsyncSupport, uninterruptible} import gears.async.AsyncOperations.* import gears.async.default.given import scala.util.boundary diff --git a/shared/src/test/scala/ChannelBehavior.scala b/shared/src/test/scala/ChannelBehavior.scala index adf23cba..6d858513 100644 --- a/shared/src/test/scala/ChannelBehavior.scala +++ b/shared/src/test/scala/ChannelBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, BufferedChannel, ChannelClosedException, ChannelMultiplexer, Future, SyncChannel, Task, TaskSchedule, alt, altC, given} +import gears.async.{Async, BufferedChannel, ChannelClosedException, ChannelMultiplexer, Future, SyncChannel, Task, TaskSchedule, alt, altC} import gears.async.default.given import gears.async.AsyncOperations.* import Future.{*:, zip} diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index 8856b953..e770e5b8 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, Task, TaskSchedule, alt, altC, uninterruptible, given} +import gears.async.{Async, Future, Task, TaskSchedule, alt, altC, uninterruptible} import gears.async.default.given import gears.async.Future.{*:, Promise, zip} import gears.async.AsyncOperations.* diff --git a/shared/src/test/scala/SourceBehavior.scala b/shared/src/test/scala/SourceBehavior.scala index f803e06e..ddeb7628 100644 --- a/shared/src/test/scala/SourceBehavior.scala +++ b/shared/src/test/scala/SourceBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, Listener, given} +import gears.async.{Async, Future, Listener} import Async.either import gears.async.AsyncOperations.* import gears.async.default.given diff --git a/shared/src/test/scala/TaskScheduleBehavior.scala b/shared/src/test/scala/TaskScheduleBehavior.scala index ba786a51..b56303c0 100644 --- a/shared/src/test/scala/TaskScheduleBehavior.scala +++ b/shared/src/test/scala/TaskScheduleBehavior.scala @@ -1,4 +1,4 @@ -import gears.async.{Async, Future, Task, TaskSchedule, alt, given} +import gears.async.{Async, Future, Task, TaskSchedule, alt} import gears.async.default.given import Future.{*:, zip} From 22fcade9dc041e2ce94fbe8401dd80cb08ae2ed2 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 8 Dec 2023 00:28:41 +0100 Subject: [PATCH 13/16] Clean up comments --- jvm/src/main/scala/async/VThreadSupport.scala | 3 -- shared/src/main/scala/async/channels.scala | 33 ------------------- 2 files changed, 36 deletions(-) diff --git a/jvm/src/main/scala/async/VThreadSupport.scala b/jvm/src/main/scala/async/VThreadSupport.scala index 67836761..f4a5b11d 100644 --- a/jvm/src/main/scala/async/VThreadSupport.scala +++ b/jvm/src/main/scala/async/VThreadSupport.scala @@ -4,9 +4,6 @@ import scala.annotation.unchecked.uncheckedVariance import java.util.concurrent.locks.ReentrantLock import scala.concurrent.duration.FiniteDuration -// given VThreadScheduler.type = VThreadScheduler -// given VThreadSupport.type = VThreadSupport - object VThreadScheduler extends Scheduler: override def execute(body: Runnable): Unit = Thread.startVirtualThread(body) diff --git a/shared/src/main/scala/async/channels.scala b/shared/src/main/scala/async/channels.scala index 1d53d025..30dca95f 100644 --- a/shared/src/main/scala/async/channels.scala +++ b/shared/src/main/scala/async/channels.scala @@ -343,36 +343,3 @@ object ChannelMultiplexer: end ChannelMultiplexer - -// TODO: make this work -// @main def channelsMultipleSendersOneReader(): Unit = -// Async.blocking: -// var aa = false -// var ab = false -// var ac = false -// var b = false -// val c = SyncChannel[Int]() -// val f13 = Future: -// for (i <- 1 to 10000) -// c.send(i) -// ac = true -// val f11 = Future: -// for (i <- 1 to 10000) -// c.send(i) -// aa = true -// val f12 = Future: -// for (i <- 1 to 10000) -// c.send(i) -// ab = true -// val f2 = Future: -// var r = 0 -// for (i <- 1 to 30000) -// c.read() -// r += 1 -// b = true - -// f11.result -// f2.result -// f12.result -// f13.result -// println("all done? " + (aa && ab && ac && b)) From 7fcbbeda8ed960e91ccdf4afb6b3ac9fd4becdf7 Mon Sep 17 00:00:00 2001 From: Natsu Kagami Date: Fri, 8 Dec 2023 00:28:59 +0100 Subject: [PATCH 14/16] Fix timer not removing listeners on tick --- shared/src/main/scala/async/Timer.scala | 37 +++++++++++++------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/shared/src/main/scala/async/Timer.scala b/shared/src/main/scala/async/Timer.scala index 0079251b..9f4a129d 100644 --- a/shared/src/main/scala/async/Timer.scala +++ b/shared/src/main/scala/async/Timer.scala @@ -13,19 +13,26 @@ import scala.annotation.tailrec import java.util.concurrent.CancellationException /** - * Timer exposes a steady Async.Source of ticks that happens every `tickDuration` milliseconds. + * Timer exposes a steady [[Async.Source]] of ticks that happens every `tickDuration` milliseconds. * Note that the timer does not start ticking until `start` is called (which is a blocking operation, until the timer is cancelled). * - * You might want to manually `cancel` the timer, so that it gets garbage collected (before the enclosing `Async` scope ends). + * You might want to manually `cancel` the timer, so that it gets garbage collected (before the enclosing [[Async]] scope ends). */ class Timer(tickDuration: Duration) extends Cancellable { - var isCancelled = false + enum TimerEvent: + case Tick + case Cancelled - private class Source extends Async.OriginalSource[this.TimerEvent] { + private var isCancelled = false + + private object Source extends Async.OriginalSource[this.TimerEvent] { + val listeners = mutable.Set[Listener[TimerEvent]]() def tick() = synchronized { - listeners.foreach(_.completeNow(TimerEvent.Tick, this)) + listeners.filterInPlace(l => + l.completeNow(TimerEvent.Tick, this) + false + ) } - val listeners = mutable.Set[Listener[TimerEvent]]() override def poll(k: Listener[TimerEvent]): Boolean = if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else false // subscribing to a timer always takes you to the next tick override def dropListener(k: Listener[TimerEvent]): Unit = listeners -= k @@ -36,36 +43,30 @@ class Timer(tickDuration: Duration) extends Cancellable { if isCancelled then k.completeNow(TimerEvent.Cancelled, this) else listeners += k } - private val _src = Source() /** Ticks of the timer are delivered through this source. Note that ticks are ephemeral. */ - val src: Async.Source[this.TimerEvent] = _src + inline final def src: Async.Source[this.TimerEvent] = Source + /** Starts the timer. Suspends until the timer is cancelled. */ def start()(using Async, AsyncOperations): Unit = cancellationScope(this): loop() @tailrec private def loop()(using Async, AsyncOperations): Unit = if !isCancelled then - try - // println(s"Sleeping at ${new java.util.Date()}, ${isCancelled}, ${this}") - sleep(tickDuration.toMillis) + try sleep(tickDuration.toMillis) catch case _: CancellationException => cancel() if !isCancelled then - _src.tick() + Source.tick() loop() override def cancel(): Unit = synchronized { isCancelled = true } src.synchronized { - _src.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src)) - _src.listeners.clear() + Source.listeners.foreach(_.completeNow(TimerEvent.Cancelled, src)) + Source.listeners.clear() } - - enum TimerEvent: - case Tick - case Cancelled } From 53a06563691be5f2bf259faff3a35d0a703308f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20M=C3=BCller?= Date: Fri, 8 Dec 2023 19:23:59 +0100 Subject: [PATCH 15/16] Introduce withResolver.withResolver --- shared/src/main/scala/async/futures.scala | 44 +++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/shared/src/main/scala/async/futures.scala b/shared/src/main/scala/async/futures.scala index f37a1b13..54208832 100644 --- a/shared/src/main/scala/async/futures.scala +++ b/shared/src/main/scala/async/futures.scala @@ -170,6 +170,50 @@ object Future: def apply[T](body: Async ?=> T)(using Async): Future[T] = RunnableFuture(body) + /** The group of handlers to be used in [[withResolver]]. + * As a Future is completed only once, only one of resolve/reject/complete + * may be used and only once. + */ + trait Resolver[-T]: + /** Complete the future with a data item successfully */ + def resolve(item: T): Unit = complete(Success(item)) + /** Complete the future with a failure */ + def reject(exc: Throwable): Unit = complete(Failure(exc)) + + /** Complete the future with the result, be it Success or Failure */ + def complete(result: Try[T]): Unit + + /** Register a cancellation handler to be called when the created future + * is cancelled. Note that only one handler may be used. + */ + def onCancel(handler: () => Unit): Unit + end Resolver + + /** Create a future that may be completed asynchronously using external means. + * + * The body is run synchronously on the callers thread to setup an external + * asynchronous operation whose success/failure it communicates using the + * [[Resolver]] to complete the future. + * + * If the external operation supports cancellation, the body can register + * one handler using [[Resolver.onCancel]]. + */ + def withResolver[T](body: Resolver[T] => Unit): Future[T] = + val future = new CoreFuture[T] with Resolver[T] { + @volatile var cancelHandle = () => () + override def onCancel(handler: () => Unit): Unit = cancelHandle = handler + override def complete(result: Try[T]): Unit = super.complete(result) + + override def cancel(): Unit = + super.cancel() + cancelHandle() + reject(CancellationException()) + this.unlink() + } + body(future) + future + end withResolver + /** A future that immediately terminates with the given result */ def now[T](result: Try[T]): Future[T] = val f = CoreFuture[T]() From c4b844b119e52418b5eca60730449410f1f67428 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20M=C3=BCller?= Date: Fri, 8 Dec 2023 19:24:31 +0100 Subject: [PATCH 16/16] Optimize ForkJoinSupport systemtime usage and promise --- .../main/scala/async/ForkJoinSupport.scala | 41 +++++++++---------- shared/src/test/scala/FutureBehavior.scala | 4 +- shared/src/test/scala/Timer.scala | 10 ++++- 3 files changed, 30 insertions(+), 25 deletions(-) diff --git a/native/src/main/scala/async/ForkJoinSupport.scala b/native/src/main/scala/async/ForkJoinSupport.scala index 9038dbd3..94d9c85b 100644 --- a/native/src/main/scala/async/ForkJoinSupport.scala +++ b/native/src/main/scala/async/ForkJoinSupport.scala @@ -28,10 +28,7 @@ class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionConte import scala.collection.mutable private case class Sleeper(wakeTime: Deadline, toRun: Runnable): var isCancelled = false - private given Ordering[Sleeper] = new Ordering[Sleeper] { - import math.Ordered.orderingToOrdered - override def compare(x: Sleeper, y: Sleeper): Int = y.wakeTime.compare(x.wakeTime) - } + private given Ordering[Sleeper] = Ordering.by((sleeper: Sleeper) => sleeper.wakeTime).reverse private val sleepers = mutable.PriorityQueue.empty[Sleeper] private var sleepingUntil: Option[Deadline] = None @@ -48,19 +45,25 @@ class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionConte () => { sleeper.isCancelled = true } } - private def consideredOverdue(d: Deadline): Boolean = d.timeLeft <= 0.milli - // Sleep until the first timer is due, or .wait() otherwise private def sleepLoop(): Unit = this.synchronized { while (true) { sleepingUntil = sleepers.headOption.map(_.wakeTime) - sleepingUntil match - case None => this.wait() + val current = sleepingUntil match + case None => + this.wait() + Deadline.now case Some(value) => - if value.hasTimeLeft() then - this.wait(value.timeLeft.max(10.millis).toMillis) + val current0 = Deadline.now + val timeLeft = value - current0 + + if timeLeft.toNanos > 0 then + this.wait(timeLeft.toMillis.max(10)) + Deadline.now + else current0 + // Pop sleepers until no more available - while (sleepers.headOption.map(_.wakeTime.isOverdue()) == Some(true)) { + while (sleepers.headOption.exists(_.wakeTime <= current)) { val task = sleepers.dequeue() if !task.isCancelled then execute(task.toRun) } @@ -77,16 +80,12 @@ class SuspendExecutorWithSleep(exec: ExecutionContext) extends ExecutorWithSleep with AsyncOperations with NativeSuspend { type Scheduler = this.type - override def sleep(millis: Long)(using ac: Async): Unit = { - val sleepingFut = Promise[Unit]() - val innerCancellable = schedule(millis.millis, () => sleepingFut.complete(scala.util.Success(()))) - cancellationScope(() => - // we need to wrap the cancellable so that we mark the cancellation as well - sleepingFut.complete(scala.util.Failure(CancellationException())) - innerCancellable.cancel() - ): - sleepingFut.future.value - } + override def sleep(millis: Long)(using Async): Unit = + Future.withResolver[Unit]: resolver => + val cancellable = schedule(millis.millis, () => resolver.resolve(())) + resolver.onCancel(cancellable.cancel) + .link() + .value } class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool()) diff --git a/shared/src/test/scala/FutureBehavior.scala b/shared/src/test/scala/FutureBehavior.scala index e770e5b8..def286d6 100644 --- a/shared/src/test/scala/FutureBehavior.scala +++ b/shared/src/test/scala/FutureBehavior.scala @@ -259,13 +259,13 @@ class FutureBehavior extends munit.FunSuite { (Future { sleep(Random.between(200, 300)); throw e1 - } *: Future { + } *: Future { sleep(Random.between(200, 300)); throw e2 } *: Future { sleep(Random.between(50, 100)); throw e3 - } *: Future{EmptyTuple}).result, Failure(e3)) + } *: Future.now(Success(EmptyTuple))).result, Failure(e3)) } test("cancelled futures return the same constant CancellationException with no stack attached".ignore) { diff --git a/shared/src/test/scala/Timer.scala b/shared/src/test/scala/Timer.scala index e201dff5..18eed51d 100644 --- a/shared/src/test/scala/Timer.scala +++ b/shared/src/test/scala/Timer.scala @@ -8,13 +8,19 @@ import java.util.concurrent.CancellationException class TimerTest extends munit.FunSuite { import gears.async.default.given + test("sleeping does sleep") { + Async.blocking: + val now1 = System.currentTimeMillis() + sleep(200) + val now2 = System.currentTimeMillis() + assert(now2 - now1 > 150, now2 - now1) + } + test("TimerSleep1Second") { Async.blocking: - println("start of 1 second") val timer = Timer(1.second) Future { timer.start() } assert(Async.await(timer.src) == timer.TimerEvent.Tick) - println("end of 1 second") } def timeoutCancellableFuture[T](d: Duration, f: Future[T])(using Async, AsyncOperations): Future[T] =