diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 498d2955..a5e42e93 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -88,11 +88,11 @@ jobs: - name: Make target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target + run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target - name: Compress target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') - run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target + run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target - name: Upload target directories if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main') diff --git a/build.sbt b/build.sbt index 3a7f4531..8c2f7167 100644 --- a/build.sbt +++ b/build.sbt @@ -60,8 +60,8 @@ lazy val root = (project in file(".")) IntegrationTest / classDirectory := (Test / classDirectory).value, IntegrationTest / parallelExecution := true ) - .aggregate(core, kernel, high, activemq, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy) - .dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy) + .aggregate(core, kernel, high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy) + .dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy) def module(name: String, directory: String = ".") = Project(s"pass4s-$name", file(directory) / name).settings(commonSettings) @@ -102,7 +102,7 @@ val nettySnykOverrides = Seq( "io.netty" % "netty-handler" % nettyVersion ) -lazy val activemq = module("activemq", directory = "connectors") +lazy val activemqAkka = module("activemq", directory = "connectors") .settings( name := "pass4s-connector-activemq", libraryDependencies ++= Seq( @@ -114,6 +114,21 @@ lazy val activemq = module("activemq", directory = "connectors") ) .dependsOn(core) +lazy val activemqPekko = module("activemq-pekko", directory = "connectors") + .settings( + mimaPreviousArtifacts := Set(), // Remove when 0.4.2 is released + name := "pass4s-connector-pekko-activemq", + resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", // Resolvers to be removed when stable version is released + resolvers ++= Resolver.sonatypeOssRepos("snapshots"), + libraryDependencies ++= Seq( + "org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+140-7d704044-SNAPSHOT", // TODO to be changed to stable release once https://github.com/apache/incubator-pekko-connectors/issues/210 is ready + "org.apache.activemq" % "activemq-pool" % Versions.ActiveMq, + "org.typelevel" %% "log4cats-core" % Versions.Log4Cats + ), + headerSources / excludeFilter := HiddenFileFilter || "taps.scala" + ) + .dependsOn(core) + lazy val kinesis = module("kinesis", directory = "connectors") .settings( name := "pass4s-connector-kinesis", @@ -206,7 +221,7 @@ lazy val docs = project // new documentation project WorkflowStep.Sbt(List("docs/mdoc")) ) ) - .dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy) + .dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy) .enablePlugins(MdocPlugin, DocusaurusPlugin) // misc @@ -223,7 +238,7 @@ lazy val demo = module("demo") "ch.qos.logback" % "logback-classic" % Versions.Logback ) ) - .dependsOn(activemq, sns, sqs, extra, logging) + .dependsOn(activemqPekko, sns, sqs, extra, logging) lazy val commonSettings = Seq( organization := "com.ocadotechnology", diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala new file mode 100644 index 00000000..cbebad37 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/ConnectionFactories.scala @@ -0,0 +1,52 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import cats.effect.Resource +import cats.effect.Sync +import cats.implicits._ +import org.apache.activemq.ActiveMQConnectionFactory +import org.apache.activemq.pool.PooledConnectionFactory + +/** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been extensively + * tested in production yet + */ +object ConnectionFactories { + + /** Creates a pooled ActiveMQ connection factory. + * + * Use `failover:(tcp://address)` to be sure that send operation is never failing. (Connection retries are handled by connection factory) + * Read documentation: https://activemq.apache.org/failover-transport-reference.html + * + * Use raw `tcp://address` to make send operation able to fail. This may be useful when working with outbox pattern + */ + def pooled[F[_]: Sync](username: String, password: String, url: String): Resource[F, PooledConnectionFactory] = + Resource.suspend { + unpooled[F](username, password, url).map(makePooled[F](_)) + } + + /** Creates an ActiveMQ connection factory. + */ + def unpooled[F[_]: Sync](username: String, password: String, url: String): F[ActiveMQConnectionFactory] = + Sync[F].delay(new ActiveMQConnectionFactory(username, password, url)) + + /** Wraps the base factory in a connection pool. + */ + def makePooled[F[_]: Sync](baseFactory: ActiveMQConnectionFactory): Resource[F, PooledConnectionFactory] = + Resource.make(Sync[F].delay(new PooledConnectionFactory(baseFactory)))(pcf => Sync[F].delay(pcf.stop())) + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala new file mode 100644 index 00000000..b81ce53c --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/JmsConnector.scala @@ -0,0 +1,129 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings} +import cats.effect.Resource +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource.JmsSourceSettings +import com.ocadotechnology.pass4s.connectors.pekko.activemq.consumer._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.producer._ +import com.ocadotechnology.pass4s.core.CommittableMessage +import com.ocadotechnology.pass4s.core.Connector +import com.ocadotechnology.pass4s.core.Destination +import com.ocadotechnology.pass4s.core.Message +import com.ocadotechnology.pass4s.core.Source +import fs2.Stream +import org.typelevel.log4cats.Logger + +import javax.jms.ConnectionFactory +import scala.concurrent.duration._ +import scala.reflect.runtime.universe._ +import cats.effect.kernel.Async + +trait Jms + +object Jms { + sealed trait Type extends Product with Serializable + + object Type { + final case object Queue extends Type + final case object Topic extends Type + } + +} + +final case class JmsSource private (name: String, sourceType: Jms.Type, settings: JmsSourceSettings) extends Source[Jms] { + override val capability: Type = typeOf[Jms] + + override val messageProcessingTimeout: Option[FiniteDuration] = Some(settings.messageProcessingTimeout) + override val cancelableMessageProcessing: Boolean = settings.cancelableMessageProcessing + override val maxConcurrent: Int = settings.parallelSessions + + def toDestination: JmsDestination = JmsDestination(name, sourceType) +} + +object JmsSource { + + final case class JmsSourceSettings( + // sets internal timeout on a message processing. JMS' ackTimeout will be (x + 1 second) * 1.2 + messageProcessingTimeout: FiniteDuration = 30.seconds, + cancelableMessageProcessing: Boolean = true, + parallelSessions: Int = 1, + restartSettings: RestartSettings = RestartSettings(minBackoff = 2.second, maxBackoff = 30.seconds, randomFactor = 0.2) + ) + + final case class RestartSettings(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double) { + val toAkka: AkkaRestartSettings = AkkaRestartSettings(minBackoff, maxBackoff, randomFactor) + } + + def queue(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Queue, settings) + + def topic(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Topic, settings) +} + +final case class JmsDestination private (name: String, destinationType: Jms.Type) extends Destination[Jms] { + override val capability: Type = typeOf[Jms] + + def toSource(settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, destinationType, settings) +} + +object JmsDestination { + def queue(name: String): JmsDestination = JmsDestination(name, Jms.Type.Queue) + + def topic(name: String): JmsDestination = JmsDestination(name, Jms.Type.Topic) +} + +object JmsConnector { + type JmsConnector[F[_]] = Connector.Aux[F, Jms, ConnectionFactory] + + /** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been + * extensively tested in production yet + */ + def singleBroker[F[_]: Logger: Async]( + username: String, + password: String, + url: String + )( + implicit as: ActorSystem + ): Resource[F, JmsConnector[F]] = + ConnectionFactories.pooled(username, password, url).flatMap(singleBroker[F](_)) + + /** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been + * extensively tested in production yet + */ + def singleBroker[F[_]: Logger: Async]( + connectionFactory: ConnectionFactory + )( + implicit as: ActorSystem + ): Resource[F, JmsConnector[F]] = + for { + producer <- createMessageProducer(connectionFactory) + } yield new Connector[F, Jms] { + + type Raw = ConnectionFactory + override val underlying: ConnectionFactory = connectionFactory + + override def consumeBatched[R >: Jms](source: Source[R]): Stream[F, List[CommittableMessage[F]]] = + consumeAndReconnectOnErrors(connectionFactory)(source).map(List(_)) + + override def produce[R >: Jms](message: Message[R]): F[Unit] = + producer(message) + + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala new file mode 100644 index 00000000..dc818c3b --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/common.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import org.apache.pekko.stream.connectors.{jms => pekkojms} + +private[activemq] object common { + + def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = { + case (name, Jms.Type.Topic) => pekkojms.Topic(name) + case (name, Jms.Type.Queue) => pekkojms.Queue(name) + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala new file mode 100644 index 00000000..1a8ab6e0 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/consumer.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer +import org.apache.pekko.stream.connectors.{jms => pekkojms} +import org.apache.pekko.stream.scaladsl.RestartSource +import cats.ApplicativeThrow +import cats.effect.Async +import cats.effect.Sync +import cats.implicits._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.taps._ +import com.ocadotechnology.pass4s.core.Message.Payload +import com.ocadotechnology.pass4s.core.CommittableMessage +import com.ocadotechnology.pass4s.core.Source +import fs2.Stream +import org.typelevel.log4cats.Logger + +import javax.jms +import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ +import scala.util.Try + +private[activemq] object consumer { + + def consumeAndReconnectOnErrors[F[_]: Async: Logger]( + connectionFactory: jms.ConnectionFactory + )( + source: Source[_] + )( + implicit as: ActorSystem + ): Stream[F, CommittableMessage[F]] = + for { + JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source)) + + jmsConsumerSettings = pekkojms + .JmsConsumerSettings(as, connectionFactory) + .withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2) + .withSessionCount(settings.parallelSessions) + .withFailStreamOnAckTimeout(true) + .withDestination(common.toPekkoDestination(name, sourceType)) + + txEnvelope <- RestartSource + .withBackoff(settings.restartSettings.toAkka) { () => + JmsConsumer.txSource(jmsConsumerSettings).named(getClass.getSimpleName) + } + .toStream[F]() + committableMessage <- Stream.eval(toCommittableMessage(txEnvelope)).unNone + } yield committableMessage + + private def extractJmsSource[F[_]: ApplicativeThrow](source: Source[_]): F[JmsSource] = + source match { + case jmsSource: JmsSource => jmsSource.pure[F] + case unsupportedDestination => + ApplicativeThrow[F].raiseError( + new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination") + ) + } + + private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = { + val commit = Sync[F].delay(txEnvelope.commit()) + val rollback = Sync[F].delay(txEnvelope.rollback()) + txEnvelope.message match { + case textMessage: jms.TextMessage => + CommittableMessage.instance(Payload(textMessage.getText, getHeaders(textMessage)), commit, _ => rollback).some.pure[F] + case unsupportedMessage => + Logger[F].warn(s"JmsConnector supports only TextMessages. Ignoring received message: $unsupportedMessage") *> rollback.as(None) + } + } + + // fixme: add headers/properties from underlying message - need to double check if all properties are returned by getPropertyNames + private def getHeaders(msg: jms.Message): Map[String, String] = + Try { + msg + .getPropertyNames + .asIterator() + .asInstanceOf[java.util.Iterator[String]] // Please forgive me I have to do this, but underneath it is really String + .asScala + .map(name => name -> msg.getStringProperty(name)) + .toMap + }.getOrElse(Map.empty) + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala new file mode 100644 index 00000000..26d9fce8 --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/producer.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2023 Ocado Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.connectors.jms.scaladsl.JmsProducer +import org.apache.pekko.stream.connectors.{jms => pekkojms} +import cats.ApplicativeThrow +import cats.effect.Concurrent +import cats.effect.Resource +import cats.effect.implicits._ +import cats.effect.kernel.Async +import cats.effect.kernel.Deferred +import cats.effect.kernel.Ref +import cats.effect.std.Queue +import cats.effect.std.Semaphore +import cats.implicits._ +import com.ocadotechnology.pass4s.connectors.pekko.activemq.taps._ +import com.ocadotechnology.pass4s.core.Destination +import com.ocadotechnology.pass4s.core.Message +import fs2.Pipe +import fs2.Stream + +import javax.jms + +private[activemq] object producer { + + type MessageProducer[F[_]] = Message[_] => F[Unit] + + private type Attempt = Either[Throwable, Unit] + private type Promise[F[_]] = Deferred[F, Attempt] + private type JmsPayload[F[_]] = pekkojms.JmsEnvelope[Promise[F]] + + def createMessageProducer[F[_]: Async]( + connectionFactory: jms.ConnectionFactory, + bufferSize: Int = 100 + )( + implicit as: ActorSystem + ): Resource[F, MessageProducer[F]] = + for { + queue <- Resource.eval(Queue.bounded[F, JmsPayload[F]](bufferSize)) + /** Stream.eval(queue.take) wouldn't work here because it takes only single element and terminates. In this case we need to take all + * elements but one by one as long as there's anything in the queue. Limit is set to one as we only process single message at a time, + * so that we don't reemit chukns in case of failure. + */ + _ <- Stream.fromQueueUnterminated(queue, limit = 1).through(sendMessageAndCompletePromise(connectionFactory)).compile.drain.background + } yield enqueueAndWaitForPromise[F](queue.offer) + + private def enqueueAndWaitForPromise[F[_]: Concurrent](enqueue: JmsPayload[F] => F[Unit]): MessageProducer[F] = + message => + for { + jmsDestination <- extractJmsDestination[F](message.destination) + promise <- Deferred[F, Attempt] + pekkoMessage = pekkojms.JmsTextMessage(message.payload.text, promise).withProperties(message.payload.metadata) + pekkoDestination = common.toPekkoDestination(jmsDestination.name, jmsDestination.destinationType) + _ <- enqueue(pekkoMessage.to(pekkoDestination)) + _ <- promise.get.rethrow + } yield () + + private def sendMessageAndCompletePromise[F[_]: Async]( + connectionFactory: jms.ConnectionFactory + )( + implicit as: ActorSystem + ): Pipe[F, JmsPayload[F], Unit] = { messages => + /* + * Note on `inflightMessages` Ref: + * Every message is added to Ref at the beginning of the message processing: + * - (happy path) after the message is sent the promise of this message is completed and the message is removed from Ref + * - (edge case) when `sendMessagePipe` crashes all inflightMessages are completed with Left and the Ref is cleaned + * + * Note on semaphore: + * Every operation on `inflightMessages` Ref is guarded by single permit uncancelable semaphore to guarantee + * that the promise completion and Ref update are atomic + */ + Stream.eval((Ref.of[F, Set[JmsPayload[F]]](Set.empty), Semaphore[F](n = 1)).tupled).flatMap { case (inflightMessages, semaphore) => + val jmsProducerSettings = pekkojms + .JmsProducerSettings(as, connectionFactory) + .withTopic("Pass4s.Default") // default destination is obligatory, but always overridden + + val sendMessagePipe: Pipe[F, JmsPayload[F], JmsPayload[F]] = + JmsProducer.flexiFlow[Promise[F]](jmsProducerSettings).named(getClass.getSimpleName).toPipe[F]() + + def addMessageToRef(pendingMessage: JmsPayload[F]) = + semaphore.permit.surround(inflightMessages.update(_ + pendingMessage)) + + def completeMessageAndRemoveFromRef(sentMessage: JmsPayload[F]) = + semaphore.permit.surround(sentMessage.passThrough.complete(Right(())).attempt *> inflightMessages.update(_ - sentMessage)) + + def failAllAndCleanRef(ex: Throwable) = + semaphore + .permit + .surround( + inflightMessages.get.flatMap(_.toList.traverse(_.passThrough.complete(Left(ex)).attempt)) *> inflightMessages.set(Set()) + ) + + messages + .evalTap(addMessageToRef) + .through(sendMessagePipe) + .attempts(Stream.constant(jmsProducerSettings.connectionRetrySettings.initialRetry)) + .evalMap(_.fold(failAllAndCleanRef, completeMessageAndRemoveFromRef)) + } + } + + private def extractJmsDestination[F[_]: ApplicativeThrow](destination: Destination[_]): F[JmsDestination] = + destination match { + case jmsDestination: JmsDestination => jmsDestination.pure[F] + case unsupportedDestination => + ApplicativeThrow[F].raiseError( + new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination") + ) + } + +} diff --git a/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala new file mode 100644 index 00000000..4a6260aa --- /dev/null +++ b/connectors/activemq-pekko/src/main/scala/com/ocadotechnology/pass4s/connectors/activemq/taps.scala @@ -0,0 +1,150 @@ +/* + * Copyright 2021 Martin Krasser + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ocadotechnology.pass4s.connectors.pekko.activemq + +import org.apache.pekko.stream.FlowShape +import org.apache.pekko.stream.Graph +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.OverflowStrategy +import org.apache.pekko.stream.QueueOfferResult +import org.apache.pekko.stream.SourceShape +import org.apache.pekko.stream.StreamDetachedException +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.SinkQueueWithCancel +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete +import cats.effect.Async + +import cats.effect.kernel.Resource.ExitCase +import cats.implicits._ +import fs2.Pipe +import fs2.Stream + +// Copied from https://github.com/krasserm/streamz due to the lack of CE3 support. https://github.com/krasserm/streamz/issues/85 +private[activemq] object taps { + + implicit class AkkaSourceDsl[A, M](source: Graph[SourceShape[A], M]) { + + def toStream[F[_]: Async](onMaterialization: M => Unit = _ => ())(implicit materializer: Materializer): Stream[F, A] = + akkaSourceToFs2Stream(source)(onMaterialization) + } + + implicit class AkkaFlowDsl[A, B, M](flow: Graph[FlowShape[A, B], M]) { + + def toPipe[F[_]: Async]( + onMaterialization: M => Unit = _ => () + )( + implicit materializer: Materializer + ): Pipe[F, A, B] = + akkaFlowToFs2Pipe(flow)(onMaterialization) + + } + + /** Converts an Akka Stream [[Graph]] of [[SourceShape]] to an FS2 [[Stream]]. The [[Graph]] is materialized when the [[Stream]]'s [[F]] + * in run. The materialized value can be obtained with the `onMaterialization` callback. + */ + private def akkaSourceToFs2Stream[F[_], A, M]( + source: Graph[SourceShape[A], M] + )( + onMaterialization: M => Unit + )( + implicit materializer: Materializer, + F: Async[F] + ): Stream[F, A] = + Stream.force { + F.delay { + val (mat, subscriber) = Source.fromGraph(source).toMat(Sink.queue[A]())(Keep.both).run() + onMaterialization(mat) + subscriberStream[F, A](subscriber) + } + } + + /** Converts an Akka Stream [[Graph]] of [[FlowShape]] to an FS2 [[Pipe]]. The [[Graph]] is materialized when the [[Pipe]]'s [[F]] in run. + * The materialized value can be obtained with the `onMaterialization` callback. + */ + private def akkaFlowToFs2Pipe[F[_], A, B, M]( + flow: Graph[FlowShape[A, B], M] + )( + onMaterialization: M => Unit + )( + implicit materializer: Materializer, + F: Async[F] + ): Pipe[F, A, B] = { s => + Stream.force { + F.delay { + val src = Source.queue[A](0, OverflowStrategy.backpressure) + val snk = Sink.queue[B]() + val ((publisher, mat), subscriber) = src.viaMat(flow)(Keep.both).toMat(snk)(Keep.both).run() + onMaterialization(mat) + transformerStream[F, A, B](subscriber, publisher, s) + } + } + } + + private def transformerStream[F[_]: Async, A, B]( + subscriber: SinkQueueWithCancel[B], + publisher: SourceQueueWithComplete[A], + stream: Stream[F, A] + ): Stream[F, B] = + subscriberStream[F, B](subscriber).concurrently(publisherStream[F, A](publisher, stream)) + + private def publisherStream[F[_], A]( + publisher: SourceQueueWithComplete[A], + stream: Stream[F, A] + )( + implicit F: Async[F] + ): Stream[F, Unit] = { + def publish(a: A): F[Option[Unit]] = + Async[F] + .fromFuture(F.delay(publisher.offer(a))) + .flatMap { + case QueueOfferResult.Enqueued => ().some.pure[F] + case QueueOfferResult.Failure(cause) => F.raiseError[Option[Unit]](cause) + case QueueOfferResult.QueueClosed => none[Unit].pure[F] + case QueueOfferResult.Dropped => + F.raiseError[Option[Unit]](new IllegalStateException("This should never happen because we use OverflowStrategy.backpressure")) + } + .recover { + // This handles a race condition between `interruptWhen` and `publish`. + // There's no guarantee that, when the akka sink is terminated, we will observe the + // `interruptWhen` termination before calling publish one last time. + // Such a call fails with StreamDetachedException + case _: StreamDetachedException => none[Unit] + } + + def watchCompletion: F[Unit] = Async[F].fromFuture(F.delay(publisher.watchCompletion())).void + def fail(e: Throwable): F[Unit] = F.delay(publisher.fail(e)) >> watchCompletion + def complete: F[Unit] = F.delay(publisher.complete()) >> watchCompletion + + stream + .interruptWhen(watchCompletion.attempt) + .evalMap(publish) + .unNoneTerminate + .onFinalizeCase { + case ExitCase.Succeeded | ExitCase.Canceled => complete + case ExitCase.Errored(e) => fail(e) + } + } + + private def subscriberStream[F[_], A](subscriber: SinkQueueWithCancel[A])(implicit F: Async[F]): Stream[F, A] = { + val pull = Async[F].fromFuture(F.delay(subscriber.pull())) + val cancel = F.delay(subscriber.cancel()) + Stream.repeatEval(pull).unNoneTerminate.onFinalize(cancel) + } + +} diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala index eb2596c1..2e0b2163 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/CirceDemo.scala @@ -24,8 +24,8 @@ import cats.effect.kernel.Sync import cats.effect.kernel.Temporal import cats.implicits._ import com.ocadotechnology.pass4s.circe.syntax._ -import com.ocadotechnology.pass4s.connectors.activemq.Jms -import com.ocadotechnology.pass4s.connectors.activemq.JmsSource +import com.ocadotechnology.pass4s.connectors.pekko.activemq.Jms +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource import com.ocadotechnology.pass4s.core.CommittableMessage import com.ocadotechnology.pass4s.core.Connector import com.ocadotechnology.pass4s.core.Message diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala index 265cbe61..8958c10d 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/DemoMain.scala @@ -31,8 +31,8 @@ import cats.effect.implicits._ import cats.implicits._ import cats.~> import com.ocadotechnology.pass4s.circe.syntax._ -import com.ocadotechnology.pass4s.connectors.activemq.Jms -import com.ocadotechnology.pass4s.connectors.activemq.JmsConnector +import com.ocadotechnology.pass4s.connectors.pekko.activemq.Jms +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsConnector import com.ocadotechnology.pass4s.core._ import com.ocadotechnology.pass4s.extra.MessageProcessor import com.ocadotechnology.pass4s.high._ @@ -92,7 +92,7 @@ object DemoMain extends IOApp { // // - val brokerResource = Akka + val brokerResource = Pekko .system[IO] .flatMap { implicit sys => implicit val connectorLogger: Logger[IO] = Slf4jLogger.getLoggerFromClass[IO](classOf[Connector[IO, Jms]]) diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala index 00072dfa..83f14cb7 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/destinations.scala @@ -16,8 +16,8 @@ package com.ocadotechnology.pass4s.demo -import com.ocadotechnology.pass4s.connectors.activemq.JmsDestination -import com.ocadotechnology.pass4s.connectors.activemq.JmsSource +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsDestination +import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource object Destinations { val inventoryEvents = JmsSource.queue("Inventory.Events") diff --git a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala similarity index 94% rename from demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala rename to demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala index 66ca380b..e72619f5 100644 --- a/demo/src/main/scala/com/ocadotechnology/pass4s/demo/akka.scala +++ b/demo/src/main/scala/com/ocadotechnology/pass4s/demo/pekko.scala @@ -16,14 +16,14 @@ package com.ocadotechnology.pass4s.demo -import akka.actor.ActorSystem +import org.apache.pekko.actor.ActorSystem import cats.effect.Async import cats.effect.Resource import cats.effect.Sync import cats.implicits._ -object Akka { +object Pekko { def system[F[_]: Async]: Resource[F, ActorSystem] = Resource.make(Sync[F].delay(ActorSystem()))(sys => Async[F].fromFuture(Sync[F].delay(sys.terminate())).void) } diff --git a/docs/getting-started.md b/docs/getting-started.md index 9c1e3109..bf7cdae4 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -30,9 +30,13 @@ The library is divided into multiple modules. If you're only interested in the b [ActiveMq](https://activemq.apache.org/) ```scala -// ActiveMQ connector +// ActiveMQ connector - based on Akka Alpakka "com.ocadotechnology" %% "pass4s-connector-activemq" % "@VERSION@" +// ActiveMQ pekko connector - based on Pekko Connectors +"com.ocadotechnology" %% "pass4s-connector-pekko-activemq" % "@VERSION@" ``` +⚠️ **Warning** Pekko connector is an experimental addition at the moment, as it is based on nightly build + [SNS/SQS](https://aws.amazon.com/blogs/aws/queues-and-notifications-now-best-friends/) ```scala diff --git a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala index 3615f622..1c279925 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/util/EmbeddedJmsBroker.scala @@ -1,7 +1,6 @@ package com.ocadotechnology.pass4s.util import akka.actor.ActorSystem - import cats.effect.IO import cats.effect.Resource import com.ocadotechnology.pass4s.connectors.activemq.JmsConnector diff --git a/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala b/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala index 619031f1..c113685a 100644 --- a/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala +++ b/src/it/scala/com/ocadotechnology/pass4s/util/LocalStackContainerUtils.scala @@ -29,7 +29,7 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueRequest import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest import software.amazon.awssdk.services.sqs.model.QueueAttributeName -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ import scala.jdk.CollectionConverters._ import scala.util.Random @@ -162,7 +162,7 @@ object LocalStackContainerUtils { ) _ <- kinesisClient.waiter.flatMap { waiter => val describeStreamRequest = DescribeStreamRequest.builder().streamName(sn).build() - IO.fromFuture(IO(waiter.waitUntilStreamExists(describeStreamRequest).toScala)) + IO.fromFuture(IO(waiter.waitUntilStreamExists(describeStreamRequest).asScala)) } } yield sn)(sn => kinesisClient.deleteStream(DeleteStreamRequest.builder().streamName(sn).build()).void)