Skip to content

Commit

Permalink
Add CountDownLatch to ChannelMultiplexer test
Browse files Browse the repository at this point in the history
ChannelMultiplexer is very dependent on subscribing order, and should be
either set up correctly or used by subscribers that don't really care
about past events.
  • Loading branch information
natsukagami committed Dec 10, 2023
1 parent 3fffabc commit 09bc8c1
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions shared/src/test/scala/ChannelBehavior.scala
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,12 @@ class ChannelBehavior extends munit.FunSuite {
test("ChannelMultiplexer multiple readers and writers") {
Async.blocking:
val m = ChannelMultiplexer[Int]()
val start = java.util.concurrent.CountDownLatch(5)

val f11 = Future:
val cc = SyncChannel[Int]()
m.addPublisher(cc)
start.countDown()
sleep(200)
for (i <- 0 to 3)
cc.send(i)
Expand All @@ -368,6 +370,7 @@ class ChannelBehavior extends munit.FunSuite {
val f12 = Future:
val cc = SyncChannel[Int]()
m.addPublisher(cc)
start.countDown()
sleep(200)
for (i <- 10 to 13)
cc.send(i)
Expand All @@ -376,13 +379,15 @@ class ChannelBehavior extends munit.FunSuite {
val f13 = Future:
val cc = SyncChannel[Int]()
m.addPublisher(cc)
start.countDown()
for (i <- 20 to 23)
cc.send(i)
m.removePublisher(cc)

val f21 = Future:
val cr = SyncChannel[Try[Int]]()
m.addSubscriber(cr)
start.countDown()
sleep(200)
val l = ArrayBuffer[Int]()
sleep(1000)
Expand All @@ -393,12 +398,14 @@ class ChannelBehavior extends munit.FunSuite {
val f22 = Future:
val cr = SyncChannel[Try[Int]]()
m.addSubscriber(cr)
start.countDown()
sleep(1500)
val l = ArrayBuffer[Int]()
for (i <- 1 to 12) {
l += cr.read().right.get.get
}

start.await()
Future { m.run() }

f21.result
Expand Down

0 comments on commit 09bc8c1

Please sign in to comment.