Skip to content

Commit

Permalink
Update Reactor wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed May 23, 2022
1 parent 2a1209f commit 448926e
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 269 deletions.
37 changes: 17 additions & 20 deletions Sample/ECommerce.Equinox/ECommerce.Api/Program.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module ECommerce.Api.Program

open ECommerce
open ECommerce.Infrastructure // Args etc
open Microsoft.AspNetCore.Hosting
open Microsoft.Extensions.DependencyInjection
open Serilog
Expand Down Expand Up @@ -29,34 +30,30 @@ module Args =
| Cosmos _ -> "specify CosmosDB input parameters"
| Dynamo _ -> "specify DynamoDB input parameters"
| Esdb _ -> "specify EventStore input parameters"
and [<NoComparison; NoEquality>] StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments
and [<RequireQualifiedAccess>]
Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Verbose
member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
member val Store : StoreArguments =
member val CacheSizeMb = 10
member val StoreArgs : Args.StoreArguments =
match a.TryGetSubCommand() with
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| Some (Parameters.Cosmos cosmos) -> Args.StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> Args.StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> Args.StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store"
member x.Connect() =
let cache = Equinox.Cache (AppName, sizeMb = 10)
match x.Store with
| StoreArguments.Cosmos ca ->
let context = ca.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
member x.VerboseStore = Args.verboseRequested x.StoreArgs
member x.Connect() : Domain.Config.Store<_> =
let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb)
match x.StoreArgs with
| Args.StoreArguments.Cosmos a ->
let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
Domain.Config.Store.Cosmos (context, cache)
| StoreArguments.Dynamo da ->
let context = da.Connect() |> DynamoStoreContext.create
| Args.StoreArguments.Dynamo a ->
let context = a.Connect() |> DynamoStoreContext.create
Domain.Config.Store.Dynamo (context, cache)
| StoreArguments.Esdb ea ->
let context = ea.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
| Args.StoreArguments.Esdb a ->
let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
Domain.Config.Store.Esdb (context, cache)
member x.StoreVerbose =
match x.Store with
| StoreArguments.Cosmos a -> a.Verbose
| StoreArguments.Dynamo a -> a.Verbose
| StoreArguments.Esdb a -> a.Verbose

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse tryGetConfigValue argv =
Expand Down Expand Up @@ -94,7 +91,7 @@ let run (args : Args.Arguments) =
let main argv =
try let args = Args.parse EnvVar.tryGet argv
let metrics = Sinks.tags AppName |> Sinks.equinoxMetricsOnly
try Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger()
try Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger()
try run args; 0
with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
Expand Down
4 changes: 3 additions & 1 deletion Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ module Events =

module Reactions =

let (|Decode|) (span : Propulsion.Streams.StreamSpan<_>) = span.events |> Seq.choose Events.codec.TryDecode |> Array.ofSeq
open FsCodec.Interop
let private codec = Events.codec.ToByteArrayCodec()
let (|Decode|) (span : Propulsion.Streams.StreamSpan<byte[]>) = span.events |> Seq.choose codec.TryDecode |> Array.ofSeq
let (|Parse|_|) = function
| StreamName sellerId, Decode events -> Some (sellerId, events)
| _ -> None
Expand Down
82 changes: 42 additions & 40 deletions Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ let [<Literal>] AppName = "FeedConsumer"
module Args =

open Argu
open Args.Esdb

[<NoEquality; NoComparison>]
type Parameters =
| [<AltCommandLine "-V"; Unique>] Verbose
Expand Down Expand Up @@ -44,8 +46,6 @@ module Args =
| Dynamo _ -> "specify DynamoDB input parameters"
| Esdb _ -> "specify EventStore input parameters"

[<NoComparison; NoEquality>]
type StoreArguments = Cosmos of Args.Cosmos.Arguments | Dynamo of Args.Dynamo.Arguments | Esdb of Args.Esdb.Arguments
type Arguments(c : Configuration, a : ParseResults<Parameters>) =
member val Verbose = a.Contains Verbose
member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort)
Expand All @@ -59,42 +59,44 @@ module Args =
member val CheckpointInterval = TimeSpan.FromHours 1.
member val TailSleepInterval = TimeSpan.FromSeconds 1.
member val ConsumerGroupName = "default"
member val CacheSizeMb = 10
member val Store : StoreArguments =
member val StoreArgs : Args.StoreArguments =
match a.TryGetSubCommand() with
| Some (Parameters.Cosmos cosmos) -> StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| Some (Parameters.Cosmos cosmos) -> Args.StoreArguments.Cosmos (Args.Cosmos.Arguments (c, cosmos))
| Some (Parameters.Dynamo dynamo) -> Args.StoreArguments.Dynamo (Args.Dynamo.Arguments (c, dynamo))
| Some (Parameters.Esdb es) -> Args.StoreArguments.Esdb (Args.Esdb.Arguments (c, es))
| _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store"
member x.Connect() =
let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb)
match x.Store with
| StoreArguments.Cosmos ca ->
let context = ca.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
Config.Store.Cosmos (context, cache), Equinox.CosmosStore.Core.Log.InternalMetrics.dump
| StoreArguments.Dynamo da ->
let context = da.Connect() |> DynamoStoreContext.create
Config.Store.Dynamo (context, cache), Equinox.DynamoStore.Core.Log.InternalMetrics.dump
| StoreArguments.Esdb ea ->
let context = ea.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
Config.Store.Esdb (context, cache), Equinox.EventStoreDb.Log.InternalMetrics.dump
member x.CheckpointStore(store) : Propulsion.Feed.IFeedCheckpointStore =
match store with
| Config.Store.Cosmos (context, cache) ->
Propulsion.Feed.ReaderCheckpoint.CosmosStore.create Config.log (x.ConsumerGroupName, x.CheckpointInterval) (context, cache)
| Config.Store.Dynamo (context, cache) ->
Propulsion.Feed.ReaderCheckpoint.DynamoStore.create Config.log (x.ConsumerGroupName, x.CheckpointInterval) (context, cache)
| Config.Store.Esdb _ ->
failwith "Propulsion.EventStoreDb does not implement a checkpointer, perhaps port https://github.com/absolutejam/Propulsion.EventStoreDB ?"
// or fork/finish https://github.com/jet/dotnet-templates/pull/81
// alternately one could use a SQL Server DB via Propulsion.SqlStreamStore
| Config.Store.Memory _ ->
failwith "It's possible to set up an in-memory projection, but that's in the context of a single process integration test as illustrated in https://github.com/jet/dotnet-templates/blob/master/equinox-shipping/Watchdog.Integration/ReactorFixture.fs"
member x.StoreVerbose =
match x.Store with
| StoreArguments.Cosmos a -> a.Verbose
| StoreArguments.Dynamo a -> a.Verbose
| StoreArguments.Esdb a -> a.Verbose
member x.VerboseStore = Args.verboseRequested x.StoreArgs
member x.DumpStoreMetrics = Args.dumpMetrics x.StoreArgs
member x.Connect() : Config.Store<_> =
let cache = Equinox.Cache (AppName, sizeMb = 10)
match x.StoreArgs with
| Args.StoreArguments.Cosmos a ->
let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
Config.Store.Cosmos (context, cache)
| Args.StoreArguments.Dynamo a ->
let context = a.Connect() |> DynamoStoreContext.create
Config.Store.Dynamo (context, cache)
| Args.StoreArguments.Esdb a ->
let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create
Config.Store.Esdb (context, cache)
member x.CheckpointStoreConfig(mainStore) : CheckpointStore.Config =
match mainStore with
| Config.Store.Cosmos (context, cache) -> CheckpointStore.Config.Cosmos (context, cache)
| Config.Store.Dynamo (context, cache) -> CheckpointStore.Config.Dynamo (context, cache)
| Config.Store.Memory _ -> failwith "Unexpected"
| Config.Store.Esdb (_, cache) ->
match x.StoreArgs with
| Args.StoreArguments.Esdb a ->
match a.CheckpointStoreArgs with
| CheckpointStoreArguments.Cosmos a ->
let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create
CheckpointStore.Config.Cosmos (context, cache)
| CheckpointStoreArguments.Dynamo a ->
let context = a.Connect() |> DynamoStoreContext.create
CheckpointStore.Config.Dynamo (context, cache)
| _ -> failwith "unexpected"
member x.CreateCheckpointStore(config) : Propulsion.Feed.IFeedCheckpointStore =
CheckpointStore.create (x.ConsumerGroupName, x.CheckpointInterval) Config.log config

/// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args
let parse tryGetConfigValue argv =
Expand All @@ -103,15 +105,15 @@ module Args =
Arguments(Configuration tryGetConfigValue, parser.ParseCommandLine argv)

let build (args : Args.Arguments) =
let store, dumpMetrics = args.Connect()
let store = args.Connect()

let log = Log.forGroup args.SourceId // needs to have a `group` tag for Propulsion.Streams Prometheus metrics
let sink =
let handle = Ingester.handle args.TicketsDop
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = dumpMetrics)
let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = args.DumpStoreMetrics)
Propulsion.Streams.StreamsProjector.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval)
let pumpSource =
let checkpoints = args.CheckpointStore(store)
let checkpoints = args.CreateCheckpointStore(args.CheckpointStoreConfig store)
let feed = ApiClient.TicketsFeed args.BaseUri
let source =
Propulsion.Feed.FeedSource(
Expand All @@ -130,7 +132,7 @@ let run args = async {
let main argv =
try let args = Args.parse EnvVar.tryGet argv
try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName)
Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.StoreVerbose).CreateLogger()
Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger()
try run args |> Async.RunSynchronously
with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2
finally Log.CloseAndFlush()
Expand Down
Loading

0 comments on commit 448926e

Please sign in to comment.