From 451bb5573d82abc6dc6e190e3a89a5aea262d89c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 19 Sep 2024 14:38:10 +0200 Subject: [PATCH] feat: Handle heartbeat events --- .../grpc/internal/EnvelopeOrigin.scala | 54 +++++++++++++++++++ .../EventPusherConsumerServiceImpl.scala | 10 ++-- .../internal/EventOriginFilter.scala | 3 +- .../internal/ReplicationImpl.scala | 29 +++++----- project/Dependencies.scala | 2 +- 5 files changed, 78 insertions(+), 20 deletions(-) create mode 100644 akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EnvelopeOrigin.scala diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EnvelopeOrigin.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EnvelopeOrigin.scala new file mode 100644 index 000000000..f3022f75b --- /dev/null +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EnvelopeOrigin.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2022 - 2023 Lightbend Inc. + */ + +package akka.projection.grpc.internal + +import akka.annotation.InternalStableApi +import akka.persistence.query.UpdatedDurableState +import akka.persistence.query.typed.EventEnvelope + +/** + * INTERNAL API + * + * See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency + * to akka-persistence-r2dbc here + */ +@InternalStableApi private[akka] object EnvelopeOrigin { + val SourceQuery = "" + val SourceBacktracking = "BT" + val SourcePubSub = "PS" + val SourceSnapshot = "SN" + val SourceHeartbeat = "HB" + + def fromQuery(env: EventEnvelope[_]): Boolean = + env.source == SourceQuery + + def fromBacktracking(env: EventEnvelope[_]): Boolean = + env.source == SourceBacktracking + + def fromBacktracking(change: UpdatedDurableState[_]): Boolean = + change.value == null + + def fromPubSub(env: EventEnvelope[_]): Boolean = + env.source == SourcePubSub + + def fromSnapshot(env: EventEnvelope[_]): Boolean = + env.source == SourceSnapshot + + def fromHeartbeat(env: EventEnvelope[_]): Boolean = + env.source == SourceHeartbeat + + def isHeartbeatEvent(env: Any): Boolean = + env match { + case e: EventEnvelope[_] => fromHeartbeat(e) + case _ => false + } + + def isFilteredEvent(env: Any): Boolean = + env match { + case e: EventEnvelope[_] => e.filtered + case _ => false + } + +} diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala index 8e2aa2655..2cb983fed 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/EventPusherConsumerServiceImpl.scala @@ -49,11 +49,6 @@ private[akka] object EventPusherConsumerServiceImpl { private val log = LoggerFactory.getLogger(getClass) - // See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency - // to akka-persistence-r2dbc here - def fromSnapshot(env: EventEnvelope[_]): Boolean = - env.source == "SN" - private case class Destination( eventProducerPushDestination: EventProducerPushDestination, sendEvent: (EventEnvelope[_], Boolean) => Future[Any], @@ -79,6 +74,9 @@ private[akka] object EventPusherConsumerServiceImpl { if (envelope.filtered) { log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId) Future.successful(Done) + } else if (EnvelopeOrigin.fromHeartbeat(envelope)) { + log.trace("Ignoring heartbeat event [{}]", envelope.persistenceId) + Future.successful(Done) } else { envelope.eventMetadata match { case Some(replicatedEventMetadata: ReplicatedEventMetadata) => @@ -141,7 +139,7 @@ private[akka] object EventPusherConsumerServiceImpl { persistenceId = envelope.persistenceId, sequenceNumber = envelope.sequenceNr, event = envelope.eventOption.getOrElse(FilteredPayload), - isSnapshotEvent = fromSnapshot(envelope), + isSnapshotEvent = EnvelopeOrigin.fromSnapshot(envelope), fillSequenceNumberGaps = fillSequenceNumberGaps, metadata = envelope.eventMetadata, tags = envelope.tags, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala index 902fd4a63..7512561f7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/EventOriginFilter.scala @@ -8,6 +8,7 @@ import akka.annotation.InternalApi import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicaId import akka.persistence.typed.internal.ReplicatedEventMetadata +import akka.projection.grpc.internal.EnvelopeOrigin import akka.projection.grpc.internal.proto.ReplicaInfo /** @@ -33,7 +34,7 @@ import akka.projection.grpc.internal.proto.ReplicaInfo // eventMetadata is not included in backtracking envelopes. // Events from backtracking are lazily loaded via `loadEvent` if needed. // Filter is done via `loadEvent` in that case. - if (envelope.eventOption.isEmpty) + if (envelope.eventOption.isEmpty || EnvelopeOrigin.fromHeartbeat(envelope)) true else envelope.eventMetadata match { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index f3cac05ac..515cc825c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -48,12 +48,14 @@ import akka.projection.grpc.replication.scaladsl.ReplicationSettings import akka.stream.scaladsl.FlowWithContext import akka.util.Timeout import org.slf4j.LoggerFactory - import java.net.URLEncoder import java.nio.charset.StandardCharsets + import scala.concurrent.ExecutionContext import scala.concurrent.Future +import akka.projection.grpc.internal.EnvelopeOrigin + /** * INTERNAL API */ @@ -207,7 +209,20 @@ private[akka] object ReplicationImpl { .mapAsyncPartitioned(parallelism = settings.parallelUpdates, perPartition = 1)(envelope => envelope.persistenceId) { case (envelope, _) => - if (!envelope.filtered) { + if (envelope.filtered) { + // Events not originating on sending side already are filtered/have no payload and end up here + if (log.isTraceEnabled) + log.traceN( + "[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])", + projectionKey, + remoteReplica.replicaId, + envelope.persistenceId, + envelope.sequenceNr) + Future.successful(Done) + } else if (EnvelopeOrigin.fromHeartbeat(envelope)) { + log.trace("Ignoring heartbeat event [{}]", envelope.persistenceId) + Future.successful(Done) + } else { envelope.eventMetadata match { case Some(replicatedEventMetadata: ReplicatedEventMetadata) if replicatedEventMetadata.originReplica == settings.selfReplicaId => @@ -256,16 +271,6 @@ private[akka] object ReplicationImpl { s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" + ", is the remote entity really a Replicated Event Sourced Entity?") } - } else { - // Events not originating on sending side already are filtered/have no payload and end up here - if (log.isTraceEnabled) - log.traceN( - "[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])", - projectionKey, - remoteReplica.replicaId, - envelope.persistenceId, - envelope.sequenceNr) - Future.successful(Done) } } .map(_ => Done) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 492b2e4bb..239f0f129 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,7 +22,7 @@ object Dependencies { val akka = sys.props.getOrElse("build.akka.version", "2.9.5") val akkaPersistenceCassandra = "1.2.1" val akkaPersistenceJdbc = "5.4.1" - val akkaPersistenceR2dbc = "1.2.4" + val akkaPersistenceR2dbc = "1.2.5+8-9388e4bd-SNAPSHOT" val alpakka = "8.0.0" val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "6.0.0") val slick = "3.5.1"