Skip to content

Commit

Permalink
Esdb+Sss Sources: Add fromTail, categoryFilter (#173)
Browse files Browse the repository at this point in the history
* Add Esdb/Sss categoryFilter/fromTail
* Standardise categoryFilter/startFromTail
* CosmosStoreSource: Expose tailSleepInterval
* FsCodec 3rc7.1 updates
* Add ReaderCheckpoint.MemoryStore
  • Loading branch information
bartelink authored Sep 8, 2022
1 parent 363784d commit 40565c6
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139)
- `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/140)
- `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165)
- `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173)
- `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141)
- `Propulsion.Tool`: Add support for [autoscaling throughput](https://docs.microsoft.com/en-us/azure/cosmos-db/provision-throughput-autoscale) of Cosmos containers and databases [#142](https://github.com/jet/propulsion/pull/142) :pray: [@brihadish](https://github.com/brihadish)

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Cosmos/Propulsion.Cosmos.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.Cosmos" Version="[2.6.0, 2.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
<PackageReference Include="Microsoft.Azure.DocumentDB.ChangeFeedProcessor" Version="2.4.0" />
</ItemGroup>

Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.CosmosStore/CosmosStoreParser.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module EquinoxSystemTextJsonParser =
unixEpoch.AddSeconds(ts.GetDouble())

/// Sanity check to determine whether the Document represents an `Equinox.Cosmos` >= 1.0 based batch
let tryParseEquinoxBatch streamFilter (d : System.Text.Json.JsonDocument) =
let tryParseEquinoxBatch categoryFilter (d : System.Text.Json.JsonDocument) =
let r = d.RootElement
let tryProp (id : string) : ValueOption<System.Text.Json.JsonElement> =
let mutable p = Unchecked.defaultof<_>
Expand All @@ -39,16 +39,16 @@ module EquinoxSystemTextJsonParser =
match tryProp "p" with
| ValueSome je when je.ValueKind = System.Text.Json.JsonValueKind.String && hasProp "i" && hasProp "n" && hasProp "e" ->
let streamName = je.GetString() |> FsCodec.StreamName.parse // we expect all Equinox data to adhere to "{category}-{aggregateId}" form (or we'll throw)
if streamFilter (FsCodec.StreamName.splitCategoryAndStreamId streamName) then ValueSome (struct (streamName, d.Cast<Batch>())) else ValueNone
if categoryFilter (FsCodec.StreamName.category streamName) then ValueSome (struct (streamName, d.Cast<Batch>())) else ValueNone
| _ -> ValueNone

/// Enumerates the events represented within a batch
let enumEquinoxCosmosEvents struct (streamName, batch : Batch) : Default.StreamEvent seq =
batch.e |> Seq.mapi (fun offset x -> streamName, FsCodec.Core.TimelineEvent.Create(batch.i + int64 offset, x.c, batch.MapData x.d, batch.MapData x.m, timestamp = x.t))

/// Collects all events with a Document [typically obtained via the CosmosDb ChangeFeed] that potentially represents an Equinox.Cosmos event-batch
let enumStreamEvents streamFilter d : Default.StreamEvent seq =
tryParseEquinoxBatch streamFilter d |> ValueOption.map enumEquinoxCosmosEvents |> ValueOption.defaultValue Seq.empty
let enumStreamEvents categoryFilter d : Default.StreamEvent seq =
tryParseEquinoxBatch categoryFilter d |> ValueOption.map enumEquinoxCosmosEvents |> ValueOption.defaultValue Seq.empty
#else
#if COSMOSV2
module EquinoxCosmosParser =
Expand Down
5 changes: 3 additions & 2 deletions src/Propulsion.CosmosStore/CosmosStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ type CosmosStoreSource =
static member Start
( log : ILogger,
monitored : Container, leases : Container, processorName, observer,
startFromTail, ?maxItems, ?lagReportFreq : TimeSpan, ?notifyError, ?customize) =
?maxItems, ?tailSleepInterval, ?startFromTail, ?lagReportFreq : TimeSpan, ?notifyError, ?customize) =
let databaseId, containerId = monitored.Database.Id, monitored.Id
#endif
let logLag (interval : TimeSpan) (remainingWork : (int*int64) list) = async {
Expand All @@ -158,7 +158,8 @@ type CosmosStoreSource =
let source =
ChangeFeedProcessor.Start
( log, monitored, leases, processorName, observer, ?notifyError=notifyError, ?customize=customize,
startFromTail=startFromTail, ?reportLagAndAwaitNextEstimation=maybeLogLag, ?maxItems=maxItems,
?maxItems = maxItems, ?feedPollDelay = tailSleepInterval, ?reportLagAndAwaitNextEstimation=maybeLogLag,
startFromTail = defaultArg startFromTail false,
leaseAcquireInterval=TimeSpan.FromSeconds 5., leaseRenewInterval=TimeSpan.FromSeconds 5., leaseTtl=TimeSpan.FromSeconds 10.)
lagReportFreq |> Option.iter (fun s -> log.Information("ChangeFeed {processorName} Lag stats interval {lagReportIntervalS:n0}s", processorName, s.TotalSeconds))
source
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
19 changes: 17 additions & 2 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@ module Events =
| Updated of Updated
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
#if MEMORYSTORE
let codec = FsCodec.Box.Codec.Create<Event>()
#else
#if DYNAMOSTORE
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Deflate.EncodeTryDeflate
let codec = FsCodec.SystemTextJson.Codec.Create<Event>() |> FsCodec.Deflate.EncodeUncompressed
#else
#if !COSMOSV3 && !COSMOSV2
let codec = FsCodec.SystemTextJson.CodecJsonElement.Create<Event>()
#else
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
#endif
#endif
#endif

module Fold =

Expand Down Expand Up @@ -133,6 +137,16 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events
let decider = resolve (source, tranche, consumerGroupName)
decider.Transact(decideOverride DateTimeOffset.UtcNow defaultCheckpointFrequency pos)

#if MEMORYSTORE
module MemoryStore =

open Equinox.MemoryStore

let create log (consumerGroupName, defaultCheckpointFrequency) context =
let cat = MemoryStoreCategory(context, Events.codec, Fold.fold, Fold.initial)
let resolve = Equinox.Decider.resolve log cat
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if DYNAMOSTORE
module DynamoStore =

Expand All @@ -143,7 +157,7 @@ module DynamoStore =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let cat = DynamoStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve = Equinox.Decider.resolve log cat
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
Service(streamName4 >> resolve, consumerGroupName, defaultCheckpointFrequency)
#else
#if !COSMOSV2 && !COSMOSV3
module CosmosStore =
Expand Down Expand Up @@ -188,3 +202,4 @@ module CosmosStore =
#endif
#endif
#endif
#endif
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore3/Propulsion.CosmosStore3.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.CosmosStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.27.0" />
</ItemGroup>

Expand Down
4 changes: 2 additions & 2 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module Events =
| Ingested of Ingested
| Closed
interface TypeShape.UnionContract.IUnionContract
let codec = EventCodec.create<Event>()
let codec = EventCodec.gen<Event>

let next (x : Events.StreamSpan) = int x.i + x.c.Length
/// Aggregates all spans per stream into a single Span from the lowest index to the highest
Expand Down Expand Up @@ -109,7 +109,7 @@ type Service internal (shouldClose, resolve : struct (AppendsTrancheId * Appends
let decider = resolve (trancheId, epochId)
if Array.isEmpty spans then async { return { accepted = [||]; closed = false; residual = [||] } } else // special-case null round-trips

let isSelf p = IndexStreamId.toStreamName p |> FsCodec.StreamName.splitCategoryAndStreamId |> ValueTuple.fst = Category
let isSelf p = match IndexStreamId.toStreamName p with FsCodec.StreamName.Category c -> c = Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
decider.TransactEx((fun c -> (Ingest.decide (shouldClose (c.StreamEventBytes, c.Version))) spans c.State), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)

Expand Down
11 changes: 5 additions & 6 deletions src/Propulsion.DynamoStore/AppendsIndex.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Events =
| Started of {| tranche : AppendsTrancheId; epoch : AppendsEpochId |}
| Snapshotted of {| active : Map<AppendsTrancheId, AppendsEpochId> |}
interface TypeShape.UnionContract.IUnionContract
let codec = EventCodec.create<Event>()
let codec = EventCodec.gen<Event>

module Fold =

Expand Down Expand Up @@ -52,10 +52,9 @@ type Service internal (resolve : unit -> Equinox.Decider<Events.Event, Fold.Stat

module Config =

let private createCategory store =
Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolveDecider log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamName >> resolveDecider log (context, Some cache))
let private createCategory store = Config.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) store
let resolve log store = createCategory store |> Equinox.Decider.resolve log
let create log (context, cache) = Service(streamName >> resolve log (context, Some cache))

/// On the Reading Side, there's no advantage to caching (as we have snapshots, and it's Dynamo)
module Reader =
Expand All @@ -80,4 +79,4 @@ module Reader =
let decider = resolve ()
decider.Query(readIngestionEpochId trancheId)

let create log context = Service(streamName >> Config.resolveDecider log (context, None))
let create log context = Service(streamName >> Config.resolve log (context, None))
30 changes: 15 additions & 15 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -141,39 +141,39 @@ module private Impl =

[<NoComparison; NoEquality>]
type LoadMode =
| All
| Filtered of filter : (struct (string * string) -> bool)
| Hydrated of filter : (struct (string * string) -> bool)
/// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the index
| WithoutEventBodies of categoryFilter : (string -> bool)
/// Populates the Data/Meta fields for events; necessitates loads of all individual streams that pass the categoryFilter before they can be handled
| Hydrated of categoryFilter : (string -> bool)
* degreeOfParallelism : int
* /// Defines the Context to use when loading the bodies
* /// Defines the Context to use when loading the Event Data/Meta
storeContext : DynamoStoreContext
module internal LoadMode =
let private mapTimelineEvent = FsCodec.Core.TimelineEvent.Map FsCodec.Deflate.EncodedToUtf8
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) filter =
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter =
fun sn (i, cs : string array) ->
if filter (FsCodec.StreamName.splitCategoryAndStreamId sn) then
if categoryFilter (FsCodec.StreamName.category sn) then
ValueSome (async { let! _pos, events = eventsContext.Read(FsCodec.StreamName.toString sn, i, maxCount = cs.Length)
return events |> Array.map mapTimelineEvent })
else ValueNone
let private withoutBodies filter =
let private withoutBodies categoryFilter =
fun sn (i, cs) ->
let renderEvent offset c = FsCodec.Core.TimelineEvent.Create(i + int64 offset, eventType = c, data = Unchecked.defaultof<_>)
if filter (FsCodec.StreamName.splitCategoryAndStreamId sn) then ValueSome (async { return cs |> Array.mapi renderEvent }) else ValueNone
if categoryFilter (FsCodec.StreamName.category sn) then ValueSome (async { return cs |> Array.mapi renderEvent }) else ValueNone
let map storeLog : LoadMode -> _ = function
| All -> false, withoutBodies (fun _ -> true), 1
| Filtered filter -> false, withoutBodies filter, 1
| Hydrated (filter, dop, storeContext) ->
| WithoutEventBodies categoryFilter -> false, withoutBodies categoryFilter, 1
| Hydrated (categoryFilter, dop, storeContext) ->
let eventsContext = Equinox.DynamoStore.Core.EventsContext(storeContext, storeLog)
true, withBodies eventsContext filter, dop
true, withBodies eventsContext categoryFilter, dop

type DynamoStoreSource
( log : Serilog.ILogger, statsInterval,
indexClient : DynamoStoreClient, batchSizeCutoff, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
// If the Handler does not utilize the bodies of the events, we can avoid loading them from the Store
// If the Handler does not utilize the Data/Meta of the events, we can avoid loading them from the Store
loadMode : LoadMode,
// Override default start position to be at the tail of the index (Default: Always replay all events)
?fromTail,
?startFromTail,
// Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics
?storeLog,
?readFailureSleepInterval,
Expand All @@ -185,7 +185,7 @@ type DynamoStoreSource
(log, defaultArg sourceId FeedSourceId.wellKnownId, defaultArg storeLog log)
(LoadMode.map (defaultArg storeLog log) loadMode) batchSizeCutoff (DynamoStoreContext indexClient),
checkpoints,
( if fromTail <> Some true then None
( if startFromTail <> Some true then None
else Some (Impl.readTailPositionForTranche (defaultArg storeLog log) (DynamoStoreContext indexClient))),
sink,
Impl.renderPos,
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.12" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 4 additions & 2 deletions src/Propulsion.DynamoStore/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ module internal Config =

open Equinox.DynamoStore

let private defaultCacheDuration = System.TimeSpan.FromMinutes 20.

let private create codec initial fold accessStrategy (context, cache) =
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cs = match cache with None -> CachingStrategy.NoCaching | Some cache -> CachingStrategy.SlidingWindow (cache, defaultCacheDuration)
DynamoStoreCategory(context, codec, fold, initial, cs, accessStrategy)

let createSnapshotted codec initial fold (isOrigin, toSnapshot) (context, cache) =
Expand All @@ -74,7 +76,7 @@ module internal Config =

module internal EventCodec =

let create<'t when 't :> TypeShape.UnionContract.IUnionContract> () =
let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.Codec.Create<'t>() |> FsCodec.Deflate.EncodeTryDeflate
let private withUpconverter<'c, 'e when 'c :> TypeShape.UnionContract.IUnionContract> up : FsCodec.IEventCodec<'e, _, _> =
let down (_ : 'e) = failwith "Unexpected"
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.6" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
</ItemGroup>

<ItemGroup>
Expand Down
Loading

0 comments on commit 40565c6

Please sign in to comment.