Skip to content

Commit

Permalink
DynamoStore: LoadMode category lists, withData naming (#204)
Browse files Browse the repository at this point in the history
Co-authored-by: Einar Norðfjörð <[email protected]>
  • Loading branch information
bartelink and nordfjord authored Feb 16, 2023
1 parent 2fc9231 commit c612eaa
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 41 deletions.
46 changes: 29 additions & 17 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,20 @@ module private Impl =
report None hydrated.Length
yield struct (sw.Elapsed, finalBatch epochId (version, state) hydrated) }

/// Defines the strategy to use for hydrating the events prior to routing them to the Handler
[<NoComparison; NoEquality>]
type LoadMode =
/// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the index
| WithoutEventBodies of categoryFilter : (string -> bool)
/// Populates the Data/Meta fields for events; necessitates loads of all individual streams that pass the categoryFilter before they can be handled
| Hydrated of categoryFilter : (string -> bool)
* degreeOfParallelism : int
type EventLoadMode =
/// Skip loading of Data/Meta for events; this is the most efficient mode as it means the Source only needs to read from the Index Table
| IndexOnly
/// Populates the Data/Meta fields for events matching the categories and/or categoryFilter
/// Requires a roundtrip per stream to the Store Table (constrained by streamParallelismLimit)
| WithData of /// Maximum concurrency for stream reads from the Store Table
streamParallelismLimit : int
* /// Defines the Context to use when loading the Event Data/Meta
storeContext : DynamoStoreContext
module internal LoadMode =
storeContext : DynamoStoreContext
module internal EventLoadMode =
let private mapTimelineEvent = FsCodec.Core.TimelineEvent.Map FsCodec.Deflate.EncodedToUtf8
let private withBodies (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter =
let private withData (eventsContext : Equinox.DynamoStore.Core.EventsContext) categoryFilter =
fun sn (i, cs : string array) ->
if categoryFilter (FsCodec.StreamName.category sn) then
ValueSome (fun ct -> task {
Expand All @@ -135,18 +137,28 @@ module internal LoadMode =
fun sn (i, cs) ->
let renderEvent offset c = FsCodec.Core.TimelineEvent.Create(i + int64 offset, eventType = c, data = Unchecked.defaultof<_>)
if categoryFilter (FsCodec.StreamName.category sn) then ValueSome (fun _ct -> task { return cs |> Array.mapi renderEvent }) else ValueNone
let map storeLog : LoadMode -> _ = function
| WithoutEventBodies categoryFilter -> false, withoutBodies categoryFilter, 1
| Hydrated (categoryFilter, dop, storeContext) ->
let mapFilters categories categoryFilter =
match categories, categoryFilter with
| None, None -> fun _ -> true
| Some categories, None -> fun x -> Array.contains x categories
| None, Some filter -> filter
| Some categories, Some filter -> fun x -> Array.contains x categories && filter x
let map categoryFilter storeLog : EventLoadMode -> _ = function
| IndexOnly -> false, withoutBodies categoryFilter, 1
| WithData (dop, storeContext) ->
let eventsContext = Equinox.DynamoStore.Core.EventsContext(storeContext, storeLog)
true, withBodies eventsContext categoryFilter, dop
true, withData eventsContext categoryFilter, dop

type DynamoStoreSource
( log : Serilog.ILogger, statsInterval,
indexClient : DynamoStoreClient, batchSizeCutoff, tailSleepInterval,
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
// If the Handler does not utilize the Data/Meta of the events, we can avoid loading them from the Store
loadMode : LoadMode,
// If the Handler does not utilize the Data/Meta of the events, we can avoid having to read from the Store Table
mode : EventLoadMode,
// The whitelist of Categories to use
?categories,
// Predicate to filter Categories to use
?categoryFilter : string -> bool,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
// Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics
Expand All @@ -162,7 +174,8 @@ type DynamoStoreSource
sink,
Impl.materializeIndexEpochAsBatchesOfStreamEvents
(log, defaultArg sourceId FeedSourceId.wellKnownId, defaultArg storeLog log)
(LoadMode.map (defaultArg storeLog log) loadMode) batchSizeCutoff (DynamoStoreContext indexClient),
(EventLoadMode.map (EventLoadMode.mapFilters categories categoryFilter) (defaultArg storeLog log) mode)
batchSizeCutoff (DynamoStoreContext indexClient),
Impl.renderPos,
Impl.logReadFailure (defaultArg storeLog log),
defaultArg readFailureSleepInterval (tailSleepInterval * 2.),
Expand Down Expand Up @@ -205,4 +218,3 @@ type DynamoStoreSource
// force a final attempt to flush anything not already checkpointed (normally checkpointing is at 5s intervals)
return! x.Checkpoint()
finally statsInterval.SleepUntilTriggerCleared() }

8 changes: 4 additions & 4 deletions src/Propulsion.EventStoreDb/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ module private Impl =
let private checkpointPos (xs : EventRecord array) =
match Array.tryLast xs with Some e -> int64 e.Position.CommitPosition | None -> -1L
|> Propulsion.Feed.Position.parse
let readBatch hydrateBodies batchSize categoryFilter (store : EventStoreClient) (pos, ct) = task {
let readBatch withData batchSize categoryFilter (store : EventStoreClient) (pos, ct) = task {
let pos = let p = pos |> Propulsion.Feed.Position.toInt64 |> uint64 in Position(p, p)
let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, hydrateBodies, cancellationToken = ct)
let res = store.ReadAllAsync(Direction.Forwards, pos, batchSize, withData, cancellationToken = ct)
let! batch = res |> TaskSeq.map (fun e -> e.Event) |> TaskSeq.toArrayAsync
return ({ checkpoint = checkpointPos batch; items = toItems categoryFilter batch; isTail = batch.LongLength <> batchSize } : Propulsion.Feed.Core.Batch<_>) }

Expand All @@ -36,11 +36,11 @@ type EventStoreSource
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
categoryFilter : string -> bool,
// If the Handler does not utilize the Data/Meta of the events, we can avoid shipping them from the Store in the first instance. Default false.
?hydrateBodies,
?withData,
// Override default start position to be at the tail of the index. Default: Replay all events.
?startFromTail,
?sourceId) =
inherit Propulsion.Feed.Core.AllFeedSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval,
Impl.readBatch (hydrateBodies = Some true) batchSize categoryFilter client, checkpoints, sink,
Impl.readBatch (withData = Some true) batchSize categoryFilter client, checkpoints, sink,
?establishOrigin = if startFromTail <> Some true then None else Some (Impl.readTailPositionForTranche log client))
10 changes: 5 additions & 5 deletions src/Propulsion.SqlStreamStore/SqlStreamStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ module private Impl =
let private readWithDataAsStreamEvent (struct (_sn, msg : SqlStreamStore.Streams.StreamMessage) as m) ct = task {
let! json = msg.GetJsonData(ct)
return toStreamEvent json m }
let readBatch hydrateBodies batchSize categoryFilter (store : SqlStreamStore.IStreamStore) (pos, ct) = task {
let! page = store.ReadAllForwards(Propulsion.Feed.Position.toInt64 pos, batchSize, hydrateBodies, ct)
let readBatch withData batchSize categoryFilter (store : SqlStreamStore.IStreamStore) (pos, ct) = task {
let! page = store.ReadAllForwards(Propulsion.Feed.Position.toInt64 pos, batchSize, withData, ct)
let filtered = page.Messages
|> Seq.choose (fun (msg : SqlStreamStore.Streams.StreamMessage) ->
let sn = Propulsion.Streams.StreamName.internalParseSafe msg.StreamId
if categoryFilter (FsCodec.StreamName.category sn) then Some struct (sn, msg) else None)
let! items = if not hydrateBodies then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq }
let! items = if not withData then task { return filtered |> Seq.map (toStreamEvent null) |> Array.ofSeq }
else filtered |> Seq.map readWithDataAsStreamEvent |> Propulsion.Internal.Task.sequential ct
return ({ checkpoint = Propulsion.Feed.Position.parse page.NextPosition; items = items; isTail = page.IsEnd } : Propulsion.Feed.Core.Batch<_>) }

Expand All @@ -31,10 +31,10 @@ type SqlStreamStoreSource
checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink,
categoryFilter : string -> bool,
// If the Handler does not require the Data/Meta of the events, the query to load the events can be much more efficient. Default: false
?hydrateBodies,
?withData,
?startFromTail,
?sourceId) =
inherit Propulsion.Feed.Core.AllFeedSource
( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval,
Impl.readBatch (hydrateBodies = Some true) batchSize categoryFilter store, checkpoints, sink,
Impl.readBatch (withData = Some true) batchSize categoryFilter store, checkpoints, sink,
?establishOrigin = if startFromTail <> Some true then None else Some (Impl.readTailPositionForTranche store))
20 changes: 11 additions & 9 deletions tools/Propulsion.Tool/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,17 @@ module Dynamo =
| xs ->
Log.Information("DynamoStoreSource Partition Filter {partitionIds}", xs)
(c, Some (Array.ofList xs))
match streamsDop with
| None ->
Log.Information("DynamoStoreSource NOT Hydrating events"); indexProps, None
| Some streamsDop ->
Log.Information("DynamoStoreSource Hydrater parallelism {streamsDop}", streamsDop)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
let context = readClient.ConnectStore("Store", table) |> DynamoStoreContext.create
indexProps, Some (context, streamsDop)

let loadMode =
match streamsDop with
| None ->
Log.Information("DynamoStoreSource IndexOnly mode")
Propulsion.DynamoStore.EventLoadMode.IndexOnly
| Some streamsDop ->
Log.Information("DynamoStoreSource WithData, parallelism limit {streamsDop}", streamsDop)
let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
let context = readClient.ConnectStore("Store", table) |> DynamoStoreContext.create
Propulsion.DynamoStore.EventLoadMode.WithData (streamsDop, context)
indexProps, loadMode
member _.CreateContext(minItemSizeK) =
let client = indexWriteClient.Value
let queryMaxItems = 100
Expand Down
7 changes: 1 addition & 6 deletions tools/Propulsion.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,10 @@ module Project =
( Log.Logger, monitored, leases, group, observer,
startFromTail = startFromTail, ?maxItems = maxItems, ?lagReportFreq = sa.MaybeLogLagInterval)
| Choice2Of3 sa ->
let (indexStore, indexFilter), maybeHydrate = sa.MonitoringParams()
let (indexStore, indexFilter), loadMode = sa.MonitoringParams()
let checkpoints =
let cache = Equinox.Cache (appName, sizeMb = 1)
sa.CreateCheckpointStore(group, cache, Log.forMetrics)
let loadMode =
match maybeHydrate with
| Some (context, streamsDop) ->
Propulsion.DynamoStore.LoadMode.Hydrated (nullFilter, streamsDop, context)
| None -> Propulsion.DynamoStore.LoadMode.WithoutEventBodies (fun _categoryName -> true)
Propulsion.DynamoStore.DynamoStoreSource(
Log.Logger, stats.StatsInterval,
indexStore, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5,
Expand Down

0 comments on commit c612eaa

Please sign in to comment.