From 1a5c9d8c60fc7adb502659933476eb504a3f1cb4 Mon Sep 17 00:00:00 2001 From: Sheliak Lyr Date: Tue, 3 Sep 2024 10:43:46 +0200 Subject: [PATCH] Added automaticTopologyRecovery to Fs2RabbitConfig --- .../scala/dev/profunktor/fs2rabbit/algebra/Connection.scala | 1 + .../dev/profunktor/fs2rabbit/config/Fs2RabbitConfig.scala | 5 ++++- .../fs2rabbit/examples/DropwizardMetricsDemo.scala | 1 + .../dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala | 1 + .../scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala | 1 + .../dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala | 2 +- 6 files changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala index a8676178..2bcaf81a 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/algebra/Connection.scala @@ -133,6 +133,7 @@ object ConnectionResource { factory.setConnectionTimeout(conf.connectionTimeout.toMillis.toInt) factory.setRequestedHeartbeat(conf.requestedHeartbeat.toSeconds.toInt) factory.setAutomaticRecoveryEnabled(conf.automaticRecovery) + factory.setTopologyRecoveryEnabled(conf.automaticTopologyRecovery) if (conf.ssl) sslCtx.fold(factory.useSslProtocol())(factory.useSslProtocol) factory.setSaslConfig(saslConf) conf.username.foreach(factory.setUsername) diff --git a/core/src/main/scala/dev/profunktor/fs2rabbit/config/Fs2RabbitConfig.scala b/core/src/main/scala/dev/profunktor/fs2rabbit/config/Fs2RabbitConfig.scala index 5436f755..b3367f11 100644 --- a/core/src/main/scala/dev/profunktor/fs2rabbit/config/Fs2RabbitConfig.scala +++ b/core/src/main/scala/dev/profunktor/fs2rabbit/config/Fs2RabbitConfig.scala @@ -40,6 +40,7 @@ case class Fs2RabbitConfig( internalQueueSize: Option[Int], requestedHeartbeat: FiniteDuration, automaticRecovery: Boolean, + automaticTopologyRecovery: Boolean, clientProvidedConnectionName: Option[String] ) @@ -57,6 +58,7 @@ object Fs2RabbitConfig { internalQueueSize: Option[Int], requestedHeartbeat: FiniteDuration = FiniteDuration(ConnectionFactory.DEFAULT_HEARTBEAT, TimeUnit.SECONDS), automaticRecovery: Boolean = true, + automaticTopologyRecovery: Boolean = true, clientProvidedConnectionName: Option[String] = None ): Fs2RabbitConfig = Fs2RabbitConfig( nodes = NonEmptyList.one(Fs2RabbitNodeConfig(host, port)), @@ -70,6 +72,7 @@ object Fs2RabbitConfig { internalQueueSize = internalQueueSize, requestedHeartbeat = requestedHeartbeat, automaticRecovery = automaticRecovery, - clientProvidedConnectionName = clientProvidedConnectionName + clientProvidedConnectionName = clientProvidedConnectionName, + automaticTopologyRecovery = automaticTopologyRecovery ) } diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala index 28427d2f..4388ef33 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/DropwizardMetricsDemo.scala @@ -48,6 +48,7 @@ object DropwizardMetricsDemo extends IOApp.Simple { internalQueueSize = Some(500), requestedHeartbeat = 60.seconds, automaticRecovery = true, + automaticTopologyRecovery = true, clientProvidedConnectionName = Some("app:drop-wizard-metrics-demo") ) diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala index b48c3a1b..eb5032a4 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/IOAckerConsumer.scala @@ -38,6 +38,7 @@ object IOAckerConsumer extends IOApp.Simple { internalQueueSize = Some(500), requestedHeartbeat = 60.seconds, automaticRecovery = true, + automaticTopologyRecovery = true, clientProvidedConnectionName = Some("app:io-acker-consumer") ) diff --git a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala index f46877f7..79a87d90 100644 --- a/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala +++ b/examples/src/main/scala/dev/profunktor/fs2rabbit/examples/RPCDemo.scala @@ -50,6 +50,7 @@ object RPCDemo extends IOApp.Simple { internalQueueSize = Some(500), requestedHeartbeat = 60.seconds, automaticRecovery = true, + automaticTopologyRecovery = true, clientProvidedConnectionName = Some("app:rpc-demo") ) diff --git a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala index b1e01d93..d87e8c48 100644 --- a/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala +++ b/tests/src/test/scala/dev/profunktor/fs2rabbit/interpreter/Fs2RabbitSpec.scala @@ -69,7 +69,7 @@ trait Fs2RabbitSpec { self: BaseSpec => createConnectionChannel.use { implicit channel => randomQueueData .flatMap { case (q, x, _) => - declareQueue(DeclarationQueueConfig(q, Durable, Exclusive, AutoDelete, Map.empty)) *> + declareQueue(DeclarationQueueConfig(q, Durable, Exclusive, AutoDelete, Map.empty, None)) *> declareExchange(x, ExchangeType.Topic) } .as(emptyAssertion)