From 18f0eb2f64a7ac89816725a6179467571a62a164 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 18 Jan 2024 23:12:17 +0000 Subject: [PATCH] Port to Equinox4rc16, Propulsion3rc10 --- .../ECommerce.Api/Program.fs | 22 ++-- .../ECommerce.Domain/ConfirmedEpoch.fs | 44 +++---- .../ECommerce.Domain/ConfirmedSeries.fs | 30 ++--- .../ECommerce.Domain/ECommerce.Domain.fsproj | 18 +-- .../ECommerce.Domain/ExactlyOnceIngester.fs | 2 +- .../ECommerce.Domain/ShoppingCart.fs | 54 ++++---- .../ECommerce.Domain/ShoppingCartSummary.fs | 26 ++-- .../ECommerce.Domain/Store.fs | 13 +- .../ECommerce.Domain/Streams.fs | 41 ++---- .../ECommerce.Domain/Types.fs | 5 + .../ECommerce.FeedConsumer/ApiClient.fs | 6 +- .../ECommerce.FeedConsumer/Ingester.fs | 9 +- .../ECommerce.FeedConsumer/Program.fs | 46 ++++--- .../ECommerce.Infrastructure/Args.fs | 99 +++++++------- .../ECommerce.Infrastructure.fsproj | 18 +-- .../Infrastructure.fs | 122 +++++++++--------- .../ECommerce.Infrastructure/SourceConfig.fs | 113 ++++++++-------- .../ECommerce.Reactor/Program.fs | 61 ++++----- .../ECommerce.Reactor/Reactor.fs | 27 ++-- .../ECommerce.Reactor/SourceArgs.fs | 84 ++++++------ .../ECommerce.Tests/ConfirmedIngesterTests.fs | 2 +- .../ECommerce.Tests/ECommerce.Tests.fsproj | 2 +- 22 files changed, 411 insertions(+), 433 deletions(-) diff --git a/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs b/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs index 5ad443c4e..4f22e1837 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Api/Program.fs @@ -1,7 +1,6 @@ module ECommerce.Api.Program open ECommerce -open ECommerce.Infrastructure // Args etc open Microsoft.AspNetCore.Hosting open Microsoft.Extensions.DependencyInjection open Serilog @@ -25,26 +24,26 @@ module Args = | [] Sss of ParseResults interface IArgParserTemplate with member a.Usage = a |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)" | Cosmos _ -> "specify CosmosDB input parameters" | Dynamo _ -> "specify DynamoDB input parameters" | Esdb _ -> "specify EventStore input parameters" | Sss _ -> "specify SqlStreamStore input parameters" and [] - Arguments(c : Configuration, a : ParseResults) = - member val Verbose = a.Contains Verbose - member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) + Arguments(c : Configuration, p : ParseResults) = + member val Verbose = p.Contains Verbose + member val PrometheusPort = p.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) member val CacheSizeMb = 10 member val StoreArgs : Args.StoreArgs = - match a.TryGetSubCommand() with + match p.TryGetSubCommand() with | Some (Parameters.Cosmos cosmos) -> Args.StoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Some (Parameters.Dynamo dynamo) -> Args.StoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) | Some (Parameters.Esdb es) -> Args.StoreArgs.Esdb (Args.Esdb.Arguments(c, es)) | Some (Parameters.Sss sss) -> Args.StoreArgs.Sss (Args.Sss.Arguments(c, sss)) - | _ -> Args.missingArg "Must specify one of cosmos, dynamo, esdb or sss for store" + | _ -> p.Raise "Must specify one of cosmos, dynamo, esdb or sss for store" member x.VerboseStore = Args.StoreArgs.verboseRequested x.StoreArgs - member x.Connect() : Domain.Config.Store<_> = + member x.Connect(): Store.Config = let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb) Args.StoreArgs.connectTarget x.StoreArgs cache @@ -77,8 +76,7 @@ let main argv = .Sinks(metrics, args.VerboseStore) .CreateLogger() try run args; 0 - with e when not (e :? Args.MissingArg) -> Log.Fatal(e, "Exiting"); 2 + with e -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn $"%s{msg}"; 1 - | :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 - | e -> eprintfn "Exception %s" e.Message; 1 + with:? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs index c5424d6c7..5de89457c 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs @@ -5,8 +5,8 @@ /// Each successive epoch is identified by an index, i.e. ConfirmedEpoch-0_0, then ConfirmedEpoch-0_1 module ECommerce.Domain.ConfirmedEpoch -let [] Category = "ConfirmedEpoch" -let streamId epochId = Equinox.StreamId.gen2 ConfirmedSeriesId.toString ConfirmedEpochId.toString (ConfirmedSeriesId.wellKnownId, epochId) +let [] CategoryName = "ConfirmedEpoch" +let streamId epochId = FsCodec.StreamId.gen2 ConfirmedSeriesId.toString ConfirmedEpochId.toString (ConfirmedSeriesId.wellKnownId, epochId) // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] @@ -19,11 +19,11 @@ module Events = | Ingested of Ingested | Closed interface TypeShape.UnionContract.IUnionContract - let codec = Config.EventCodec.gen - let codecJsonElement = Config.EventCodec.genJsonElement + let codec = Store.Codec.gen + let codecJsonElement = Store.Codec.genJsonElement let ofShoppingCartView cartId (view : ShoppingCart.Details.View) : Events.Cart = - { cartId = cartId; items = [| for i in view.items -> { productId = i.productId; unitPrice = i.unitPrice; quantity = i.quantity }|] } + { cartId = cartId; items = [| for i in view.items -> { productId = i.productId; unitPrice = i.unitPrice; quantity = i.quantity } |] } let itemId (x : Events.Cart) : CartId = x.cartId let (|ItemIds|) : Events.Cart[] -> CartId[] = Array.map itemId @@ -45,20 +45,20 @@ let notAlreadyIn (ids : CartId seq) = /// Manages ingestion of only items not already in the list /// Yields residual net of items already present in this epoch // NOTE See feedSource template for more advanced version handling splitting large input requests where epoch limit is strict -let decide shouldClose candidates (currentIds, closed as state) : ExactlyOnceIngester.IngestResult<_,_> * Events.Event list = +let decide shouldClose candidates (currentIds, closed as state) : ExactlyOnceIngester.IngestResult<_,_> * Events.Event[] = match closed, candidates |> Array.filter (notAlreadyIn currentIds) with | false, fresh -> let added, events = match fresh with - | [||] -> [||], [] + | [||] -> [||], [||] | ItemIds freshIds -> let closing = shouldClose currentIds freshIds let ingestEvent = Events.Ingested { carts = fresh } - freshIds, if closing then [ ingestEvent ; Events.Closed ] else [ ingestEvent ] + freshIds, if closing then [| ingestEvent ; Events.Closed |] else [| ingestEvent |] let _, closed = Fold.fold state events { accepted = added; closed = closed; residual = [||] }, events | true, fresh -> - { accepted = [||]; closed = true; residual = fresh }, [] + { accepted = [||]; closed = true; residual = fresh }, [||] // NOTE see feedSource for example of separating Service logic into Ingestion and Read Services in order to vary the folding and/or state held type Service internal @@ -72,7 +72,7 @@ type Service internal // NOTE decider which will initially transact against potentially stale cached state, which will trigger a // resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk // of writes are presumed to be coming from within this same process - decider.Transact(decide shouldClose carts, load = Equinox.AllowStale) + decider.Transact(decide shouldClose carts, load = Equinox.LoadOption.AnyCachedValue) /// Returns all the items currently held in the stream (Not using AllowStale on the assumption this needs to see updates from other apps) member _.Read epochId : Async = @@ -81,13 +81,13 @@ type Service internal module Config = - let private create_ shouldClose cat = Service(shouldClose, streamId >> Config.createDecider cat Category) + let private create_ shouldClose cat = Service(shouldClose, streamId >> Store.createDecider cat) let private (|Category|) = function - | Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store - | Config.Store.Cosmos (context, cache) -> Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache) - | Config.Store.Dynamo (context, cache) -> Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - | Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - | Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codecJsonElement Fold.initial Fold.fold (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) let shouldClose maxItemsPerEpoch candidateItems currentItems = Array.length currentItems + Array.length candidateItems >= maxItemsPerEpoch let create maxItemsPerEpoch (Category cat) = create_ (shouldClose maxItemsPerEpoch) cat @@ -116,9 +116,9 @@ module Reader = module Config = let private (|Category|) = function - | Config.Store.Memory store -> Config.Memory.create Events.codec initial fold store - | Config.Store.Cosmos (context, cache) -> Config.Cosmos.createUnoptimized Events.codecJsonElement initial fold (context, cache) - | Config.Store.Dynamo (context, cache) -> Config.Dynamo.createUnoptimized Events.codec initial fold (context, cache) - | Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec initial fold (context, cache) - | Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec initial fold (context, cache) - let create (Category cat) = Service(streamId >> Config.createDecider cat Category) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec initial fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codecJsonElement initial fold (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createUnoptimized CategoryName Events.codec initial fold (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec initial fold (context, cache) + | Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec initial fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedSeries.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedSeries.fs index 1d3c3ccaa..68d3d24e8 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedSeries.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedSeries.fs @@ -3,10 +3,10 @@ /// As an Epoch is marked `Closed`, the Ingester will mark a new Epoch `Started` on this aggregate via MarkIngestionEpochId module ECommerce.Domain.ConfirmedSeries -let [] Category = "ConfirmedSeries" +let [] CategoryName = "ConfirmedSeries" // TOCONSIDER: if you need multiple lists series/epochs in a single system, the Series and Epoch streams should have a SeriesId in the stream name // See also the implementation in the feedSource template, where the Series aggregate also functions as an index of series held in the system -let streamId () = Equinox.StreamId.gen ConfirmedSeriesId.toString ConfirmedSeriesId.wellKnownId +let streamId () = FsCodec.StreamId.gen ConfirmedSeriesId.toString ConfirmedSeriesId.wellKnownId // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] @@ -16,8 +16,8 @@ module Events = | Started of {| epochId : ConfirmedEpochId |} | Snapshotted of {| active : ConfirmedEpochId |} interface TypeShape.UnionContract.IUnionContract - let codec = Config.EventCodec.gen - let codecJsonElement = Config.EventCodec.genJsonElement + let codec = Store.Codec.gen + let codecJsonElement = Store.Codec.genJsonElement module Fold = @@ -31,9 +31,9 @@ module Fold = let isOrigin = function Events.Snapshotted _ -> true | _ -> false let toSnapshot s = Events.Snapshotted {| active = Option.get s |} -let interpret epochId (state : Fold.State) = - [if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ConfirmedEpochId.initial then - yield Events.Started {| epochId = epochId |}] +let interpret epochId (state : Fold.State) = [| + if state |> Option.forall (fun cur -> cur < epochId) && epochId >= ConfirmedEpochId.initial then + yield Events.Started {| epochId = epochId |}|] type Service internal (resolve : unit -> Equinox.Decider) = @@ -46,15 +46,15 @@ type Service internal (resolve : unit -> Equinox.Decider = - let decider = resolve() - decider.Transact(interpret epochId, load = Equinox.AllowStale) + let decider = resolve () + decider.Transact(interpret epochId, load = Equinox.LoadOption.AnyCachedValue) module Config = let private (|Category|) = function - | Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store - | Config.Store.Cosmos (context, cache) -> Config.Cosmos.createSnapshotted Events.codecJsonElement Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Config.Store.Dynamo (context, cache) -> Config.Dynamo.createSnapshotted Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) - | Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - | Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - let create (Category cat) = Service(streamId >> Config.createDecider cat Category) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJsonElement Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj b/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj index a7ba161e4..537e5fd1e 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ECommerce.Domain.fsproj @@ -6,7 +6,8 @@ - + + @@ -17,13 +18,14 @@ - - - - - - - + + + + + + + + diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ExactlyOnceIngester.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ExactlyOnceIngester.fs index c2734a462..12d7dba82 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ExactlyOnceIngester.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ExactlyOnceIngester.fs @@ -65,7 +65,7 @@ type Service<[]'id, 'req, 'res, 'outcome> internal /// a) back-off, re-read and retry if there's a concurrent write Optimistic Concurrency Check failure when writing the stream /// b) enter a prolonged period of retries if multiple concurrent writes trigger rate limiting and 429s from CosmosDB /// c) readers will less frequently encounter sustained 429s on the batch - let batchedIngest = Equinox.Core.AsyncBatchingGate(tryIngest, linger) + let batchedIngest = Equinox.Core.Batching.Batcher(tryIngest, linger) /// Run the requests over a chain of epochs. /// Returns the subset that actually got handled this time around (exclusive of items that did not trigger events per idempotency rules). diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs index 5c3bc621d..4d815e3ee 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCart.fs @@ -2,10 +2,9 @@ module ECommerce.Domain.ShoppingCart open Propulsion.Internal -let [] Category = "ShoppingCart" - -let streamId = Equinox.StreamId.gen CartId.toString -let [] (|StreamName|_|) = function FsCodec.StreamName.CategoryAndId (Category, CartId.Parse cartId) -> ValueSome cartId | _ -> ValueNone +let [] CategoryName = "ShoppingCart" +let streamId = FsCodec.StreamId.gen CartId.toString +let private catId = CategoryId(CategoryName, streamId, FsCodec.StreamId.dec CartId.(|Parse|)) module Events = @@ -16,15 +15,16 @@ module Events = | Confirmed of {| confirmedAt : System.DateTimeOffset |} | Registering of {| originEpoch : ConfirmedEpochId |} interface TypeShape.UnionContract.IUnionContract - let codec = Config.EventCodec.gen - let codecJsonElement = Config.EventCodec.genJsonElement + let codec = Store.Codec.gen + let codecJsonElement = Store.Codec.genJsonElement module Reactions = - let private codec = Events.codec - let (|Decode|) span = span |> Seq.chooseV codec.TryDecode |> Array.ofSeq - let [] (|Parse|_|) = function - | StreamName cartId, Decode events -> ValueSome (cartId, events) + let [] (|For|_|) = catId.TryDecode + let dec = Streams.Codec.dec + let config = catId.StreamName, dec + let [] (|Decode|_|) = function + | struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events) | _ -> ValueNone let chooseConfirmed = function | Events.Confirmed _ -> ValueSome () @@ -71,25 +71,25 @@ module Fold = | Events.Registering e -> { s with confirmedOriginEpoch = Some e.originEpoch } let fold = Seq.fold evolve -let decideInitialize clientId (s : Fold.State) = - if s.clientId <> None then [] - else [ Events.Initialized {| clientId = clientId |}] +let decideInitialize clientId (s : Fold.State) = [| + if Option.isNone s.clientId then + Events.Initialized {| clientId = clientId |} |] let decideAdd calculatePrice productId quantity state = async { match state with | s when Fold.isClosed s -> return invalidOp $"Adding product item for cart in '%A{s.status}' status is not allowed." | _ -> let! price = calculatePrice (productId, quantity) - return (), [ Events.ItemAdded {| productId = productId; unitPrice = price; quantity = quantity |} ] } + return (), [| Events.ItemAdded {| productId = productId; unitPrice = price; quantity = quantity |} |] } let decideRemove (productId, price) = function | s when Fold.isClosed s -> invalidOp $"Removing product item for cart in '%A{s.status}' status is not allowed." | _ -> - [ Events.ItemRemoved {| productId = productId; unitPrice = price |} ] + [| Events.ItemRemoved {| productId = productId; unitPrice = price |} |] let decideConfirm at = function - | s when Fold.isClosed s -> [] - | _ -> [ Events.Confirmed {| confirmedAt = at |} ] + | s when Fold.isClosed s -> [||] + | _ -> [| Events.Confirmed {| confirmedAt = at |} |] module Details = @@ -107,10 +107,10 @@ let summarizeWithOriginEpoch getActiveEpochId state = async { match state with | s when not (Fold.isClosed s) -> return failwith "Unexpected" | { confirmedOriginEpoch = Some originEpoch } as s -> - return (Details.render s |> Option.get, originEpoch), [] + return (Details.render s |> Option.get, originEpoch), [||] | { confirmedOriginEpoch = None } as s -> let! originEpoch = getActiveEpochId () - return (Details.render s |> Option.get, originEpoch), [ Events.Registering {| originEpoch = originEpoch |} ] } + return (Details.render s |> Option.get, originEpoch), [| Events.Registering {| originEpoch = originEpoch |} |] } type Service internal (resolve : CartId -> Equinox.Decider, calculatePrice : ProductId * int -> Async) = @@ -120,7 +120,7 @@ type Service internal (resolve : CartId -> Equinox.Decider Equinox.Decider = let decider = resolve cartId - decider.TransactAsync(summarizeWithOriginEpoch getActiveEpochId) + decider.Transact(summarizeWithOriginEpoch getActiveEpochId) /// Render view (and emit version on which it was based) for Denormalizer to store member _.SummarizeWithVersion(cartId) : Async = @@ -158,12 +158,12 @@ module Config = pricer productId let private (|Category|) = function - | Config.Store.Memory store -> Config.Memory.create Events.codec Fold.initial Fold.fold store - | Config.Store.Cosmos (context, cache) -> Config.Cosmos.createUnoptimized Events.codecJsonElement Fold.initial Fold.fold (context, cache) - | Config.Store.Dynamo (context, cache) -> Config.Dynamo.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - | Config.Store.Esdb (context, cache) -> Config.Esdb.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - | Config.Store.Sss (context, cache) -> Config.Sss.createUnoptimized Events.codec Fold.initial Fold.fold (context, cache) - let create_ pricer (Category cat) = Service(streamId >> Config.createDecider cat Category, calculatePrice pricer) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createUnoptimized CategoryName Events.codecJsonElement Fold.initial Fold.fold (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Esdb (context, cache) -> Store.Esdb.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + | Store.Config.Sss (context, cache) -> Store.Sss.createUnoptimized CategoryName Events.codec Fold.initial Fold.fold (context, cache) + let create_ pricer (Category cat) = Service(streamId >> Store.createDecider cat, calculatePrice pricer) let create = let defaultCalculator = RandomProductPriceCalculator() create_ defaultCalculator.Calculate diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs index d40dcd642..4ee02d6db 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ShoppingCartSummary.fs @@ -1,8 +1,8 @@ module ECommerce.Domain.ShoppingCartSummary -let [] Category = "ShoppingCartSummary" +let [] CategoryName = "ShoppingCartSummary" -let streamId = Equinox.StreamId.gen CartId.toString +let streamId = FsCodec.StreamId.gen CartId.toString module Events = @@ -13,8 +13,8 @@ module Events = type Event = | Ingested of Ingested interface TypeShape.UnionContract.IUnionContract - let codec = Config.EventCodec.gen - let codecJsonElement = Config.EventCodec.genJsonElement + let codec = Store.Codec.gen + let codecJsonElement = Store.Codec.genJsonElement module Fold = @@ -46,10 +46,10 @@ module Ingest = { clientId = view.clientId; status = mapStatus view.status items = [| for i in view.items -> { productId = i.productId; unitPrice = i.unitPrice; quantity = i.quantity } |] } - let decide (version : int64, value : Events.Summary) : Fold.State -> bool * Events.Event list = function - | Some { version = v } when v >= version -> false, [] - | None -> false, [] - | _ -> true, [ Events.Ingested { version = version; value = value } ] + let decide (version : int64, value : Events.Summary): Fold.State -> bool * Events.Event[] = function + | Some { version = v } when v >= version -> false, [||] + | None -> false, [||] + | _ -> true, [| Events.Ingested { version = version; value = value } |] type Service internal (resolve : CartId -> Equinox.Decider) = @@ -64,8 +64,8 @@ type Service internal (resolve : CartId -> Equinox.Decider Config.Memory.create Events.codec Fold.initial Fold.fold store - | Config.Store.Cosmos (context, cache) -> Config.Cosmos.createRollingState Events.codecJsonElement Fold.initial Fold.fold Fold.toSnapshot (context, cache) - | Config.Store.Dynamo (context, cache) -> Config.Dynamo.createRollingState Events.codec Fold.initial Fold.fold Fold.toSnapshot (context, cache) - | Config.Store.Esdb _ | Config.Store.Sss _ -> failwith "Not implemented: For EventStore/Sss its suggested to do a cached read from the write side" - let create (Category cat) = Service(streamId >> Config.createDecider cat Category) + | Store.Config.Memory store -> Store.Memory.create CategoryName Events.codec Fold.initial Fold.fold store + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createRollingState CategoryName Events.codecJsonElement Fold.initial Fold.fold Fold.toSnapshot (context, cache) + | Store.Config.Dynamo (context, cache) -> Store.Dynamo.createRollingState CategoryName Events.codec Fold.initial Fold.fold Fold.toSnapshot (context, cache) + | Store.Config.Esdb _ | Store.Config.Sss _ -> failwith "Not implemented: For EventStore/Sss its suggested to do a cached read from the write side" + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/Store.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/Store.fs index c8f58eb11..e1b0592cf 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/Store.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/Store.fs @@ -9,17 +9,16 @@ let createDecider category = Equinox.Decider.forStream Metrics.log category module Memory = let create name codec initial fold store : Equinox.Category<_, _, _> = - Equinox.MemoryStore.MemoryStoreCategory(store, name, codec, fold, initial) + Equinox.MemoryStore.MemoryStoreCategory(store, name, FsCodec.Compression.EncodeUncompressed codec, fold, initial) -module EventCodec = +module Codec = open FsCodec.SystemTextJson - let private defaultOptions = Options.Create(autoTypeSafeEnumToJsonString = true) - let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = - CodecJsonElement.Create<'t>(options = defaultOptions) let gen<'t when 't :> TypeShape.UnionContract.IUnionContract> = Codec.Create<'t>(options = defaultOptions) + let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> = + CodecJsonElement.Create<'t>(options = defaultOptions) let private defaultCacheDuration = System.TimeSpan.FromMinutes 20. @@ -80,8 +79,8 @@ module Sss = createCached name codec initial fold Equinox.SqlStreamStore.AccessStrategy.LatestKnownEvent (context, cache) [] -type Store<'t> = - | Memory of Equinox.MemoryStore.VolatileStore<'t> +type Config = + | Memory of Equinox.MemoryStore.VolatileStore)> | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Cache | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.Cache | Esdb of Equinox.EventStoreDb.EventStoreContext * Equinox.Cache diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/Streams.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/Streams.fs index 88c932b89..95dbfe02c 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/Streams.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/Streams.fs @@ -1,35 +1,22 @@ module Streams -type Event = FsCodec.ITimelineEvent -and EventBody = System.ReadOnlyMemory - -module Event = - - let private serdes = FsCodec.NewtonsoftJson.Serdes(Newtonsoft.Json.JsonSerializerSettings()) - let userContext (e : Event) = serdes.Deserialize(e.Meta) +open Serilog module Codec = - open FsCodec.NewtonsoftJson - open Serilog - type UserContext = Domain.Types.UserContext + let dec<'E when 'E :> TypeShape.UnionContract.IUnionContract> : Propulsion.Sinks.Codec<'E> = + FsCodec.SystemTextJson.Codec.Create<'E>() // options = Options.Default - let dec<'E when 'E :> TypeShape.UnionContract.IUnionContract> = - let up _ (typed: 'E) = typed - let down (_event: 'E) = failwith "only supports deserialization" - Codec.Create(up, down) + let private renderBody (x: Propulsion.Sinks.EventBody) = System.Text.Encoding.UTF8.GetString(x.Span) - let render (x : EventBody) : string = System.Text.Encoding.UTF8.GetString(x.Span) - /// Uses the supplied codec to decode the supplied event record `x` (iff at LogEventLevel.Debug, detail fails to `log` citing the `stream` and content) - let tryDecode (codec : FsCodec.IEventCodec<'Event, EventBody, unit>) (streamName : FsCodec.StreamName) (x : Event): ('Event * UserContext) voption = - match codec.Decode x with - | ValueNone -> - if Log.IsEnabled Serilog.Events.LogEventLevel.Debug then - Log.ForContext("event", render x.Data, true) - .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, x.EventType, streamName) + // Uses the supplied codec to decode the supplied event record (iff at LogEventLevel.Debug, failures are logged, citing `stream` and `.Data`) + let internal tryDecode<'E> (codec: Propulsion.Sinks.Codec<'E>) (streamName: FsCodec.StreamName) event = + match codec.Decode event with + | ValueNone when Log.IsEnabled Serilog.Events.LogEventLevel.Debug -> + Log.ForContext("eventData", renderBody event.Data) + .Debug("Codec {type} Could not decode {eventType} in {stream}", codec.GetType().FullName, event.EventType, streamName) ValueNone - | ValueSome d -> - ValueSome (d, Event.userContext x) - let decode codec struct (stream, events: Event[]) : ('e * UserContext) array = - events |> Array.chooseV (tryDecode codec stream) -let (|Decode|) = Codec.decode + | x -> x + +let (|Decode|) codec struct (stream, events: Propulsion.Sinks.Event[]): 'E[] = + events |> Propulsion.Internal.Array.chooseV (Codec.tryDecode codec stream) diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs index 5cfe0b5c7..394cbcb0f 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs @@ -71,3 +71,8 @@ module ConfirmedCheckpoint = let toEpochAndOffset (value : ConfirmedCheckpoint) : ConfirmedEpochId * int = let d, r = Math.DivRem(%value, factor) (ConfirmedEpochId.parse (int d)), int r + +/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers +type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) = + member _.StreamName = gen >> FsCodec.StreamName.create name + member _.TryDecode = FsCodec.StreamName.tryFind name >> ValueOption.map dec diff --git a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/ApiClient.fs b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/ApiClient.fs index e4c5a698f..ae4a956ce 100644 --- a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/ApiClient.fs +++ b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/ApiClient.fs @@ -68,13 +68,13 @@ type TicketsFeed(baseUri) = let client = new HttpClient(BaseAddress = baseUri) let tickets = Session(client).Tickets - let batch pg items : Propulsion.Feed.Page = + let batch pg items : Propulsion.Feed.Page = { checkpoint = TicketsCheckpoint.toPosition pg.checkpoint; items = items; isTail = not pg.closed } // TODO add retries - consumer loop will abort if this throws - member _.Poll(trancheId, pos, ct) = task { + member _.Poll(trancheId, pos) = async { let checkpoint = TicketsCheckpoint.ofPosition pos - let! pg = tickets.Poll(TrancheId.toFcId trancheId, checkpoint) |> fun f -> Async.StartImmediateAsTask(f, cancellationToken = ct) + let! pg = tickets.Poll(TrancheId.toFcId trancheId, checkpoint) let baseIndex = TicketsCheckpoint.toStreamIndex pg.position let map (x : ItemDto) : Ingester.PipelineEvent.Item = { id = x.id; payload = x.payload } let items = pg.tickets |> Array.mapi (fun i x -> Ingester.PipelineEvent.ofIndexAndItem (baseIndex + int64 i) (map x)) diff --git a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs index 291c7ded4..f1ac96400 100644 --- a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs +++ b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Ingester.fs @@ -36,14 +36,13 @@ module PipelineEvent = Unchecked.defaultof<_>, context = item) let (|ItemsForFc|_|) = function - | FsCodec.StreamName.CategoryAndIds (_, [|_ ; FcId.Parse fc|]), (s : Propulsion.Streams.StreamSpan) -> + | FsCodec.StreamName.Split (_, FsCodec.StreamId.Parse 2 [|_ ; FcId.Parse fc|]), (s : Propulsion.Sinks.Event[]) -> Some (fc, s |> Seq.map (fun e -> Unchecked.unbox e.Context)) | _ -> None -let handle maxDop - struct (stream, span) : Async = async { +let handle maxDop stream span: Async = async { match stream, span with - | PipelineEvent.ItemsForFc (fc, items) -> + | PipelineEvent.ItemsForFc (_fc, items) -> // Take chunks of max 1000 in order to make handler latency be less 'lumpy' // What makes sense in terms of a good chunking size will vary depending on the workload in question let ticketIds = seq { for x in items -> x.id } |> Seq.truncate 1000 |> Seq.toArray @@ -59,6 +58,6 @@ let handle maxDop }) let! added = Async.Parallel(maybeAdd, maxDegreeOfParallelism=maxDop) let outcome = { added = Seq.length added; notReady = results.Length - ready.Length; dups = results.Length - ticketIds.Length } - return Propulsion.Streams.SpanResult.PartiallyProcessed ticketIds.Length, outcome + return Propulsion.Sinks.StreamResult.PartiallyProcessed ticketIds.Length, outcome | x -> return failwithf "Unexpected stream %O" x } diff --git a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs index a887ed7fa..3e5d073ed 100644 --- a/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.FeedConsumer/Program.fs @@ -1,15 +1,14 @@ module ECommerce.FeedConsumer.Program -open ECommerce.Infrastructure // ConnectStore etc -open ECommerce.Domain // Config etc open Serilog open System type Configuration(tryGet) = inherit Args.Configuration(tryGet) - member _.BaseUri = base.get "API_BASE_URI" - member _.Group = base.get "API_CONSUMER_GROUP" + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" + member _.BaseUri = get "API_BASE_URI" + member _.Group = get "API_CONSUMER_GROUP" let [] AppName = "FeedConsumer" @@ -35,7 +34,7 @@ module Args = | [] Sss of ParseResults interface IArgParserTemplate with member a.Usage = a |> function - | Verbose _ -> "request verbose logging." + | Verbose -> "request verbose logging." | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off (optional if environment variable PROMETHEUS_PORT specified)" | Group _ -> "specify Api Consumer Group Id. (optional if environment variable API_CONSUMER_GROUP specified)" | BaseUri _ -> "specify Api endpoint. (optional if environment variable API_BASE_URI specified)" @@ -66,29 +65,29 @@ module Args = | Some (Parameters.Cosmos cosmos) -> Args.StoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Some (Parameters.Dynamo dynamo) -> Args.StoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) | Some (Parameters.Esdb es) -> Args.StoreArgs.Esdb (Args.Esdb.Arguments(c, es)) - | _ -> Args.missingArg "Must specify one of cosmos, dynamo or esdb for store" + | _ -> a.Raise "Must specify one of cosmos, dynamo or esdb for store" member x.VerboseStore = Args.StoreArgs.verboseRequested x.StoreArgs member x.DumpStoreMetrics = Args.StoreArgs.dumpMetrics x.StoreArgs - member x.Connect() : Config.Store<_> * Propulsion.Feed.IFeedCheckpointStore = + member x.Connect() : Store.Config * Propulsion.Feed.IFeedCheckpointStore = let cache = Equinox.Cache(AppName, sizeMb = x.CacheSizeMb) - let createCheckpoints = Args.Checkpoints.create (x.ConsumerGroupName, x.CheckpointInterval) Config.log + let createCheckpoints = Args.Checkpoints.create (x.ConsumerGroupName, x.CheckpointInterval) Store.Metrics.log match x.StoreArgs with | Args.StoreArgs.Cosmos a -> - let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create - let store = Config.Store.Cosmos (context, cache) - store, createCheckpoints (Args.Checkpoints.Store.Cosmos (context, cache)) + let context = a.Connect() |> Async.RunSynchronously + let store = Store.Config.Cosmos (context, cache) + store, createCheckpoints (Args.Checkpoints.Config.Cosmos (context, cache)) | Args.StoreArgs.Dynamo a -> - let context = a.Connect() |> DynamoStoreContext.create - let store = Config.Store.Dynamo (context, cache) - store, createCheckpoints (Args.Checkpoints.Store.Dynamo (context, cache)) + let context = a.Connect() + let store = Store.Config.Dynamo (context, cache) + store, createCheckpoints (Args.Checkpoints.Config.Dynamo (context, cache)) | Args.StoreArgs.Esdb a -> let context = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) |> EventStoreContext.create - let store = Config.Store.Esdb (context, cache) + let store = Store.Config.Esdb (context, cache) let checkpointStore = a.ConnectCheckpointStore(cache) store, createCheckpoints checkpointStore | Args.StoreArgs.Sss a -> let context = a.Connect() |> SqlStreamStoreContext.create - let store = Config.Store.Sss (context, cache) + let store = Store.Config.Sss (context, cache) let checkpointStore = a.CreateCheckpointStoreSql(x.ConsumerGroupName) store, checkpointStore @@ -105,14 +104,14 @@ let build (args : Args.Arguments) = let sink = let handle = Ingester.handle args.TicketsDop let stats = Ingester.Stats(log, args.StatsInterval, args.StateInterval, logExternalStats = args.DumpStoreMetrics) - Propulsion.Streams.StreamsSink.Start(log, args.MaxReadAhead, args.FcsDop, handle, stats, args.StatsInterval, Propulsion.Streams.Default.eventSize) + Propulsion.Sinks.Factory.StartConcurrent(log, args.MaxReadAhead, args.FcsDop, handle, stats) let pumpSource = let feed = ApiClient.TicketsFeed args.BaseUri let source = Propulsion.Feed.FeedSource( log, args.StatsInterval, args.SourceId, args.TailSleepInterval, - checkpoints, sink, feed.Poll) - source.Pump feed.ReadTranches + checkpoints, sink) + source.Start(feed.ReadTranches, fun t p -> feed.Poll(t, p)) sink, pumpSource open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException @@ -121,7 +120,7 @@ let run args = async { let sink, source = build args use _ = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException() - source + source.AwaitWithStopOnCancellation() sink.AwaitWithStopOnCancellation() |] |> Async.Parallel |> Async.Ignore } @@ -131,8 +130,7 @@ let main argv = try let metrics = Sinks.equinoxAndPropulsionFeedConsumerMetrics (Sinks.tags AppName) Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintfn "Exception %s" e.Message; 1 + with:? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs index 619c327dc..e139546d7 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Args.fs @@ -1,12 +1,8 @@ /// Commandline arguments and/or secrets loading specifications -module ECommerce.Infrastructure.Args +module Args -module Config = ECommerce.Domain.Config open System -exception MissingArg of message : string with override this.Message = this.message -let missingArg msg = raise (MissingArg msg) - let [] REGION = "EQUINOX_DYNAMO_REGION" let [] SERVICE_URL = "EQUINOX_DYNAMO_SERVICE_URL" let [] ACCESS_KEY = "EQUINOX_DYNAMO_ACCESS_KEY_ID" @@ -16,29 +12,29 @@ let [] INDEX_TABLE = "EQUINOX_DYNAMO_TABLE_INDEX" type Configuration(tryGet : string -> string option) = + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" member val tryGet = tryGet - member _.get key = match tryGet key with Some value -> value | None -> missingArg $"Missing Argument/Environment Variable %s{key}" - member x.CosmosConnection = x.get "EQUINOX_COSMOS_CONNECTION" - member x.CosmosDatabase = x.get "EQUINOX_COSMOS_DATABASE" - member x.CosmosContainer = x.get "EQUINOX_COSMOS_CONTAINER" + member _.CosmosConnection = get "EQUINOX_COSMOS_CONNECTION" + member _.CosmosDatabase = get "EQUINOX_COSMOS_DATABASE" + member _.CosmosContainer = get "EQUINOX_COSMOS_CONTAINER" - member x.DynamoServiceUrl = x.get SERVICE_URL - member x.DynamoAccessKey = x.get ACCESS_KEY - member x.DynamoSecretKey = x.get SECRET_KEY - member x.DynamoTable = x.get TABLE - member x.DynamoRegion = x.tryGet REGION + member _.DynamoServiceUrl = get SERVICE_URL + member _.DynamoAccessKey = get ACCESS_KEY + member _.DynamoSecretKey = get SECRET_KEY + member _.DynamoTable = get TABLE + member _.DynamoRegion = tryGet REGION - member x.EventStoreConnection = x.get "EQUINOX_ES_CONNECTION" - member x.EventStoreCredentials = x.get "EQUINOX_ES_CREDENTIALS" + member _.EventStoreConnection = get "EQUINOX_ES_CONNECTION" + // member _.EventStoreCredentials = get "EQUINOX_ES_CREDENTIALS" member _.MaybeEventStoreConnection = tryGet "EQUINOX_ES_CONNECTION" member _.MaybeEventStoreCredentials = tryGet "EQUINOX_ES_CREDENTIALS" - member x.SqlStreamStoreConnection = x.get "SQLSTREAMSTORE_CONNECTION" - member x.SqlStreamStoreCredentials = tryGet "SQLSTREAMSTORE_CREDENTIALS" - member x.SqlStreamStoreCredentialsCheckpoints = tryGet "SQLSTREAMSTORE_CREDENTIALS_CHECKPOINTS" - member x.SqlStreamStoreDatabase = x.get "SQLSTREAMSTORE_DATABASE" - member x.SqlStreamStoreContainer = x.get "SQLSTREAMSTORE_CONTAINER" + member _.SqlStreamStoreConnection = get "SQLSTREAMSTORE_CONNECTION" + member _.SqlStreamStoreCredentials = tryGet "SQLSTREAMSTORE_CREDENTIALS" + member _.SqlStreamStoreCredentialsCheckpoints = tryGet "SQLSTREAMSTORE_CREDENTIALS_CHECKPOINTS" + member _.SqlStreamStoreDatabase = get "SQLSTREAMSTORE_DATABASE" + member _.SqlStreamStoreContainer = get "SQLSTREAMSTORE_CONTAINER" member x.PrometheusPort = tryGet "PROMETHEUS_PORT" |> Option.map int @@ -47,9 +43,9 @@ type Configuration(tryGet : string -> string option) = module Checkpoints = [] - type Store = - | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Core.ICache - | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.Core.ICache + type Config = + | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Cache + | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.Cache (* Propulsion.EventStoreDb does not implement a native checkpoint storage mechanism, perhaps port https://github.com/absolutejam/Propulsion.EventStoreDB ? or fork/finish https://github.com/jet/dotnet-templates/pull/81 @@ -57,20 +53,20 @@ module Checkpoints = For now, we store the Checkpoints in one of the above stores as this sample uses one for the read models anyway *) - let create (consumerGroup, checkpointInterval) storeLog : Store -> Propulsion.Feed.IFeedCheckpointStore = function - | Store.Cosmos (context, cache) -> + let create (consumerGroup, checkpointInterval) storeLog: Config -> Propulsion.Feed.IFeedCheckpointStore = function + | Config.Cosmos (context, cache) -> Propulsion.Feed.ReaderCheckpoint.CosmosStore.create storeLog (consumerGroup, checkpointInterval) (context, cache) - | Store.Dynamo (context, cache) -> + | Config.Dynamo (context, cache) -> Propulsion.Feed.ReaderCheckpoint.DynamoStore.create storeLog (consumerGroup, checkpointInterval) (context, cache) let createCheckpointStore (group, checkpointInterval, store) : Propulsion.Feed.IFeedCheckpointStore = let checkpointStore = match store with - | Config.Store.Cosmos (context, cache) -> Store.Cosmos (context, cache) - | Config.Store.Dynamo (context, cache) -> Store.Dynamo (context, cache) - | Config.Store.Esdb _ - | Config.Store.Memory _ - | Config.Store.Sss _ -> failwith "unexpected" - create (group, checkpointInterval) Config.log checkpointStore + | Store.Config.Cosmos (context, cache) -> Config.Cosmos (context, cache) + | Store.Config.Dynamo (context, cache) -> Config.Dynamo (context, cache) + | Store.Config.Esdb _ + | Store.Config.Memory _ + | Store.Config.Sss _ -> failwith "unexpected" + create (group, checkpointInterval) Store.Metrics.log checkpointStore open Argu @@ -96,17 +92,17 @@ module Cosmos = | Retries _ -> "specify operation retries (default: 1)." | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds (default: 5)" type Arguments(c : Configuration, p : ParseResults) = - let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) let discovery = Equinox.CosmosStore.Discovery.ConnectionString connection let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 1) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let container = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let container = p.GetResult(Container, fun () -> c.CosmosContainer) member val Verbose = p.Contains Verbose - member _.Connect() = connector.ConnectStore("Target", database, container) + member _.Connect() = connector.Connect("Target", database, container) module Dynamo = @@ -150,9 +146,7 @@ module Dynamo = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) member val Verbose = p.Contains Verbose - member _.Connect() = connector.LogConfiguration() - let client = connector.CreateClient() - client.ConnectStore("Main", table) + member _.Connect() = connector.CreateClient().CreateContext("Main", table) module Esdb = @@ -198,16 +192,16 @@ module Esdb = match a.GetSubCommand() with | Cosmos cosmos -> SecondaryStoreArgs.Cosmos (Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> SecondaryStoreArgs.Dynamo (Dynamo.Arguments(c, dynamo)) - | _ -> missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> a.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member x.ConnectCheckpointStore(cache) = match x.SecondaryStoreArgs with | SecondaryStoreArgs.Cosmos a -> - let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create - Checkpoints.Store.Cosmos (context, cache) + let context = a.Connect() |> Async.RunSynchronously + Checkpoints.Config.Cosmos (context, cache) | SecondaryStoreArgs.Dynamo a -> - let context = a.Connect() |> DynamoStoreContext.create - Checkpoints.Store.Dynamo (context, cache) + let context = a.Connect() + Checkpoints.Config.Dynamo (context, cache) and [] SecondaryStoreArgs = @@ -238,7 +232,6 @@ module Sss = let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.SqlStreamStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.SqlStreamStoreCredentials) |> Option.toObj let schema = p.GetResult(Schema, null) - let checkpointEventInterval = TimeSpan.FromHours 1. // Ignored when storing to Propulsion.SqlStreamStore.ReaderCheckpoint member x.Connect() = let conn, creds, schema, autoCreate = connection, credentials, schema, false @@ -258,7 +251,7 @@ module Sss = cs member x.CreateCheckpointStoreSql(groupName) : Propulsion.Feed.IFeedCheckpointStore = let connectionString = x.BuildCheckpointsConnectionString() - Propulsion.SqlStreamStore.ReaderCheckpoint.Service(connectionString, groupName, checkpointEventInterval) + Propulsion.SqlStreamStore.ReaderCheckpoint.Service(connectionString, groupName) type [] StoreArgs = @@ -272,17 +265,17 @@ module StoreArgs = let connectTarget targetStore cache = match targetStore with | StoreArgs.Cosmos a -> - let context = a.Connect() |> Async.RunSynchronously |> CosmosStoreContext.create - Config.Store.Cosmos (context, cache) + let context = a.Connect() |> Async.RunSynchronously + Store.Config.Cosmos (context, cache) | StoreArgs.Dynamo a -> - let context = a.Connect() |> DynamoStoreContext.create - Config.Store.Dynamo (context, cache) + let context = a.Connect() + Store.Config.Dynamo (context, cache) | StoreArgs.Esdb a -> let context = a.Connect(Serilog.Log.Logger, "Main", EventStore.Client.NodePreference.Leader) |> EventStoreContext.create - Config.Store.Esdb (context, cache) + Store.Config.Esdb (context, cache) | StoreArgs.Sss a -> let context = a.Connect() |> SqlStreamStoreContext.create - Config.Store.Sss (context, cache) + Store.Config.Sss (context, cache) let verboseRequested = function | StoreArgs.Cosmos a -> a.Verbose | StoreArgs.Dynamo a -> a.Verbose diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj index b27aa1898..271393a4f 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/ECommerce.Infrastructure.fsproj @@ -11,17 +11,17 @@ - + - - - + + + - - - - - + + + + + diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs index 73549fede..da00aece9 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/Infrastructure.fs @@ -1,5 +1,5 @@ [] -module ECommerce.Infrastructure.Helpers +module Helpers open Serilog open System @@ -13,42 +13,55 @@ module EnvVar = let tryGet varName : string option = Environment.GetEnvironmentVariable varName |> Option.ofObj +type Equinox.CosmosStore.CosmosStoreContext with + + member x.LogConfiguration(role, databaseId: string, containerId: string) = + Log.Information("CosmosStore {role:l} {db}/{container} Tip maxEvents {maxEvents} maxSize {maxJsonLen} Query maxItems {queryMaxItems}", + role, databaseId, containerId, x.TipOptions.MaxEvents, x.TipOptions.MaxJsonLength, x.QueryOptions.MaxItems) + +type Equinox.CosmosStore.CosmosStoreClient with + + member x.CreateContext(role: string, databaseId, containerId, tipMaxEvents, ?queryMaxItems, ?tipMaxJsonLength, ?skipLog) = + let c = Equinox.CosmosStore.CosmosStoreContext(x, databaseId, containerId, tipMaxEvents, ?queryMaxItems = queryMaxItems, ?tipMaxJsonLength = tipMaxJsonLength) + if skipLog = Some true then () else c.LogConfiguration(role, databaseId, containerId) + c + +module CosmosStoreConnector = + + let private get (role: string) (client: Microsoft.Azure.Cosmos.CosmosClient) databaseId containerId = + Log.Information("CosmosDB {role} Database {database} Container {container}", role, databaseId, containerId) + client.GetDatabase(databaseId).GetContainer(containerId) + let getSource c = get "Source" c + let getLeases c = get "Leases" c + let getSourceAndLeases client databaseId containerId auxContainerId = + getSource client databaseId containerId, getLeases client databaseId auxContainerId + type Equinox.CosmosStore.CosmosStoreConnector with - member private x.LogConfiguration(connectionName, databaseId, containerId) = + member private x.LogConfiguration(role, databaseId: string, containers: string[]) = let o = x.Options let timeout, retries429, timeout429 = o.RequestTimeout, o.MaxRetryAttemptsOnRateLimitedRequests, o.MaxRetryWaitTimeOnRateLimitedRequests - Log.Information("CosmosDb {name} {mode} {endpointUri} timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", - connectionName, o.ConnectionMode, x.Endpoint, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds) - Log.Information("CosmosDb {name} Database {database} Container {container}", - connectionName, databaseId, containerId) - - /// Use sparingly; in general one wants to use CreateAndInitialize to avoid slow first requests - member x.CreateUninitialized(databaseId, containerId) = - x.CreateUninitialized().GetDatabase(databaseId).GetContainer(containerId) - - /// Connect a CosmosStoreClient, including warming up - member x.ConnectStore(connectionName, databaseId, containerId) = - x.LogConfiguration(connectionName, databaseId, containerId) - Equinox.CosmosStore.CosmosStoreClient.Connect(x.CreateAndInitialize, databaseId, containerId) - - /// Creates a CosmosClient suitable for running a CFP via CosmosStoreSource - member x.ConnectMonitored(databaseId, containerId, ?connectionName) = - x.LogConfiguration(defaultArg connectionName "Source", databaseId, containerId) - x.CreateUninitialized(databaseId, containerId) - - /// Connects to a Store as both a ChangeFeedProcessor Monitored Container and a CosmosStoreClient - member x.ConnectStoreAndMonitored(databaseId, containerId) = - let monitored = x.ConnectMonitored(databaseId, containerId, "Main") - let storeClient = Equinox.CosmosStore.CosmosStoreClient(monitored.Database.Client, databaseId, containerId) - storeClient, monitored - -module CosmosStoreContext = - - /// Create with default packing and querying policies. Search for other `module CosmosStoreContext` impls for custom variations - let create (storeClient : Equinox.CosmosStore.CosmosStoreClient) = - let maxEvents = 256 - Equinox.CosmosStore.CosmosStoreContext(storeClient, tipMaxEvents=maxEvents) + Log.Information("CosmosDB {role} {mode} {endpointUri} {db} {containers} timeout {timeout}s Throttling retries {retries}, max wait {maxRetryWaitTime}s", + role, o.ConnectionMode, x.Endpoint, databaseId, containers, timeout.TotalSeconds, retries429, let t = timeout429.Value in t.TotalSeconds) + member private x.CreateAndInitialize(role, databaseId, containers) = + x.LogConfiguration(role, databaseId, containers) + x.CreateAndInitialize(databaseId, containers) + // member private x.Connect(role, databaseId, containers) = + // x.LogConfiguration(role, databaseId, containers) + // x.Connect(databaseId, containers) + member private x.ConnectContexts(role, databaseId, containerId, ?auxContainerId): Async<_ * Equinox.CosmosStore.CosmosStoreContext> = async { + let! cosmosClient = x.CreateAndInitialize(role, databaseId, [| yield containerId; yield! Option.toList auxContainerId |]) + let client = Equinox.CosmosStore.CosmosStoreClient(cosmosClient) + let contexts = client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 100) + return cosmosClient, contexts } + /// Connect to the database (including verifying and warming up relevant containers), establish relevant CosmosStoreContexts required by Domain + member x.Connect(role, databaseId, containerId: string) = async { + let! _client, contexts = x.ConnectContexts(role, databaseId, containerId) + return contexts } + member x.ConnectWithFeed(databaseId, containerId, auxContainerId) = async { + let! client, context = x.ConnectContexts("Main", databaseId, containerId, auxContainerId) + let source, leases = CosmosStoreConnector.getSourceAndLeases client databaseId containerId auxContainerId + return context, source, leases } type Equinox.DynamoStore.DynamoStoreConnector with @@ -56,33 +69,24 @@ type Equinox.DynamoStore.DynamoStoreConnector with Log.Information("DynamoStore {endpoint} Timeout {timeoutS}s Retries {retries}", x.Endpoint, (let t = x.Timeout in t.TotalSeconds), x.Retries) + member x.CreateClient() = + x.LogConfiguration() + x.CreateDynamoStoreClient() + type Equinox.DynamoStore.DynamoStoreClient with - member internal x.LogConfiguration(role, ?log) = - (defaultArg log Log.Logger).Information("DynamoStore {role:l} Table {table} Archive {archive}", role, x.TableName, Option.toObj x.ArchiveTableName) - member client.CreateCheckpointService(consumerGroupName, cache, log, ?checkpointInterval) = - let checkpointInterval = defaultArg checkpointInterval (TimeSpan.FromHours 1.) - let context = Equinox.DynamoStore.DynamoStoreContext(client) - Propulsion.Feed.ReaderCheckpoint.DynamoStore.create log (consumerGroupName, checkpointInterval) (context, cache) + member x.CreateContext(role, table, ?queryMaxItems, ?maxBytes, ?archiveTableName: string) = + let queryMaxItems = defaultArg queryMaxItems 100 + let c = Equinox.DynamoStore.DynamoStoreContext(x, table, queryMaxItems = queryMaxItems, ?maxBytes = maxBytes, ?archiveTableName = archiveTableName) + Log.Information("DynamoStore {role:l} Table {table} Archive {archive} Tip thresholds: {maxTipBytes}b {maxTipEvents}e Query paging {queryMaxItems} items", + role, table, Option.toObj archiveTableName, c.TipOptions.MaxBytes, Option.toNullable c.TipOptions.MaxEvents, c.QueryOptions.MaxItems) + c type Equinox.DynamoStore.DynamoStoreContext with - member internal x.LogConfiguration(log : ILogger) = - log.Information("DynamoStore Tip thresholds: {maxTipBytes}b {maxTipEvents}e Query Paging {queryMaxItems} items", - x.TipOptions.MaxBytes, Option.toNullable x.TipOptions.MaxEvents, x.QueryOptions.MaxItems) - -type Amazon.DynamoDBv2.IAmazonDynamoDB with - - member x.ConnectStore(role, table) = - let storeClient = Equinox.DynamoStore.DynamoStoreClient(x, table) - storeClient.LogConfiguration(role) - storeClient - -module DynamoStoreContext = - - /// Create with default packing and querying policies. Search for other `module DynamoStoreContext` impls for custom variations - let create (storeClient : Equinox.DynamoStore.DynamoStoreClient) = - Equinox.DynamoStore.DynamoStoreContext(storeClient, queryMaxItems = 100) + member context.CreateCheckpointService(consumerGroupName, cache, log, ?checkpointInterval) = + let checkpointInterval = defaultArg checkpointInterval (TimeSpan.FromHours 1.) + Propulsion.Feed.ReaderCheckpoint.DynamoStore.create log (consumerGroupName, checkpointInterval) (context, cache) module EventStoreContext = @@ -109,16 +113,16 @@ module Sinks = let equinoxAndPropulsionMetrics tags (l : LoggerConfiguration) = l |> equinoxMetricsOnly tags - |> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags)) + |> _.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags)) let equinoxAndPropulsionReactorMetrics tags (l : LoggerConfiguration) = l |> equinoxAndPropulsionMetrics tags - |> fun l -> l.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) - .WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags)) // Esdb and Dynamo indirectly provide metrics via Feed + |> _.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) + .WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags)) // Esdb and Dynamo indirectly provide metrics via Feed let equinoxAndPropulsionFeedConsumerMetrics tags (l : LoggerConfiguration) = l |> equinoxAndPropulsionMetrics tags - |> fun l -> l.WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags)) + |> _.WriteTo.Sink(Propulsion.Feed.Prometheus.LogSink(tags)) let console (configuration : LoggerConfiguration) = let t = "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj} {NewLine}{Exception}" diff --git a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceConfig.fs b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceConfig.fs index abb7b968a..a8553efd2 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceConfig.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Infrastructure/SourceConfig.fs @@ -1,21 +1,23 @@ -namespace ECommerce.Infrastructure +namespace global open System +open System.Threading.Tasks [] type SourceConfig = | Memory of store : Equinox.MemoryStore.VolatileStore)> - | Cosmos of monitoredContainer : Microsoft.Azure.Cosmos.Container - * leasesContainer : Microsoft.Azure.Cosmos.Container - * checkpoints : CosmosFeedConfig - * tailSleepInterval : TimeSpan - | Dynamo of indexStore : Equinox.DynamoStore.DynamoStoreClient - * checkpoints : Propulsion.Feed.IFeedCheckpointStore - * loading : DynamoLoadModeConfig - * startFromTail : bool - * batchSizeCutoff : int - * tailSleepInterval : TimeSpan - * statsInterval : TimeSpan + | Cosmos of monitoredContainer: Microsoft.Azure.Cosmos.Container + * leasesContainer: Microsoft.Azure.Cosmos.Container + * checkpoints: CosmosFeedConfig + * tailSleepInterval: TimeSpan + * statsInterval: TimeSpan + | Dynamo of indexContext: Equinox.DynamoStore.DynamoStoreContext + * checkpoints: Propulsion.Feed.IFeedCheckpointStore + * loading: Propulsion.DynamoStore.EventLoadMode + * startFromTail: bool + * batchSizeCutoff: int + * tailSleepInterval: TimeSpan + * statsInterval: TimeSpan | Esdb of client : EventStore.Client.EventStoreClient * checkpoints : Propulsion.Feed.IFeedCheckpointStore * hydrateBodies : bool @@ -39,77 +41,74 @@ and [] DynamoLoadModeConfig = module SourceConfig = module Memory = open Propulsion.MemoryStore - let start log (sink : Propulsion.Streams.Default.Sink) categoryFilter - (store : Equinox.MemoryStore.VolatileStore<_>) : Propulsion.Pipeline * (TimeSpan -> Async) option = - let source = MemoryStoreSource(log, store, categoryFilter, sink) - source.Start(), Some (fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false)) + let start log (sink: Propulsion.Sinks.SinkPipeline) (categories: string[]) + (store: Equinox.MemoryStore.VolatileStore<_>): Propulsion.Pipeline * (TimeSpan -> Task) = + let source = MemoryStoreSource(log, store, categories, sink) + source.Start(), fun _propagationDelay -> source.Monitor.AwaitCompletion(ignoreSubsequent = false) module Cosmos = open Propulsion.CosmosStore - let start log (sink : Propulsion.Streams.Default.Sink) categoryFilter - (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval) : Propulsion.Pipeline * (TimeSpan -> Async) option = - let parseFeedDoc = EquinoxSystemTextJsonParser.enumStreamEvents categoryFilter - let observer = CosmosStoreSource.CreateObserver(log, sink.StartIngester, Seq.collect parseFeedDoc) + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (monitoredContainer, leasesContainer, checkpointConfig, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = + let parseFeedDoc = EquinoxSystemTextJsonParser.ofCategories categories let source = match checkpointConfig with | Ephemeral processorName -> - let withStartTime1sAgo (x : Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder) = + let withStartTime1sAgo (x: Microsoft.Azure.Cosmos.ChangeFeedProcessorBuilder) = x.WithStartTime(let t = DateTime.UtcNow in t.AddSeconds -1.) let lagFrequency = TimeSpan.FromMinutes 1. - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = true, customize = withStartTime1sAgo, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() | Persistent (processorName, startFromTail, maxItems, lagFrequency) -> - CosmosStoreSource.Start(log, monitoredContainer, leasesContainer, processorName, observer, - startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, - lagReportFreq = lagFrequency) - source, None + CosmosStoreSource(log, statsInterval, monitoredContainer, leasesContainer, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, + lagEstimationInterval = lagFrequency).Start() + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Dynamo = open Propulsion.DynamoStore - let start (log, storeLog) (sink : Propulsion.Streams.Default.Sink) categoryFilter - (indexStore, checkpoints, loadModeConfig, startFromTail, tailSleepInterval, batchSizeCutoff, statsInterval) : Propulsion.Pipeline * (TimeSpan -> Async) option = - let loadMode = - match loadModeConfig with - | Hydrate (monitoredContext, hydrationConcurrency) -> LoadMode.Hydrated (categoryFilter, hydrationConcurrency, monitoredContext) - let source = - DynamoStoreSource( - log, statsInterval, - indexStore, batchSizeCutoff, tailSleepInterval, - checkpoints, sink, loadMode, - startFromTail = startFromTail, storeLog = storeLog) - // trancheIds = [|Propulsion.Feed.TrancheId.parse "0"|]) // TEMP filter for additional clones of index data in target Table + let create (log, storeLog) (sink: Propulsion.Sinks.SinkPipeline) categories + (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) trancheIds = + DynamoStoreSource( + log, statsInterval, + indexContext, batchSizeCutoff, tailSleepInterval, + checkpoints, sink, loadMode, categories = categories, + startFromTail = startFromTail, storeLog = storeLog, ?trancheIds = trancheIds) + let start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) + : Propulsion.Pipeline * (TimeSpan -> Task) = + let source = create (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) None let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Esdb = open Propulsion.EventStoreDb - let start log (sink : Propulsion.Streams.Default.Sink) categoryFilter - (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) : Propulsion.Pipeline * (TimeSpan -> Async) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) categories + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval): Propulsion.Pipeline * (TimeSpan -> Task) = let source = EventStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, - checkpoints, sink, categoryFilter, hydrateBodies = hydrateBodies, startFromTail = startFromTail) + checkpoints, sink, categories, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) module Sss = open Propulsion.SqlStreamStore - let start log (sink : Propulsion.Streams.Default.Sink) categoryFilter - (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) : Propulsion.Pipeline * (TimeSpan -> Async) option = + let start log (sink: Propulsion.Sinks.SinkPipeline) categoryFilter + (client, checkpoints, withData, startFromTail, batchSize, tailSleepInterval, statsInterval) : Propulsion.Pipeline * (TimeSpan -> Task) = let source = SqlStreamStoreSource( log, statsInterval, client, batchSize, tailSleepInterval, - checkpoints, sink, categoryFilter, hydrateBodies = hydrateBodies, startFromTail = startFromTail) + checkpoints, sink, categoryFilter, withData = withData, startFromTail = startFromTail) let source = source.Start() - source, Some (fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false)) + source, fun propagationDelay -> source.Monitor.AwaitCompletion(propagationDelay, ignoreSubsequent = false) - let start (log, storeLog) sink categoryFilter : SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Async) option = function + let start (log, storeLog) sink categories: SourceConfig -> Propulsion.Pipeline * (TimeSpan -> Task) = function | SourceConfig.Memory volatileStore -> - Memory.start log sink categoryFilter volatileStore - | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) -> - Cosmos.start log sink categoryFilter (monitored, leases, checkpointConfig, tailSleepInterval) - | SourceConfig.Dynamo (indexStore, checkpoints, loading, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> - Dynamo.start (log, storeLog) sink categoryFilter (indexStore, checkpoints, loading, startFromTail, tailSleepInterval, batchSizeCutoff, statsInterval) + Memory.start log sink categories volatileStore + | SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) -> + Cosmos.start log sink categories (monitored, leases, checkpointConfig, tailSleepInterval, statsInterval) + | SourceConfig.Dynamo (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) -> + Dynamo.start (log, storeLog) sink categories (indexContext, checkpoints, loadMode, startFromTail, batchSizeCutoff, tailSleepInterval, statsInterval) | SourceConfig.Esdb (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) -> - Esdb.start log sink categoryFilter (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) + Esdb.start log sink categories (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) | SourceConfig.Sss (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) -> - Sss.start log sink categoryFilter (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) + Sss.start log sink categories (client, checkpoints, hydrateBodies, startFromTail, batchSize, tailSleepInterval, statsInterval) diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs index 42319aedf..cd65852e1 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/Program.fs @@ -1,12 +1,9 @@ module ECommerce.Reactor.Program -open ECommerce.Infrastructure open Serilog open System open System.Threading.Tasks -module Config = ECommerce.Domain.Config - let [] AppName = "ECommerce.Reactor" module Args = @@ -35,32 +32,32 @@ module Args = | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." | StateIntervalM _ -> "Interval at which to report Propulsion Statistics. Default: 10" | IdleDelayMs _ -> "Idle delay for scheduler. Default 1000ms" - | WakeForResults _ -> "Wake for all results to provide optimal throughput" + | WakeForResults -> "Wake for all results to provide optimal throughput" | Cosmos _ -> "specify CosmosDB input parameters." | Dynamo _ -> "specify DynamoDB input parameters." | Esdb _ -> "specify EventStoreDB input parameters." - type Arguments(c : SourceArgs.Configuration, a : ParseResults) = - let maxReadAhead = a.GetResult(MaxReadAhead, 2) - let maxConcurrentProcessors = a.GetResult(MaxWriters, 8) - let consumerGroupName = a.GetResult ConsumerGroupName + type Arguments(c : SourceArgs.Configuration, p : ParseResults) = + let maxReadAhead = p.GetResult(MaxReadAhead, 2) + let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) + let consumerGroupName = p.GetResult ConsumerGroupName member _.ProcessorParams() = Log.Information("Reacting... {consumerGroupName}, reading {maxReadAhead} ahead, {dop} writers", consumerGroupName, maxReadAhead, maxConcurrentProcessors) (consumerGroupName, maxReadAhead, maxConcurrentProcessors) - member val Verbose = a.Contains Verbose - member val PrometheusPort = a.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) + member val Verbose = p.Contains Verbose + member val PrometheusPort = p.TryGetResult PrometheusPort |> Option.orElseWith (fun () -> c.PrometheusPort) member val CacheSizeMb = 10 member val StatsInterval = TimeSpan.FromMinutes 1. - member val StateInterval = a.GetResult(StateIntervalM, 10.) |> TimeSpan.FromMinutes + member val StateInterval = p.GetResult(StateIntervalM, 10.) |> TimeSpan.FromMinutes member val PurgeInterval = TimeSpan.FromHours 1. - member val IdleDelay = a.GetResult(IdleDelayMs, 1000) |> TimeSpan.FromMilliseconds - member val WakeForResults = a.Contains WakeForResults + member val IdleDelay = p.GetResult(IdleDelayMs, 1000) |> TimeSpan.FromMilliseconds + member val WakeForResults = p.Contains WakeForResults member val Store : Choice = - match a.GetSubCommand() with + match p.GetSubCommand() with | Cosmos a -> Choice1Of3 <| SourceArgs.Cosmos.Arguments(c, a) | Dynamo a -> Choice2Of3 <| SourceArgs.Dynamo.Arguments(c, a) | Esdb a -> Choice3Of3 <| SourceArgs.Esdb.Arguments(c, a) - | a -> Args.missingArg $"Unexpected Store subcommand %A{a}" + | a -> p.Raise $"Unexpected Store subcommand %A{a}" member x.VerboseStore = match x.Store with | Choice1Of3 s -> s.Verbose | Choice2Of3 s -> s.Verbose @@ -70,31 +67,30 @@ module Args = | Choice2Of3 _ -> Equinox.DynamoStore.Core.Log.InternalMetrics.dump | Choice3Of3 _ -> Equinox.EventStoreDb.Log.InternalMetrics.dump - member x.ConnectStoreSourceAndTarget() : Config.Store<_> * (ILogger -> string -> SourceConfig) * Config.Store<_> = + member x.ConnectStoreSourceAndTarget() : Store.Config * (ILogger -> string -> SourceConfig) * Store.Config= let cache = Equinox.Cache (AppName, sizeMb = x.CacheSizeMb) match x.Store with | Choice1Of3 a -> - let client, monitored = a.ConnectStoreAndMonitored() - let buildSourceConfig log groupName = - let leases, startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams(log) + let context, monitored, leases = a.ConnectWithFeed() |> Async.RunSynchronously + let buildSourceConfig _log groupName = + let startFromTail, maxItems, tailSleepInterval, lagFrequency = a.MonitoringParams let checkpointConfig = CosmosFeedConfig.Persistent (groupName, startFromTail, maxItems, lagFrequency) - SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval) - let context = client |> CosmosStoreContext.create - let store = Config.Store.Cosmos (context, cache) + SourceConfig.Cosmos (monitored, leases, checkpointConfig, tailSleepInterval, x.StatsInterval) + let store = Store.Config.Cosmos (context, cache) store, buildSourceConfig, store | Choice2Of3 a -> let context = a.Connect() let buildSourceConfig log groupName = - let indexStore, startFromTail, batchSizeCutoff, tailSleepInterval, streamsDop = a.MonitoringParams(log) + let indexContext, startFromTail, batchSizeCutoff, tailSleepInterval, streamsDop = a.MonitoringParams(log) let checkpoints = a.CreateCheckpointStore(groupName, cache) - let load = DynamoLoadModeConfig.Hydrate (context, streamsDop) - SourceConfig.Dynamo (indexStore, checkpoints, load, startFromTail, batchSizeCutoff, tailSleepInterval, x.StatsInterval) - let store = Config.Store.Dynamo (context, cache) + let load = Propulsion.DynamoStore.WithData (streamsDop, context) + SourceConfig.Dynamo (indexContext, checkpoints, load, startFromTail, batchSizeCutoff, tailSleepInterval, x.StatsInterval) + let store = Store.Config.Dynamo (context, cache) store, buildSourceConfig, store | Choice3Of3 a -> let connection = a.Connect(Log.Logger, AppName, EventStore.Client.NodePreference.Leader) let context = connection |> EventStoreContext.create - let store = Config.Store.Esdb (context, cache) + let store = Store.Config.Esdb (context, cache) let targetStore = a.ConnectTarget(cache) let buildSourceConfig log groupName = let startFromTail, maxItems, tailSleepInterval = a.MonitoringParams(log) @@ -115,13 +111,13 @@ let build (args : Args.Arguments) = let consumerGroupName, maxReadAhead, maxConcurrentStreams = args.ProcessorParams() let store, buildSourceConfig, targetStore = args.ConnectStoreSourceAndTarget() let log = Log.forGroup consumerGroupName // needs to have a `group` tag for Propulsion.Streams Prometheus metrics - let filter, handle = Reactor.Config.create (store, targetStore) + let handle = Reactor.Config.create (store, targetStore) let stats = Reactor.Stats(log, args.StatsInterval, args.StateInterval, args.VerboseStore, logExternalStats = args.DumpStoreMetrics) let sink = Reactor.Config.StartSink(log, stats, handle, maxReadAhead, maxConcurrentStreams, wakeForResults = args.WakeForResults, idleDelay = args.IdleDelay, purgeInterval = args.PurgeInterval) let source, _awaitSource = let sourceConfig = buildSourceConfig log consumerGroupName - Reactor.Config.StartSource(log, sink, sourceConfig, filter) + Reactor.Config.StartSource(log, sink, sourceConfig) sink, source let run (args : Args.Arguments) = async { @@ -138,8 +134,7 @@ let main argv = let metrics = Sinks.tags AppName |> Sinks.equinoxAndPropulsionReactorMetrics try Log.Logger <- LoggerConfiguration().Configure(verbose=args.Verbose).Sinks(metrics, args.VerboseStore).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? Args.MissingArg) && not (e :? TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with e when not (e :? TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with Args.MissingArg msg -> eprintfn "%s" msg; 1 - | :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 - | e -> eprintfn "Exception %s" e.Message; 1 + with:? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs index c8aaed510..f6081741e 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/Reactor.fs @@ -1,7 +1,6 @@ module ECommerce.Reactor.Reactor open ECommerce.Domain -open ECommerce.Infrastructure // Exception open Metrics /// Gathers stats based on the outcome of each Span processed for emission, at intervals controlled by `StreamsConsumer` @@ -26,16 +25,14 @@ type Stats(log, statsInterval, stateInterval, verboseStore, ?logExternalStats) = match logExternalStats with None -> () | Some f -> f Serilog.Log.Logger base.DumpStats() -let isReactionStream = function - | ShoppingCart.Category -> true - | _ -> false +let reactionCategories = [| ShoppingCart.CategoryName |] let handle (cartSummary : ShoppingCartSummaryHandler.Service) (confirmedCarts : ConfirmedHandler.Service) - struct (stream, span) : Async = async { - match stream, span with - | ShoppingCart.Reactions.Parse (cartId, events) -> + stream span : Async = async { + match struct (stream, span) with + | ShoppingCart.Reactions.Decode (cartId, events) -> match events with | ShoppingCart.Reactions.Confirmed -> let! _done = confirmedCarts.TrySummarizeConfirmed(cartId) in () @@ -44,8 +41,8 @@ let handle | ShoppingCart.Reactions.StateChanged -> let! worked, version' = cartSummary.TryIngestSummary(cartId) let outcome = if worked then Outcome.Ok (1, Array.length span - 1) else Outcome.Skipped span.Length - return Propulsion.Streams.SpanResult.OverrideWritePosition version', outcome - | _ -> return Propulsion.Streams.SpanResult.AllProcessed, Outcome.NotApplicable span.Length + return Propulsion.Sinks.StreamResult.OverrideNextIndex version', outcome + | _ -> return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.NotApplicable span.Length | x -> return failwith $"Invalid event %A{x}" } // should be filtered by isReactionStream module Config = @@ -53,16 +50,16 @@ module Config = let create (sourceStore, targetStore) = let cartSummary = ShoppingCartSummaryHandler.Config.create (sourceStore, targetStore) let confirmedCarts = ConfirmedHandler.Config.create (sourceStore, targetStore) - isReactionStream, handle cartSummary confirmedCarts + handle cartSummary confirmedCarts type Config private () = static member StartSink(log : Serilog.ILogger, stats : Stats, - handle : struct (FsCodec.StreamName * Propulsion.Streams.Default.StreamSpan) -> - Async, + handle,// : (FsCodec.StreamName * Propulsion.Sinks.Event[]) -> + // Async<(Propulsion.Sinks.StreamResult * Outcome)>, maxReadAhead : int, maxConcurrentStreams : int, ?wakeForResults, ?idleDelay, ?purgeInterval) = - Propulsion.Streams.Default.Config.Start(log, maxReadAhead, maxConcurrentStreams, handle, stats, stats.StatsInterval.Period, + Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, maxConcurrentStreams, handle, stats, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay, ?purgeInterval = purgeInterval) - static member StartSource(log, sink, sourceConfig, filter) = - SourceConfig.start (log, Config.log) sink filter sourceConfig + static member StartSource(log, sink, sourceConfig) = + SourceConfig.start (log, Store.Metrics.log) sink reactionCategories sourceConfig diff --git a/Sample/ECommerce.Equinox/ECommerce.Reactor/SourceArgs.fs b/Sample/ECommerce.Equinox/ECommerce.Reactor/SourceArgs.fs index 444e599a9..d19fd1945 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Reactor/SourceArgs.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Reactor/SourceArgs.fs @@ -1,11 +1,9 @@ -module ECommerce.Infrastructure.SourceArgs +module SourceArgs open Argu open Serilog open System -module Config = ECommerce.Domain.Config - type Configuration(tryGet) = inherit Args.Configuration(tryGet) member _.DynamoIndexTable = tryGet Args.INDEX_TABLE @@ -38,39 +36,33 @@ module Cosmos = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 30." | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `SourceContainer` + `-aux`." - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | MaxItems _ -> "maximum item count to supply for the Change Feed query. Default: use response size limit" | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" - type Arguments(c : Args.Configuration, p : ParseResults) = - let discovery = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString + type Arguments(c: Args.Configuration, p: ParseResults) = + let discovery = p.GetResult(Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString let mode = p.TryGetResult ConnectionMode let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds let retries = p.GetResult(Retries, 9) let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 30.) |> TimeSpan.FromSeconds let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.TryGetResult Database |> Option.defaultWith (fun () -> c.CosmosDatabase) - let containerId = p.TryGetResult Container |> Option.defaultWith (fun () -> c.CosmosContainer) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") let fromTail = p.Contains FromTail let maxItems = p.TryGetResult MaxItems let tailSleepInterval = TimeSpan.FromMilliseconds 500. let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes - member _.Verbose = p.Contains Verbose - member private _.ConnectLeases() = connector.CreateUninitialized(database, leaseContainerId) - member x.MonitoringParams(log : ILogger) = - let leases : Microsoft.Azure.Cosmos.Container = x.ConnectLeases() - log.Information("ChangeFeed Leases Database {db} Container {container}. MaxItems limited to {maxItems}", - leases.Database.Id, leases.Id, Option.toNullable maxItems) - if fromTail then log.Warning("(If new projector group) Skipping projection of all existing events.") - (leases, fromTail, maxItems, tailSleepInterval, lagFrequency) - member x.ConnectStoreAndMonitored() = - connector.ConnectStoreAndMonitored(database, containerId) + member val Verbose = p.Contains Verbose + member val MonitoringParams = fromTail, maxItems, tailSleepInterval, lagFrequency + member _.ConnectWithFeed() = connector.ConnectWithFeed(database, containerId, leaseContainerId) module Dynamo = type [] Parameters = | [] Verbose + | [] RegionProfile of string | [] ServiceUrl of string | [] AccessKey of string | [] SecretKey of string @@ -85,6 +77,10 @@ module Dynamo = interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "Include low level Store logging." + | RegionProfile _ -> "specify an AWS Region (aka System Name, e.g. \"us-east-1\") to connect to using the implicit AWS SDK/tooling config and/or environment variables etc. Optional if:\n" + + "1) $" + Args.REGION + " specified OR\n" + + "2) Explicit `ServiceUrl`/$" + Args.SERVICE_URL + "+`AccessKey`/$" + Args.ACCESS_KEY + "+`Secret Key`/$" + Args.SECRET_KEY + " specified.\n" + + "See https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html for details" | ServiceUrl _ -> "specify a server endpoint for a Dynamo account. (optional if environment variable " + Args.SERVICE_URL + " specified)" | AccessKey _ -> "specify an access key id for a Dynamo account. (optional if environment variable " + Args.ACCESS_KEY + " specified)" | SecretKey _ -> "specify a secret access key for a Dynamo account. (optional if environment variable " + Args.SECRET_KEY + " specified)" @@ -94,36 +90,43 @@ module Dynamo = | IndexTable _ -> "specify a table name for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: `Table`+`IndexSuffix`)" | IndexSuffix _ -> "specify a suffix for the index store. (optional if environment variable " + Args.INDEX_TABLE + " specified. default: \"-index\")" | MaxItems _ -> "maximum events to load in a batch. Default: 100" - | FromTail _ -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." | StreamsDop _ -> "parallelism when loading events from Store Feed Source. Default 4" - type Arguments(c : Configuration, p : ParseResults) = - let serviceUrl = p.TryGetResult ServiceUrl |> Option.defaultWith (fun () -> c.DynamoServiceUrl) - let accessKey = p.TryGetResult AccessKey |> Option.defaultWith (fun () -> c.DynamoAccessKey) - let secretKey = p.TryGetResult SecretKey |> Option.defaultWith (fun () -> c.DynamoSecretKey) - let table = p.TryGetResult Table |> Option.defaultWith (fun () -> c.DynamoTable) + type Arguments(c: Configuration, p: ParseResults) = + let conn = match p.TryGetResult RegionProfile |> Option.orElseWith (fun () -> c.DynamoRegion) with + | Some systemName -> + Choice1Of2 systemName + | None -> + let serviceUrl = p.GetResult(ServiceUrl, fun () -> c.DynamoServiceUrl) + let accessKey = p.GetResult(AccessKey, fun () -> c.DynamoAccessKey) + let secretKey = p.GetResult(SecretKey, fun () -> c.DynamoSecretKey) + Choice2Of2 (serviceUrl, accessKey, secretKey) + let connector = let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 9) + match conn with + | Choice1Of2 systemName -> + Equinox.DynamoStore.DynamoStoreConnector(systemName, timeout, retries) + | Choice2Of2 (serviceUrl, accessKey, secretKey) -> + Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) + let table = p.GetResult(Table, fun () -> c.DynamoTable) let indexSuffix = p.GetResult(IndexSuffix, "-index") - let indexTable = p.TryGetResult IndexTable |> Option.orElseWith (fun () -> c.DynamoIndexTable) |> Option.defaultWith (fun () -> table + indexSuffix) + let indexTable = p.GetResult(IndexTable, fun () -> defaultArg c.DynamoIndexTable (table + indexSuffix)) let fromTail = p.Contains FromTail let tailSleepInterval = TimeSpan.FromMilliseconds 500. let batchSizeCutoff = p.GetResult(MaxItems, 100) let streamsDop = p.GetResult(StreamsDop, 4) - let timeout = p.GetResult(RetriesTimeoutS, 60.) |> TimeSpan.FromSeconds - let retries = p.GetResult(Retries, 9) - let connector = Equinox.DynamoStore.DynamoStoreConnector(serviceUrl, accessKey, secretKey, timeout, retries) - let client = connector.CreateClient() - let indexStoreClient = lazy client.ConnectStore("Index", indexTable) + let client = lazy connector.CreateClient() + let indexContext = lazy client.Value.CreateContext("Index", indexTable) member val Verbose = p.Contains Verbose - member _.Connect() = connector.LogConfiguration() - client.ConnectStore("Main", table) |> DynamoStoreContext.create - member _.MonitoringParams(log : ILogger) = + member _.Connect() = client.Value.CreateContext("Main", table) + member _.MonitoringParams(log: ILogger) = log.Information("DynamoStoreSource BatchSizeCutoff {batchSizeCutoff} Hydrater parallelism {streamsDop}", batchSizeCutoff, streamsDop) - let indexStoreClient = indexStoreClient.Value + let indexContext = indexContext.Value if fromTail then log.Warning("(If new projector group) Skipping projection of all existing events.") - indexStoreClient, fromTail, batchSizeCutoff, tailSleepInterval, streamsDop + indexContext, fromTail, batchSizeCutoff, tailSleepInterval, streamsDop member _.CreateCheckpointStore(group, cache) = - let indexTable = indexStoreClient.Value - indexTable.CreateCheckpointService(group, cache, Config.log) + indexContext.Value.CreateCheckpointService(group, cache, Store.Metrics.log) module Esdb = @@ -180,7 +183,7 @@ module Esdb = match p.GetSubCommand() with | Cosmos cosmos -> Args.StoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> Args.StoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `esdb`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `esdb`" member x.ConnectTarget(cache) = Args.StoreArgs.connectTarget x.TargetStoreArgs cache @@ -217,7 +220,6 @@ module Sss = type Arguments(c : Configuration, p : ParseResults) = let startFromTail = p.Contains FromTail let tailSleepInterval = p.GetResult(Tail, 1.) |> TimeSpan.FromSeconds - let checkpointEventInterval = TimeSpan.FromHours 1. // Ignored when storing to Propulsion.SqlStreamStore.ReaderCheckpoint let batchSize = p.GetResult(BatchSize, 512) let connection = p.TryGetResult Connection |> Option.defaultWith (fun () -> c.SqlStreamStoreConnection) let credentials = p.TryGetResult Credentials |> Option.orElseWith (fun () -> c.SqlStreamStoreCredentials) |> Option.toObj @@ -246,9 +248,9 @@ module Sss = match p.GetSubCommand() with | Cosmos cosmos -> Args.StoreArgs.Cosmos (Args.Cosmos.Arguments(c, cosmos)) | Dynamo dynamo -> Args.StoreArgs.Dynamo (Args.Dynamo.Arguments(c, dynamo)) - | _ -> Args.missingArg "Must specify `cosmos` or `dynamo` target store when source is `sss`" + | _ -> p.Raise "Must specify `cosmos` or `dynamo` target store when source is `sss`" member x.CreateCheckpointStoreSql(groupName) : Propulsion.Feed.IFeedCheckpointStore = let connectionString = x.BuildCheckpointsConnectionString() - Propulsion.SqlStreamStore.ReaderCheckpoint.Service(connectionString, groupName, checkpointEventInterval) + Propulsion.SqlStreamStore.ReaderCheckpoint.Service(connectionString, groupName) member x.ConnectTarget(cache) = Args.StoreArgs.connectTarget x.TargetStoreArgs cache diff --git a/Sample/ECommerce.Equinox/ECommerce.Tests/ConfirmedIngesterTests.fs b/Sample/ECommerce.Equinox/ECommerce.Tests/ConfirmedIngesterTests.fs index c60ead5d1..535c13eae 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Tests/ConfirmedIngesterTests.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Tests/ConfirmedIngesterTests.fs @@ -27,7 +27,7 @@ type Custom = let [] properties shouldUseSameSut (Gap gap) (initialEpochId, NonEmptyArray initialItems) (NonEmptyArray items) = async { - let store = Equinox.MemoryStore.VolatileStore() |> Config.Store.Memory + let store = Equinox.MemoryStore.VolatileStore() |> Store.Config.Memory let mutable nextEpochId = initialEpochId for _ in 1 .. gap do nextEpochId <- ConfirmedEpochId.next nextEpochId diff --git a/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj b/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj index 62a2fb8b1..70c4aad01 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj +++ b/Sample/ECommerce.Equinox/ECommerce.Tests/ECommerce.Tests.fsproj @@ -13,7 +13,7 @@ - +