diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs index 8aff2d244..de4767ef4 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs @@ -92,6 +92,9 @@ module Config = | Config.Store.Cosmos (context, cache) -> let cat = Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache) cat.Resolve + | Config.Store.Dynamo (context, cache) -> + let cat = Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) + cat.Resolve let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider let shouldClose maxItemsPerEpoch candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch let create maxItemsPerEpoch = resolveDecider >> create_ (shouldClose maxItemsPerEpoch) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj b/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj index 64a31d0ea..2bce3de2c 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj @@ -18,12 +18,12 @@ - - - - + + + + - + diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs index c2523365d..27244297a 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs @@ -166,6 +166,9 @@ module Config = | Config.Store.Cosmos (context, cache) -> let cat = Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache) cat.Resolve + | Config.Store.Dynamo (context, cache) -> + let cat = Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) + cat.Resolve let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider let create_ pricer store = Service(resolveDecider store, calculatePrice pricer) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs index 5a1dc1cab..c6e5bfbb9 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs @@ -72,5 +72,8 @@ module Config = | Config.Store.Cosmos (context, cache) -> let cat = Config.Cosmos.createRollingState Events.codecJsonElement Fold.initial Fold.fold Fold.toSnapshot (context, cache) cat.Resolve + | Config.Store.Dynamo (context, cache) -> + let cat = Config.Dynamo.createRollingState Events.codec Fold.initial Fold.fold Fold.toSnapshot (context, cache) + cat.Resolve let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider let create = resolveDecider >> Service diff --git a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs index 05cbac18b..ab883a6a6 100644 --- a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs @@ -79,7 +79,7 @@ module Args = | Args.StoreArguments.Esdb a -> let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create Config.Store.Esdb (context, cache) - member x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config = + member private x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config = match mainStore with | Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache) | Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache) @@ -95,8 +95,9 @@ module Args = let context = a.Connect() |> DynamoStoreContext.create CheckpointStore.Config.Dynamo (context, cache) | _ -> failwith "unexpected" - member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore = - CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) Config.log config + member x.CreateCheckpointStore(mainStore) : Propulsion.Feed.IFeedCheckpointStore = + let config = x.CheckpointStoreConfig(mainStore) + CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) ECommerce.Domain.Config.log config /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv = @@ -113,7 +114,7 @@ let build (args : Args.Arguments) = let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = args.DumpStoreMetrics) Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval) let pumpSource = - let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) + let checkpoints = args.CreateCheckpointStore(store) let feed = ApiClient.TicketsFeed args.BaseUri let source = Propulsion.Feed.FeedSource( diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs index fd1221955..d46c177d2 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs @@ -96,7 +96,7 @@ module Dynamo = let table = a.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) let retries = a.GetResult(Retries, 1) let timeout = a.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds - let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout) + let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) member val Verbose = a.Contains Verbose member _.Connect() = connector.ConnectStore(connector.CreateClient(), "Main", table) diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj index a56a00c6d..e589b30e6 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj @@ -15,12 +15,12 @@ - - + + - - - + + + diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs index 9c00411d3..02586fa84 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs @@ -52,8 +52,8 @@ module CosmosStoreContext = type Equinox.DynamoStore.DynamoStoreClient with - member x.LogConfiguration(role : string, ?log) = - (defaultArg log Log.Logger).Information("DynamoStore {role:l} Table {table}", role) // TODO next ver has: , x.TableName) + member x.LogConfiguration(role, ?log) = + (defaultArg log Log.Logger).Information("DynamoStore {role:l} Table {table}", role, x.TableName) type Equinox.DynamoStore.DynamoStoreConnector with diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs index 5b888053e..efe1dce9d 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs @@ -108,7 +108,7 @@ module Dynamo = let streamsDop = a.GetResult(StreamsDop, 4) let timeout = a.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds let retries = a.GetResult(Retries, 9) - let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout) + let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) let client = connector.CreateClient() member val Verbose = a.Contains Verbose member _.Connect() = let mainClient = connector.ConnectStore(client, "Main", table) diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs index d3464eef2..a6c61908d 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs @@ -51,7 +51,7 @@ module Args = member x.DumpStoreMetrics = SourceArgs.dumpMetrics x.SourceArgs member val CheckpointInterval = TimeSpan.FromHours 1. member val CacheSizeMb = 10 - member x.CheckpointStoreConfig(mainStore : ECommerce.Domain.Config.Store<_>) : CheckpointStore.Config = + member private x.CheckpointStoreConfig(mainStore : ECommerce.Domain.Config.Store<_>) : CheckpointStore.Config = match mainStore with | ECommerce.Domain.Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache) | ECommerce.Domain.Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache) @@ -67,7 +67,8 @@ module Args = let context = a.Connect() |> DynamoStoreContext.create CheckpointStore.Config.Dynamo (context, cache) | _ -> failwith "unexpected" - member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore = + member x.CreateCheckpointStore(mainStore) : Propulsion.Feed.IFeedCheckpointStore = + let config = x.CheckpointStoreConfig(mainStore) CheckpointStore.create (x.Group, x.CheckpointInterval) ECommerce.Domain.Config.log config /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args @@ -94,8 +95,8 @@ let build (args : Args.Arguments) = let handle = Reactor.Config.create (store, store) let sink = buildSink args log handle let source = - let parseFeedDoc : _ -> _ = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents - >> Seq.filter (fun {stream = s } -> Reactor.isReactionStream s) + let parseFeedDoc = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents + >> Seq.filter (fun {stream = s } -> Reactor.isReactionStream s) let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(log, sink.StartIngester, parseFeedDoc) let leases, startFromTail, maxItems, lagFrequency = a.MonitoringParams(log) Propulsion.CosmosStore.CosmosStoreSource.Start(log, monitored, leases, args.Group, observer, startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) @@ -105,7 +106,7 @@ let build (args : Args.Arguments) = let context = storeClient |> DynamoStoreContext.create let sourceId = Propulsion.DynamoStore.FeedSourceId.wellKnownId let store = Config.Store.Dynamo (context, cache) - let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) + let checkpoints = args.CreateCheckpointStore(store) match a.MaybeOverrideRequested() with | None -> () | Some (trancheId, group, pos) -> Async.RunSynchronously <| async { @@ -115,7 +116,7 @@ let build (args : Args.Arguments) = let sink = buildSink args log handle let source = let indexClient, startFromTail, maxItems, streamsDop = a.MonitoringParams(log) - let loadMode = Propulsion.DynamoStore.LoadMode.WithBodies (Reactor.isReactionStream, streamsDop, streamsContext) + let loadMode = Propulsion.DynamoStore.LoadMode.Hydrated (Reactor.isReactionStream, streamsDop, streamsContext) Propulsion.DynamoStore.DynamoStoreSource( log, args.StatsInterval, indexClient, sourceId, maxItems, TimeSpan.FromSeconds 0.5, @@ -127,7 +128,7 @@ let build (args : Args.Arguments) = let context = conn |> EventStoreContext.create let sourceId = Propulsion.EventStoreDb.FeedSourceId.wellKnownId let store = Config.Store.Esdb (context, cache) - let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) + let checkpoints = args.CreateCheckpointStore(store) // TODO implement checkpoint reset mechanism let handle = Reactor.Config.create (store, store) let sink = buildSink args log handle @@ -148,7 +149,7 @@ let run (args : Args.Arguments) = async { Async.AwaitKeyboardInterruptAsTaskCancelledException() source.AwaitWithStopOnCancellation() sink.AwaitWithStopOnCancellation() ] - return! Async.Parallel (build args) |> Async.Ignore } + return! Async.Parallel work |> Async.Ignore } [] let main argv = diff --git a/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj b/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj index f29400393..a0fa945f3 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj @@ -14,7 +14,7 @@ - + runtime; build; native; contentfiles; analyzers; buildtransitive