Skip to content

Commit

Permalink
Introduce pekko-based activemq connector (#313)
Browse files Browse the repository at this point in the history
Introduce experimental implementation of activemq connector based on nightly build of pekko-connectors (alpakka replacement for pekko).
  • Loading branch information
majk-p committed Aug 7, 2023
1 parent 27660a8 commit 537ddad
Show file tree
Hide file tree
Showing 15 changed files with 622 additions and 20 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target
run: mkdir -p core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target
run: tar cf targets.tar core/target addons/phobos/target target addons/extra/target demo/target addons/plaintext/target connectors/activemq-pekko/target connectors/sns/target mdoc/target addons/s3proxy/target connectors/kinesis/target connectors/sqs/target kernel/target addons/logging/target connectors/activemq/target addons/circe/target high/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
25 changes: 20 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ lazy val root = (project in file("."))
IntegrationTest / classDirectory := (Test / classDirectory).value,
IntegrationTest / parallelExecution := true
)
.aggregate(core, kernel, high, activemq, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy)
.dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.aggregate(core, kernel, high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, phobos, plaintext, extra, logging, demo, s3Proxy)
.dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy)

def module(name: String, directory: String = ".") = Project(s"pass4s-$name", file(directory) / name).settings(commonSettings)

Expand Down Expand Up @@ -102,7 +102,7 @@ val nettySnykOverrides = Seq(
"io.netty" % "netty-handler" % nettyVersion
)

lazy val activemq = module("activemq", directory = "connectors")
lazy val activemqAkka = module("activemq", directory = "connectors")
.settings(
name := "pass4s-connector-activemq",
libraryDependencies ++= Seq(
Expand All @@ -114,6 +114,21 @@ lazy val activemq = module("activemq", directory = "connectors")
)
.dependsOn(core)

lazy val activemqPekko = module("activemq-pekko", directory = "connectors")
.settings(
mimaPreviousArtifacts := Set(), // Remove when 0.4.2 is released
name := "pass4s-connector-pekko-activemq",
resolvers += "Apache Snapshots" at "https://repository.apache.org/content/repositories/snapshots/", // Resolvers to be removed when stable version is released
resolvers ++= Resolver.sonatypeOssRepos("snapshots"),
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-connectors-jms" % "0.0.0+140-7d704044-SNAPSHOT", // TODO to be changed to stable release once https://github.com/apache/incubator-pekko-connectors/issues/210 is ready
"org.apache.activemq" % "activemq-pool" % Versions.ActiveMq,
"org.typelevel" %% "log4cats-core" % Versions.Log4Cats
),
headerSources / excludeFilter := HiddenFileFilter || "taps.scala"
)
.dependsOn(core)

lazy val kinesis = module("kinesis", directory = "connectors")
.settings(
name := "pass4s-connector-kinesis",
Expand Down Expand Up @@ -206,7 +221,7 @@ lazy val docs = project // new documentation project
WorkflowStep.Sbt(List("docs/mdoc"))
)
)
.dependsOn(high, activemq, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.dependsOn(high, activemqAkka, activemqPekko, kinesis, sns, sqs, circe, logging, extra, s3Proxy)
.enablePlugins(MdocPlugin, DocusaurusPlugin)

// misc
Expand All @@ -223,7 +238,7 @@ lazy val demo = module("demo")
"ch.qos.logback" % "logback-classic" % Versions.Logback
)
)
.dependsOn(activemq, sns, sqs, extra, logging)
.dependsOn(activemqPekko, sns, sqs, extra, logging)

lazy val commonSettings = Seq(
organization := "com.ocadotechnology",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.pekko.activemq

import cats.effect.Resource
import cats.effect.Sync
import cats.implicits._
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.pool.PooledConnectionFactory

/** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been extensively
* tested in production yet
*/
object ConnectionFactories {

/** Creates a pooled ActiveMQ connection factory.
*
* Use `failover:(tcp://address)` to be sure that send operation is never failing. (Connection retries are handled by connection factory)
* Read documentation: https://activemq.apache.org/failover-transport-reference.html
*
* Use raw `tcp://address` to make send operation able to fail. This may be useful when working with outbox pattern
*/
def pooled[F[_]: Sync](username: String, password: String, url: String): Resource[F, PooledConnectionFactory] =
Resource.suspend {
unpooled[F](username, password, url).map(makePooled[F](_))
}

/** Creates an ActiveMQ connection factory.
*/
def unpooled[F[_]: Sync](username: String, password: String, url: String): F[ActiveMQConnectionFactory] =
Sync[F].delay(new ActiveMQConnectionFactory(username, password, url))

/** Wraps the base factory in a connection pool.
*/
def makePooled[F[_]: Sync](baseFactory: ActiveMQConnectionFactory): Resource[F, PooledConnectionFactory] =
Resource.make(Sync[F].delay(new PooledConnectionFactory(baseFactory)))(pcf => Sync[F].delay(pcf.stop()))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.pekko.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.{RestartSettings => AkkaRestartSettings}
import cats.effect.Resource
import com.ocadotechnology.pass4s.connectors.pekko.activemq.JmsSource.JmsSourceSettings
import com.ocadotechnology.pass4s.connectors.pekko.activemq.consumer._
import com.ocadotechnology.pass4s.connectors.pekko.activemq.producer._
import com.ocadotechnology.pass4s.core.CommittableMessage
import com.ocadotechnology.pass4s.core.Connector
import com.ocadotechnology.pass4s.core.Destination
import com.ocadotechnology.pass4s.core.Message
import com.ocadotechnology.pass4s.core.Source
import fs2.Stream
import org.typelevel.log4cats.Logger

import javax.jms.ConnectionFactory
import scala.concurrent.duration._
import scala.reflect.runtime.universe._
import cats.effect.kernel.Async

trait Jms

object Jms {
sealed trait Type extends Product with Serializable

object Type {
final case object Queue extends Type
final case object Topic extends Type
}

}

final case class JmsSource private (name: String, sourceType: Jms.Type, settings: JmsSourceSettings) extends Source[Jms] {
override val capability: Type = typeOf[Jms]

override val messageProcessingTimeout: Option[FiniteDuration] = Some(settings.messageProcessingTimeout)
override val cancelableMessageProcessing: Boolean = settings.cancelableMessageProcessing
override val maxConcurrent: Int = settings.parallelSessions

def toDestination: JmsDestination = JmsDestination(name, sourceType)
}

object JmsSource {

final case class JmsSourceSettings(
// sets internal timeout on a message processing. JMS' ackTimeout will be (x + 1 second) * 1.2
messageProcessingTimeout: FiniteDuration = 30.seconds,
cancelableMessageProcessing: Boolean = true,
parallelSessions: Int = 1,
restartSettings: RestartSettings = RestartSettings(minBackoff = 2.second, maxBackoff = 30.seconds, randomFactor = 0.2)
)

final case class RestartSettings(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double) {
val toAkka: AkkaRestartSettings = AkkaRestartSettings(minBackoff, maxBackoff, randomFactor)
}

def queue(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Queue, settings)

def topic(name: String, settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, Jms.Type.Topic, settings)
}

final case class JmsDestination private (name: String, destinationType: Jms.Type) extends Destination[Jms] {
override val capability: Type = typeOf[Jms]

def toSource(settings: JmsSourceSettings = JmsSourceSettings()): JmsSource = JmsSource(name, destinationType, settings)
}

object JmsDestination {
def queue(name: String): JmsDestination = JmsDestination(name, Jms.Type.Queue)

def topic(name: String): JmsDestination = JmsDestination(name, Jms.Type.Topic)
}

object JmsConnector {
type JmsConnector[F[_]] = Connector.Aux[F, Jms, ConnectionFactory]

/** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been
* extensively tested in production yet
*/
def singleBroker[F[_]: Logger: Async](
username: String,
password: String,
url: String
)(
implicit as: ActorSystem
): Resource[F, JmsConnector[F]] =
ConnectionFactories.pooled(username, password, url).flatMap(singleBroker[F](_))

/** This implementation is EXPERIMENTAL - use at your own risk This module relies on SNAPSHOT version of Pekko that has not been
* extensively tested in production yet
*/
def singleBroker[F[_]: Logger: Async](
connectionFactory: ConnectionFactory
)(
implicit as: ActorSystem
): Resource[F, JmsConnector[F]] =
for {
producer <- createMessageProducer(connectionFactory)
} yield new Connector[F, Jms] {

type Raw = ConnectionFactory
override val underlying: ConnectionFactory = connectionFactory

override def consumeBatched[R >: Jms](source: Source[R]): Stream[F, List[CommittableMessage[F]]] =
consumeAndReconnectOnErrors(connectionFactory)(source).map(List(_))

override def produce[R >: Jms](message: Message[R]): F[Unit] =
producer(message)

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.pekko.activemq

import org.apache.pekko.stream.connectors.{jms => pekkojms}

private[activemq] object common {

def toPekkoDestination: (String, Jms.Type) => pekkojms.Destination = {
case (name, Jms.Type.Topic) => pekkojms.Topic(name)
case (name, Jms.Type.Queue) => pekkojms.Queue(name)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2023 Ocado Technology
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ocadotechnology.pass4s.connectors.pekko.activemq

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.jms.scaladsl.JmsConsumer
import org.apache.pekko.stream.connectors.{jms => pekkojms}
import org.apache.pekko.stream.scaladsl.RestartSource
import cats.ApplicativeThrow
import cats.effect.Async
import cats.effect.Sync
import cats.implicits._
import com.ocadotechnology.pass4s.connectors.pekko.activemq.taps._
import com.ocadotechnology.pass4s.core.Message.Payload
import com.ocadotechnology.pass4s.core.CommittableMessage
import com.ocadotechnology.pass4s.core.Source
import fs2.Stream
import org.typelevel.log4cats.Logger

import javax.jms
import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
import scala.util.Try

private[activemq] object consumer {

def consumeAndReconnectOnErrors[F[_]: Async: Logger](
connectionFactory: jms.ConnectionFactory
)(
source: Source[_]
)(
implicit as: ActorSystem
): Stream[F, CommittableMessage[F]] =
for {
JmsSource(name, sourceType, settings) <- Stream.eval(extractJmsSource[F](source))

jmsConsumerSettings = pekkojms
.JmsConsumerSettings(as, connectionFactory)
.withAckTimeout((settings.messageProcessingTimeout + 1.second) * 1.2)
.withSessionCount(settings.parallelSessions)
.withFailStreamOnAckTimeout(true)
.withDestination(common.toPekkoDestination(name, sourceType))

txEnvelope <- RestartSource
.withBackoff(settings.restartSettings.toAkka) { () =>
JmsConsumer.txSource(jmsConsumerSettings).named(getClass.getSimpleName)
}
.toStream[F]()
committableMessage <- Stream.eval(toCommittableMessage(txEnvelope)).unNone
} yield committableMessage

private def extractJmsSource[F[_]: ApplicativeThrow](source: Source[_]): F[JmsSource] =
source match {
case jmsSource: JmsSource => jmsSource.pure[F]
case unsupportedDestination =>
ApplicativeThrow[F].raiseError(
new UnsupportedOperationException(s"JmsConnector does not support destination: $unsupportedDestination")
)
}

private def toCommittableMessage[F[_]: Sync: Logger](txEnvelope: pekkojms.TxEnvelope): F[Option[CommittableMessage[F]]] = {
val commit = Sync[F].delay(txEnvelope.commit())
val rollback = Sync[F].delay(txEnvelope.rollback())
txEnvelope.message match {
case textMessage: jms.TextMessage =>
CommittableMessage.instance(Payload(textMessage.getText, getHeaders(textMessage)), commit, _ => rollback).some.pure[F]
case unsupportedMessage =>
Logger[F].warn(s"JmsConnector supports only TextMessages. Ignoring received message: $unsupportedMessage") *> rollback.as(None)
}
}

// fixme: add headers/properties from underlying message - need to double check if all properties are returned by getPropertyNames
private def getHeaders(msg: jms.Message): Map[String, String] =
Try {
msg
.getPropertyNames
.asIterator()
.asInstanceOf[java.util.Iterator[String]] // Please forgive me I have to do this, but underneath it is really String
.asScala
.map(name => name -> msg.getStringProperty(name))
.toMap
}.getOrElse(Map.empty)

}
Loading

0 comments on commit 537ddad

Please sign in to comment.