Skip to content

Commit

Permalink
zio2 rc6 (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon authored May 11, 2022
1 parent 44e4860 commit 146b343
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 54 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val mainScala = "2.13.7"
val allScala = Seq("2.12.15", mainScala)

val zioVersion = "2.0.0-RC5"
val zioVersion = "2.0.0-RC6"
val akkaVersion = "2.6.19"

organization := "dev.zio"
Expand Down
14 changes: 7 additions & 7 deletions src/main/scala/zio/akka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package zio.akka.cluster
import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props }
import akka.cluster.ClusterEvent._
import zio.Exit.{ Failure, Success }
import zio.{ Queue, Runtime, Task, ZIO }
import zio.{ Queue, Runtime, ZIO }

object Cluster {

private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] =
for {
actorSystem <- ZIO.service[ActorSystem]
cluster <- Task.attempt(akka.cluster.Cluster(actorSystem))
cluster <- ZIO.attempt(akka.cluster.Cluster(actorSystem))
} yield cluster

/**
Expand All @@ -19,7 +19,7 @@ object Cluster {
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] =
for {
cluster <- cluster
state <- Task.attempt(cluster.state)
state <- ZIO.attempt(cluster.state)
} yield state

/**
Expand All @@ -28,7 +28,7 @@ object Cluster {
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task.attempt(cluster.joinSeedNodes(seedNodes))
_ <- ZIO.attempt(cluster.joinSeedNodes(seedNodes))
} yield ()

/**
Expand All @@ -37,7 +37,7 @@ object Cluster {
val leave: ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task.attempt(cluster.leave(cluster.selfAddress))
_ <- ZIO.attempt(cluster.leave(cluster.selfAddress))
} yield ()

/**
Expand All @@ -61,9 +61,9 @@ object Cluster {
initialStateAsEvents: Boolean = false
): ZIO[ActorSystem, Throwable, Unit] =
for {
rts <- Task.runtime[ActorSystem]
rts <- ZIO.runtime[ActorSystem]
actorSystem <- ZIO.service[ActorSystem]
_ <- Task.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents))))
_ <- ZIO.attempt(actorSystem.actorOf(Props(new SubscriberActor(rts, queue, initialStateAsEvents))))
} yield ()

private[cluster] class SubscriberActor(
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/zio/akka/cluster/pubsub/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A]
object PubSub {

private def getMediator(actorSystem: ActorSystem): Task[ActorRef] =
Task.attempt(DistributedPubSub(actorSystem).mediator)
ZIO.attempt(DistributedPubSub(actorSystem).mediator)

/**
* Creates a new `Publisher[A]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package zio.akka.cluster.pubsub.impl
import akka.actor.ActorRef
import akka.cluster.pubsub.DistributedPubSubMediator.Publish
import zio.akka.cluster.pubsub.{ MessageEnvelope, Publisher }
import zio.Task
import zio.{ Task, ZIO }

private[pubsub] trait PublisherImpl[A] extends Publisher[A] {
val getMediator: ActorRef

override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] =
Task.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
ZIO.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck }
import zio.Exit.{ Failure, Success }
import zio.akka.cluster.pubsub.impl.SubscriberImpl.SubscriberActor
import zio.akka.cluster.pubsub.{ MessageEnvelope, Subscriber }
import zio.{ Promise, Queue, Runtime, Task }
import zio.{ Promise, Queue, Runtime, Task, ZIO }

private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] {
val getActorSystem: ActorSystem
val getMediator: ActorRef

override def listenWith(topic: String, queue: Queue[A], group: Option[String] = None): Task[Unit] =
for {
rts <- Task.runtime[Any]
rts <- ZIO.runtime[Any]
subscribed <- Promise.make[Nothing, Unit]
_ <- Task.attempt(
_ <- ZIO.attempt(
getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed)))
)
_ <- subscribed.await
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object Sharding {
for {
rts <- ZIO.runtime[ActorSystem with R]
actorSystem = rts.environment.get[ActorSystem]
shardingRegion <- Task.attempt(
shardingRegion <- ZIO.attempt(
ClusterSharding(actorSystem).start(
typeName = name,
entityProps = Props(new ShardEntity[R, Msg, State](rts)(onMessage)),
Expand Down Expand Up @@ -87,7 +87,7 @@ object Sharding {
for {
rts <- ZIO.runtime[ActorSystem]
actorSystem = rts.environment.get
shardingRegion <- Task.attempt(
shardingRegion <- ZIO.attempt(
ClusterSharding(actorSystem).startProxy(
typeName = name,
role,
Expand All @@ -114,16 +114,16 @@ object Sharding {
val getShardingRegion: ActorRef

override def send(entityId: String, data: Msg): Task[Unit] =
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data)))
ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data)))

override def stop(entityId: String): Task[Unit] =
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload))
ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload))

override def passivate(entityId: String): Task[Unit] =
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload))
ZIO.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload))

override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] =
Task.fromFuture(_ =>
ZIO.fromFuture(_ =>
(getShardingRegion ? sharding.MessageEnvelope(entityId, MessagePayload(data)))
.mapTo[R]
)
Expand All @@ -139,10 +139,10 @@ object Sharding {
override def context: ActorContext = actorContext
override def id: String = actorContext.self.path.name
override def state: Ref[Option[State]] = ref
override def stop: UIO[Unit] = UIO.succeed(actorContext.stop(self))
override def passivate: UIO[Unit] = UIO.succeed(actorContext.parent ! Passivate(PoisonPill))
override def passivateAfter(duration: Duration): UIO[Unit] = UIO.succeed(actorContext.self ! SetTimeout(duration))
override def replyToSender[M](msg: M): Task[Unit] = Task.attempt(actorContext.sender() ! msg)
override def stop: UIO[Unit] = ZIO.succeed(actorContext.stop(self))
override def passivate: UIO[Unit] = ZIO.succeed(actorContext.parent ! Passivate(PoisonPill))
override def passivateAfter(duration: Duration): UIO[Unit] = ZIO.succeed(actorContext.self ! SetTimeout(duration))
override def replyToSender[M](msg: M): Task[Unit] = ZIO.attempt(actorContext.sender() ! msg)
}
val entity: ZLayer[Any, Nothing, Entity[State]] = ZLayer.succeed(service)

Expand Down
8 changes: 4 additions & 4 deletions src/test/scala/zio/akka/cluster/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import zio._

object ClusterSpec extends ZIOSpecDefault {

def spec: ZSpec[TestEnvironment, Any] =
def spec: Spec[TestEnvironment, Any] =
suite("ClusterSpec")(
test("receive cluster events") {
val config: Config = ConfigFactory.parseString(s"""
Expand All @@ -37,11 +37,11 @@ object ClusterSpec extends ZIOSpecDefault {
""".stripMargin)

val actorSystem: ZIO[Scope, Throwable, ActorSystem] =
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys =>
ZIO.fromFuture(_ => sys.terminate()).either
)

assertM(
assertZIO(
for {
queue <- Cluster.clusterEvents()
_ <- Clock.sleep(5 seconds)
Expand Down
18 changes: 8 additions & 10 deletions src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import zio.test.Assertion._
import zio.test._
import zio.test.TestEnvironment
import zio.test.ZIOSpecDefault
import zio.{ ExecutionStrategy, Task, ZIO, ZLayer }
import zio.{ ExecutionStrategy, ZIO, ZLayer }

object PubSubSpec extends ZIOSpecDefault {

Expand All @@ -32,18 +32,16 @@ object PubSubSpec extends ZIOSpecDefault {
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys => ZIO.fromFuture(_ => sys.terminate()).either)
)

val topic = "topic"
val msg = "yo"

def spec: ZSpec[TestEnvironment, Any] =
def spec: Spec[TestEnvironment, Any] =
suite("PubSubSpec")(
test("send and receive a single message") {
assertM(
assertZIO(
for {
pubSub <- PubSub.createPubSub[String]
queue <- pubSub.listen(topic)
Expand All @@ -53,7 +51,7 @@ object PubSubSpec extends ZIOSpecDefault {
)(equalTo(msg)).provideLayer(actorSystem)
},
test("support multiple subscribers") {
assertM(
assertZIO(
for {
pubSub <- PubSub.createPubSub[String]
queue1 <- pubSub.listen(topic)
Expand All @@ -66,7 +64,7 @@ object PubSubSpec extends ZIOSpecDefault {
},
test("support multiple publishers") {
val msg2 = "what's up"
assertM(
assertZIO(
for {
pubSub <- PubSub.createPubSub[String]
queue <- pubSub.listen(topic)
Expand All @@ -79,7 +77,7 @@ object PubSubSpec extends ZIOSpecDefault {
},
test("send only one message to a single group") {
val group = "group"
assertM(
assertZIO(
for {
pubSub <- PubSub.createPubSub[String]
queue1 <- pubSub.listen(topic, Some(group))
Expand All @@ -93,7 +91,7 @@ object PubSubSpec extends ZIOSpecDefault {
test("send one message to each group") {
val group1 = "group1"
val group2 = "group2"
assertM(
assertZIO(
for {
pubSub <- PubSub.createPubSub[String]
queue1 <- pubSub.listen(topic, Some(group1))
Expand Down
30 changes: 14 additions & 16 deletions src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import zio.test.Assertion._
import zio.test._
import zio.test.TestEnvironment
import zio.test.ZIOSpecDefault
import zio.{ ExecutionStrategy, Promise, Task, UIO, ZIO, ZLayer }
import zio.{ ExecutionStrategy, Promise, UIO, ZIO, ZLayer }
import zio._

object ShardingSpec extends ZIOSpecDefault {
Expand Down Expand Up @@ -35,9 +35,7 @@ object ShardingSpec extends ZIOSpecDefault {
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config)))(sys => ZIO.fromFuture(_ => sys.terminate()).either)
)

val config2: Config = ConfigFactory.parseString(s"""
Expand All @@ -63,19 +61,19 @@ object ShardingSpec extends ZIOSpecDefault {
val actorSystem2: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config2)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
ZIO.acquireRelease(ZIO.attempt(ActorSystem("Test", config2)))(sys =>
ZIO.fromFuture(_ => sys.terminate()).either
)
)

val shardId = "shard"
val shardName = "name"
val msg = "yo"

def spec: ZSpec[TestEnvironment, Any] =
def spec: Spec[TestEnvironment, Any] =
suite("ShardingSpec")(
test("send and receive a single message") {
assertM(
assertZIO(
for {
p <- Promise.make[Nothing, String]
onMessage = (msg: String) => p.succeed(msg).unit
Expand All @@ -88,15 +86,15 @@ object ShardingSpec extends ZIOSpecDefault {
test("send and receive a message using ask") {
val onMessage: String => ZIO[Entity[Any], Nothing, Unit] =
incomingMsg => ZIO.serviceWithZIO[Entity[Any]](_.replyToSender(incomingMsg).orDie)
assertM(
assertZIO(
for {
sharding <- Sharding.start(shardName, onMessage)
reply <- sharding.ask[String](shardId, msg)
} yield reply
)(equalTo(msg)).provideLayer(actorSystem)
},
test("gather state") {
assertM(
assertZIO(
for {
p <- Promise.make[Nothing, Boolean]
onMessage = (_: String) =>
Expand All @@ -120,7 +118,7 @@ object ShardingSpec extends ZIOSpecDefault {
)(equalTo((None, true))).provideLayer(actorSystem)
},
test("kill itself") {
assertM(
assertZIO(
for {
p <- Promise.make[Nothing, Option[Unit]]
onMessage = (msg: String) =>
Expand All @@ -142,7 +140,7 @@ object ShardingSpec extends ZIOSpecDefault {
)(isNone).provideLayer(actorSystem)
},
test("passivate") {
assertM(
assertZIO(
for {
p <- Promise.make[Nothing, Option[Unit]]
onMessage = (msg: String) =>
Expand All @@ -163,7 +161,7 @@ object ShardingSpec extends ZIOSpecDefault {
)(isNone).provideLayer(actorSystem)
},
test("passivateAfter") {
assertM(
assertZIO(
for {
p <- Promise.make[Nothing, Option[Unit]]
onMessage = (msg: String) =>
Expand All @@ -186,7 +184,7 @@ object ShardingSpec extends ZIOSpecDefault {
)(isNone).provideLayer(actorSystem)
},
test("work with 2 actor systems") {
assertM(
assertZIO(
ZIO.scoped {
actorSystem.build.flatMap(a1 =>
actorSystem2.build.flatMap(a2 =>
Expand Down Expand Up @@ -215,10 +213,10 @@ object ShardingSpec extends ZIOSpecDefault {
ZIO.serviceWithZIO[TestService](_.doSomething())

val l = ZLayer.succeed(new TestService {
override def doSomething(): UIO[String] = UIO.succeed("test")
override def doSomething(): UIO[String] = ZIO.succeed("test")
})

assertM(
assertZIO(
for {
p <- Promise.make[Nothing, String]
onMessage = (_: String) => (doSomething flatMap p.succeed).unit
Expand Down

0 comments on commit 146b343

Please sign in to comment.