Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Commit

Permalink
Allow EventsourcedProcessor to post-process events
Browse files Browse the repository at this point in the history
- Closes #370 in which an EventsourcedProcessor was not able to define custom routing destination
  aggregate IDs for the events produced by `processEvent`
- Introduces method EventsourcedProcessor.postProcessDurableEvent: DurableEvent => DurableEvent
  which defaults to identity and allows manipulation of the DurableEvent instance created as a
  consequence of new events generated by `processEvent`
  • Loading branch information
danbim committed Dec 19, 2016
1 parent 8f7aa6c commit 2264feb
Showing 1 changed file with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.typesafe.config.Config
import scala.collection.immutable.Seq
import scala.concurrent._
import scala.concurrent.duration._
import scala.util._

private class EventsourcedProcessorSettings(config: Config) {
val readTimeout =
Expand Down Expand Up @@ -98,6 +97,14 @@ trait EventsourcedProcessor extends EventsourcedWriter[Long, Long] with ActorLog
*/
def processEvent: Process

/**
* Override to allow post-processing of DurableEvent instances wrapping events generated by `processEvent`.
*
* Amongst other things, this can e.g. be used to set different or additional aggregate IDs for custom routing
* destinations (which by default take the same routing destinations as the original event that was processed).
*/
def postProcessDurableEvent: DurableEvent => DurableEvent = identity[DurableEvent]

/**
* Returns an command [[BehaviorContext]] that doesn't allow event handler behavior changes. An
* attempt to change the event handler behavior with `eventContext.become()` will throw an
Expand All @@ -113,7 +120,9 @@ trait EventsourcedProcessor extends EventsourcedWriter[Long, Long] with ActorLog
case payload if processEvent.isDefinedAt(payload) =>
val currentProcessedEvents = processEvent(payload)
if (lastSequenceNr > processingProgress)
processedEvents = processedEvents :+ currentProcessedEvents.map(createEvent(_, lastHandledEvent.customDestinationAggregateIds))
processedEvents = processedEvents :+ currentProcessedEvents.map { e =>
postProcessDurableEvent(createEvent(e, lastHandledEvent.customDestinationAggregateIds))
}
}

/**
Expand Down

0 comments on commit 2264feb

Please sign in to comment.