diff --git a/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs b/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs index 93a336d15..b738002b3 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs @@ -1,6 +1,7 @@ module ECommerce.Api.Program open ECommerce +open ECommerce.Infrastructure // Args etc open Microsoft.AspNetCore.Hosting open Microsoft.Extensions.DependencyInjection open Serilog @@ -29,34 +30,30 @@ module Args = | Cosmos _ -> "specify CosmosDB input parameters" | Dynamo _ -> "specify DynamoDB input parameters" | Esdb _ -> "specify EventStore input parameters" - and [] StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments and [] Arguments(c : Configuration, a : ParseResults) = member val Verbose = a.Contains Verbose member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) - member val Store : StoreArguments = + member val CacheSizeMb = 10 + member val StoreArgs : Args.StoreArguments = match a.TryGetSubCommand() with - | Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos)) - | Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo)) - | Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es)) + | Some (Parameters.Cosmos cosmos) -> Args.StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos)) + | Some (Parameters.Dynamo dynamo) -> Args.StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo)) + | Some (Parameters.Esdb es) -> Args.StoreArguments.Esdb (Args.Esdb.Arguments (c, es)) | _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store" - member x.Connect() = - let cache = Equinox.Cache (AppName, sizeMb = 10) - match x.Store with - | StoreArguments.Cosmos ca -> - let context = ca.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create + member x.VerboseStore = Args.verboseRequested x.StoreArgs + member x.Connect() : Domain.Config.Store<_> = + let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb) + match x.StoreArgs with + | Args.StoreArguments.Cosmos a -> + let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create Domain.Config.Store.Cosmos (context, cache) - | StoreArguments.Dynamo da -> - let context = da.Connect() |> DynamoStoreContext.create + | Args.StoreArguments.Dynamo a -> + let context = a.Connect() |> DynamoStoreContext.create Domain.Config.Store.Dynamo (context, cache) - | StoreArguments.Esdb ea -> - let context = ea.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create + | Args.StoreArguments.Esdb a -> + let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create Domain.Config.Store.Esdb (context, cache) - member x.StoreVerbose = - match x.Store with - | StoreArguments.Cosmos a -> a.Verbose - | StoreArguments.Dynamo a -> a.Verbose - | StoreArguments.Esdb a -> a.Verbose /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv = @@ -94,7 +91,7 @@ let run (args : Args.Arguments) = let main argv = try let args = Args.parse EnvVar.tryGet argv let metrics = Sinks.tags AppName |> Sinks.equinoxMetricsOnly - try Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger() + try Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger() try run args; 0 with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs index 5eece4336..c2523365d 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs @@ -19,7 +19,9 @@ module Events = module Reactions = - let (|Decode|) (span : Propulsion.Streams.StreamSpan<_>) = span.events |> Seq.choose Events.codec.TryDecode |> Array.ofSeq + open FsCodec.Interop + let private codec = Events.codec.ToByteArrayCodec() + let (|Decode|) (span : Propulsion.Streams.StreamSpan) = span.events |> Seq.choose codec.TryDecode |> Array.ofSeq let (|Parse|_|) = function | StreamName sellerId, Decode events -> Some (sellerId, events) | _ -> None diff --git a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs index 78d62f865..05cbac18b 100644 --- a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs @@ -16,6 +16,8 @@ let [] AppName = "FeedConsumer" module Args = open Argu + open Args.Esdb + [] type Parameters = | [] Verbose @@ -44,8 +46,6 @@ module Args = | Dynamo _ -> "specify DynamoDB input parameters" | Esdb _ -> "specify EventStore input parameters" - [] - type StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments type Arguments(c : Configuration, a : ParseResults) = member val Verbose = a.Contains Verbose member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) @@ -59,42 +59,44 @@ module Args = member val CheckpointInterval = TimeSpan.FromHours 1. member val TailSleepInterval = TimeSpan.FromSeconds 1. member val ConsumerGroupName = "default" - member val CacheSizeMb = 10 - member val Store : StoreArguments = + member val StoreArgs : Args.StoreArguments = match a.TryGetSubCommand() with - | Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos)) - | Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo)) - | Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es)) + | Some (Parameters.Cosmos cosmos) -> Args.StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos)) + | Some (Parameters.Dynamo dynamo) -> Args.StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo)) + | Some (Parameters.Esdb es) -> Args.StoreArguments.Esdb (Args.Esdb.Arguments (c, es)) | _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store" - member x.Connect() = - let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb) - match x.Store with - | StoreArguments.Cosmos ca -> - let context = ca.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create - Config.Store.Cosmos (context, cache), Equinox.CosmosStore.Core.Log.InternalMetrics.dump - | StoreArguments.Dynamo da -> - let context = da.Connect() |> DynamoStoreContext.create - Config.Store.Dynamo (context, cache), Equinox.DynamoStore.Core.Log.InternalMetrics.dump - | StoreArguments.Esdb ea -> - let context = ea.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create - Config.Store.Esdb (context, cache), Equinox.EventStoreDb.Log.InternalMetrics.dump - member x.CheckpointStore(store) : Propulsion.Feed.IFeedCheckpointStore = - match store with - | Config.Store.Cosmos (context, cache) -> - Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Config.log (x.ConsumerGroupName, x.CheckpointInterval) (context, cache) - | Config.Store.Dynamo (context, cache) -> - Propulsion.Feed.ReaderCheckpoint.DynamoStore.create Config.log (x.ConsumerGroupName, x.CheckpointInterval) (context, cache) - | Config.Store.Esdb _ -> - failwith "Propulsion.EventStoreDb does not implement a checkpointer, perhaps port https://github.com/absolutejam/Propulsion.EventStoreDB ?" - // or fork/finish https://github.com/jet/dotnet-templates/pull/81 - // alternately one could use a SQL Server DB via Propulsion.SqlStreamStore - | Config.Store.Memory _ -> - failwith "It's possible to set up an in-memory projection, but that's in the context of a single process integration test as illustrated in https://github.com/jet/dotnet-templates/blob/master/equinox-shipping/Watchdog.Integration/ReactorFixture.fs" - member x.StoreVerbose = - match x.Store with - | StoreArguments.Cosmos a -> a.Verbose - | StoreArguments.Dynamo a -> a.Verbose - | StoreArguments.Esdb a -> a.Verbose + member x.VerboseStore = Args.verboseRequested x.StoreArgs + member x.DumpStoreMetrics = Args.dumpMetrics x.StoreArgs + member x.Connect() : Config.Store<_> = + let cache = Equinox.Cache (AppName, sizeMb = 10) + match x.StoreArgs with + | Args.StoreArguments.Cosmos a -> + let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create + Config.Store.Cosmos (context, cache) + | Args.StoreArguments.Dynamo a -> + let context = a.Connect() |> DynamoStoreContext.create + Config.Store.Dynamo (context, cache) + | Args.StoreArguments.Esdb a -> + let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create + Config.Store.Esdb (context, cache) + member x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config = + match mainStore with + | Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache) + | Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache) + | Config.Store.Memory _ -> failwith "Unexpected" + | Config.Store.Esdb (_, cache) -> + match x.StoreArgs with + | Args.StoreArguments.Esdb a -> + match a.CheckpointStoreArgs with + | CheckpointStoreArguments.Cosmos a -> + let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create + CheckpointStore.Config.Cosmos (context, cache) + | CheckpointStoreArguments.Dynamo a -> + let context = a.Connect() |> DynamoStoreContext.create + CheckpointStore.Config.Dynamo (context, cache) + | _ -> failwith "unexpected" + member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore = + CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) Config.log config /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv = @@ -103,15 +105,15 @@ module Args = Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv) let build (args : Args.Arguments) = - let store, dumpMetrics = args.Connect() + let store = args.Connect() let log = Log.forGroup args.SourceId // needs to have a `group` tag for Propulsion.Streams Prometheus metrics let sink = let handle = Ingester.handle args.TicketsDop - let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = dumpMetrics) + let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = args.DumpStoreMetrics) Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval) let pumpSource = - let checkpoints = args.CheckpointStore(store) + let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) let feed = ApiClient.TicketsFeed args.BaseUri let source = Propulsion.Feed.FeedSource( @@ -130,7 +132,7 @@ let run args = async { let main argv = try let args = Args.parse EnvVar.tryGet argv try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) - Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger() + Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger() try run args |> Async.RunSynchronously with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs new file mode 100644 index 000000000..fd1221955 --- /dev/null +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs @@ -0,0 +1,161 @@ +module ECommerce.Infrastructure.Args + +open Serilog +open System + +exception MissingArg of message : string with override this.Message = this.message +let missingArg msg = raise (MissingArg msg) + +let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" +let [] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID" +let [] SECRET_KEY = "EQUINOX_DYNAMO_SECRET_ACCESS_KEY" +let [] TABLE = "EQUINOX_DYNAMO_TABLE" +let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" + +type Configuration(tryGet : string -> string option) = + + member val tryGet = tryGet + member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" + + member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" + member x.CosmosDatabase = x.get "EQUINOX_COSMOS_DATABASE" + member x.CosmosContainer = x.get "EQUINOX_COSMOS_CONTAINER" + + member x.DynamoServiceUrl = x.get SERVICE_URL + member x.DynamoAccessKey = x.get ACCESS_KEY + member x.DynamoSecretKey = x.get SECRET_KEY + member x.DynamoTable = x.get TABLE + + member x.EventStoreConnection = x.get "EQUINOX_ES_CONNECTION" + member _.EventStoreCredentials = tryGet "EQUINOX_ES_CREDENTIALS" + + member x.PrometheusPort = tryGet "PROMETHEUS_PORT" |> Option.map int + +open Argu + +module Cosmos = + + [] + type Parameters = + | [] Verbose + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + interface IArgParserTemplate with + member a.Usage = a |> function + | Verbose _ -> "request verbose logging." + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" + | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" + | Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" + | Timeout _ -> "specify operation timeout in seconds (default: 5)." + | Retries _ -> "specify operation retries (default: 1)." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" + + type Arguments(c : Configuration, a : ParseResults) = + let discovery = a.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + let mode = a.TryGetResult ConnectionMode + let timeout = a.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = a.GetResult(Retries, 1) + let maxRetryWaitTime = a.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) + let database = a.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) + let container = a.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + member val Verbose = a.Contains Verbose + member _.Connect() = connector.ConnectStore("Main", database, container) + +module Dynamo = + + [] + type Parameters = + | [] Verbose + | [] ServiceUrl of string + | [] AccessKey of string + | [] SecretKey of string + | [] Table of string + | [] Retries of int + | [] RetriesTimeoutS of float + interface IArgParserTemplate with + member a.Usage = a |> function + | Verbose -> "Include low level Store logging." + | ServiceUrl _ -> "specify a server endpoint for a Dynamo account. (optional if environment variable " + SERVICE_URL + " specified)" + | AccessKey _ -> "specify an access key id for a Dynamo account. (optional if environment variable " + ACCESS_KEY + " specified)" + | SecretKey _ -> "specify a secret access key for a Dynamo account. (optional if environment variable " + SECRET_KEY + " specified)" + | Table _ -> "specify a table name for the primary store. (optional if environment variable " + TABLE + " specified)" + | Retries _ -> "specify operation retries (default: 1)." + | RetriesTimeoutS _ -> "specify max wait-time including retries in seconds (default: 5)" + + type Arguments(c : Configuration, a : ParseResults) = + let serviceUrl = a.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) + let accessKey = a.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) + let secretKey = a.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) + let table = a.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + let retries = a.GetResult(Retries, 1) + let timeout = a.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds + let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout) + member val Verbose = a.Contains Verbose + member _.Connect() = connector.ConnectStore(connector.CreateClient(), "Main", table) + +module Esdb = + + [] + type Parameters = + | [] Verbose + | [] Connection of string + | [] Credentials of string + | [] Timeout of float + | [] Retries of int + + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | Verbose -> "Include low level Store logging." + | Connection _ -> "EventStore Connection String. (optional if environment variable EQUINOX_ES_CONNECTION specified)" + | Credentials _ -> "Credentials string for EventStore (used as part of connection string, but NOT logged). Default: use EQUINOX_ES_CREDENTIALS environment variable (or assume no credentials)" + | Timeout _ -> "specify operation timeout in seconds. Default: 20." + | Retries _ -> "specify operation retries. Default: 3." + + (* Propulsion.EventStoreDb does not implement a checkpointer, perhaps port https://github.com/absolutejam/Propulsion.EventStoreDB ? + or fork/finish https://github.com/jet/dotnet-templates/pull/81 + alternately one could use a SQL Server DB via Propulsion.SqlStreamStore *) + + | Cosmos _ -> "CosmosDB (Checkpoint) Store parameters." + | Dynamo _ -> "DynamoDB (Checkpoint) Store parameters." + + [] + type CheckpointStoreArguments = Cosmos of Cosmos.Arguments | Dynamo of Dynamo.Arguments + + type Arguments(c : Configuration, a : ParseResults) = + member val ConnectionString = a.TryGetResult(Connection) |> Option.defaultWith (fun () -> c.EventStoreConnection) + member val Credentials = a.TryGetResult(Credentials) |> Option.orElseWith (fun () -> c.EventStoreCredentials) |> Option.toObj + member val Retries = a.GetResult(Retries, 3) + member val Timeout = a.GetResult(Timeout, 20.) |> TimeSpan.FromSeconds + member _.Verbose = a.Contains Verbose + member val CheckpointStoreArgs : CheckpointStoreArguments = + match a.GetSubCommand() with + | Cosmos cosmos -> CheckpointStoreArguments.Cosmos(Cosmos.Arguments (c, cosmos)) + | Dynamo dynamo -> CheckpointStoreArguments.Dynamo(Dynamo.Arguments (c, dynamo)) + | _ -> missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + member x.Connect(log : ILogger, appName, nodePreference) = + let connection = x.ConnectionString + log.Information("EventStore {discovery}", connection) + let discovery = String.Join(";", connection, x.Credentials) |> Equinox.EventStoreDb.Discovery.ConnectionString + let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] + Equinox.EventStoreDb.EventStoreConnector(x.Timeout, x.Retries, tags = tags) + .Establish(appName, discovery, Equinox.EventStoreDb.ConnectionStrategy.ClusterSingle nodePreference) + +[] +type StoreArguments = Cosmos of Cosmos.Arguments | Dynamo of Dynamo.Arguments | Esdb of Esdb.Arguments +let verboseRequested = function + | StoreArguments.Cosmos a -> a.Verbose + | StoreArguments.Dynamo a -> a.Verbose + | StoreArguments.Esdb a -> a.Verbose +let dumpMetrics = function + | StoreArguments.Cosmos a -> Equinox.CosmosStore.Core.Log.InternalMetrics.dump + | StoreArguments.Dynamo a -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump + | StoreArguments.Esdb a -> Equinox.EventStoreDb.Log.InternalMetrics.dump diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/CheckpointStore.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/CheckpointStore.fs new file mode 100644 index 000000000..915043910 --- /dev/null +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/CheckpointStore.fs @@ -0,0 +1,12 @@ +module ECommerce.Infrastructure.CheckpointStore + +[] +type Config = + | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Core.ICache + | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.Core.ICache + +let create (consumerGroup, checkpointInterval) storeLog : Config -> Propulsion.Feed.IFeedCheckpointStore = function + | Config.Cosmos (context, cache) -> + Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (consumerGroup, checkpointInterval) (context, cache) + | Config.Dynamo (context, cache) -> + Propulsion.Feed.ReaderCheckpoint.DynamoStore.create storeLog (consumerGroup, checkpointInterval) (context, cache) diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj index 35a2b5083..a56a00c6d 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj @@ -7,6 +7,9 @@ + + + diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs index 970def9d0..9c00411d3 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs @@ -1,5 +1,5 @@ [] -module ECommerce.Infrastructure +module ECommerce.Infrastructure.Helpers open Serilog open System @@ -140,137 +140,6 @@ let startMetricsServer port : IDisposable = Log.Information("Prometheus /metrics endpoint on port {port}", port) { new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() } -module Args = - - exception MissingArg of message : string with override this.Message = this.message - let missingArg msg = raise (MissingArg msg) - - let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" - let [] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID" - let [] SECRET_KEY = "EQUINOX_DYNAMO_SECRET_ACCESS_KEY" - let [] TABLE = "EQUINOX_DYNAMO_TABLE" - let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" - - type Configuration(tryGet : string -> string option) = - - member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" - - member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" - member x.CosmosDatabase = x.get "EQUINOX_COSMOS_DATABASE" - member x.CosmosContainer = x.get "EQUINOX_COSMOS_CONTAINER" - - member x.DynamoServiceUrl = x.get SERVICE_URL - member x.DynamoAccessKey = x.get ACCESS_KEY - member x.DynamoSecretKey = x.get SECRET_KEY - member x.DynamoTable = x.get TABLE - - member x.EventStoreConnection = x.get "EQUINOX_ES_CONNECTION" - member _.EventStoreCredentials = tryGet "EQUINOX_ES_CREDENTIALS" - - member x.PrometheusPort = tryGet "PROMETHEUS_PORT" |> Option.map int - - open Argu - - module Cosmos = - - [] - type Parameters = - | [] Verbose - | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode - | [] Connection of string - | [] Database of string - | [] Container of string - | [] Timeout of float - | [] Retries of int - | [] RetriesWaitTime of float - interface IArgParserTemplate with - member a.Usage = a |> function - | Verbose _ -> "request verbose logging." - | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" - | Database _ -> "specify a database name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" - | Container _ -> "specify a container name for Cosmos store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" - | Timeout _ -> "specify operation timeout in seconds (default: 5)." - | Retries _ -> "specify operation retries (default: 1)." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" - - type Arguments(c : Configuration, a : ParseResults) = - let discovery = a.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString - let mode = a.TryGetResult ConnectionMode - let timeout = a.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds - let retries = a.GetResult(Retries, 1) - let maxRetryWaitTime = a.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = a.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = a.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) - member val Verbose = a.Contains Verbose - member _.Connect() = connector.ConnectStore("Main", database, container) - - module Dynamo = - - [] - type Parameters = - | [] Verbose - | [] ServiceUrl of string - | [] AccessKey of string - | [] SecretKey of string - | [] Table of string - | [] Retries of int - | [] RetriesTimeoutS of float - interface IArgParserTemplate with - member a.Usage = a |> function - | Verbose -> "Include low level Store logging." - | ServiceUrl _ -> "specify a server endpoint for a Dynamo account. (optional if environment variable " + SERVICE_URL + " specified)" - | AccessKey _ -> "specify an access key id for a Dynamo account. (optional if environment variable " + ACCESS_KEY + " specified)" - | SecretKey _ -> "specify a secret access key for a Dynamo account. (optional if environment variable " + SECRET_KEY + " specified)" - | Table _ -> "specify a table name for the primary store. (optional if environment variable " + TABLE + " specified)" - | Retries _ -> "specify operation retries (default: 1)." - | RetriesTimeoutS _ -> "specify max wait-time including retries in seconds (default: 5)" - - type Arguments(c : Configuration, a : ParseResults) = - let serviceUrl = a.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = a.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = a.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) - let table = a.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) - let retries = a.GetResult(Retries, 1) - let timeout = a.GetResult(RetriesTimeoutS, 5.) |> TimeSpan.FromSeconds - let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout) - member val Verbose = a.Contains Verbose - member _.Connect() = connector.ConnectStore("Main", table) - - module Esdb = - - [] - type Parameters = - | [] Verbose - | [] Connection of string - | [] Credentials of string - | [] Timeout of float - | [] Retries of int - // | [] HeartbeatTimeout of float - interface IArgParserTemplate with - member a.Usage = a |> function - | Verbose -> "Include low level Store logging." - | Connection _ -> "EventStore Connection String. (optional if environment variable EQUINOX_ES_CONNECTION specified)" - | Credentials _ -> "Credentials string for EventStore (used as part of connection string, but NOT logged). Default: use EQUINOX_ES_CREDENTIALS environment variable (or assume no credentials)" - | Timeout _ -> "specify operation timeout in seconds. Default: 20." - | Retries _ -> "specify operation retries. Default: 3." - - type Arguments(c : Configuration, a : ParseResults) = - member val ConnectionString = a.TryGetResult(Connection) |> Option.defaultWith (fun () -> c.EventStoreConnection) - member val Credentials = a.TryGetResult(Credentials) |> Option.orElseWith (fun () -> c.EventStoreCredentials) |> Option.toObj - member val Retries = a.GetResult(Retries, 3) - member val Timeout = a.GetResult(Timeout, 20.) |> TimeSpan.FromSeconds - member _.Verbose = a.Contains Verbose - member x.Connect(log: ILogger, appName, nodePreference) = - let connection = x.ConnectionString - log.Information("EventStore {discovery}", connection) - let discovery = String.Join(";", connection, x.Credentials) |> Equinox.EventStoreDb.Discovery.ConnectionString - let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] - Equinox.EventStoreDb.EventStoreConnector(x.Timeout, x.Retries, tags = tags) - .Establish(appName, discovery, Equinox.EventStoreDb.ConnectionStrategy.ClusterSingle nodePreference) - module Exception = let dump verboseStore (log : ILogger) (exn : exn) = diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Args.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs similarity index 94% rename from Sample/ECommerce.Equinox/ECommerce.Reactor/Args.fs rename to Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs index 25b40cc21..5b888053e 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Args.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceArgs.fs @@ -1,7 +1,6 @@ -module ECommerce.Reactor.Args +module ECommerce.Infrastructure.SourceArgs open Argu -open ECommerce open Serilog open System @@ -9,7 +8,7 @@ type Configuration(tryGet) = inherit Args.Configuration(tryGet) member _.DynamoIndexTable = tryGet Args.INDEX_TABLE -module CosmosSource = +module Cosmos = type [] Parameters = | [] Verbose @@ -56,15 +55,15 @@ module CosmosSource = let lagFrequency = a.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes member val Verbose = a.Contains Verbose member private _.ConnectLeases() = connector.CreateUninitialized(database, leaseContainer) + member x.ConnectStoreAndMonitored() = connector.ConnectStoreAndMonitored(database, container) member x.MonitoringParams(log : ILogger) = let leases : Microsoft.Azure.Cosmos.Container = x.ConnectLeases() log.Information("ChangeFeed Leases Database {db} Container {container}. MaxItems limited to {maxItems}", leases.Database.Id, leases.Id, Option.toNullable maxItems) if fromTail then log.Warning("(If new projector group) Skipping projection of all existing events.") (leases, fromTail, maxItems, lagFrequency) - member x.ConnectStoreAndMonitored() = connector.ConnectStoreAndMonitored(database, container) -module DynamoSource = +module Dynamo = [] type Parameters = @@ -140,7 +139,7 @@ module DynamoSource = | Position _ -> "Override to specified position" | Tranche _ -> "Specify tranche to override" -module EsdbSource = +module Esdb = [] type Parameters = @@ -162,27 +161,47 @@ module EsdbSource = | Cosmos _ -> "CosmosDB (Checkpoint) Store parameters." | Dynamo _ -> "DynamoDB (Checkpoint) Store parameters." + + [] + type TargetStoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments + type Arguments(c : Configuration, a : ParseResults) = + let tailSleepInterval = TimeSpan.FromSeconds 0.5 + let maxItems = 100 member val Verbose = a.Contains Verbose member val ConnectionString = a.TryGetResult(Connection) |> Option.defaultWith (fun () -> c.EventStoreConnection) member val Credentials = a.TryGetResult(Credentials) |> Option.orElseWith (fun () -> c.EventStoreCredentials) |> Option.toObj member val Retries = a.GetResult(Retries, 3) member val Timeout = a.GetResult(Timeout, 20.) |> TimeSpan.FromSeconds -(* + member x.Connect(log: ILogger, appName, nodePreference) = let connection = x.ConnectionString log.Information("EventStore {discovery}", connection) let discovery = String.Join(";", connection, x.Credentials) |> Equinox.EventStoreDb.Discovery.ConnectionString let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] Equinox.EventStoreDb.EventStoreConnector(x.Timeout, x.Retries, tags=tags) - .Establish(appName, discovery, Equinox.EventStoreDb.ConnectionStrategy.ClusterSingle nodePreference) *) - + .Establish(appName, discovery, Equinox.EventStoreDb.ConnectionStrategy.ClusterSingle nodePreference) + member _.MonitoringParams(log : ILogger) = + log.Information("EventStoreSource MaxItems {maxItems}", maxItems) + maxItems, tailSleepInterval member val CheckpointInterval = TimeSpan.FromHours 1. - member val CheckpointStore : Choice = + member val TargetStoreArgs : TargetStoreArguments = match a.GetSubCommand() with - | Cosmos cosmos -> Choice1Of2 <| Args.Cosmos.Arguments (c, cosmos) - | Dynamo dynamo -> Choice2Of2 <| Args.Dynamo.Arguments (c, dynamo) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` checkpoint store when source is `es`" + | Cosmos cosmos -> TargetStoreArguments.Cosmos(Args.Cosmos.Arguments (c, cosmos)) + | Dynamo dynamo -> TargetStoreArguments.Dynamo(Args.Dynamo.Arguments (c, dynamo)) + | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + +[] +type Arguments = Cosmos of Cosmos.Arguments | Dynamo of Dynamo.Arguments | Esdb of Esdb.Arguments +let verboseRequested = function + | Arguments.Cosmos a -> a.Verbose + | Arguments.Dynamo a -> a.Verbose + | Arguments.Esdb a -> a.Verbose +let dumpMetrics = function + | Arguments.Cosmos a -> Equinox.CosmosStore.Core.Log.InternalMetrics.dump + | Arguments.Dynamo a -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump + | Arguments.Esdb a -> Equinox.EventStoreDb.Log.InternalMetrics.dump + (* type [] Parameters = | [] FromTail diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/ECommerce.Reactor.fsproj b/Sample/ECommerce.Equinox/ECommerce.Reactor/ECommerce.Reactor.fsproj index a9c7516da..caf0985ea 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/ECommerce.Reactor.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/ECommerce.Reactor.fsproj @@ -11,7 +11,6 @@ - diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs index 8ed578096..d3464eef2 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs @@ -1,115 +1,160 @@ module ECommerce.Reactor.Program -open ECommerce.Infrastructure +open ECommerce.Infrastructure // Args, SourceArgs open Serilog open System +let [] AppName = "ECommerce.Reactor" + module Args = open Argu - open ECommerce.Reactor.Args + open SourceArgs.Esdb [] type Parameters = | [] Verbose | [] PrometheusPort of int - | [] ProcessorName of string + | [] Group of string | [] MaxReadAhead of int | [] MaxWriters of int - | [] Cosmos of ParseResults - | [] Dynamo of ParseResults - | [] Esdb of ParseResults + | [] Cosmos of ParseResults + | [] Dynamo of ParseResults + | [] Esdb of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Verbose -> "request Verbose Logging. Default: off." | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)" - | ProcessorName _ -> "Projector consumer group name." + | Group _ -> "Projector consumer group name." | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 16." | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." | Cosmos _ -> "specify CosmosDB input parameters." | Dynamo _ -> "specify DynamoDB input parameters." | Esdb _ -> "specify EventStoreDB input parameters." - and Arguments(c : Configuration, a : ParseResults) = + and Arguments(c : SourceArgs.Configuration, a : ParseResults) = member val Verbose = a.Contains Verbose member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) - member val ProcessorName = a.GetResult ProcessorName + member val Group = a.GetResult Group member val MaxReadAhead = a.GetResult(MaxReadAhead, 16) member val MaxConcurrentStreams = a.GetResult(MaxWriters, 8) // 1ms -> 10ms reduces CPU consumption from ~5s/min to .7s/min member val IdleDelay = TimeSpan.FromMilliseconds 10. member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 5. - member val Source : Choice = + member val SourceArgs : SourceArgs.Arguments = match a.GetSubCommand() with - | Cosmos a -> Choice1Of3 <| CosmosSource.Arguments(c, a) - | Dynamo a -> Choice2Of3 <| DynamoSource.Arguments(c, a) - | Esdb a -> Choice3Of3 <| EsdbSource.Arguments(c, a) + | Cosmos a -> SourceArgs.Arguments.Cosmos(SourceArgs.Cosmos.Arguments(c, a)) + | Dynamo a -> SourceArgs.Arguments.Dynamo(SourceArgs.Dynamo.Arguments(c, a)) + | Esdb a -> SourceArgs.Arguments.Esdb(SourceArgs.Esdb.Arguments(c, a)) | a -> Args.missingArg $"Unexpected Store subcommand %A{a}" - member x.StoreVerbose = match x.Source with - | Choice1Of3 s -> s.Verbose - | Choice2Of3 s -> s.Verbose - | Choice3Of3 s -> s.Verbose + member x.VerboseStore = SourceArgs.verboseRequested x.SourceArgs + member x.DumpStoreMetrics = SourceArgs.dumpMetrics x.SourceArgs + member val CheckpointInterval = TimeSpan.FromHours 1. + member val CacheSizeMb = 10 + member x.CheckpointStoreConfig(mainStore : ECommerce.Domain.Config.Store<_>) : CheckpointStore.Config = + match mainStore with + | ECommerce.Domain.Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache) + | ECommerce.Domain.Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache) + | ECommerce.Domain.Config.Store.Memory _ -> failwith "Unexpected" + | ECommerce.Domain.Config.Store.Esdb (_, cache) -> + match x.SourceArgs with + | SourceArgs.Arguments.Esdb a -> + match a.TargetStoreArgs with + | TargetStoreArguments.Cosmos a -> + let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create + CheckpointStore.Config.Cosmos (context, cache) + | TargetStoreArguments.Dynamo a -> + let context = a.Connect() |> DynamoStoreContext.create + CheckpointStore.Config.Dynamo (context, cache) + | _ -> failwith "unexpected" + member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore = + CheckpointStore.create (x.Group, x.CheckpointInterval) ECommerce.Domain.Config.log config /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv : Arguments = let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name let parser = ArgumentParser.Create(programName=programName) - Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv) - -let [] AppName = "ECommerce.Reactor" + Arguments(SourceArgs.Configuration tryGetConfigValue, parser.ParseCommandLine argv) open Propulsion.CosmosStore.Infrastructure // AwaitKeyboardInterruptAsTaskCancelledException open ECommerce.Domain +let buildSink (args : Args.Arguments) log (handle : FsCodec.StreamName * Propulsion.Streams.StreamSpan -> Async<_ * _>) = + let stats = Reactor.Stats(log, args.StatsInterval, args.StateInterval, args.VerboseStore, logExternalStats = args.DumpStoreMetrics) + Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats, args.StatsInterval, idleDelay = args.IdleDelay) let build (args : Args.Arguments) = - - match args.Source with - | Choice1Of2 cosmos (context, monitored, leases, processorName, startFromTail, maxItems, lagFrequency) -> - let cache = Equinox.Cache(AppName, sizeMb = 10) + let log = Log.forGroup args.Group // needs to have a `group` tag for Propulsion.Streams Prometheus metrics + let cache = Equinox.Cache(AppName, sizeMb = 10) + match args.SourceArgs with + | SourceArgs.Arguments.Cosmos a-> + let client, monitored = a.ConnectStoreAndMonitored() + let context = client |> CosmosStoreContext.create let store = Config.Store.Cosmos (context, cache) let handle = Reactor.Config.create (store, store) - let stats = Reactor.Stats(Log.Logger, args.StatsInterval, args.StateInterval) - let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats, args.StatsInterval) + let sink = buildSink args log handle let source = - let mapToStreamItems = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents - let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems) - Propulsion.CosmosStore.CosmosStoreSource.Start(Log.Logger, monitored, leases, processorName, observer, startFromTail, ?maxItems=maxItems, lagReportFreq=lagFrequency) - [ Async.AwaitKeyboardInterruptAsTaskCancelledException() - source.AwaitWithStopOnCancellation() - sink.AwaitWithStopOnCancellation() ] - | Choice1Of2 (srcE, context, spec) -> - let connectEs () = srcE.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStoreDb.ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader) - let connectProjEs () = srcE.ConnectProj(Log.Logger, Log.Logger, AppName, EventStore.Client.NodePreference.Follower) - - let cache = Equinox.Cache(AppName, sizeMb = 10) - let checkpoints = Checkpoints.Cosmos.create spec.groupName (context, cache) - let esStore = - let esConn = connectEs () - Config.Store.Esdb (EventStoreContext.create esConn, cache) - let cosmosStore = Config.Store.Cosmos (context, cache) - let handle = Reactor.Config.create (esStore, cosmosStore) - let stats = Reactor.Stats(Log.Logger, args.StatsInterval, args.StateInterval) - let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats, args.StatsInterval) - let runPipeline = - let tryMapEvent (x : EventStore.ClientAPI.ResolvedEvent) = - match x.Event with - | e when not e.IsJson || e.EventStreamId.StartsWith "$" -> None - | PropulsionStreamEvent e -> Some e - EventStoreSource.Run( - Log.Logger, sink, checkpoints, connectProjEs, spec, tryMapEvent, - args.MaxReadAhead, args.StatsInterval) - [ runPipeline; sink.AwaitWithStopOnCancellation() ] + let parseFeedDoc : _ -> _ = Seq.collect Propulsion.CosmosStore.EquinoxSystemTextJsonParser.enumStreamEvents + >> Seq.filter (fun {stream = s } -> Reactor.isReactionStream s) + let observer = Propulsion.CosmosStore.CosmosStoreSource.CreateObserver(log, sink.StartIngester, parseFeedDoc) + let leases, startFromTail, maxItems, lagFrequency = a.MonitoringParams(log) + Propulsion.CosmosStore.CosmosStoreSource.Start(log, monitored, leases, args.Group, observer, startFromTail, ?maxItems = maxItems, lagReportFreq = lagFrequency) + sink, source + | SourceArgs.Arguments.Dynamo a -> + let storeClient, streamsContext = a.Connect() + let context = storeClient |> DynamoStoreContext.create + let sourceId = Propulsion.DynamoStore.FeedSourceId.wellKnownId + let store = Config.Store.Dynamo (context, cache) + let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) + match a.MaybeOverrideRequested() with + | None -> () + | Some (trancheId, group, pos) -> Async.RunSynchronously <| async { + let checkpointsStore : Propulsion.Feed.ReaderCheckpoint.Service = downcast checkpoints + do! checkpointsStore.Override(sourceId, trancheId, pos) } + let handle = Reactor.Config.create (store, store) + let sink = buildSink args log handle + let source = + let indexClient, startFromTail, maxItems, streamsDop = a.MonitoringParams(log) + let loadMode = Propulsion.DynamoStore.LoadMode.WithBodies (Reactor.isReactionStream, streamsDop, streamsContext) + Propulsion.DynamoStore.DynamoStoreSource( + log, args.StatsInterval, + indexClient, sourceId, maxItems, TimeSpan.FromSeconds 0.5, + checkpoints, sink, loadMode, fromTail = startFromTail, storeLog = Config.log + ).Start() + sink, source + | SourceArgs.Arguments.Esdb a -> + let conn = a.Connect(log, AppName, EventStore.Client.NodePreference.Leader) + let context = conn |> EventStoreContext.create + let sourceId = Propulsion.EventStoreDb.FeedSourceId.wellKnownId + let store = Config.Store.Esdb (context, cache) + let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store) + // TODO implement checkpoint reset mechanism + let handle = Reactor.Config.create (store, store) + let sink = buildSink args log handle + let source = + let maxItems, tailSleepInterval = a.MonitoringParams(log) + let includeBodies = true + Propulsion.EventStoreDb.EventStoreSource( + log, args.StatsInterval, + conn.ReadConnection, sourceId, maxItems, tailSleepInterval, + checkpoints, sink, includeBodies (* TODO impl , storeLog = Config.log *) + ).Start() + sink, source let run (args : Args.Arguments) = async { use _ = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj + let sink, source = build args + let work = [ + Async.AwaitKeyboardInterruptAsTaskCancelledException() + source.AwaitWithStopOnCancellation() + sink.AwaitWithStopOnCancellation() ] return! Async.Parallel (build args) |> Async.Ignore } [] let main argv = try let args = Args.parse EnvVar.tryGet argv let metrics = Sinks.tags AppName |> Sinks.equinoxAndPropulsionReactorMetrics - try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger() + try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger() try run args |> Async.RunSynchronously; 0 with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs index cd1b9912d..462ed515b 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs @@ -26,10 +26,15 @@ type Stats(log, statsInterval, stateInterval, verboseStore, ?logExternalStats) = match logExternalStats with None -> () | Some f -> f Serilog.Log.Logger base.DumpStats() +let isReactionStream = function + | ShoppingCart.StreamName _ -> true + | _ -> false +let filterReactorEvents seq = seq |> Seq.filter (fun ({ stream = sn } : Propulsion.Streams.StreamEvent<_>) -> isReactionStream sn) + let handle (cartSummary : ShoppingCartSummaryHandler.Service) (confirmedCarts : ConfirmedHandler.Service) - (stream, span : Propulsion.Streams.StreamSpan<_>) = async { + (stream, span : Propulsion.Streams.StreamSpan) = async { match stream, span with | ShoppingCart.Reactions.Parse (cartId, events) -> match events with @@ -42,11 +47,12 @@ let handle return Propulsion.Streams.SpanResult.OverrideWritePosition version', (if worked then Outcome.Ok (1, span.events.Length - 1) else Outcome.Skipped span.events.Length) | _ -> return Propulsion.Streams.SpanResult.AllProcessed, Outcome.NotApplicable span.events.Length - | _ -> return Propulsion.Streams.SpanResult.AllProcessed, Outcome.NotApplicable span.events.Length } + | x -> return failwith $"Invalid event %A{x}" } // should be filtered by filterReactorEvents +// | _ -> return Propulsion.Streams.SpanResult.AllProcessed, Outcome.NotApplicable span.events.Length } module Config = - let create (sourceStore, cosmosStore) = - let cartSummary = ShoppingCartSummaryHandler.Config.create (sourceStore, cosmosStore) - let confirmedCarts = ConfirmedHandler.Config.create (sourceStore, cosmosStore) + let create (sourceStore, targetStore) = + let cartSummary = ShoppingCartSummaryHandler.Config.create (sourceStore, targetStore) + let confirmedCarts = ConfirmedHandler.Config.create (sourceStore, targetStore) handle cartSummary confirmedCarts diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/ShoppingCartSummaryHandler.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/ShoppingCartSummaryHandler.fs index 52bc3c6ba..468e5de19 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/ShoppingCartSummaryHandler.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/ShoppingCartSummaryHandler.fs @@ -13,7 +13,7 @@ type Service(source : ShoppingCart.Service, destination : ShoppingCartSummary.Se module Config = - let create (sourceStore, destinationStore) = + let create (sourceStore, targetStore) = let source = ShoppingCart.Config.create sourceStore - let destination = ShoppingCartSummary.Config.create destinationStore + let destination = ShoppingCartSummary.Config.create targetStore Service(source, destination)