Skip to content

Commit

Permalink
zio2 (#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
justcoon authored Jun 26, 2022
1 parent 6e7f0bb commit d492325
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 15 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
val mainScala = "2.13.7"
val allScala = Seq("2.12.15", mainScala)

val zioVersion = "2.0.0-RC6"
val zioVersion = "2.0.0"
val akkaVersion = "2.6.19"

organization := "dev.zio"
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/zio/akka/cluster/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package zio.akka.cluster

import akka.actor.{ Actor, ActorSystem, Address, PoisonPill, Props }
import akka.cluster.ClusterEvent._
import zio.Exit.{ Failure, Success }
import zio.{ Queue, Runtime, ZIO }
import zio.{ Exit, Queue, Runtime, Unsafe, ZIO }

object Cluster {

Expand Down Expand Up @@ -78,9 +77,12 @@ object Cluster {

def receive: Actor.Receive = {
case ev: ClusterDomainEvent =>
rts.unsafeRunAsyncWith(queue.offer(ev)) {
case Success(_) => ()
case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
Unsafe.unsafeCompat { implicit u =>
val fiber = rts.unsafe.fork(queue.offer(ev))
fiber.unsafe.addObserver {
case Exit.Success(_) => ()
case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
}
}
()
case _ => ()
Expand Down
16 changes: 10 additions & 6 deletions src/main/scala/zio/akka/cluster/pubsub/impl/SubscriberImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package zio.akka.cluster.pubsub.impl

import akka.actor.{ Actor, ActorRef, ActorSystem, PoisonPill, Props }
import akka.cluster.pubsub.DistributedPubSubMediator.{ Subscribe, SubscribeAck }
import zio.Exit.{ Failure, Success }
import zio.akka.cluster.pubsub.impl.SubscriberImpl.SubscriberActor
import zio.akka.cluster.pubsub.{ MessageEnvelope, Subscriber }
import zio.{ Promise, Queue, Runtime, Task, ZIO }
import zio.{ Exit, Promise, Queue, Runtime, Task, Unsafe, ZIO }

private[pubsub] trait SubscriberImpl[A] extends Subscriber[A] {
val getActorSystem: ActorSystem
Expand Down Expand Up @@ -36,12 +35,17 @@ object SubscriberImpl {

def receive: Actor.Receive = {
case SubscribeAck(_) =>
rts.unsafeRunSync(subscribed.succeed(()))
Unsafe.unsafeCompat { implicit u =>
rts.unsafe.run(subscribed.succeed(())).getOrThrow()
}
()
case MessageEnvelope(msg) =>
rts.unsafeRunAsyncWith(queue.offer(msg.asInstanceOf[A])) {
case Success(_) => ()
case Failure(cause) => if (cause.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
Unsafe.unsafeCompat { implicit u =>
val fiber = rts.unsafe.fork(queue.offer(msg.asInstanceOf[A]))
fiber.unsafe.addObserver {
case Exit.Success(_) => ()
case Exit.Failure(c) => if (c.isInterrupted) self ! PoisonPill // stop listening if the queue was shut down
}
}
()
}
Expand Down
11 changes: 8 additions & 3 deletions src/main/scala/zio/akka/cluster/sharding/Sharding.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.pattern.{ ask => askPattern }
import akka.util.Timeout
import zio.akka.cluster.sharding
import zio.akka.cluster.sharding.MessageEnvelope.{ MessagePayload, PassivatePayload, PoisonPillPayload }
import zio.{ =!=, Ref, Runtime, Tag, Task, UIO, ZIO, ZLayer }
import zio.{ =!=, Ref, Runtime, Tag, Task, UIO, Unsafe, ZIO, ZLayer }

/**
* A `Sharding[M]` is able to send messages of type `M` to a sharded entity or to stop one.
Expand Down Expand Up @@ -133,7 +133,10 @@ object Sharding {
onMessage: Msg => ZIO[Entity[State] with R, Nothing, Unit]
) extends Actor {

val ref: Ref[Option[State]] = rts.unsafeRun(Ref.make[Option[State]](None))
val ref: Ref[Option[State]] =
Unsafe.unsafeCompat { implicit u =>
rts.unsafe.run(Ref.make[Option[State]](None)).getOrThrow()
}
val actorContext: ActorContext = context
val service: Entity.Service[State] = new Entity.Service[State] {
override def context: ActorContext = actorContext
Expand All @@ -154,7 +157,9 @@ object Sharding {
case p: Passivate =>
actorContext.parent ! p
case MessagePayload(msg) =>
rts.unsafeRunSync(onMessage(msg.asInstanceOf[Msg]).provideSomeLayer[R](entity))
Unsafe.unsafeCompat { implicit u =>
rts.unsafe.run(onMessage(msg.asInstanceOf[Msg]).provideSomeLayer[R](entity)).getOrThrow()
}
()
case _ =>
}
Expand Down

0 comments on commit d492325

Please sign in to comment.