diff --git a/src/Propulsion.DynamoStore/DynamoStoreSource.fs b/src/Propulsion.DynamoStore/DynamoStoreSource.fs index db54a875..03a68059 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreSource.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreSource.fs @@ -113,18 +113,20 @@ module private Impl = report None hydrated.Length yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) } +/// Defines the strategy to use for hydrating the events prior to routing them to the Handler [] -type LoadMode = - /// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the index - | WithoutEventBodies of categoryFilter : (string -> bool) - /// Populates the Data/Meta fields for events; necessitates loads of all individual streams that pass the categoryFilter before they can be handled - | Hydrated of categoryFilter : (string -> bool) - * degreeOfParallelism : int +type EventLoadMode = + /// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the Index Table + | IndexOnly + /// Populates the Data/Meta fields for events matching the categories and/or categoryFilter + /// Requires a roundtrip per stream to the Store Table (constrained by streamParallelismLimit) + | WithData of /// Maximum concurrency for stream reads from the Store Table + streamParallelismLimit : int * /// Defines the Context to use when loading the Event Data/Meta - storeContext : DynamoStoreContext -module internal LoadMode = + storeContext : DynamoStoreContext +module internal EventLoadMode = let private mapTimelineEvent = FsCodec.Core.TimelineEvent.Map FsCodec.Deflate.EncodedToUtf8 - let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter = + let private withData (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter = fun sn (i, cs : string array) -> if categoryFilter (FsCodec.StreamName.category sn) then ValueSome (fun ct -> task { @@ -135,18 +137,28 @@ module internal LoadMode = fun sn (i, cs) -> let renderEvent offset c = FsCodec.Core.TimelineEvent.Create(i + int64 offset, eventType = c, data = Unchecked.defaultof<_>) if categoryFilter (FsCodec.StreamName.category sn) then ValueSome (fun _ct -> task { return cs |> Array.mapi renderEvent }) else ValueNone - let map storeLog : LoadMode -> _ = function - | WithoutEventBodies categoryFilter -> false, withoutBodies categoryFilter, 1 - | Hydrated (categoryFilter, dop, storeContext) -> + let mapFilters categories categoryFilter = + match categories, categoryFilter with + | None, None -> fun _ -> true + | Some categories, None -> fun x -> Array.contains x categories + | None, Some filter -> filter + | Some categories, Some filter -> fun x -> Array.contains x categories && filter x + let map categoryFilter storeLog : EventLoadMode -> _ = function + | IndexOnly -> false, withoutBodies categoryFilter, 1 + | WithData (dop, storeContext) -> let eventsContext = Equinox.DynamoStore.Core.EventsContext(storeContext, storeLog) - true, withBodies eventsContext categoryFilter, dop + true, withData eventsContext categoryFilter, dop type DynamoStoreSource ( log : Serilog.ILogger, statsInterval, indexClient : DynamoStoreClient, batchSizeCutoff, tailSleepInterval, checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink, - // If the Handler does not utilize the Data/Meta of the events, we can avoid loading them from the Store - loadMode : LoadMode, + // If the Handler does not utilize the Data/Meta of the events, we can avoid having to read from the Store Table + mode : EventLoadMode, + // The whitelist of Categories to use + ?categories, + // Predicate to filter Categories to use + ?categoryFilter : string -> bool, // Override default start position to be at the tail of the index. Default: Replay all events. ?startFromTail, // Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics @@ -162,7 +174,8 @@ type DynamoStoreSource sink, Impl.materializeIndexEpochAsBatchesOfStreamEvents (log, defaultArg sourceId FeedSourceId.wellKnownId, defaultArg storeLog log) - (LoadMode.map (defaultArg storeLog log) loadMode) batchSizeCutoff (DynamoStoreContext indexClient), + (EventLoadMode.map (EventLoadMode.mapFilters categories categoryFilter) (defaultArg storeLog log) mode) + batchSizeCutoff (DynamoStoreContext indexClient), Impl.renderPos, Impl.logReadFailure (defaultArg storeLog log), defaultArg readFailureSleepInterval (tailSleepInterval * 2.), @@ -205,4 +218,3 @@ type DynamoStoreSource // force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals) return! x.Checkpoint() finally statsInterval.SleepUntilTriggerCleared() } - diff --git a/src/Propulsion.EventStoreDb/EventStoreSource.fs b/src/Propulsion.EventStoreDb/EventStoreSource.fs index f787ac6c..d8146190 100644 --- a/src/Propulsion.EventStoreDb/EventStoreSource.fs +++ b/src/Propulsion.EventStoreDb/EventStoreSource.fs @@ -13,9 +13,9 @@ module private Impl = let private checkpointPos (xs : EventRecord array) = match Array.tryLast xs with Some e -> int64 e.Position.CommitPosition | None -> -1L |> Propulsion.Feed.Position.parse - let readBatch hydrateBodies batchSize categoryFilter (store : EventStoreClient) (pos, ct) = task { + let readBatch withData batchSize categoryFilter (store : EventStoreClient) (pos, ct) = task { let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p) - let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, hydrateBodies, cancellationToken = ct) + let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct) let! batch = res |> TaskSeq.map (fun e -> e.Event) |> TaskSeq.toArrayAsync return ({ checkpoint = checkpointPos batch; items = toItems categoryFilter batch; isTail = batch.LongLength <> batchSize } : Propulsion.Feed.Core.Batch<_>) } @@ -36,11 +36,11 @@ type EventStoreSource checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink, categoryFilter : string -> bool, // If the Handler does not utilize the Data/Meta of the events, we can avoid shipping them from the Store in the first instance. Default false. - ?hydrateBodies, + ?withData, // Override default start position to be at the tail of the index. Default: Replay all events. ?startFromTail, ?sourceId) = inherit Propulsion.Feed.Core.AllFeedSource ( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, - Impl.readBatch (hydrateBodies = Some true) batchSize categoryFilter client, checkpoints, sink, + Impl.readBatch (withData = Some true) batchSize categoryFilter client, checkpoints, sink, ?establishOrigin = if startFromTail <> Some true then None else Some (Impl.readTailPositionForTranche log client)) diff --git a/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs b/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs index 287d407e..fbf3e1c2 100644 --- a/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs +++ b/src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs @@ -11,13 +11,13 @@ module private Impl = let private readWithDataAsStreamEvent (struct (_sn, msg : SqlStreamStore.Streams.StreamMessage) as m) ct = task { let! json = msg.GetJsonData(ct) return toStreamEvent json m } - let readBatch hydrateBodies batchSize categoryFilter (store : SqlStreamStore.IStreamStore) (pos, ct) = task { - let! page = store.ReadAllForwards(Propulsion.Feed.Position.toInt64 pos, batchSize, hydrateBodies, ct) + let readBatch withData batchSize categoryFilter (store : SqlStreamStore.IStreamStore) (pos, ct) = task { + let! page = store.ReadAllForwards(Propulsion.Feed.Position.toInt64 pos, batchSize, withData, ct) let filtered = page.Messages |> Seq.choose (fun (msg : SqlStreamStore.Streams.StreamMessage) -> let sn = Propulsion.Streams.StreamName.internalParseSafe msg.StreamId if categoryFilter (FsCodec.StreamName.category sn) then Some struct (sn, msg) else None) - let! items = if not hydrateBodies then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq } + let! items = if not withData then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq } else filtered |> Seq.map readWithDataAsStreamEvent |> Propulsion.Internal.Task.sequential ct return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd } : Propulsion.Feed.Core.Batch<_>) } @@ -31,10 +31,10 @@ type SqlStreamStoreSource checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink, categoryFilter : string -> bool, // If the Handler does not require the Data/Meta of the events, the query to load the events can be much more efficient. Default: false - ?hydrateBodies, + ?withData, ?startFromTail, ?sourceId) = inherit Propulsion.Feed.Core.AllFeedSource ( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, - Impl.readBatch (hydrateBodies = Some true) batchSize categoryFilter store, checkpoints, sink, + Impl.readBatch (withData = Some true) batchSize categoryFilter store, checkpoints, sink, ?establishOrigin = if startFromTail <> Some true then None else Some (Impl.readTailPositionForTranche store)) diff --git a/tools/Propulsion.Tool/Args.fs b/tools/Propulsion.Tool/Args.fs index 522c1d5b..c1ef2f5c 100644 --- a/tools/Propulsion.Tool/Args.fs +++ b/tools/Propulsion.Tool/Args.fs @@ -243,15 +243,17 @@ module Dynamo = | xs -> Log.Information("DynamoStoreSource Partition Filter {partitionIds}", xs) (c, Some (Array.ofList xs)) - match streamsDop with - | None -> - Log.Information("DynamoStoreSource NOT Hydrating events"); indexProps, None - | Some streamsDop -> - Log.Information("DynamoStoreSource Hydrater parallelism {streamsDop}", streamsDop) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) - let context = readClient.ConnectStore("Store", table) |> DynamoStoreContext.create - indexProps, Some (context, streamsDop) - + let loadMode = + match streamsDop with + | None -> + Log.Information("DynamoStoreSource IndexOnly mode") + Propulsion.DynamoStore.EventLoadMode.IndexOnly + | Some streamsDop -> + Log.Information("DynamoStoreSource WithData, parallelism limit {streamsDop}", streamsDop) + let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let context = readClient.ConnectStore("Store", table) |> DynamoStoreContext.create + Propulsion.DynamoStore.EventLoadMode.WithData (streamsDop, context) + indexProps, loadMode member _.CreateContext(minItemSizeK) = let client = indexWriteClient.Value let queryMaxItems = 100 diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index ebd60b6e..2d6313e4 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -358,15 +358,10 @@ module Project = ( Log.Logger, monitored, leases, group, observer, startFromTail = startFromTail, ?maxItems = maxItems, ?lagReportFreq = sa.MaybeLogLagInterval) | Choice2Of3 sa -> - let (indexStore, indexFilter), maybeHydrate = sa.MonitoringParams() + let (indexStore, indexFilter), loadMode = sa.MonitoringParams() let checkpoints = let cache = Equinox.Cache (appName, sizeMb = 1) sa.CreateCheckpointStore(group, cache, Log.forMetrics) - let loadMode = - match maybeHydrate with - | Some (context, streamsDop) -> - Propulsion.DynamoStore.LoadMode.Hydrated (nullFilter, streamsDop, context) - | None -> Propulsion.DynamoStore.LoadMode.WithoutEventBodies (fun _categoryName -> true) Propulsion.DynamoStore.DynamoStoreSource( Log.Logger, stats.StatsInterval, indexStore, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5,