Skip to content

Commit

Permalink
feat: Handle heartbeat events
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Sep 19, 2024
1 parent 29ebc3b commit 451bb55
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2022 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.internal

import akka.annotation.InternalStableApi
import akka.persistence.query.UpdatedDurableState
import akka.persistence.query.typed.EventEnvelope

/**
* INTERNAL API
*
* See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency
* to akka-persistence-r2dbc here
*/
@InternalStableApi private[akka] object EnvelopeOrigin {
val SourceQuery = ""
val SourceBacktracking = "BT"
val SourcePubSub = "PS"
val SourceSnapshot = "SN"
val SourceHeartbeat = "HB"

def fromQuery(env: EventEnvelope[_]): Boolean =
env.source == SourceQuery

def fromBacktracking(env: EventEnvelope[_]): Boolean =
env.source == SourceBacktracking

def fromBacktracking(change: UpdatedDurableState[_]): Boolean =
change.value == null

def fromPubSub(env: EventEnvelope[_]): Boolean =
env.source == SourcePubSub

def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == SourceSnapshot

def fromHeartbeat(env: EventEnvelope[_]): Boolean =
env.source == SourceHeartbeat

def isHeartbeatEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => fromHeartbeat(e)
case _ => false
}

def isFilteredEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => e.filtered
case _ => false
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,6 @@ private[akka] object EventPusherConsumerServiceImpl {

private val log = LoggerFactory.getLogger(getClass)

// See akka.persistence.r2dbc.internal.EnvelopeOrigin, but we don't have a dependency
// to akka-persistence-r2dbc here
def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == "SN"

private case class Destination(
eventProducerPushDestination: EventProducerPushDestination,
sendEvent: (EventEnvelope[_], Boolean) => Future[Any],
Expand All @@ -79,6 +74,9 @@ private[akka] object EventPusherConsumerServiceImpl {
if (envelope.filtered) {
log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
Future.successful(Done)
} else if (EnvelopeOrigin.fromHeartbeat(envelope)) {
log.trace("Ignoring heartbeat event [{}]", envelope.persistenceId)
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
Expand Down Expand Up @@ -141,7 +139,7 @@ private[akka] object EventPusherConsumerServiceImpl {
persistenceId = envelope.persistenceId,
sequenceNumber = envelope.sequenceNr,
event = envelope.eventOption.getOrElse(FilteredPayload),
isSnapshotEvent = fromSnapshot(envelope),
isSnapshotEvent = EnvelopeOrigin.fromSnapshot(envelope),
fillSequenceNumberGaps = fillSequenceNumberGaps,
metadata = envelope.eventMetadata,
tags = envelope.tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import akka.annotation.InternalApi
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.internal.ReplicatedEventMetadata
import akka.projection.grpc.internal.EnvelopeOrigin
import akka.projection.grpc.internal.proto.ReplicaInfo

/**
Expand All @@ -33,7 +34,7 @@ import akka.projection.grpc.internal.proto.ReplicaInfo
// eventMetadata is not included in backtracking envelopes.
// Events from backtracking are lazily loaded via `loadEvent` if needed.
// Filter is done via `loadEvent` in that case.
if (envelope.eventOption.isEmpty)
if (envelope.eventOption.isEmpty || EnvelopeOrigin.fromHeartbeat(envelope))
true
else
envelope.eventMetadata match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ import akka.projection.grpc.replication.scaladsl.ReplicationSettings
import akka.stream.scaladsl.FlowWithContext
import akka.util.Timeout
import org.slf4j.LoggerFactory

import java.net.URLEncoder
import java.nio.charset.StandardCharsets

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.projection.grpc.internal.EnvelopeOrigin

/**
* INTERNAL API
*/
Expand Down Expand Up @@ -207,7 +209,20 @@ private[akka] object ReplicationImpl {
.mapAsyncPartitioned(parallelism = settings.parallelUpdates, perPartition = 1)(envelope =>
envelope.persistenceId) {
case (envelope, _) =>
if (!envelope.filtered) {
if (envelope.filtered) {
// Events not originating on sending side already are filtered/have no payload and end up here
if (log.isTraceEnabled)
log.traceN(
"[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])",
projectionKey,
remoteReplica.replicaId,
envelope.persistenceId,
envelope.sequenceNr)
Future.successful(Done)
} else if (EnvelopeOrigin.fromHeartbeat(envelope)) {
log.trace("Ignoring heartbeat event [{}]", envelope.persistenceId)
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata)
if replicatedEventMetadata.originReplica == settings.selfReplicaId =>
Expand Down Expand Up @@ -256,16 +271,6 @@ private[akka] object ReplicationImpl {
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
}
} else {
// Events not originating on sending side already are filtered/have no payload and end up here
if (log.isTraceEnabled)
log.traceN(
"[{}] ignoring filtered event from replica [{}] (pid [{}], seq_nr [{}])",
projectionKey,
remoteReplica.replicaId,
envelope.persistenceId,
envelope.sequenceNr)
Future.successful(Done)
}
}
.map(_ => Done)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object Dependencies {
val akka = sys.props.getOrElse("build.akka.version", "2.9.5")
val akkaPersistenceCassandra = "1.2.1"
val akkaPersistenceJdbc = "5.4.1"
val akkaPersistenceR2dbc = "1.2.4"
val akkaPersistenceR2dbc = "1.2.5+8-9388e4bd-SNAPSHOT"
val alpakka = "8.0.0"
val alpakkaKafka = sys.props.getOrElse("build.alpakka.kafka.version", "6.0.0")
val slick = "3.5.1"
Expand Down

0 comments on commit 451bb55

Please sign in to comment.