Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Wip rfc #7

Open
wants to merge 5 commits into
base: wip-schema-evolution-II
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ object EventCompatibility {
eventCompatibility(event).map(decider).getOrElse(Continue)
}

case class BlockOnIncompatibility(compatibility: IncompatibilityReason) extends BlockReason
case class Incompatible(compatibility: IncompatibilityReason) extends BlockReason

def stopOnIncompatibility(implicit system: ActorSystem) = eventCompatibilityDecider {
incompatibility => Block(BlockOnIncompatibility(incompatibility))
incompatibility => Block(Incompatible(incompatibility))
}

def stopOnUnserializableKeepOthers(implicit system: ActorSystem) = eventCompatibilityDecider {
case _: MinorIncompatibility | _: NoRemotePayloadVersion | _: NoLocalPayloadVersion => Continue
case incompatibility => Block(BlockOnIncompatibility(incompatibility))
case incompatibility => Block(Incompatible(incompatibility))
}
}
154 changes: 102 additions & 52 deletions src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,21 @@ import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProcessor.ReplicationProcessResult
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.BlockAfter
import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.NoBlocker
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer

import scala.collection.immutable.Seq

trait EventLogOps {
// --- EventLog ---
private var _sequenceNr: Long = 0L
private var _versionVector: VectorTime = VectorTime.Zero
private var _deletionVector: VectorTime = VectorTime.Zero

var eventStore: Vector[EncodedEvent] = Vector.empty
private var progressStore: Map[String, Long] = Map.empty

def id: String

def sourceFilter: ReplicationFilter
def inboundReplicationProcessor(sourceLogId: String, currentVersionVector: VectorTime): ReplicationProcessor
def outboundReplicationProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor

def sequenceNr: Long =
_sequenceNr

Expand All @@ -34,33 +31,6 @@ trait EventLogOps {
def read(fromSequenceNr: Long): Seq[EncodedEvent] =
eventStore.drop(fromSequenceNr.toInt - 1)

def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter {
override def apply(event: EncodedEvent): Boolean = !event.before(versionVector)
}

def replicationReadFilter(targetFilter: ReplicationFilter, targetVersionVector: VectorTime): ReplicationFilter =
causalityFilter(targetVersionVector) and targetFilter and sourceFilter

def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult =
outboundReplicationProcessor(targetLogId, targetVersionVector, num)
.apply(read(fromSequenceNr), fromSequenceNr)

def progressRead(logId: String): Long =
progressStore.getOrElse(logId, 0L)

def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events, (evt, snr) => evt.emitted(id, snr))

def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = {
inboundReplicationProcessor(sourceLogId, versionVector)
.apply(events, progress).right.map {
case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress)
}
}

def progressWrite(progresses: Map[String, Long]): Unit =
progressStore = progressStore ++ progresses

private def write(events: Seq[EncodedEvent], prepare: (EncodedEvent, Long) => EncodedEvent): Seq[EncodedEvent] = {
var snr = _sequenceNr
var cvv = _versionVector
Expand All @@ -83,6 +53,39 @@ trait EventLogOps {

written
}

// --- Eventsourcing ---

def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events, (evt, snr) => evt.emitted(id, snr))

// --- Replication ---

private var progressStore: Map[String, Long] = Map.empty

def replicationWriteProcessor(sourceLogId: String, currentVersionVector: VectorTime): ReplicationProcessor
def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor

def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult =
replicationReadProcessor(targetLogId, targetVersionVector, num)
.apply(read(fromSequenceNr), fromSequenceNr)

def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter {
override def apply(event: EncodedEvent): Boolean = !event.before(versionVector)
}

def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = {
replicationWriteProcessor(sourceLogId, versionVector)
.apply(events, progress).right.map {
case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress)
}
}

def progressWrite(progresses: Map[String, Long]): Unit =
progressStore = progressStore ++ progresses

def progressRead(logId: String): Long =
progressStore.getOrElse(logId, 0L)
}

trait EventSubscribers {
Expand All @@ -102,31 +105,31 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto
import EventLog._
import context.system

/** Maps target log ids to replication filters */
private var targetFilters: Map[String, ReplicationFilter] =
Map.empty

private var eventCompatibilityDeciders: Map[String, ReplicationDecider] =
Map.empty
/* --- Eventsourcing --- */

override def receive = {
private def eventsourcingReceive: Receive = {
case Subscribe(subscriber) =>
subscribe(subscriber)
case Read(from) =>
val encoded = read(from)
sender() ! ReadSuccess(decode(encoded))
case Write(events) =>
val encoded = emissionWrite(encode(events))
val decoded = encoded.zip(events).map { case (enc, dec) => dec.copy(enc.metadata) }
sender() ! WriteSuccess(decoded)
publish(decoded)
}

/* --- Replication --- */

private def replicationReceive: Receive = {
case ReplicationRead(from, num, tlid, tvv) =>
replicationRead(from, num, tlid, tvv) match {
case Right((processedEvents, progress)) =>
sender() ! ReplicationReadSuccess(processedEvents, progress)
case Left(reason) =>
sender() ! ReplicationReadFailure(new ReplicationStoppedException(reason))
}
case Write(events) =>
val encoded = emissionWrite(encode(events))
val decoded = encoded.zip(events).map { case (enc, dec) => dec.copy(enc.metadata) }
sender() ! WriteSuccess(decoded)
publish(decoded)
case ReplicationWrite(events, sourceLogId, progress) =>
replicationWrite(events, progress, sourceLogId) match {
case Right((processedEvents, updatedProgress)) =>
Expand All @@ -139,26 +142,73 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto
}
case GetReplicationProgressAndVersionVector(logId) =>
sender() ! GetReplicationProgressAndVersionVectorSuccess(progressRead(logId), versionVector)
}

/* --- Replication Filters --- */

/** Maps target log ids to replication filters used for replication reads */
private var targetFilters: Map[String, ReplicationFilter] =
Map.empty

private def replicationFilterReceive: Receive = {
case AddTargetFilter(logId, filter) =>
targetFilters = targetFilters.updated(logId, filter)
}

/* --- RFC --- */

/** Maps target log ids to [[RedundantFilterConfig]]s used to build [[RfcBlocker]]s for replication reads */
private var redundantFilterConfigs: Map[String, RedundantFilterConfig] =
Map.empty


private def rfcReceive: Receive = {
case AddRedundantFilterConfig(logId, config) =>
redundantFilterConfigs += logId -> config
}

/* --- Scheme evolution --- */

/** Maps source log ids to [[ReplicationDecider]]s used for replication writes */
private var eventCompatibilityDeciders: Map[String, ReplicationDecider] =
Map.empty

private def schemaEvolutionReceive: Receive = {
case AddEventCompatibilityDecider(sourceLogId, processor) =>
eventCompatibilityDeciders += sourceLogId -> processor
case RemoveEventCompatibilityDecider(sourceLogId) =>
eventCompatibilityDeciders -= sourceLogId
}

override def inboundReplicationProcessor(sourceLogId: String, currentVersionVector: VectorTime) =
override def receive: Receive =
eventsourcingReceive orElse
replicationReceive orElse
replicationFilterReceive orElse
rfcReceive orElse
schemaEvolutionReceive

/* --- Replication processors --- */

override def replicationWriteProcessor(sourceLogId: String, currentVersionVector: VectorTime) =
ReplicationProcessor(
// replication
ReplicationDecider(causalityFilter(currentVersionVector))
.andThen(eventCompatibilityDeciders.getOrElse(sourceLogId, stopOnUnserializableKeepOthers)))
// schema evolution
.andThen(eventCompatibilityDeciders.getOrElse(sourceLogId, stopOnUnserializableKeepOthers)))

override def outboundReplicationProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int) =
// TODO RFC processor
override def replicationReadProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int) = {
val targetFilter = targetFilters.getOrElse(targetLogId, NoFilter)
val rfcBlocker = redundantFilterConfigs.get(targetLogId).map(_.rfcBlocker(targetVersionVector)).getOrElse(NoBlocker)
ReplicationProcessor(
ReplicationDecider(replicationReadFilter(targetFilter(targetLogId), targetVersionVector), new BlockAfter(num)))

private def targetFilter(logId: String): ReplicationFilter =
targetFilters.getOrElse(logId, NoFilter)
// replication
ReplicationDecider(causalityFilter(targetVersionVector))
// replication filters
.andThen(ReplicationDecider(targetFilter and sourceFilter))
// RFC
.andThen(ReplicationDecider(rfcBlocker))
// replication
.andThen(ReplicationDecider(BlockAfter(num))))
}
}

object EventLog {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.NoBlocker
import com.rbmhtechnology.eventuate.sandbox.ReplicationEndpoint.logId

case class RedundantFilterConfig(logName: String, endpointIds: Set[String] = Set.empty, foreign: Boolean = true) {
def rfcBlocker(targetVersionVector: VectorTime) =
if(endpointIds.isEmpty) NoBlocker
else RfcBlocker(targetVersionVector, endpointIds.map(logId(_, logName)), !foreign)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,15 @@ trait ReplicationBlocker {
}

object ReplicationBlocker {
class SequentialReplicationBlocker(blockers: Seq[ReplicationBlocker]) extends ReplicationBlocker {
case class SequentialReplicationBlocker(blockers: Seq[ReplicationBlocker]) extends ReplicationBlocker {
override def apply(event: EncodedEvent) = {
@tailrec
def go(blockers: Seq[ReplicationBlocker]): Option[BlockReason] =
blockers match {
case Nil => None
case h :: t =>
h(event) match {
case None => go(t)
case reason => reason
}
}
def go(blockers: Seq[ReplicationBlocker]): Option[BlockReason] = blockers match {
case Nil => None
case h :: t =>
val reason = h(event)
if(reason.isDefined) reason else go(t) // getOrElse violates tailrec
}
go(blockers)
}
}
Expand All @@ -31,7 +28,7 @@ object ReplicationBlocker {
override def apply(event: EncodedEvent) = None
}

class BlockAfter(n: Int) extends ReplicationBlocker {
case class BlockAfter(n: Int) extends ReplicationBlocker {
private var count: Int = 0
override def apply(event: EncodedEvent) =
if(count > n)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.NoBlocker
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Continue
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.ReplicationDecision

Expand All @@ -10,9 +9,14 @@ object ReplicationDecider {
case class Block(reason: BlockReason) extends ReplicationDecision
case object Continue extends ReplicationDecision

def apply(replicationFilter: ReplicationFilter, replicationBlocker: ReplicationBlocker = NoBlocker): ReplicationDecider = new ReplicationDecider {
def apply(filter: ReplicationFilter): ReplicationDecider = new ReplicationDecider {
override def apply(event: EncodedEvent) =
if (replicationFilter(event)) replicationBlocker(event).map(Block).getOrElse(Continue) else Filter
if (filter(event)) Continue else Filter
}

def apply(blocker: ReplicationBlocker): ReplicationDecider = new ReplicationDecider {
override def apply(event: EncodedEvent) =
blocker(event).map(Block).getOrElse(Continue)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import java.util.function.UnaryOperator

import akka.actor._
import akka.pattern.{ask, pipe}
import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.IncompatibilityReason
import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.typesafe.config._
Expand All @@ -30,7 +29,7 @@ class ReplicationEndpoint(
new AtomicReference(Map.empty)

val system: ActorSystem =
ActorSystem(s"$id-system", config)
ActorSystem(s"$id-system", config.withFallback(ConfigFactory.load()))

val settings: ReplicationSettings =
new ReplicationSettings(system.settings.config)
Expand All @@ -51,6 +50,9 @@ class ReplicationEndpoint(
def addTargetFilter(targetEndpointId: String, targetLogName: String, filter: ReplicationFilter): Unit =
eventLogs(targetLogName) ! AddTargetFilter(logId(targetEndpointId, targetLogName), filter)

def addRedundantFilterConfig(targetEndpointId: String, config: RedundantFilterConfig): Unit =
eventLogs(config.logName) ! AddRedundantFilterConfig(logId(targetEndpointId, config.logName), config)

def connect(remoteEndpoint: ReplicationEndpoint): Future[String] =
connect(remoteEndpoint.connectionAcceptor)

Expand Down Expand Up @@ -150,6 +152,9 @@ private class Replicator(sourceLogId: String, sourceLog: ActorRef, targetLogId:
case ReplicationReadSuccess(events, progress) =>
context.become(writing)
write(events, progress)
case ReplicationReadFailure(cause) =>
context.become(idle)
scheduleRead()
}

val writing: Receive = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ object ReplicationProtocol {

case class AddTargetFilter(targetLogId: String, filter: ReplicationFilter)

case class AddRedundantFilterConfig(targetLogId: String, config: RedundantFilterConfig)

case class GetReplicationSourceLogs(logNames: Set[String])
case class GetReplicationSourceLogsSuccess(endpointId: String, sourceLogs: Map[String, ActorRef])

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.RfcBlocker.RfcConditionViolated

object RfcBlocker {
case class RfcConditionViolated(eventTime: VectorTime, targetVersionVector: VectorTime, projectionProcessIds: Set[String], negateProjection: Boolean) extends BlockReason
}

case class RfcBlocker(targetVersionVector: VectorTime, processIds: Set[String], negateProjection: Boolean) extends ReplicationBlocker {
def apply(event: EncodedEvent): Option[BlockReason] =
if(event.metadata.vectorTimestamp.projection(processIds, negateProjection) <= targetVersionVector)
None
else
Some(RfcConditionViolated(event.metadata.vectorTimestamp, targetVersionVector, processIds, negateProjection))
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ case class VectorTime(value: Map[String, Long] = Map.empty) {
def merge(that: VectorTime): VectorTime =
copy(value.unionWith(that.value)(math.max))

def projection(processIds: Set[String], negate: Boolean = false): VectorTime =
copy(value.filterKeys(p => processIds.contains(p) != negate).view.force)

/**
* Returns `true` if this vector time is equivalent (equal) to `that`.
*/
Expand Down
2 changes: 2 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
akka.actor.warn-about-java-serializer-usage = off
akka.log-dead-letters = off
Loading