From beae6c56799d1bee89b0777174062bdf587be9a9 Mon Sep 17 00:00:00 2001 From: Volker Stampa Date: Fri, 10 Feb 2017 15:36:36 +0100 Subject: [PATCH 1/3] Add documentation for TLS setup Akka remoting support TLS. The documentation describes how akka remoting needs to be configured for this and how corresponding keystores can be prepared. Closes #380 --- src/sphinx/code/create-keystore.sh | 93 ++++++++++++++++++++++++++ src/sphinx/conf/tls.conf | 20 ++++++ src/sphinx/reference/configuration.rst | 30 +++++++-- src/sphinx/reference/event-log.rst | 2 + 4 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 src/sphinx/code/create-keystore.sh create mode 100644 src/sphinx/conf/tls.conf diff --git a/src/sphinx/code/create-keystore.sh b/src/sphinx/code/create-keystore.sh new file mode 100644 index 00000000..d8dc0cfd --- /dev/null +++ b/src/sphinx/code/create-keystore.sh @@ -0,0 +1,93 @@ +#!/bin/bash -eu + +export PW=<...> + +CaAlias=rootca +CaKeystore=rootca.jks +CaExport=root.crt + +CertAlias=serverandclient +CertKeystore=keystore.jks +CertSigningRequest=serverandclient.csr +CertSigned=serverandclient.crt + +logStep() { + echo + echo '=========' "$@" + echo +} + +rm -f "$CaKeystore" "$CertKeystore" "$CaExport" "$CertSigningRequest" "$CertSigned" + +logStep Create root CA in $CaKeystore +keytool -genkeypair \ + -alias "$CaAlias" \ + -dname "CN=CA" \ + -keystore "$CaKeystore" \ + -keypass:env PW \ + -storepass:env PW \ + -keyalg RSA \ + -keysize 4096 \ + -ext KeyUsage:critical="keyCertSign" \ + -ext BasicConstraints:critical="ca:true" \ + -validity 3700 + +logStep Export root CA to $CaExport for import into truststore +keytool -export \ + -alias "$CaAlias" \ + -file "$CaExport" \ + -keypass:env PW \ + -storepass:env PW \ + -keystore "$CaKeystore" \ + -rfc + +logStep Make $CertKeystore trust CA as signer +keytool -import \ + -alias "$CaAlias" \ + -file "$CaExport" \ + -keystore "$CertKeystore" \ + -storepass:env PW << EOF +yes +EOF + +logStep Create certificate for client and server auth in $CertKeystore +keytool -genkeypair \ + -alias "$CertAlias" \ + -dname "CN=Unknown" \ + -keystore "$CertKeystore" \ + -keypass:env PW \ + -storepass:env PW \ + -keyalg RSA \ + -keysize 2048 \ + -validity 385 + +logStep Create a certificate signing request: $CertSigningRequest +keytool -certreq \ + -alias "$CertAlias" \ + -keypass:env PW \ + -storepass:env PW \ + -keystore "$CertKeystore" \ + -file "$CertSigningRequest" + +logStep Sign certificate with CA: $CertSigned +keytool -gencert \ + -alias "$CaAlias" \ + -keypass:env PW \ + -storepass:env PW \ + -keystore "$CaKeystore" \ + -infile "$CertSigningRequest" \ + -outfile "$CertSigned" \ + -ext KeyUsage:critical="digitalSignature,keyEncipherment" \ + -rfc + +logStep Import signed certificate back into $CertKeystore +keytool -import \ + -alias "$CertAlias" \ + -file "$CertSigned" \ + -keystore "$CertKeystore" \ + -storepass:env PW + +logStep List $CertKeystore +keytool -list \ + -keystore "$CertKeystore" \ + -storepass:env PW diff --git a/src/sphinx/conf/tls.conf b/src/sphinx/conf/tls.conf new file mode 100644 index 00000000..c11d506d --- /dev/null +++ b/src/sphinx/conf/tls.conf @@ -0,0 +1,20 @@ +akka { + remote { + enabled-transports = [akka.remote.netty.ssl] + netty.ssl.security { + key-store = "/path/to/keystore.jks" + trust-store = "/path/to/keystore.jks" + + key-store-password = ... + key-password = ... + trust-store-password = ... + + protocol = "TLSv1.2" + + enabled-algorithms = ["TLS_DHE_RSA_WITH_AES_256_GCM_SHA384"] + + random-number-generator = "AES256CounterSecureRNG" + require-mutual-authentication = on + } + } +} diff --git a/src/sphinx/reference/configuration.rst b/src/sphinx/reference/configuration.rst index c1ee9809..c516a23a 100644 --- a/src/sphinx/reference/configuration.rst +++ b/src/sphinx/reference/configuration.rst @@ -1,29 +1,49 @@ +.. _transport-security: + +Transport Security +------------------ + +In a setup with several locations Eventuate uses `Akka Remoting`_ for communication between locations. Akka Remoting supports using TLS as transport protocol. In combination with mutual authentication (client and server) this allows to prevent that untrusted nodes can connect to a replication network. The configuration snippet listed here acts as an example for how to set this up. For more details see `Configuring SSL/TLS for Akka Remoting`_. + +.. literalinclude:: ../conf/tls.conf + +This example uses a single file for key- and trust-store. Having a single self-signed certificate in this store suffices to establish mutually authenticated connections via TLS between locations. However a self-signed key makes replacing the certificate in a rolling upgrade tedious. The following script (derived from the scripts of the documentation of `Lightbend's SSL-Config library`_) creates a root certificate that is used to sign the certificate for client and server authentication. The root certificate is kept in the private keystore ``rootca.jks``, which does not have to be deployed with the application. It is imported as ``trustedCertEntry`` into the keystore ``keystore.jks``, which is the one referenced by the akka configuration. + +.. literalinclude:: ../code/create-keystore.sh + +If the certificate for client and server authentication shall be replaced, corresponding key-stores can be deployed location by location without the need to stop the entire replication network at once. + +For debugging TLS connections please read `Debugging SSL/TLS Connections`_ from the java documentation. + .. _configuration: -------------- Configuration ------------- This is the reference configuration of Eventuate. It is processed by Typesafe's config_ library and can be overridden by applications: eventuate-core --------------- +~~~~~~~~~~~~~~ .. literalinclude:: ../../../eventuate-core/src/main/resources/reference.conf eventuate-crdt --------------- +~~~~~~~~~~~~~~ .. literalinclude:: ../../../eventuate-crdt/src/main/resources/reference.conf eventuate-log-cassandra ------------------------ +~~~~~~~~~~~~~~~~~~~~~~~ .. literalinclude:: ../../../eventuate-log-cassandra/src/main/resources/reference.conf eventuate-log-leveldb ---------------------- +~~~~~~~~~~~~~~~~~~~~~ .. literalinclude:: ../../../eventuate-log-leveldb/src/main/resources/reference.conf .. _config: https://github.com/typesafehub/config +.. _Akka Remoting: http://doc.akka.io/docs/akka/2.4/scala/remoting.html +.. _Configuring SSL/TLS for Akka Remoting: http://doc.akka.io/docs/akka/2.4/scala/remoting.html#Configuring_SSL_TLS_for_Akka_Remoting +.. _Lightbend's SSL-Config library: http://typesafehub.github.io/ssl-config/CertificateGeneration.html#using-keytool +.. _Debugging SSL/TLS Connections: http://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/ReadDebug.html \ No newline at end of file diff --git a/src/sphinx/reference/event-log.rst b/src/sphinx/reference/event-log.rst index 8c61380a..540fbcdf 100644 --- a/src/sphinx/reference/event-log.rst +++ b/src/sphinx/reference/event-log.rst @@ -120,6 +120,8 @@ Each location has a ``ReplicationEndpoint`` that manages the local event logs. R .. includecode:: ../conf/location-1.conf :snippet: remoting-conf +Next to TCP Akka Remoting also support TLS as transport protocol. See :ref:`transport-security` for details on how to set this up. + The network address of the replication endpoint at location ``1`` is: .. includecode:: ../conf/location-1.conf From 928eac9f359fc6b2795736cb95a940eee4041480 Mon Sep 17 00:00:00 2001 From: Mike Slinn Date: Tue, 21 Mar 2017 21:13:55 -0700 Subject: [PATCH 2/3] Tweaking for visually impaired people --- src/sphinx/_ext/tabbedcode.css | 15 ++- src/sphinx/_themes/rbmh/layout.html | 10 +- src/sphinx/_themes/rbmh/static/css/theme.css | 100 +++++++++++++++---- 3 files changed, 94 insertions(+), 31 deletions(-) diff --git a/src/sphinx/_ext/tabbedcode.css b/src/sphinx/_ext/tabbedcode.css index a4b2101c..dbd8141f 100644 --- a/src/sphinx/_ext/tabbedcode.css +++ b/src/sphinx/_ext/tabbedcode.css @@ -1,7 +1,7 @@ ul.tabbed-code-selector { margin-bottom: 0 !important; position: relative; - bottom: -22px; + bottom: -12px; list-style-type: none; display: inline-flex; } @@ -16,12 +16,19 @@ ul.tabbed-code-selector li { float: left; border: 1px solid #e1e4e5; background-color: #f5f5f5; + border-bottom: none; color: #646567; } ul.tabbed-code-selector li:hover { background-color: #ebebeb; } ul.tabbed-code-selector li.selected { - background-color: #f8f8f8; - border-bottom: 1px solid #f8f8f8; -} \ No newline at end of file + background-color: #fbf97a; + border-bottom: none; + color: black; +} + +ul.tabbed-code-selector li[class^='highlight'] { + border-top-right-radius: 8px; + border-top-left-radius: 8px; +} diff --git a/src/sphinx/_themes/rbmh/layout.html b/src/sphinx/_themes/rbmh/layout.html index 194f4782..cd0a0c4f 100644 --- a/src/sphinx/_themes/rbmh/layout.html +++ b/src/sphinx/_themes/rbmh/layout.html @@ -95,15 +95,15 @@ {# Not strictly valid HTML, but it's the only way to display/scale it properly, without weird scripting or heaps of work #} {% endif %} - + {% include "searchbox.html" %} - + {% endblock %} diff --git a/src/sphinx/_themes/rbmh/static/css/theme.css b/src/sphinx/_themes/rbmh/static/css/theme.css index 5e8c9583..569ed249 100644 --- a/src/sphinx/_themes/rbmh/static/css/theme.css +++ b/src/sphinx/_themes/rbmh/static/css/theme.css @@ -30,7 +30,7 @@ audio:not([controls]) { } html { - font-size: 100%; + font-size: 12pt; -webkit-text-size-adjust: 100%; -ms-text-size-adjust: 100%; } @@ -75,7 +75,7 @@ mark { pre, code, .rst-content tt, kbd, samp { font-family: monospace, serif; _font-family: "courier new", monospace; - font-size: 1em; + font-size: 90%; } pre { @@ -3259,6 +3259,10 @@ input[type="radio"][disabled], input[type="checkbox"][disabled] { cursor: not-allowed; } +table.footnote code .pre { + font-size: 10pt; +} + .wy-checkbox, .wy-radio { margin: 6px 0; color: #404040; @@ -3628,7 +3632,7 @@ html { body { font-family: "open-sans", "Helvetica Neue", Arial, sans-serif; font-weight: normal; - color: #404040; + color: #242424; min-height: 100%; overflow-x: hidden; background: #edf0f2; @@ -3712,7 +3716,12 @@ p { line-height: 24px; margin: 0; font-size: 16px; - margin-bottom: 24px; + margin-bottom: 12px; + text-align: justify; +} + +p + p { + margin-top: 12px; } h1 { @@ -3751,16 +3760,16 @@ hr { code, .rst-content tt { white-space: nowrap; max-width: 100%; - background: #fff; - border: solid 1px #e1e4e5; - font-size: 75%; - padding: 0 5px; - font-family: "open-sans", "Helvetica Neue", Arial, sans-serif; - color: #E74C3C; + background: none; + /*border: solid 1px #e1e4e5;*/ + font-size: 90%; + /*padding: 0 5px;*/ + font-family: "open-sans", "Helvetica Neue", sans-serif; + color: black; overflow-x: auto; } code.code-large, .rst-content tt.code-large { - font-size: 90%; + font-size: 100%; } .wy-plain-list-disc, .rst-content .section ul, .rst-content .toctree-wrapper ul, article ul { @@ -3771,6 +3780,7 @@ code.code-large, .rst-content tt.code-large { .wy-plain-list-disc li, .rst-content .section ul li, .rst-content .toctree-wrapper ul li, article ul li { list-style: disc; margin-left: 24px; + margin-top: 6pt; } .wy-plain-list-disc li p:last-child, .rst-content .section ul li p:last-child, .rst-content .toctree-wrapper ul li p:last-child, article ul li p:last-child { margin-bottom: 0; @@ -3788,6 +3798,48 @@ code.code-large, .rst-content tt.code-large { list-style: decimal; } +.toctree-l2 { + text-indent: 0.5em; + white-space: nowrap; +} +.toctree-l3 { + text-indent: 1em; + white-space: nowrap; +} +.toctree-l4 { + text-indent: 1.5em; + white-space: nowrap; +} +.toctree-l5 { + text-indent: 2em; + white-space: nowrap; +} +.toctree-l6 { + text-indent: 2.5em; + white-space: nowrap; +} + +.document .section h1 { + color: black; + font-size: 36px; +} +.document .section h2 { + font-size: 30px; + color: #202325; +} +.document .section h3 { + font-size: 24px; + color: #343739; +} +.document .section h4 { + font-size: 22px; + color: #494c4e; +} +.document .section h5 { + font-size: 18px; + color: #56595b; +} + .wy-plain-list-decimal, .rst-content .section ol, .rst-content ol.arabic, article ol { list-style: decimal; line-height: 24px; @@ -3796,6 +3848,7 @@ code.code-large, .rst-content tt.code-large { .wy-plain-list-decimal li, .rst-content .section ol li, .rst-content ol.arabic li, article ol li { list-style: decimal; margin-left: 24px; + margin-top: 6pt; } .wy-plain-list-decimal li p:last-child, .rst-content .section ol li p:last-child, .rst-content ol.arabic li p:last-child, article ol li p:last-child { margin-bottom: 0; @@ -3837,6 +3890,7 @@ code.code-large, .rst-content tt.code-large { background: #fff; margin: 1px 0 24px 0; } + .codeblock div[class^='highlight'], pre.literal-block div[class^='highlight'], .rst-content .literal-block div[class^='highlight'], div[class^='highlight'] div[class^='highlight'] { border: none; background: none; @@ -3860,7 +3914,7 @@ div[class^='highlight'] td.code { div[class^='highlight'] pre { white-space: pre; margin: 0; - padding: 12px 12px; + /*padding: 12px 12px;*/ font-family: "open-sans", "Helvetica Neue", Arial, sans-serif; font-size: 12px; line-height: 1.5; @@ -4619,7 +4673,7 @@ h4 { } .wy-menu-vertical a, .wy-menu-vertical li.current a { - color: #646567; + color: #242424; } .wy-menu-vertical a:hover, .wy-menu-vertical li.current > a:hover, .wy-menu-vertical li.current a:hover { @@ -4642,9 +4696,9 @@ h4 { background-color: #f2940a; } -.rst-content .warning { +/*.rst-content .warning { background-color: rgba(242, 148, 10, 0.1); -} +}*/ .btn-neutral { background-color: #f8f8f8; @@ -4659,7 +4713,8 @@ h4 { } code, .rst-content tt, .rst-content tt { - color: #db0a40; + color: black; + background: none; font-family: "Lucida Console", Monaco, monospace; } @@ -4767,10 +4822,11 @@ footer p { background: #f8f8f8 !important; padding: 7px 7px 7px 10px !important; border: 1px solid #ddd !important; - -moz-box-shadow: 3px 3px rgba(0, 0, 0, 0.1) !important; + border-radius: 4px; + /*-moz-box-shadow: 3px 3px rgba(0, 0, 0, 0.1) !important; -webkit-box-shadow: 3px 3px rgba(0, 0, 0, 0.1) !important; - box-shadow: 3px 3px rgba(0, 0, 0, 0.1) !important; - margin: 20px 0 20px 0 !important; + box-shadow: 3px 3px rgba(0, 0, 0, 0.1) !important;*/ + margin: 10px 0 10px 0 !important; } div[class^='highlight'] pre { @@ -4783,7 +4839,7 @@ div[class^='highlight'] pre { } body { - color: #646567; + color: #242424; } blockquote { @@ -5109,12 +5165,12 @@ td input:focus { } .rst-content .footnote-reference, .rst-content .citation-reference { vertical-align: super; - font-size: 90%; + font-size: 8pt; } .rst-content table.docutils.citation, .rst-content table.docutils.footnote { background: none; border: none; - color: #999; + /* color: #999;*/ } .rst-content table.docutils.citation td, .rst-content table.docutils.citation tr, .rst-content table.docutils.footnote td, .rst-content table.docutils.footnote tr { border: none; From 5d045c0f72413bce70717bd0492fa6552c0f981d Mon Sep 17 00:00:00 2001 From: Volker Stampa Date: Wed, 29 Mar 2017 11:18:06 +0200 Subject: [PATCH 3/3] Use EventId in PersistOnEventRequests Instead of using the in case of disaster recovery potentially unstable sequence number as id for persist on event requests use a stable EventId that is composed of the sequence number of the emitter of the event and the corresponding process id. Closes #385 --- .../DurableEventSerializerSpec.scala | 3 +- .../serializer/SnapshotSerializerSpec.scala | 4 +- .../main/protobuf/DurableEventFormats.proto | 6 + .../src/main/protobuf/SnapshotFormats.proto | 3 +- .../eventuate/DurableEvent.scala | 26 ++- .../eventuate/EventsourcedActor.scala | 4 +- .../eventuate/EventsourcedVersion.scala | 5 +- .../eventuate/PersistOnEvent.scala | 59 +++++-- .../DelegatingDurableEventSerializer.scala | 17 +- .../serializer/SnapshotSerializer.scala | 10 +- .../rbmhtechnology/eventuate/BaseSpec.java | 2 +- .../eventuate/PersistOnEventSpec.scala | 156 +++++++++++++----- ...ersistOnEventWithRecoverySpecLeveldb.scala | 96 +++++++++++ .../eventuate/RecoverySpecLeveldb.scala | 21 +-- 14 files changed, 324 insertions(+), 88 deletions(-) create mode 100644 eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala diff --git a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala index 1ba9417d..86245c0c 100644 --- a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala +++ b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/DurableEventSerializerSpec.scala @@ -106,7 +106,8 @@ object DurableEventSerializerSpec { localLogId = "p3", localSequenceNr = 17L, deliveryId = Some("x"), - persistOnEventSequenceNr = Some(12L)) + persistOnEventSequenceNr = Some(12L), + persistOnEventId = Some(EventId("p4", 0L))) } class DurableEventSerializerSpec extends WordSpec with Matchers with Inside with BeforeAndAfterAll { diff --git a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala index 82429f68..34d3b918 100644 --- a/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala +++ b/eventuate-core/src/it/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializerSpec.scala @@ -46,8 +46,8 @@ object SnapshotSerializerSpec { DeliveryAttempt("4", payload, destination)) def persistOnEventRequests(payload: Any) = Vector( - PersistOnEventRequest(7L, Vector(PersistOnEventInvocation(payload, Set("a"))), 17), - PersistOnEventRequest(8L, Vector(PersistOnEventInvocation(payload, Set("b"))), 17)) + PersistOnEventRequest(7L, None, Vector(PersistOnEventInvocation(payload, Set("a"))), 17), + PersistOnEventRequest(8L, Some(EventId("p-a", 3L)), Vector(PersistOnEventInvocation(payload, Set("b"))), 17)) def snapshot(payload: Any, destination: ActorPath) = Snapshot(payload, "x", last(payload), vectorTime(17, 18), event.localSequenceNr, diff --git a/eventuate-core/src/main/protobuf/DurableEventFormats.proto b/eventuate-core/src/main/protobuf/DurableEventFormats.proto index 86e2173d..071d79d4 100644 --- a/eventuate-core/src/main/protobuf/DurableEventFormats.proto +++ b/eventuate-core/src/main/protobuf/DurableEventFormats.proto @@ -31,4 +31,10 @@ message DurableEventFormat { optional int64 localSequenceNr = 9; optional int64 persistOnEventSequenceNr = 10; optional string deliveryId = 11; + optional EventIdFormat persistOnEventId = 12; +} + +message EventIdFormat { + optional string processId = 1; + optional int64 sequenceNr = 2; } diff --git a/eventuate-core/src/main/protobuf/SnapshotFormats.proto b/eventuate-core/src/main/protobuf/SnapshotFormats.proto index 6cad7cf2..b295738d 100644 --- a/eventuate-core/src/main/protobuf/SnapshotFormats.proto +++ b/eventuate-core/src/main/protobuf/SnapshotFormats.proto @@ -37,9 +37,10 @@ message DeliveryAttemptFormat { } message PersistOnEventRequestFormat { - optional int64 persistOnEventSequenceNr = 1; + optional int64 sequenceNr = 1; repeated PersistOnEventInvocationFormat invocation = 2; optional int32 instanceId = 3; + optional EventIdFormat eventId = 4; } message PersistOnEventInvocationFormat { diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala index 626b017b..fb468396 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/DurableEvent.scala @@ -18,6 +18,16 @@ package com.rbmhtechnology.eventuate import scala.collection.immutable.Seq +/** + * Unique id of a [[DurableEvent]]. + * + * This is a stable id of an event across all replicated logs. + * + * @param processId the id of the event log the initially wrote the event. + * @param sequenceNr the initial sequence number in this log. + */ +case class EventId(processId: String, sequenceNr: Long) + /** * Provider API. * @@ -43,7 +53,14 @@ import scala.collection.immutable.Seq * @param deliveryId Delivery id chosen by an application that persisted this event with [[ConfirmedDelivery.persistConfirmation]]. * @param persistOnEventSequenceNr Sequence number of the event that caused the emission of this event in an event handler. * Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event - * with `persistOnEvent`. + * with `persistOnEvent`. Actually superseded by `persistOnEventId`, but still + * has to be maintained for backwards compatibility. It is required for confirmation + * of old [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest]]s from + * a snapshot that do not have [[com.rbmhtechnology.eventuate.PersistOnEvent.PersistOnEventRequest.persistOnEventId]] + * set. + * @param persistOnEventId event id of the event that caused the emission of this event in an event handler. + * Defined if an [[EventsourcedActor]] with a [[PersistOnEvent]] mixin emitted this event + * with `persistOnEvent`. */ case class DurableEvent( payload: Any, @@ -56,15 +73,16 @@ case class DurableEvent( localLogId: String = DurableEvent.UndefinedLogId, localSequenceNr: Long = DurableEvent.UndefinedSequenceNr, deliveryId: Option[String] = None, - persistOnEventSequenceNr: Option[Long] = None) { + persistOnEventSequenceNr: Option[Long] = None, + persistOnEventId: Option[EventId] = None) { import DurableEvent._ /** * Unique event identifier. */ - def id: VectorTime = - vectorTimestamp + val id: EventId = + EventId(processId, vectorTimestamp.localTime(processId)) /** * Returns `true` if this event did not happen before or at the given `vectorTime` diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala index f42cada3..ac1d2867 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedActor.scala @@ -145,12 +145,12 @@ trait EventsourcedActor extends EventsourcedView with EventsourcedVersion { messageStash.unstash() } } - case PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations, iid) => if (iid == instanceId) { + case PersistOnEventRequest(persistOnEventSequenceNr, persistOnEventId, invocations, iid) => if (iid == instanceId) { writeOrDelay { writeHandlers = Vector.fill(invocations.length)(PersistOnEvent.DefaultHandler) writeRequests = invocations.map { case PersistOnEventInvocation(event, customDestinationAggregateIds) => - durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr)) + durableEvent(event, customDestinationAggregateIds, None, Some(persistOnEventSequenceNr), persistOnEventId) } } } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala index 1038f077..04282258 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/EventsourcedVersion.scala @@ -39,7 +39,7 @@ trait EventsourcedVersion extends EventsourcedView { * Internal API. */ private[eventuate] def durableEvent(payload: Any, customDestinationAggregateIds: Set[String], - deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None): DurableEvent = + deliveryId: Option[String] = None, persistOnEventSequenceNr: Option[Long] = None, persistOnEventId: Option[EventId] = None): DurableEvent = DurableEvent( payload = payload, emitterId = id, @@ -47,7 +47,8 @@ trait EventsourcedVersion extends EventsourcedView { customDestinationAggregateIds = customDestinationAggregateIds, vectorTimestamp = currentVersion, deliveryId = deliveryId, - persistOnEventSequenceNr = persistOnEventSequenceNr) + persistOnEventSequenceNr = persistOnEventSequenceNr, + persistOnEventId = persistOnEventId) /** * Internal API. diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala index 7411dca2..415dea09 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/PersistOnEvent.scala @@ -17,12 +17,11 @@ package com.rbmhtechnology.eventuate import scala.collection.immutable.SortedMap - import com.rbmhtechnology.eventuate.EventsourcedView.Handler import scala.util._ -object PersistOnEvent { +private[eventuate] object PersistOnEvent { /** * Records a `persistOnEvent` invocation. */ @@ -30,13 +29,17 @@ object PersistOnEvent { /** * A request sent by [[PersistOnEvent]] instances to `self` in order to persist events recorded by `invocations`. + * @param persistOnEventSequenceNr the sequence number of the event that caused this request. + * @param persistOnEventId [[EventId]] of the event that caused this request. This is optional for backwards + * compatibility, as old snapshots might contain `PersistOnEventRequest`s + * without this field being defined. */ - case class PersistOnEventRequest(persistOnEventSequenceNr: Long, invocations: Vector[PersistOnEventInvocation], instanceId: Int) + case class PersistOnEventRequest(persistOnEventSequenceNr: Long, persistOnEventId: Option[EventId], invocations: Vector[PersistOnEventInvocation], instanceId: Int) /** * Default `persist` handler to use when processing [[PersistOnEventRequest]]s in [[EventsourcedActor]]. */ - private[eventuate] val DefaultHandler: Handler[Any] = { + val DefaultHandler: Handler[Any] = { case Success(_) => case Failure(e) => throw new PersistOnEventException(e) } @@ -61,7 +64,22 @@ trait PersistOnEvent extends EventsourcedActor { import PersistOnEvent._ private var invocations: Vector[PersistOnEventInvocation] = Vector.empty - private var requests: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty + /** + * [[PersistOnEventRequest]] by sequence number of the event that caused the persist on event request. + * + * This map keeps the requests in the order they were submitted. + */ + private var requestsBySequenceNr: SortedMap[Long, PersistOnEventRequest] = SortedMap.empty + + /** + * [[PersistOnEventRequest]] by [[EventId]] of the event that caused the persist on event request. + * + * This map ensures that requests can be confirmed properly even if the sequence number of the event + * that caused the request changed its local sequence number due to a disaster recovery. + * + * @see https://github.com/RBMHTechnology/eventuate/issues/385 + */ + private var requestsByEventId: Map[EventId, PersistOnEventRequest] = Map.empty /** * Asynchronously persists the given `event`. Applications that want to handle the persisted event should define @@ -77,13 +95,10 @@ trait PersistOnEvent extends EventsourcedActor { */ override private[eventuate] def receiveEvent(event: DurableEvent): Unit = { super.receiveEvent(event) - - event.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr => - if (event.emitterId == id) confirmRequest(persistOnEventSequenceNr) - } + if (event.emitterId == id) findPersistOnEventRequest(event).foreach(confirmRequest) if (invocations.nonEmpty) { - deliverRequest(PersistOnEventRequest(lastSequenceNr, invocations, instanceId)) + deliverRequest(PersistOnEventRequest(lastSequenceNr, Some(lastHandledEvent.id), invocations, instanceId)) invocations = Vector.empty } } @@ -92,7 +107,7 @@ trait PersistOnEvent extends EventsourcedActor { * Internal API. */ override private[eventuate] def snapshotCaptured(snapshot: Snapshot): Snapshot = { - requests.values.foldLeft(super.snapshotCaptured(snapshot)) { + requestsBySequenceNr.values.foldLeft(super.snapshotCaptured(snapshot)) { case (s, pr) => s.addPersistOnEventRequest(pr) } } @@ -103,7 +118,9 @@ trait PersistOnEvent extends EventsourcedActor { override private[eventuate] def snapshotLoaded(snapshot: Snapshot): Unit = { super.snapshotLoaded(snapshot) snapshot.persistOnEventRequests.foreach { pr => - requests = requests + (pr.persistOnEventSequenceNr -> pr.copy(instanceId = instanceId)) + val requestWithUpdatedInstanceId = pr.copy(instanceId = instanceId) + requestsBySequenceNr += (pr.persistOnEventSequenceNr -> requestWithUpdatedInstanceId) + pr.persistOnEventId.foreach(requestsByEventId += _ -> requestWithUpdatedInstanceId) } } @@ -119,18 +136,26 @@ trait PersistOnEvent extends EventsourcedActor { * Internal API. */ private[eventuate] def unconfirmedRequests: Set[Long] = - requests.keySet + requestsBySequenceNr.keySet private def deliverRequest(request: PersistOnEventRequest): Unit = { - requests = requests + (request.persistOnEventSequenceNr -> request) + requestsBySequenceNr += request.persistOnEventSequenceNr -> request + request.persistOnEventId.foreach(requestsByEventId += _ -> request) if (!recovering) self ! request } - private def confirmRequest(persistOnEventSequenceNr: Long): Unit = { - requests = requests - persistOnEventSequenceNr + private def confirmRequest(request: PersistOnEventRequest): Unit = { + request.persistOnEventId.foreach(requestsByEventId -= _) + requestsBySequenceNr -= request.persistOnEventSequenceNr } - private def redeliverUnconfirmedRequests(): Unit = requests.foreach { + private def findPersistOnEventRequest(event: DurableEvent) = + event + .persistOnEventId.flatMap(requestsByEventId.get) + // Fallback for old events that have no persistOnEventId + .orElse(event.persistOnEventSequenceNr.flatMap(requestsBySequenceNr.get)) + + private def redeliverUnconfirmedRequests(): Unit = requestsBySequenceNr.foreach { case (_, request) => self ! request } } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala index abff6ed8..009da82e 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/DelegatingDurableEventSerializer.scala @@ -102,6 +102,9 @@ abstract class DurableEventSerializer( durableEvent.persistOnEventSequenceNr.foreach { persistOnEventSequenceNr => builder.setPersistOnEventSequenceNr(persistOnEventSequenceNr) } + durableEvent.persistOnEventId.foreach { persistOnEventId => + builder.setPersistOnEventId(eventIdFormatBuilder(persistOnEventId)) + } durableEvent.emitterAggregateId.foreach { id => builder.setEmitterAggregateId(id) @@ -114,6 +117,13 @@ abstract class DurableEventSerializer( builder } + def eventIdFormatBuilder(eventId: EventId) = { + val builder = EventIdFormat.newBuilder() + builder.setProcessId(eventId.processId) + builder.setSequenceNr(eventId.sequenceNr) + builder + } + // -------------------------------------------------------------------------------- // fromBinary helpers // -------------------------------------------------------------------------------- @@ -128,6 +138,7 @@ abstract class DurableEventSerializer( val deliveryId = if (durableEventFormat.hasDeliveryId) Some(durableEventFormat.getDeliveryId) else None val persistOnEventSequenceNr = if (durableEventFormat.hasPersistOnEventSequenceNr) Some(durableEventFormat.getPersistOnEventSequenceNr) else None + val persistOnEventId = if (durableEventFormat.hasPersistOnEventId) Some(eventId(durableEventFormat.getPersistOnEventId)) else None DurableEvent( payload = payloadSerializer.payload(durableEventFormat.getPayload), @@ -140,6 +151,10 @@ abstract class DurableEventSerializer( localLogId = durableEventFormat.getLocalLogId, localSequenceNr = durableEventFormat.getLocalSequenceNr, deliveryId = deliveryId, - persistOnEventSequenceNr = persistOnEventSequenceNr) + persistOnEventSequenceNr = persistOnEventSequenceNr, + persistOnEventId = persistOnEventId) } + + def eventId(eventId: EventIdFormat) = + EventId(eventId.getProcessId, eventId.getSequenceNr) } diff --git a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala index 74839060..ec46c915 100644 --- a/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala +++ b/eventuate-core/src/main/scala/com/rbmhtechnology/eventuate/serializer/SnapshotSerializer.scala @@ -98,7 +98,10 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { private def persistOnEventRequestFormatBuilder(persistOnEventRequest: PersistOnEventRequest): PersistOnEventRequestFormat.Builder = { val builder = PersistOnEventRequestFormat.newBuilder - builder.setPersistOnEventSequenceNr(persistOnEventRequest.persistOnEventSequenceNr) + builder.setSequenceNr(persistOnEventRequest.persistOnEventSequenceNr) + persistOnEventRequest.persistOnEventId.foreach { eventId => + builder.setEventId(eventSerializer.eventIdFormatBuilder(eventId)) + } builder.setInstanceId(persistOnEventRequest.instanceId) persistOnEventRequest.invocations.foreach { invocation => @@ -191,8 +194,11 @@ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { invocationsBuilder += persistOnEventInvocation(pif) } + val persistOnEventReference = if (persistOnEventRequestFormat.hasEventId) Some(eventSerializer.eventId(persistOnEventRequestFormat.getEventId)) else None + PersistOnEventRequest( - persistOnEventRequestFormat.getPersistOnEventSequenceNr, + persistOnEventRequestFormat.getSequenceNr, + persistOnEventReference, invocationsBuilder.result(), persistOnEventRequestFormat.getInstanceId) } diff --git a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java index cf459ac4..cbc9a843 100644 --- a/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java +++ b/eventuate-core/src/test/java/com/rbmhtechnology/eventuate/BaseSpec.java @@ -105,7 +105,7 @@ protected DurableEvent createEvent(final Object payload, final long sequenceNr) @SuppressWarnings("unchecked") protected DurableEvent createEvent(final Object payload, final long sequenceNr, final String emitterId, final String logId, final VectorTime timestamp) { return new DurableEvent(payload, emitterId, Option.empty(), Set$.MODULE$.empty(), 0L, - timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty()); + timestamp, logId, logId, sequenceNr, Option.empty(), Option.empty(), Option.empty()); } protected VectorTime timestamp(final long a, final long b) { diff --git a/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala b/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala index 07f80988..c14bc4a7 100644 --- a/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala +++ b/eventuate-core/src/test/scala/com/rbmhtechnology/eventuate/PersistOnEventSpec.scala @@ -82,8 +82,8 @@ object PersistOnEventSpec { } } - def event(payload: Any, sequenceNr: Long, persistOnEventSequenceNr: Option[Long] = None): DurableEvent = - DurableEvent(payload, emitterIdA, None, Set(), 0L, timestamp(sequenceNr), logIdA, logIdA, sequenceNr, None, persistOnEventSequenceNr) + def event(payload: Any, sequenceNr: Long, persistOnEventEvent: Option[DurableEvent] = None): DurableEvent = + DurableEvent(payload, emitterIdA, None, Set(), 0L, timestamp(sequenceNr), logIdA, logIdA, sequenceNr, None, persistOnEventEvent.map(_.localSequenceNr), persistOnEventEvent.map(_.id)) } class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach { @@ -131,16 +131,17 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike "An EventsourcedActor with PersistOnEvent" must { "support persistence in event handler" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - write.events(0).persistOnEventSequenceNr should be(Some(1L)) - write.events(1).persistOnEventSequenceNr should be(Some(1L)) + write.events(0).persistOnEventId should be(Some(eventA.id)) + write.events(1).persistOnEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -150,47 +151,49 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "support cascading persistence" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("b", 1L)) + val eventB = event("b", 1L) + actor ! Written(eventB) deliverProbe.expectMsg(Set(1L)) val write1 = logProbe.expectMsgClass(classOf[Write]) - write1.events(0).persistOnEventSequenceNr should be(Some(1L)) - write1.events(1).persistOnEventSequenceNr should be(Some(1L)) + write1.events(0).persistOnEventId should be(Some(eventB.id)) + write1.events(1).persistOnEventId should be(Some(eventB.id)) - logProbe.sender() ! WriteSuccess(Seq( - event(write1.events(0).payload, 2L, Some(1L)), - event(write1.events(1).payload, 3L, Some(1L))), write1.correlationId, instanceId) + val eventC = event(write1.events(0).payload, 2L, Some(eventB)) + val eventC2 = event(write1.events(1).payload, 3L, Some(eventB)) + logProbe.sender() ! WriteSuccess(Seq(eventC, eventC2), write1.correlationId, instanceId) deliverProbe.expectMsg(Set(2L)) deliverProbe.expectMsg(Set(2L)) persistProbe.expectMsg("c-1") val write2 = logProbe.expectMsgClass(classOf[Write]) - write2.events(0).persistOnEventSequenceNr should be(Some(2L)) - logProbe.sender() ! WriteSuccess(Seq(event(write2.events(0).payload, 4L, Some(2L))), write2.correlationId, instanceId) + write2.events(0).persistOnEventId should be(Some(eventC.id)) + logProbe.sender() ! WriteSuccess(Seq(event(write2.events(0).payload, 4L, Some(eventC))), write2.correlationId, instanceId) deliverProbe.expectMsg(Set()) persistProbe.expectMsg("c-2") } "confirm persistence with self-emitted events only" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - actor ! Written(event("x-1", 2L, Some(1L)).copy(emitterId = emitterIdB)) - actor ! Written(event("x-2", 3L, Some(1L)).copy(emitterId = emitterIdB)) + actor ! Written(event("x-1", 2L, Some(eventA)).copy(emitterId = emitterIdB)) + actor ! Written(event("x-2", 3L, Some(eventA)).copy(emitterId = emitterIdB)) deliverProbe.expectMsg(Set(1L)) deliverProbe.expectMsg(Set(1L)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 4L, Some(1L)), - event(write.events(1).payload, 5L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 4L, Some(eventA)), + event(write.events(1).payload, 5L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -200,31 +203,32 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "re-attempt persistence on failed write after restart" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write1 = logProbe.expectMsgClass(classOf[Write]) - write1.events(0).persistOnEventSequenceNr should be(Some(1L)) - write1.events(1).persistOnEventSequenceNr should be(Some(1L)) + write1.events(0).persistOnEventId should be(Some(eventA.id)) + write1.events(1).persistOnEventId should be(Some(eventA.id)) // application crash and restart logProbe.sender() ! WriteFailure(Seq( - event(write1.events(0).payload, 0L, Some(1L)), - event(write1.events(1).payload, 0L, Some(1L))), TestException, write1.correlationId, instanceId) + event(write1.events(0).payload, 0L, Some(eventA)), + event(write1.events(1).payload, 0L, Some(eventA))), TestException, write1.correlationId, instanceId) processRecover(actor, instanceId + 1, Seq(event("a", 1L))) deliverProbe.expectMsg(Set(1L)) val write2 = logProbe.expectMsgClass(classOf[Write]) - write2.events(0).persistOnEventSequenceNr should be(Some(1L)) - write2.events(1).persistOnEventSequenceNr should be(Some(1L)) + write2.events(0).persistOnEventId should be(Some(eventA.id)) + write2.events(1).persistOnEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write2.events(0).payload, 2L, Some(1L)), - event(write2.events(1).payload, 3L, Some(1L))), write2.correlationId, instanceId + 1) + event(write2.events(0).payload, 2L, Some(eventA)), + event(write2.events(1).payload, 3L, Some(eventA))), write2.correlationId, instanceId + 1) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -234,18 +238,19 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike } "not re-attempt persistence on successful write after restart" in { val actor = recoveredTestActor(stateSync = true) - actor ! Written(event("a", 1L)) + val eventA = event("a", 1L) + actor ! Written(eventA) deliverProbe.expectMsg(Set(1L)) val write = logProbe.expectMsgClass(classOf[Write]) - write.events(0).persistOnEventSequenceNr should be(Some(1L)) - write.events(1).persistOnEventSequenceNr should be(Some(1L)) + write.events(0).persistOnEventId should be(Some(eventA.id)) + write.events(1).persistOnEventId should be(Some(eventA.id)) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId) deliverProbe.expectMsg(Set()) deliverProbe.expectMsg(Set()) @@ -256,8 +261,41 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike actor ! "boom" processRecover(actor, instanceId + 1, Seq( event("a", 1L), - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L)))) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA)))) + + deliverProbe.expectMsg(Set(1L)) + deliverProbe.expectMsg(Set()) + deliverProbe.expectMsg(Set()) + + persistProbe.expectNoMsg(timeout) + persistProbe.expectNoMsg(timeout) + } + "not re-attempt persistence on successful write of events without persistOnEventReference after restart" in { + val actor = recoveredTestActor(stateSync = true) + val eventA = event("a", 1L) + actor ! Written(eventA) + + deliverProbe.expectMsg(Set(1L)) + + val write = logProbe.expectMsgClass(classOf[Write]) + + write.events(0).persistOnEventId should be(Some(eventA.id)) + write.events(1).persistOnEventId should be(Some(eventA.id)) + + val persistedOnA = Seq( + event(write.events(0).payload, 2L, Some(eventA)).copy(persistOnEventId = None), + event(write.events(1).payload, 3L, Some(eventA))) + logProbe.sender() ! WriteSuccess(persistedOnA, write.correlationId, instanceId) + + deliverProbe.expectMsg(Set()) + deliverProbe.expectMsg(Set()) + + persistProbe.expectMsg("a-1") + persistProbe.expectMsg("a-2") + + actor ! "boom" + processRecover(actor, instanceId + 1, eventA +: persistedOnA) deliverProbe.expectMsg(Set(1L)) deliverProbe.expectMsg(Set()) @@ -270,12 +308,13 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike val actor = recoveredTestActor(stateSync = true) actor ! Written(event("boom", 1L)) - processRecover(actor, instanceId + 1, Seq(event("a", 1L))) + val eventA = event("a", 1L) + processRecover(actor, instanceId + 1, Seq(eventA)) val write = logProbe.expectMsgClass(classOf[Write]) logProbe.sender() ! WriteSuccess(Seq( - event(write.events(0).payload, 2L, Some(1L)), - event(write.events(1).payload, 3L, Some(1L))), write.correlationId, instanceId + 1) + event(write.events(0).payload, 2L, Some(eventA)), + event(write.events(1).payload, 3L, Some(eventA))), write.correlationId, instanceId + 1) logProbe.expectNoMsg(timeout) } "save a snapshot with persistOnEvent requests" in { @@ -287,7 +326,7 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike actor ! "snap" val save = logProbe.expectMsgClass(classOf[SaveSnapshot]) - val expected = PersistOnEventRequest(1L, Vector( + val expected = PersistOnEventRequest(1L, Some(EventId("logA", 1L)), Vector( PersistOnEventInvocation("x-1", Set("14")), PersistOnEventInvocation("x-2", Set("15"))), instanceId) @@ -317,12 +356,13 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike "recover from a snapshot with persistOnEvent requests whose execution succeeded" in { val actor = recoveredTestActor(stateSync = false) - actor ! Written(event("x", 1L)) + val eventX = event("x", 1L) + actor ! Written(eventX) val write1 = logProbe.expectMsgClass(classOf[Write]) val written = List( - event(write1.events(0).payload, 2L, Some(1L)), - event(write1.events(1).payload, 3L, Some(1L))) + event(write1.events(0).payload, 2L, Some(eventX)), + event(write1.events(1).payload, 3L, Some(eventX))) actor ! "snap" @@ -338,12 +378,38 @@ class PersistOnEventSpec extends TestKit(ActorSystem("test")) with WordSpecLike logProbe.sender() ! ReplaySuccess(Nil, 3L, instanceId + 1) logProbe.expectNoMsg(timeout) } + "recover from a snapshot with persistOnEvent requests without persistOnEventReferences whose execution succeeded" in { + val actor = recoveredTestActor(stateSync = false) + + val eventX = event("x", 1L) + actor ! Written(eventX) + + val write1 = logProbe.expectMsgClass(classOf[Write]) + val written = List( + event(write1.events(0).payload, 2L, Some(eventX)), + event(write1.events(1).payload, 3L, Some(eventX))) + + actor ! "snap" + + val save = logProbe.expectMsgClass(classOf[SaveSnapshot]) + val snapshotWithoutReferences = save.snapshot.copy(persistOnEventRequests = save.snapshot.persistOnEventRequests.map(_.copy(persistOnEventId = None))) + + actor ! "boom" + + logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId + 1)) + logProbe.sender() ! LoadSnapshotSuccess(Some(snapshotWithoutReferences), instanceId + 1) + logProbe.expectMsg(Replay(2L, Some(actor), instanceId + 1)) + logProbe.sender() ! ReplaySuccess(written, 3L, instanceId + 1) + logProbe.expectMsg(Replay(4L, None, instanceId + 1)) + logProbe.sender() ! ReplaySuccess(Nil, 3L, instanceId + 1) + logProbe.expectNoMsg(timeout) + } "be tolerant to changing actor paths across incarnations" in { val actor = unrecoveredTestActor(stateSync = false) val path = ActorPath.fromString("akka://test/user/invalid") val requests = Vector( - PersistOnEventRequest(3L, Vector(PersistOnEventInvocation("y", Set())), instanceId), - PersistOnEventRequest(4L, Vector(PersistOnEventInvocation("z", Set())), instanceId)) + PersistOnEventRequest(3L, Some(EventId("p-2", 2L)), Vector(PersistOnEventInvocation("y", Set())), instanceId), + PersistOnEventRequest(4L, None, Vector(PersistOnEventInvocation("z", Set())), instanceId)) val snapshot = Snapshot("foo", emitterIdA, event("x", 2), timestamp(2), 2, persistOnEventRequests = requests) logProbe.expectMsg(LoadSnapshot(emitterIdA, instanceId)) diff --git a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala new file mode 100644 index 00000000..e1bdbd4f --- /dev/null +++ b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/PersistOnEventWithRecoverySpecLeveldb.scala @@ -0,0 +1,96 @@ +/* + * Copyright 2015 - 2016 Red Bull Media House GmbH - all rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.rbmhtechnology.eventuate + +import java.util.UUID + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.testkit.TestProbe +import com.rbmhtechnology.eventuate.ReplicationIntegrationSpec.replicationConnection +import com.rbmhtechnology.eventuate.utilities._ +import org.apache.commons.io.FileUtils +import org.scalatest.Matchers +import org.scalatest.WordSpec + +import scala.concurrent.duration.DurationInt + +object PersistOnEventWithRecoverySpecLeveldb { + class OnBEmitRandomActor(val eventLog: ActorRef, probe: TestProbe) extends EventsourcedActor with PersistOnEvent { + + override def id = getClass.getName + + override def onCommand = Actor.emptyBehavior + + override def onEvent = { + case "A" => + case "B" => persistOnEvent(UUID.randomUUID().toString) + case uuid: String => probe.ref ! uuid + } + } + + def persistOnEventProbe(locationA1: Location, log: ActorRef) = { + val probe = locationA1.probe + locationA1.system.actorOf(Props(new OnBEmitRandomActor(log, probe))) + probe + } + + val noMsgTimeout = 100.millis +} + +class PersistOnEventWithRecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecLeveldb { + import RecoverySpecLeveldb._ + import PersistOnEventWithRecoverySpecLeveldb._ + + override val logFactory: String => Props = + id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) + + "An EventsourcedActor with PersistOnEvent" must { + "not re-attempt persistence on successful write after reordering of events through disaster recovery" in { + val locationB = location("B", customConfig = RecoverySpecLeveldb.config) + def newLocationA = location("A", customConfig = RecoverySpecLeveldb.config) + val locationA1 = newLocationA + + val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port))) + def newEndpointA(l: Location, activate: Boolean) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = activate) + val endpointA1 = newEndpointA(locationA1, activate = true) + + val targetA = endpointA1.target("L1") + val logDirA = logDirectory(targetA) + val targetB = endpointB.target("L1") + val a1Probe = persistOnEventProbe(locationA1, targetA.log) + + write(targetA, List("A")) + write(targetB, List("B")) + val event = a1Probe.expectMsgClass(classOf[String]) + assertConvergence(Set("A", "B", event), endpointA1, endpointB) + + locationA1.terminate().await + FileUtils.deleteDirectory(logDirA) + + val locationA2 = newLocationA + val endpointA2 = newEndpointA(locationA2, activate = false) + endpointA2.recover().await + + val a2Probe = persistOnEventProbe(locationA2, endpointA2.logs("L1")) + a2Probe.expectMsg(event) + a2Probe.expectNoMsg(noMsgTimeout) + assertConvergence(Set("A", "B", event), endpointA2, endpointB) + } + } +} diff --git a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala index 13c0453b..77f94804 100644 --- a/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala +++ b/eventuate-log-leveldb/src/it/scala/com/rbmhtechnology/eventuate/RecoverySpecLeveldb.scala @@ -49,6 +49,15 @@ object RecoverySpecLeveldb { } } + def assertConvergence(expected: Set[String], endpoints: ReplicationEndpoint*): Unit = { + val probes = endpoints.map { endpoint => + val probe = new TestProbe(endpoint.system) + endpoint.system.actorOf(Props(new ConvergenceView(s"p-${endpoint.id}", endpoint.logs("L1"), expected.size, probe.ref))) + probe + } + probes.foreach(_.expectMsg(expected)) + } + val config = ConfigFactory.parseString( """ |eventuate.log.replication.retry-delay = 1s @@ -96,15 +105,6 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecL override val logFactory: String => Props = id => SingleLocationSpecLeveldb.TestEventLog.props(id, batching = true) - def assertConvergence(expected: Set[String], endpoints: ReplicationEndpoint*): Unit = { - val probes = endpoints.map { endpoint => - val probe = new TestProbe(endpoint.system) - endpoint.system.actorOf(Props(new ConvergenceView(s"p-${endpoint.id}", endpoint.logs("L1"), expected.size, probe.ref))) - probe - } - probes.foreach(_.expectMsg(expected)) - } - "Replication endpoint recovery" must { "disallow activation of endpoint during and after recovery" in { val locationA = location("A", customConfig = RecoverySpecLeveldb.config) @@ -350,7 +350,8 @@ class RecoverySpecLeveldb extends WordSpec with Matchers with MultiLocationSpecL val locationA1 = newLocationA val endpointB = locationB.endpoint(Set("L1"), Set(replicationConnection(locationA1.port)), applicationVersion = oldVersion) - def newEndpointA(l: Location, version: ApplicationVersion, activate: Boolean) = l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), activate = activate) + def newEndpointA(l: Location, version: ApplicationVersion, activate: Boolean) = + l.endpoint(Set("L1"), Set(replicationConnection(locationB.port)), applicationVersion = version, activate = activate) val endpointA1 = newEndpointA(locationA1, oldVersion, activate = true) val targetA = endpointA1.target("L1")