Skip to content

Commit

Permalink
Merge pull request #21 from natsukagami/cross-platform
Browse files Browse the repository at this point in the history
Multiplatform support
  • Loading branch information
natsukagami authored Dec 8, 2023
2 parents 1b374df + c4b844b commit fbcdccf
Show file tree
Hide file tree
Showing 40 changed files with 457 additions and 204 deletions.
47 changes: 44 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,57 @@ on:
- main
pull_request:
jobs:
test:
test-jvm:
strategy:
fail-fast: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0
- uses: coursier/cache-action@v6
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 21
- name: Get submodule status hash
id: get-submodules
run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT
- uses: actions/cache@v3
with:
path: |
./dependencies/**/target
key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }}-scala-native-only
- name: Build dependencies
run: ./dependencies/publish-deps.sh --scala-native-only
- name: Test
run: sbt rootJVM/test
test-native:
strategy:
fail-fast: false
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0
- uses: coursier/cache-action@v6
- uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 19
- name: Get submodule status hash
id: get-submodules
run: echo "submodules=$(git submodule status | sha256sum | awk '{print $1}')" >> $GITHUB_OUTPUT
- uses: actions/cache@v3
with:
path: |
./dependencies/**/target
key: ${{ runner.os }}-${{ steps.get-submodules.outputs.submodules }}
- name: Build dependencies
run: ./dependencies/publish-deps.sh
- name: Install scala-native dependencies
run: sudo apt-get install clang libstdc++-12-dev libgc-dev
- name: Test
run: sbt test
run: sbt rootNative/test
7 changes: 7 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[submodule "dependencies/scala-native"]
path = dependencies/scala-native
url = [email protected]:scala-native/scala-native.git
[submodule "dependencies/munit"]
path = dependencies/munit
url = [email protected]:natsukagami/munit.git
branch = use-0.5-snapshot-sn
7 changes: 7 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Contributing

## Building `gears`

`gears` currently require:
- **On the JVM**: JVM with support for virtual threads. This usually means JVM 21+, or 19+ with `--enable-preview`.
- **On Scala Native**: Scala Native with delimited continuations support. See the pinned versions in [`dependencies`](./dependencies/README.md).
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@



This is a proof of concept for a base library for asynchronous computing in direct style. The library needs either fibers or virtual threads as a basis. It is at present highly experimental, incomplete and provisional. It is not yet extensively tested and not optimized at all.

The concepts and code here should be regarded as a strawman, in the sense of "meant to be knocked down".
Expand Down
24 changes: 20 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}
import scalanative.build._

ThisBuild / scalaVersion := "3.3.1"

lazy val root = project
lazy val root =
crossProject(JVMPlatform, NativePlatform)
.crossType(CrossType.Full)
.in(file("."))
.settings(
.settings(Seq(
name := "Gears",
organization := "ch.epfl.lamp",
version := "0.1.0-SNAPSHOT",
libraryDependencies += "org.scalameta" %% "munit" % "0.7.29" % Test
)
testFrameworks += new TestFramework("munit.Framework")
))
.jvmSettings(Seq(
javaOptions += "--version 21",
libraryDependencies += "org.scalameta" %% "munit" % "1.0.0-M10" % Test
))
.nativeSettings(Seq(
nativeConfig ~= { c =>
c.withMultithreadingSupport(true)
.withGC(GC.boehm) // immix doesn't work yet
},
libraryDependencies += "org.scalameta" %%% "munit" % "1.0.0-M10+15-3940023e-SNAPSHOT" % Test
))
22 changes: 22 additions & 0 deletions dependencies/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Custom Dependencies for Scala Native

Scala Native requires some libraries to be compiled from source and `publishLocal`'d.

### TL; DR

```bash
./publish-deps.sh
```

### What are included?

- The current snapshot version of Scala Native, pinned in `scala-native`: for the delimited continuation support.
This needs to be published for both `3.3.1` (for `gears`) and `3.1.2` (for `munit`):
```bash
sbt "publish-local-dev 3; ++3.1.2 publishLocal"
```
- A fork of `munit` that uses the above snapshot, with a simple fix (https://github.com/scalameta/munit/pull/714) to make it compile.
Pinned in `munit`.
```bash
sbt "munitNative/publishLocal"
```
1 change: 1 addition & 0 deletions dependencies/munit
Submodule munit added at 394002
13 changes: 13 additions & 0 deletions dependencies/publish-deps.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

set -e

cd "$(dirname "${BASH_SOURCE[0]}")" # change to this directory

cd scala-native && sbt 'publish-local-dev 3' && cd ..

if test "$1" != "--scala-native-only"; then
cd scala-native && sbt '++3.1.2 publishLocal' && cd ..
cd munit && sbt "++3.1.2 munitNative/publishLocal" && cd ..
fi

1 change: 1 addition & 0 deletions dependencies/scala-native
Submodule scala-native added at f4b907
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO

import gears.async.{Async, Future, given}
import gears.async.{Async, Future}
import gears.async.default.given
import Future.Promise

import java.net.{DatagramPacket, DatagramSocket, InetAddress, InetSocketAddress, ServerSocket, Socket}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, Future, given}
import gears.async.default.given
import gears.async.{Async, Future}
import gears.async.AsyncOperations.*
import PosixLikeIO.{PIOHelper, SocketUDP}

Expand All @@ -9,7 +10,6 @@ import java.nio.ByteBuffer
import java.nio.file.StandardOpenOption
import scala.concurrent.ExecutionContext


@main def clientAndServerUDP(): Unit =
given ExecutionContext = ExecutionContext.global
Async.blocking:
Expand All @@ -30,6 +30,5 @@ import scala.concurrent.ExecutionContext
val messageReceived = String(responseDatagram.getData.slice(0, responseDatagram.getLength), "UTF-8").toInt
println("Sent " + value.toString + " and got " + messageReceived.toString + " in return.")


Async.await(client(100))
Async.await(server)
Async.await(server)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, given}
import gears.async.Async
import gears.async.default.given
import PosixLikeIO.PIOHelper

import java.nio.ByteBuffer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package PosixLikeIO.examples

import gears.async.{Async, given}
import gears.async.Async
import gears.async.default.given
import PosixLikeIO.PIOHelper

import java.nio.ByteBuffer
Expand Down
7 changes: 7 additions & 0 deletions jvm/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package gears.async.default

import gears.async._

given AsyncOperations = JvmAsyncOperations
given VThreadSupport.type = VThreadSupport
given VThreadSupport.Scheduler = VThreadScheduler
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package gears.async

given JvmAsyncOperations.type = JvmAsyncOperations

object JvmAsyncOperations extends AsyncOperations:

private def jvmInterruptible[T](fn: => T)(using Async): T =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import scala.annotation.unchecked.uncheckedVariance
import java.util.concurrent.locks.ReentrantLock
import scala.concurrent.duration.FiniteDuration

given VThreadScheduler.type = VThreadScheduler
given VThreadSupport.type = VThreadSupport

object VThreadScheduler extends Scheduler:
override def execute(body: Runnable): Unit = Thread.startVirtualThread(body)

override def schedule(delay: FiniteDuration, body: Runnable): Cancellable =
val th = Thread.startVirtualThread: () =>
Thread.sleep(delay.toMillis)
body.run()
() => th.interrupt() // TODO this may interrupt the body after sleeping
execute(body)
() => th.interrupt()

object VThreadSupport extends AsyncSupport:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package measurements

import PosixLikeIO.PIOHelper
import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel, given}
import gears.async.{Async, BufferedChannel, ChannelMultiplexer, Future, SyncChannel}
import gears.async.default.given

import java.io.{FileReader, FileWriter}
import java.nio.file.{Files, NoSuchFileException, Paths, StandardOpenOption}
Expand Down
21 changes: 21 additions & 0 deletions jvm/src/test/scala/CancellationBehavior.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import gears.async.{Async, Future, AsyncSupport, uninterruptible}
import gears.async.AsyncOperations.*
import gears.async.default.given
import scala.util.boundary
import boundary.break
import scala.concurrent.duration.{Duration, DurationInt}
import java.util.concurrent.CancellationException
import scala.util.Success
import scala.util.Properties

// JVM-only since `munitTimeout` is not available on scala native.
// See (here)[https://scalameta.org/munit/docs/tests.html#customize-test-timeouts].
class JVMCancellationBehavior extends munit.FunSuite:
override def munitTimeout: Duration = 2.seconds
test("no cancel -> timeout".fail):
Async.blocking:
val f = Future:
Thread.sleep(5000)
1
f.result

10 changes: 10 additions & 0 deletions native/src/main/scala/async/DefaultSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package gears.async.default

import gears.async._
import gears.async.native.ForkJoinSupport

object DefaultSupport extends ForkJoinSupport

given AsyncSupport = DefaultSupport
given DefaultSupport.Scheduler = DefaultSupport
given AsyncOperations = DefaultSupport
91 changes: 91 additions & 0 deletions native/src/main/scala/async/ForkJoinSupport.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package gears.async.native

import gears.async._
import scala.scalanative.runtime.{Continuations => native}
import java.util.concurrent.ForkJoinPool
import scala.concurrent.ExecutionContext
import scala.concurrent.JavaConversions._
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicBoolean
import gears.async.Future.Promise
import java.util.concurrent.CancellationException

class NativeContinuation[-T, +R] private[native] (val cont: T => R) extends Suspension[T, R]:
def resume(arg: T): R = cont(arg)

trait NativeSuspend extends SuspendSupport:
type Label[R] = native.BoundaryLabel[R]
type Suspension[T, R] = NativeContinuation[T, R]

override def boundary[R](body: (Label[R]) ?=> R): R =
native.boundary(body)

override def suspend[T, R](body: Suspension[T, R] => R)(using Label[R]): T = native.suspend[T, R](f => body(NativeContinuation(f)))
end NativeSuspend

/** Spawns a single thread that does all the sleeping. */
class ExecutorWithSleepThread(val exec: ExecutionContext) extends ExecutionContext with Scheduler {
import scala.collection.mutable
private case class Sleeper(wakeTime: Deadline, toRun: Runnable):
var isCancelled = false
private given Ordering[Sleeper] = Ordering.by((sleeper: Sleeper) => sleeper.wakeTime).reverse
private val sleepers = mutable.PriorityQueue.empty[Sleeper]
private var sleepingUntil: Option[Deadline] = None

override def execute(body: Runnable): Unit = exec.execute(body)
override def reportFailure(cause: Throwable): Unit = exec.reportFailure(cause)
override def schedule(delay: FiniteDuration, body: Runnable): Cancellable = {
val sleeper = Sleeper(delay.fromNow, body)
// push to the sleeping priority queue
this.synchronized {
sleepers += sleeper
val shouldWake = sleepingUntil.map(sleeper.wakeTime < _).getOrElse(true)
if shouldWake then this.notifyAll()
}
() => { sleeper.isCancelled = true }
}

// Sleep until the first timer is due, or .wait() otherwise
private def sleepLoop(): Unit = this.synchronized {
while (true) {
sleepingUntil = sleepers.headOption.map(_.wakeTime)
val current = sleepingUntil match
case None =>
this.wait()
Deadline.now
case Some(value) =>
val current0 = Deadline.now
val timeLeft = value - current0

if timeLeft.toNanos > 0 then
this.wait(timeLeft.toMillis.max(10))
Deadline.now
else current0

// Pop sleepers until no more available
while (sleepers.headOption.exists(_.wakeTime <= current)) {
val task = sleepers.dequeue()
if !task.isCancelled then execute(task.toRun)
}
}
}

val sleeperThread = Thread(() => sleepLoop())
sleeperThread.setDaemon(true)
sleeperThread.start()
}

class SuspendExecutorWithSleep(exec: ExecutionContext) extends ExecutorWithSleepThread(exec)
with AsyncSupport
with AsyncOperations
with NativeSuspend {
type Scheduler = this.type
override def sleep(millis: Long)(using Async): Unit =
Future.withResolver[Unit]: resolver =>
val cancellable = schedule(millis.millis, () => resolver.resolve(()))
resolver.onCancel(cancellable.cancel)
.link()
.value
}

class ForkJoinSupport extends SuspendExecutorWithSleep(new ForkJoinPool())
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("org.portable-scala" % "sbt-scala-native-crossproject" % "1.2.0")
addSbtPlugin("org.scala-native" % "sbt-scala-native" % "0.5.0-SNAPSHOT")
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ trait AsyncOperations:
def sleep(millis: Long)(using Async): Unit

object AsyncOperations:
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
inline def sleep(millis: Long)(using AsyncOperations, Async): Unit =
summon[AsyncOperations].sleep(millis)
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ trait Cancellable:
/** Add this cancellable to the given group after removing
* it from the previous group in which it was.
*/
def link(group: CompletionGroup): this.type =
def link(group: CompletionGroup): this.type = synchronized:
this.group.drop(this)
this.group = group
this.group.add(this)
Expand Down
Loading

0 comments on commit fbcdccf

Please sign in to comment.