Skip to content

Commit

Permalink
Target Propulsion 2.13.0-beta.4
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 23, 2022
1 parent 448926e commit 9e9d873
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 27 deletions.
3 changes: 3 additions & 0 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ module Config =
| Config.Store.Cosmos (context, cache) ->
let cat = Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache)
cat.Resolve
| Config.Store.Dynamo (context, cache) ->
let cat = Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
cat.Resolve
let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider
let shouldClose maxItemsPerEpoch candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch
let create maxItemsPerEpoch = resolveDecider >> create_ (shouldClose maxItemsPerEpoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.4.5" />
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.4.5" />
<PackageReference Include="Equinox.EventStoreDb" Version="4.0.0-beta.4.5" />
<PackageReference Include="Equinox.MemoryStore" Version="4.0.0-beta.4.5" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.2.2" />
<PackageReference Include="Propulsion" Version="2.13.0-beta.3" />
<PackageReference Include="Propulsion" Version="2.13.0-beta.4" />
</ItemGroup>

</Project>
3 changes: 3 additions & 0 deletions Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ module Config =
| Config.Store.Cosmos (context, cache) ->
let cat = Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache)
cat.Resolve
| Config.Store.Dynamo (context, cache) ->
let cat = Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache)
cat.Resolve
let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider
let create_ pricer store =
Service(resolveDecider store, calculatePrice pricer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,8 @@ module Config =
| Config.Store.Cosmos (context, cache) ->
let cat = Config.Cosmos.createRollingState Events.codecJsonElement Fold.initial Fold.fold Fold.toSnapshot (context, cache)
cat.Resolve
| Config.Store.Dynamo (context, cache) ->
let cat = Config.Dynamo.createRollingState Events.codec Fold.initial Fold.fold Fold.toSnapshot (context, cache)
cat.Resolve
let private resolveDecider store = streamName >> resolveStream store >> Config.createDecider
let create = resolveDecider >> Service
9 changes: 5 additions & 4 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ module Args =
| Args.StoreArguments.Esdb a ->
let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
Config.Store.Esdb (context, cache)
member x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config =
member private x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config =
match mainStore with
| Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache)
| Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache)
Expand All @@ -95,8 +95,9 @@ module Args =
let context = a.Connect() |> DynamoStoreContext.create
CheckpointStore.Config.Dynamo (context, cache)
| _ -> failwith "unexpected"
member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore =
CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) Config.log config
member x.CreateCheckpointStore(mainStore) : Propulsion.Feed.IFeedCheckpointStore =
let config = x.CheckpointStoreConfig(mainStore)
CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) ECommerce.Domain.Config.log config

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse tryGetConfigValue argv =
Expand All @@ -113,7 +114,7 @@ let build (args : Args.Arguments) =
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = args.DumpStoreMetrics)
Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval)
let pumpSource =
let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store)
let checkpoints = args.CreateCheckpointStore(store)
let feed = ApiClient.TicketsFeed args.BaseUri
let source =
Propulsion.Feed.FeedSource(
Expand Down
2 changes: 1 addition & 1 deletion Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ module Dynamo =
let table = a.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable)
let retries = a.GetResult(Retries, 1)
let timeout = a.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds
let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout)
let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
member val Verbose = a.Contains Verbose
member _.Connect() = connector.ConnectStore(connector.CreateClient(), "Main", table)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
<ItemGroup>
<PackageReference Include="Argu" Version="6.1.1" />
<PackageReference Include="Destructurama.FSharp" Version="1.2.0" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.DynamoStore.Prometheus" Version="4.0.0-beta.4.2" />
<PackageReference Include="Equinox.CosmosStore.Prometheus" Version="4.0.0-beta.4.5" />
<PackageReference Include="Equinox.DynamoStore.Prometheus" Version="4.0.0-beta.4.5" />
<PackageReference Include="prometheus-net.AspNetCore" Version="3.6.0" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.13.0-beta.2" />
<PackageReference Include="Propulsion.EventStoreDb" Version="2.13.0-beta.2" />
<PackageReference Include="Propulsion.DynamoStore" Version="2.13.0-beta.2" />
<PackageReference Include="Propulsion.CosmosStore" Version="2.13.0-beta.4" />
<PackageReference Include="Propulsion.EventStoreDb" Version="2.13.0-beta.4" />
<PackageReference Include="Propulsion.DynamoStore" Version="2.13.0-beta.4" />
<PackageReference Include="Serilog.Sinks.Async" Version="1.5.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="4.0.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ module CosmosStoreContext =

type Equinox.DynamoStore.DynamoStoreClient with

member x.LogConfiguration(role : string, ?log) =
(defaultArg log Log.Logger).Information("DynamoStore {role:l} Table {table}", role) // TODO next ver has: , x.TableName)
member x.LogConfiguration(role, ?log) =
(defaultArg log Log.Logger).Information("DynamoStore {role:l} Table {table}", role, x.TableName)

type Equinox.DynamoStore.DynamoStoreConnector with

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ module Dynamo =
let streamsDop = a.GetResult(StreamsDop, 4)
let timeout = a.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds
let retries = a.GetResult(Retries, 9)
let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout)
let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries)
let client = connector.CreateClient()
member val Verbose = a.Contains Verbose
member _.Connect() = let mainClient = connector.ConnectStore(client, "Main", table)
Expand Down
17 changes: 9 additions & 8 deletions Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ module Args =
member x.DumpStoreMetrics = SourceArgs.dumpMetrics x.SourceArgs
member val CheckpointInterval = TimeSpan.FromHours 1.
member val CacheSizeMb = 10
member x.CheckpointStoreConfig(mainStore : ECommerce.Domain.Config.Store<_>) : CheckpointStore.Config =
member private x.CheckpointStoreConfig(mainStore : ECommerce.Domain.Config.Store<_>) : CheckpointStore.Config =
match mainStore with
| ECommerce.Domain.Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache)
| ECommerce.Domain.Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache)
Expand All @@ -67,7 +67,8 @@ module Args =
let context = a.Connect() |> DynamoStoreContext.create
CheckpointStore.Config.Dynamo (context, cache)
| _ -> failwith "unexpected"
member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore =
member x.CreateCheckpointStore(mainStore) : Propulsion.Feed.IFeedCheckpointStore =
let config = x.CheckpointStoreConfig(mainStore)
CheckpointStore.create (x.Group, x.CheckpointInterval) ECommerce.Domain.Config.log config

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
Expand All @@ -94,8 +95,8 @@ let build (args : Args.Arguments) =
let handle = Reactor.Config.create (store, store)
let sink = buildSink args log handle
let source =
let parseFeedDoc : _ -> _ = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents
>> Seq.filter (fun {stream = s } -> Reactor.isReactionStream s)
let parseFeedDoc = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents
>> Seq.filter (fun {stream = s } -> Reactor.isReactionStream s)
let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(log, sink.StartIngester, parseFeedDoc)
let leases, startFromTail, maxItems, lagFrequency = a.MonitoringParams(log)
Propulsion.CosmosStore.CosmosStoreSource.Start(log, monitored, leases, args.Group, observer, startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency)
Expand All @@ -105,7 +106,7 @@ let build (args : Args.Arguments) =
let context = storeClient |> DynamoStoreContext.create
let sourceId = Propulsion.DynamoStore.FeedSourceId.wellKnownId
let store = Config.Store.Dynamo (context, cache)
let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store)
let checkpoints = args.CreateCheckpointStore(store)
match a.MaybeOverrideRequested() with
| None -> ()
| Some (trancheId, group, pos) -> Async.RunSynchronously <| async {
Expand All @@ -115,7 +116,7 @@ let build (args : Args.Arguments) =
let sink = buildSink args log handle
let source =
let indexClient, startFromTail, maxItems, streamsDop = a.MonitoringParams(log)
let loadMode = Propulsion.DynamoStore.LoadMode.WithBodies (Reactor.isReactionStream, streamsDop, streamsContext)
let loadMode = Propulsion.DynamoStore.LoadMode.Hydrated (Reactor.isReactionStream, streamsDop, streamsContext)
Propulsion.DynamoStore.DynamoStoreSource(
log, args.StatsInterval,
indexClient, sourceId, maxItems, TimeSpan.FromSeconds 0.5,
Expand All @@ -127,7 +128,7 @@ let build (args : Args.Arguments) =
let context = conn |> EventStoreContext.create
let sourceId = Propulsion.EventStoreDb.FeedSourceId.wellKnownId
let store = Config.Store.Esdb (context, cache)
let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store)
let checkpoints = args.CreateCheckpointStore(store)
// TODO implement checkpoint reset mechanism
let handle = Reactor.Config.create (store, store)
let sink = buildSink args log handle
Expand All @@ -148,7 +149,7 @@ let run (args : Args.Arguments) = async {
Async.AwaitKeyboardInterruptAsTaskCancelledException()
source.AwaitWithStopOnCancellation()
sink.AwaitWithStopOnCancellation() ]
return! Async.Parallel (build args) |> Async.Ignore<unit array> }
return! Async.Parallel work |> Async.Ignore<unit array> }

[<EntryPoint>]
let main argv =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.11.0"/>

<PackageReference Include="FsCheck.Xunit" Version="3.0.0-beta1"/>
<PackageReference Include="unquote" Version="5.0.0"/>
<PackageReference Include="unquote" Version="6.1.0"/>
<PackageReference Include="xunit" Version="2.4.1"/>
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down

0 comments on commit 9e9d873

Please sign in to comment.