diff --git a/build.sbt b/build.sbt index 240b31c..7f08398 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ val mainScala = "2.13.7" val allScala = Seq("2.12.15", mainScala) -val zioVersion = "2.0.0-RC3" +val zioVersion = "2.0.0-RC5" val akkaVersion = "2.6.19" organization := "dev.zio" diff --git a/src/main/scala/zio/akka/cluster/Cluster.scala b/src/main/scala/zio/akka/cluster/Cluster.scala index 2e94bd5..d51af5b 100644 --- a/src/main/scala/zio/akka/cluster/Cluster.scala +++ b/src/main/scala/zio/akka/cluster/Cluster.scala @@ -10,7 +10,7 @@ object Cluster { private val cluster: ZIO[ActorSystem, Throwable, akka.cluster.Cluster] = for { actorSystem <- ZIO.service[ActorSystem] - cluster <- Task(akka.cluster.Cluster(actorSystem)) + cluster <- Task.attempt(akka.cluster.Cluster(actorSystem)) } yield cluster /** @@ -19,7 +19,7 @@ object Cluster { val clusterState: ZIO[ActorSystem, Throwable, CurrentClusterState] = for { cluster <- cluster - state <- Task(cluster.state) + state <- Task.attempt(cluster.state) } yield state /** @@ -28,7 +28,7 @@ object Cluster { def join(seedNodes: List[Address]): ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task(cluster.joinSeedNodes(seedNodes)) + _ <- Task.attempt(cluster.joinSeedNodes(seedNodes)) } yield () /** @@ -37,7 +37,7 @@ object Cluster { val leave: ZIO[ActorSystem, Throwable, Unit] = for { cluster <- cluster - _ <- Task(cluster.leave(cluster.selfAddress)) + _ <- Task.attempt(cluster.leave(cluster.selfAddress)) } yield () /** diff --git a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala index b34f2a8..5387304 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/PubSub.scala @@ -30,7 +30,8 @@ trait PubSub[A] extends Publisher[A] with Subscriber[A] object PubSub { - private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = Task(DistributedPubSub(actorSystem).mediator) + private def getMediator(actorSystem: ActorSystem): Task[ActorRef] = + Task.attempt(DistributedPubSub(actorSystem).mediator) /** * Creates a new `Publisher[A]`. diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala index 0e5bba5..975b851 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/PublisherImpl.scala @@ -9,5 +9,5 @@ private[pubsub] trait PublisherImpl[A] extends Publisher[A] { val getMediator: ActorRef override def publish(topic: String, data: A, sendOneMessageToEachGroup: Boolean = false): Task[Unit] = - Task(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) + Task.attempt(getMediator ! Publish(topic, MessageEnvelope(data), sendOneMessageToEachGroup)) } diff --git a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala index c2b3e46..36fc4f3 100644 --- a/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala +++ b/src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala @@ -15,7 +15,7 @@ private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] { for { rts <- Task.runtime[Any] subscribed <- Promise.make[Nothing, Unit] - _ <- Task( + _ <- Task.attempt( getActorSystem.actorOf(Props(new SubscriberActor[A](getMediator, topic, group, rts, queue, subscribed))) ) _ <- subscribed.await diff --git a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala index ec724b5..95ba9b8 100644 --- a/src/main/scala/zio/akka/cluster/sharding/Sharding.scala +++ b/src/main/scala/zio/akka/cluster/sharding/Sharding.scala @@ -46,7 +46,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem with R] actorSystem = rts.environment.get[ActorSystem] - shardingRegion <- Task( + shardingRegion <- Task.attempt( ClusterSharding(actorSystem).start( typeName = name, entityProps = Props(new ShardEntity[R, Msg, State](rts)(onMessage)), @@ -87,7 +87,7 @@ object Sharding { for { rts <- ZIO.runtime[ActorSystem] actorSystem = rts.environment.get - shardingRegion <- Task( + shardingRegion <- Task.attempt( ClusterSharding(actorSystem).startProxy( typeName = name, role, @@ -114,13 +114,13 @@ object Sharding { val getShardingRegion: ActorRef override def send(entityId: String, data: Msg): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, MessagePayload(data))) override def stop(entityId: String): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PoisonPillPayload)) override def passivate(entityId: String): Task[Unit] = - Task(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) + Task.attempt(getShardingRegion ! sharding.MessageEnvelope(entityId, PassivatePayload)) override def ask[R](entityId: String, data: Msg)(implicit tag: ClassTag[R], proof: R =!= Nothing): Task[R] = Task.fromFuture(_ => diff --git a/src/test/scala/zio/akka/cluster/ClusterSpec.scala b/src/test/scala/zio/akka/cluster/ClusterSpec.scala index 463ac9a..4750e64 100644 --- a/src/test/scala/zio/akka/cluster/ClusterSpec.scala +++ b/src/test/scala/zio/akka/cluster/ClusterSpec.scala @@ -26,18 +26,20 @@ object ClusterSpec extends ZIOSpecDefault { | enabled-transports = ["akka.remote.artery.canonical"] | artery.canonical { | hostname = "127.0.0.1" - | port = 2551 + | port = 2554 | } | } | cluster { - | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2554"] | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} """.stripMargin) val actorSystem: ZIO[Scope, Throwable, ActorSystem] = - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) assertM( for { @@ -53,7 +55,7 @@ object ClusterSpec extends ZIOSpecDefault { .runCollect .timeoutFail(new Exception("Timeout"))(10 seconds) } yield items - )(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem) ++ Clock.live) + )(isNonEmpty).provideLayer(ZLayer.scoped(actorSystem)) } - ) + ) @@ TestAspect.withLiveClock } diff --git a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala index 78827da..1b9e208 100644 --- a/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala +++ b/src/test/scala/zio/akka/cluster/pubsub/PubSubSpec.scala @@ -19,11 +19,11 @@ object PubSubSpec extends ZIOSpecDefault { | enabled-transports = ["akka.remote.artery.canonical"] | artery.canonical { | hostname = "127.0.0.1" - | port = 2551 + | port = 2553 | } | } | cluster { - | seed-nodes = ["akka://Test@127.0.0.1:2551"] + | seed-nodes = ["akka://Test@127.0.0.1:2553"] | downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" | } |} @@ -32,7 +32,9 @@ object PubSubSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val topic = "topic" diff --git a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala index 9ea8580..7a497b2 100644 --- a/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala +++ b/src/test/scala/zio/akka/cluster/sharding/ShardingSpec.scala @@ -35,7 +35,9 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val config2: Config = ConfigFactory.parseString(s""" @@ -61,7 +63,9 @@ object ShardingSpec extends ZIOSpecDefault { val actorSystem2: ZLayer[Any, Throwable, ActorSystem] = ZLayer .scoped( - ZIO.acquireRelease(Task(ActorSystem("Test", config2)))(sys => Task.fromFuture(_ => sys.terminate()).either) + ZIO.acquireRelease(Task.attempt(ActorSystem("Test", config2)))(sys => + Task.fromFuture(_ => sys.terminate()).either + ) ) val shardId = "shard" @@ -129,10 +133,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.send(shardId, "die") - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -151,10 +154,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.passivate(shardId) - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -175,10 +177,9 @@ object ShardingSpec extends ZIOSpecDefault { sharding <- Sharding.start(shardName, onMessage) _ <- sharding.send(shardId, "set") _ <- sharding.send(shardId, "timeout") - _ <- ZIO.sleep(3 seconds) - .provideLayer( - Clock.live - ) // give time to the ShardCoordinator to notice the death of the actor and recreate one + _ <- Clock.sleep( + 3 seconds + ) // give time to the ShardCoordinator to notice the death of the actor and recreate one _ <- sharding.send(shardId, "get") res <- p.await } yield res @@ -227,6 +228,8 @@ object ShardingSpec extends ZIOSpecDefault { } yield res )(equalTo("test")).provideLayer(actorSystem ++ l) } - ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout(30.seconds) + ) @@ TestAspect.executionStrategy(ExecutionStrategy.Sequential) @@ TestAspect.timeout( + 30.seconds + ) @@ TestAspect.withLiveClock }