Skip to content

Commit

Permalink
Zio2 rc5 (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon authored Apr 20, 2022
1 parent 50773d0 commit 44e4860
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val mainScala = "2.13.7"
val allScala = Seq("2.12.15", mainScala)

val zioVersion = "2.0.0-RC3"
val zioVersion = "2.0.0-RC5"
val akkaVersion = "2.6.19"

organization := "dev.zio"
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/zio/akka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ object Cluster {
private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] =
for {
actorSystem <- ZIO.service[ActorSystem]
cluster <- Task(akka.cluster.Cluster(actorSystem))
cluster <- Task.attempt(akka.cluster.Cluster(actorSystem))
} yield cluster

/**
Expand All @@ -19,7 +19,7 @@ object Cluster {
val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] =
for {
cluster <- cluster
state <- Task(cluster.state)
state <- Task.attempt(cluster.state)
} yield state

/**
Expand All @@ -28,7 +28,7 @@ object Cluster {
def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.joinSeedNodes(seedNodes))
_ <- Task.attempt(cluster.joinSeedNodes(seedNodes))
} yield ()

/**
Expand All @@ -37,7 +37,7 @@ object Cluster {
val leave: ZIO[ActorSystem, Throwable, Unit] =
for {
cluster <- cluster
_ <- Task(cluster.leave(cluster.selfAddress))
_ <- Task.attempt(cluster.leave(cluster.selfAddress))
} yield ()

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/zio/akka/cluster/pubsub/PubSub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A]

object PubSub {

private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = Task(DistributedPubSub(actorSystem).mediator)
private def getMediator(actorSystem: ActorSystem): Task[ActorRef] =
Task.attempt(DistributedPubSub(actorSystem).mediator)

/**
* Creates a new `Publisher[A]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ private[pubsub] trait PublisherImpl[A] extends Publisher[A] {
val getMediator: ActorRef

override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] =
Task(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
Task.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] {
for {
rts <- Task.runtime[Any]
subscribed <- Promise.make[Nothing, Unit]
_ <- Task(
_ <- Task.attempt(
getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed)))
)
_ <- subscribed.await
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object Sharding {
for {
rts <- ZIO.runtime[ActorSystem with R]
actorSystem = rts.environment.get[ActorSystem]
shardingRegion <- Task(
shardingRegion <- Task.attempt(
ClusterSharding(actorSystem).start(
typeName = name,
entityProps = Props(new ShardEntity[R, Msg, State](rts)(onMessage)),
Expand Down Expand Up @@ -87,7 +87,7 @@ object Sharding {
for {
rts <- ZIO.runtime[ActorSystem]
actorSystem = rts.environment.get
shardingRegion <- Task(
shardingRegion <- Task.attempt(
ClusterSharding(actorSystem).startProxy(
typeName = name,
role,
Expand All @@ -114,13 +114,13 @@ object Sharding {
val getShardingRegion: ActorRef

override def send(entityId: String, data: Msg): Task[Unit] =
Task(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data)))
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data)))

override def stop(entityId: String): Task[Unit] =
Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload))
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload))

override def passivate(entityId: String): Task[Unit] =
Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload))
Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload))

override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] =
Task.fromFuture(_ =>
Expand Down
12 changes: 7 additions & 5 deletions src/test/scala/zio/akka/cluster/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ object ClusterSpec extends ZIOSpecDefault {
| enabled-transports = ["akka.remote.artery.canonical"]
| artery.canonical {
| hostname = "127.0.0.1"
| port = 2551
| port = 2554
| }
| }
| cluster {
| seed-nodes = ["akka://[email protected]:2551"]
| seed-nodes = ["akka://[email protected]:2554"]
| downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
| }
|}
""".stripMargin)

val actorSystem: ZIO[Scope, Throwable, ActorSystem] =
ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either)
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)

assertM(
for {
Expand All @@ -53,7 +55,7 @@ object ClusterSpec extends ZIOSpecDefault {
.runCollect
.timeoutFail(new Exception("Timeout"))(10 seconds)
} yield items
)(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem) ++ Clock.live)
)(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem))
}
)
) @@ TestAspect.withLiveClock
}
8 changes: 5 additions & 3 deletions src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ object PubSubSpec extends ZIOSpecDefault {
| enabled-transports = ["akka.remote.artery.canonical"]
| artery.canonical {
| hostname = "127.0.0.1"
| port = 2551
| port = 2553
| }
| }
| cluster {
| seed-nodes = ["akka://[email protected]:2551"]
| seed-nodes = ["akka://[email protected]:2553"]
| downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
| }
|}
Expand All @@ -32,7 +32,9 @@ object PubSubSpec extends ZIOSpecDefault {
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either)
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)
)

val topic = "topic"
Expand Down
33 changes: 18 additions & 15 deletions src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ object ShardingSpec extends ZIOSpecDefault {
val actorSystem: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either)
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)
)

val config2: Config = ConfigFactory.parseString(s"""
Expand All @@ -61,7 +63,9 @@ object ShardingSpec extends ZIOSpecDefault {
val actorSystem2: ZLayer[Any, Throwable, ActorSystem] =
ZLayer
.scoped(
ZIO.acquireRelease(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either)
ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config2)))(sys =>
Task.fromFuture(_ => sys.terminate()).either
)
)

val shardId = "shard"
Expand Down Expand Up @@ -129,10 +133,9 @@ object ShardingSpec extends ZIOSpecDefault {
sharding <- Sharding.start(shardName, onMessage)
_ <- sharding.send(shardId, "set")
_ <- sharding.send(shardId, "die")
_ <- ZIO.sleep(3 seconds)
.provideLayer(
Clock.live
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- Clock.sleep(
3 seconds
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- sharding.send(shardId, "get")
res <- p.await
} yield res
Expand All @@ -151,10 +154,9 @@ object ShardingSpec extends ZIOSpecDefault {
sharding <- Sharding.start(shardName, onMessage)
_ <- sharding.send(shardId, "set")
_ <- sharding.passivate(shardId)
_ <- ZIO.sleep(3 seconds)
.provideLayer(
Clock.live
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- Clock.sleep(
3 seconds
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- sharding.send(shardId, "get")
res <- p.await
} yield res
Expand All @@ -175,10 +177,9 @@ object ShardingSpec extends ZIOSpecDefault {
sharding <- Sharding.start(shardName, onMessage)
_ <- sharding.send(shardId, "set")
_ <- sharding.send(shardId, "timeout")
_ <- ZIO.sleep(3 seconds)
.provideLayer(
Clock.live
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- Clock.sleep(
3 seconds
) // give time to the ShardCoordinator to notice the death of the actor and recreate one
_ <- sharding.send(shardId, "get")
res <- p.await
} yield res
Expand Down Expand Up @@ -227,6 +228,8 @@ object ShardingSpec extends ZIOSpecDefault {
} yield res
)(equalTo("test")).provideLayer(actorSystem ++ l)
}
) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout(30.seconds)
) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout(
30.seconds
) @@ TestAspect.withLiveClock

}

0 comments on commit 44e4860

Please sign in to comment.