Skip to content
This repository has been archived by the owner on Jun 1, 2021. It is now read-only.

Wip schema evolution ii #6

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.rbmhtechnology.eventuate.sandbox

case class EventMetadata(emitterId: String, emitterLogId: String, localLogId: String, localSequenceNr: Long, vectorTimestamp: VectorTime)
case class EventVersion(majorVersion: Int, minorVersion: Int)
case class EventManifest(schema: String, isStringManifest: Boolean, eventVersion: Option[EventVersion])
case class PayloadVersion(majorVersion: Int, minorVersion: Int)
case class EventManifest(schema: String, isStringManifest: Boolean, payloadVersion: Option[PayloadVersion])
case class EventBytes(bytes: Array[Byte], serializerId: Int, manifest: EventManifest)

sealed trait DurableEvent {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.rbmhtechnology.eventuate.sandbox

import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Block
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Continue
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.ReplicationDecision
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer.decode

import scala.reflect.ClassTag
import scala.reflect._
import scala.util.Failure
import scala.util.Success
import scala.util.Try


object EventCompatibility {
sealed trait IncompatibilityReason
case class MinorIncompatibility(event: EncodedEvent, required: PayloadVersion, supported: PayloadVersion) extends IncompatibilityReason
case class MajorIncompatibility(schema: String, required: PayloadVersion, supported: PayloadVersion) extends IncompatibilityReason
case class FailureOnDeserialization(serializerId: Int, schema: String, cause: Throwable) extends IncompatibilityReason
case class NoSerializer(serializerId: Int) extends IncompatibilityReason
case class NoLocalPayloadVersion(event: EncodedEvent, serializerId: Int) extends IncompatibilityReason
case class NoRemotePayloadVersion(event: EncodedEvent, serializerId: Int) extends IncompatibilityReason

def eventCompatibility(encoded: EncodedEvent)(implicit system: ActorSystem): Option[IncompatibilityReason]= {
val serializerId = encoded.payload.serializerId
val manifest = encoded.payload.manifest
val compatibility= for {
serializer <- toRight(SerializationExtension(system).serializerByIdentity.get(serializerId), NoSerializer(serializerId))
event <- toRight(decode(encoded).map(_ => encoded), FailureOnDeserialization(serializerId, manifest.schema, _ : Throwable))
payloadSerializer <- castOrLeft[EventPayloadSerializer, IncompatibilityReason](serializer, NoLocalPayloadVersion(event, serializerId))
payloadVersion <- toRight(manifest.payloadVersion, NoRemotePayloadVersion(event, serializerId))
_ <- compareVersions(event, payloadSerializer.payloadVersion(manifest.schema), payloadVersion)
} yield ()
compatibility.left.toOption
}

private def compareVersions(event: EncodedEvent, supported: PayloadVersion, required: PayloadVersion): Either.RightProjection[IncompatibilityReason, Unit] = {
val res = if(supported.majorVersion < required.majorVersion)
Left(MajorIncompatibility(event.payload.manifest.schema, required, supported))
else if(supported.majorVersion == required.majorVersion && supported.minorVersion < required.minorVersion)
Left(MinorIncompatibility(event, required, supported))
else
Right(())
res.right
}

private def castOrLeft[A : ClassTag, L](a: AnyRef, left: L): Either.RightProjection[L, A] =
Either.cond(classTag[A].runtimeClass.isAssignableFrom(a.getClass), a.asInstanceOf[A], left).right

private def toRight[L, R](option: Option[R], left: L): Either.RightProjection[L, R] =
Either.cond(option.isDefined, option.get, left).right

private def toRight[L, R](t: Try[R], makeLeft: Throwable => L): Either.RightProjection[L, R] =
t match {
case Success(r) => Right(r).right
case Failure(ex) => Left(makeLeft(ex)).right
}

def eventCompatibilityDecider(decider: IncompatibilityReason => ReplicationDecision)(implicit system: ActorSystem): ReplicationDecider =
new ReplicationDecider {
override def apply(event: EncodedEvent) =
eventCompatibility(event).map(decider).getOrElse(Continue)
}

case class BlockOnIncompatibility(compatibility: IncompatibilityReason) extends BlockReason

def stopOnIncompatibility(implicit system: ActorSystem) = eventCompatibilityDecider {
incompatibility => Block(BlockOnIncompatibility(incompatibility))
}

def stopOnUnserializableKeepOthers(implicit system: ActorSystem) = eventCompatibilityDecider {
case _: MinorIncompatibility | _: NoRemotePayloadVersion | _: NoLocalPayloadVersion => Continue
case incompatibility => Block(BlockOnIncompatibility(incompatibility))
}
}
70 changes: 54 additions & 16 deletions src/main/scala/com/rbmhtechnology/eventuate/sandbox/EventLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package com.rbmhtechnology.eventuate.sandbox

import akka.actor._
import com.rbmhtechnology.eventuate.sandbox.EventsourcingProtocol._
import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.stopOnUnserializableKeepOthers
import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProcessor.ReplicationProcessResult
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.BlockAfter
import com.rbmhtechnology.eventuate.sandbox.serializer.EventPayloadSerializer

import scala.collection.immutable.Seq
Expand All @@ -19,35 +22,41 @@ trait EventLogOps {
def id: String

def sourceFilter: ReplicationFilter
def targetFilter(targetLogId: String): ReplicationFilter
def inboundReplicationProcessor(sourceLogId: String, currentVersionVector: VectorTime): ReplicationProcessor
def outboundReplicationProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int): ReplicationProcessor

def sequenceNr: Long =
_sequenceNr

def versionVector: VectorTime =
_versionVector

def read(fromSquenceNr: Long): Seq[EncodedEvent] =
eventStore.drop(fromSquenceNr.toInt - 1)
def read(fromSequenceNr: Long): Seq[EncodedEvent] =
eventStore.drop(fromSequenceNr.toInt - 1)

def causalityFilter(versionVector: VectorTime): ReplicationFilter = new ReplicationFilter {
override def apply(event: EncodedEvent): Boolean = !event.before(versionVector)
}

def replicationReadFilter(targetLogId: String, targetVersionVector: VectorTime): ReplicationFilter =
causalityFilter(targetVersionVector) and targetFilter(targetLogId) and sourceFilter
def replicationReadFilter(targetFilter: ReplicationFilter, targetVersionVector: VectorTime): ReplicationFilter =
causalityFilter(targetVersionVector) and targetFilter and sourceFilter

def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): Seq[EncodedEvent] =
read(fromSequenceNr).filter(replicationReadFilter(targetLogId, targetVersionVector).apply).take(num)
def replicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime): ReplicationProcessResult =
outboundReplicationProcessor(targetLogId, targetVersionVector, num)
.apply(read(fromSequenceNr), fromSequenceNr)

def progressRead(logId: String): Long =
progressStore.getOrElse(logId, 0L)

def emissionWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events, (evt, snr) => evt.emitted(id, snr))

def replicationWrite(events: Seq[EncodedEvent]): Seq[EncodedEvent] =
write(events.filter(causalityFilter(_versionVector).apply), (evt, snr) => evt.replicated(id, snr))
def replicationWrite(events: Seq[EncodedEvent], progress: Long, sourceLogId: String): ReplicationProcessResult = {
inboundReplicationProcessor(sourceLogId, versionVector)
.apply(events, progress).right.map {
case (filtered, updatedProgress) => (write(filtered, (evt, snr) => evt.replicated(id, snr)), updatedProgress)
}
}

def progressWrite(progresses: Map[String, Long]): Unit =
progressStore = progressStore ++ progresses
Expand Down Expand Up @@ -97,32 +106,58 @@ class EventLog(val id: String, val sourceFilter: ReplicationFilter) extends Acto
private var targetFilters: Map[String, ReplicationFilter] =
Map.empty

private var eventCompatibilityDeciders: Map[String, ReplicationDecider] =
Map.empty

override def receive = {
case Subscribe(subscriber) =>
subscribe(subscriber)
case Read(from) =>
val encoded = read(from)
sender() ! ReadSuccess(decode(encoded))
case ReplicationRead(from, num, tlid, tvv) =>
val encoded = replicationRead(from, num, tlid, tvv)
sender() ! ReplicationReadSuccess(encoded, encoded.lastOption.map(_.metadata.localSequenceNr).getOrElse(from))
replicationRead(from, num, tlid, tvv) match {
case Right((processedEvents, progress)) =>
sender() ! ReplicationReadSuccess(processedEvents, progress)
case Left(reason) =>
sender() ! ReplicationReadFailure(new ReplicationStoppedException(reason))
}
case Write(events) =>
val encoded = emissionWrite(encode(events))
val decoded = encoded.zip(events).map { case (enc, dec) => dec.copy(enc.metadata) }
sender() ! WriteSuccess(decoded)
publish(decoded)
case ReplicationWrite(events, sourceLogId, progress) =>
val encoded = replicationWrite(events); progressWrite(Map(sourceLogId -> progress))
val decoded = decode(encoded)
sender() ! ReplicationWriteSuccess(encoded, sourceLogId, progress, versionVector)
publish(decoded)
replicationWrite(events, progress, sourceLogId) match {
case Right((processedEvents, updatedProgress)) =>
progressWrite(Map(sourceLogId -> updatedProgress))
val decoded = decode(processedEvents)
sender() ! ReplicationWriteSuccess(processedEvents, sourceLogId, progress, versionVector)
publish(decoded)
case Left(reason) =>
sender() ! ReplicationWriteFailure(new ReplicationStoppedException(reason))
}
case GetReplicationProgressAndVersionVector(logId) =>
sender() ! GetReplicationProgressAndVersionVectorSuccess(progressRead(logId), versionVector)
case AddTargetFilter(logId, filter) =>
targetFilters = targetFilters.updated(logId, filter)
case AddEventCompatibilityDecider(sourceLogId, processor) =>
eventCompatibilityDeciders += sourceLogId -> processor
case RemoveEventCompatibilityDecider(sourceLogId) =>
eventCompatibilityDeciders -= sourceLogId
}

def targetFilter(logId: String): ReplicationFilter =
override def inboundReplicationProcessor(sourceLogId: String, currentVersionVector: VectorTime) =
ReplicationProcessor(
ReplicationDecider(causalityFilter(currentVersionVector))
.andThen(eventCompatibilityDeciders.getOrElse(sourceLogId, stopOnUnserializableKeepOthers)))

override def outboundReplicationProcessor(targetLogId: String, targetVersionVector: VectorTime, num: Int) =
// TODO RFC processor
ReplicationProcessor(
ReplicationDecider(replicationReadFilter(targetFilter(targetLogId), targetVersionVector), new BlockAfter(num)))

private def targetFilter(logId: String): ReplicationFilter =
targetFilters.getOrElse(logId, NoFilter)
}

Expand All @@ -138,4 +173,7 @@ object EventLog {

def decode(events: Seq[EncodedEvent])(implicit system: ActorSystem): Seq[DecodedEvent] =
events.map(e => EventPayloadSerializer.decode(e).get)

class ReplicationStoppedException(reason: BlockReason)
extends IllegalStateException(s"Replication stopped: $reason")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.rbmhtechnology.eventuate.sandbox

import scala.annotation.tailrec
import scala.collection.immutable.Seq

trait BlockReason
case class MoreThanN(n: Int) extends BlockReason

trait ReplicationBlocker {
def apply(event: EncodedEvent): Option[BlockReason]
}

object ReplicationBlocker {
class SequentialReplicationBlocker(blockers: Seq[ReplicationBlocker]) extends ReplicationBlocker {
override def apply(event: EncodedEvent) = {
@tailrec
def go(blockers: Seq[ReplicationBlocker]): Option[BlockReason] =
blockers match {
case Nil => None
case h :: t =>
h(event) match {
case None => go(t)
case reason => reason
}
}
go(blockers)
}
}

object NoBlocker extends ReplicationBlocker {
override def apply(event: EncodedEvent) = None
}

class BlockAfter(n: Int) extends ReplicationBlocker {
private var count: Int = 0
override def apply(event: EncodedEvent) =
if(count > n)
Some(MoreThanN(n))
else {
count += 1
None
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.ReplicationBlocker.NoBlocker
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Continue
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.ReplicationDecision

object ReplicationDecider {
sealed trait ReplicationDecision
case object Filter extends ReplicationDecision
case class Block(reason: BlockReason) extends ReplicationDecision
case object Continue extends ReplicationDecision

def apply(replicationFilter: ReplicationFilter, replicationBlocker: ReplicationBlocker = NoBlocker): ReplicationDecider = new ReplicationDecider {
override def apply(event: EncodedEvent) =
if (replicationFilter(event)) replicationBlocker(event).map(Block).getOrElse(Continue) else Filter
}
}

trait ReplicationDecider { outer =>
def apply(event: EncodedEvent): ReplicationDecision

def andThen(nextDecider: ReplicationDecider) = new ReplicationDecider {
override def apply(event: EncodedEvent) = outer(event) match {
case Continue => nextDecider(event)
case result => result
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.function.UnaryOperator

import akka.actor._
import akka.pattern.{ask, pipe}

import com.rbmhtechnology.eventuate.sandbox.EventCompatibility.IncompatibilityReason
import com.rbmhtechnology.eventuate.sandbox.ReplicationFilter.NoFilter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProtocol._
import com.typesafe.config._
Expand Down Expand Up @@ -54,8 +54,15 @@ class ReplicationEndpoint(
def connect(remoteEndpoint: ReplicationEndpoint): Future[String] =
connect(remoteEndpoint.connectionAcceptor)

def connect(remoteAcceptor: ActorRef): Future[String] =
def connect(remoteEndpoint: ReplicationEndpoint, eventCompatibilityDeciders: Map[String, ReplicationDecider]): Future[String] =
connect(remoteEndpoint.connectionAcceptor, eventCompatibilityDeciders)

def connect(remoteAcceptor: ActorRef, eventCompatibilityDeciders: Map[String, ReplicationDecider] = Map.empty): Future[String] =
remoteAcceptor.ask(GetReplicationSourceLogs(logNames))(settings.askTimeout).mapTo[GetReplicationSourceLogsSuccess].map { reply =>
eventCompatibilityDeciders.foreach { case (logName, processor) =>
eventLogs.get(logName).foreach(_ ! AddEventCompatibilityDecider(logId(reply.endpointId, logName), processor))
}
//TODO make sure processors are added before replicators are started
val replicators = reply.sourceLogs.map {
case (logName, sourceLog) =>
val sourceLogId = logId(reply.endpointId, logName)
Expand All @@ -66,8 +73,12 @@ class ReplicationEndpoint(
reply.endpointId
}

def disconnect(remoteEndpointId: String): Unit =
def disconnect(remoteEndpointId: String): Unit = {
removeConnection(remoteEndpointId).foreach(system.stop)
eventLogs.foreach { case (logName, eventLog) =>
eventLog ! RemoveEventCompatibilityDecider(logId(remoteEndpointId, logName))
}
}

def terminate(): Future[Terminated] =
system.terminate()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.rbmhtechnology.eventuate.sandbox

import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Block
import com.rbmhtechnology.eventuate.sandbox.ReplicationDecider.Filter
import com.rbmhtechnology.eventuate.sandbox.ReplicationProcessor.ReplicationProcessResult

import scala.annotation.tailrec
import scala.collection.immutable.Seq

object ReplicationProcessor {
type ReplicationProcessResult = Either[BlockReason, (Seq[EncodedEvent], Long)]
}

case class ReplicationProcessor(replicationDecider: ReplicationDecider) {

def apply(events: Seq[EncodedEvent], progress: Long): ReplicationProcessResult = {
var lastProgress: Long = 0

@tailrec
def go(in: Seq[EncodedEvent], out: Vector[EncodedEvent]): ReplicationProcessResult = in match {
case seq if seq.isEmpty =>
Right(out, progress)
case seq =>
replicationDecider(seq.head) match {
case Block(reason) =>
Either.cond(lastProgress > 0, (out, lastProgress), reason)
case decision =>
lastProgress = seq.head.metadata.localSequenceNr
go(seq.tail, if(decision == Filter) out else out :+ seq.head)
}
}

go(events, Vector.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import akka.actor.ActorRef
import scala.collection.immutable.Seq

object ReplicationProtocol {
case class AddEventCompatibilityDecider(sourceLogId: String, decider: ReplicationDecider)
case class RemoveEventCompatibilityDecider(sourceLogId: String)

case class AddTargetFilter(targetLogId: String, filter: ReplicationFilter)

case class GetReplicationSourceLogs(logNames: Set[String])
Expand All @@ -15,7 +18,9 @@ object ReplicationProtocol {

case class ReplicationRead(fromSequenceNr: Long, num: Int, targetLogId: String, targetVersionVector: VectorTime)
case class ReplicationReadSuccess(events: Seq[EncodedEvent], progress: Long)
case class ReplicationReadFailure(cause: Throwable)

case class ReplicationWrite(events: Seq[EncodedEvent], sourceLogId: String, progress: Long)
case class ReplicationWriteSuccess(events: Seq[EncodedEvent], sourceLogId: String, progress: Long, targetVersionVector: VectorTime)
case class ReplicationWriteFailure(cause: Throwable)
}
Loading