diff --git a/equinox-fc/Domain/Inventory.fs b/equinox-fc/Domain/Inventory.fs index 14754695f..490d7c847 100644 --- a/equinox-fc/Domain/Inventory.fs +++ b/equinox-fc/Domain/Inventory.fs @@ -70,7 +70,7 @@ type Service2 internal (inventoryId, series : Series.Service, epochs : Epoch.Ser module internal Helpers = - let createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) = + let create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) = let remainingEpochCapacity (state: Epoch.Fold.State) = let currentLen = state.ids.Count max 0 (maxTransactionsPerEpoch - currentLen) @@ -79,9 +79,9 @@ module internal Helpers = module Cosmos = let create inventoryId maxTransactionsPerEpoch lookBackLimit (context, cache) = - let series = Series.Cosmos.createService (context, cache) - let epochs = Epoch.Cosmos.createService (context, cache) - Helpers.createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) + let series = Series.Cosmos.create (context, cache) + let epochs = Epoch.Cosmos.create (context, cache) + Helpers.create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) module Processor = diff --git a/equinox-fc/Domain/InventoryEpoch.fs b/equinox-fc/Domain/InventoryEpoch.fs index 5fca01731..74d5b6d50 100644 --- a/equinox-fc/Domain/InventoryEpoch.fs +++ b/equinox-fc/Domain/InventoryEpoch.fs @@ -3,13 +3,13 @@ /// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state module Fc.Inventory.Epoch +let [] Category = "InventoryEpoch" +let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId] + // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = - let [] CategoryId = "InventoryEpoch" - let (|For|) (inventoryId, epochId) = FsCodec.StreamName.compose CategoryId [InventoryId.toString inventoryId; InventoryEpochId.toString epochId] - type TransactionRef = { transactionId : InventoryTransactionId } type Snapshotted = { closed: bool; ids : InventoryTransactionId[] } @@ -70,9 +70,7 @@ let decideSync capacity events (state : Fold.State) : Result * Events.Event list let state' = Fold.fold state events { isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.For streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) +type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream) = /// Attempt ingestion of `events` into the cited Epoch. /// - None will be accepted if the Epoch is `closed` @@ -83,14 +81,16 @@ type Service internal (log, resolve, maxAttempts) = let stream = resolve (inventoryId, epochId) stream.Transact(decideSync capacity events) -let createService resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 2) +let create resolver = + let resolve locationId = + let stream = resolver (streamName locationId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = 2) + Service (resolve) module Cosmos = - open Equinox.Cosmos - let accessStrategy = Equinox.Cosmos.AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot) let resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve - let createService (context, cache) = createService (resolve (context, cache)) + let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let create (context, cache) = create (resolve (context, cache)) diff --git a/equinox-fc/Domain/InventorySeries.fs b/equinox-fc/Domain/InventorySeries.fs index 2e0ce93fe..a93f5a67d 100644 --- a/equinox-fc/Domain/InventorySeries.fs +++ b/equinox-fc/Domain/InventorySeries.fs @@ -3,13 +3,13 @@ /// See Inventory.Service for the surface API which manages the writing module Fc.Inventory.Series +let [] Category = "InventorySeries" +let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId) + // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = - let [] CategoryId = "InventorySeries" - let (|For|) inventoryId = FsCodec.StreamName.create CategoryId (InventoryId.toString inventoryId) - type Started = { epoch : InventoryEpochId } type Event = | Started of Started @@ -30,9 +30,7 @@ let interpretAdvanceIngestionEpoch epochId (state : Fold.State) = if queryActiveEpoch state >= epochId then [] else [Events.Started { epoch = epochId }] -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.For streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) +type Service internal (resolve : InventoryId -> Equinox.Stream) = member __.ReadIngestionEpoch(inventoryId) : Async = let stream = resolve inventoryId @@ -42,19 +40,21 @@ type Service internal (log, resolve, maxAttempts) = let stream = resolve inventoryId stream.Transact(interpretAdvanceIngestionEpoch epochId) -let createService resolve = - Service(Serilog.Log.ForContext(), resolve, maxAttempts = 2) +let create resolver = + let resolve locationId = + let stream = resolver (streamName locationId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = 2) + Service (resolve) module Cosmos = - open Equinox.Cosmos - + let accessStrategy = Equinox.Cosmos.AccessStrategy.LatestKnownEvent let resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // For this stream, we uniformly use stale reads as: // a) we don't require any information from competing writers // b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent let opt = Equinox.ResolveOption.AllowStale - fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt) - let createService (context, cache) = - createService (resolve (context, cache)) + fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt) + let create (context, cache) = + create (resolve (context, cache)) diff --git a/equinox-fc/Domain/InventoryTransaction.fs b/equinox-fc/Domain/InventoryTransaction.fs index 9ee61b472..ecb9acbc1 100644 --- a/equinox-fc/Domain/InventoryTransaction.fs +++ b/equinox-fc/Domain/InventoryTransaction.fs @@ -9,13 +9,13 @@ /// This represents the case where a 'happy path' actor died, or experienced another impediment on the path. module Fc.Inventory.Transaction +let [] Category = "InventoryTransaction" +let streamName transactionId = FsCodec.StreamName.create Category (InventoryTransactionId.toString transactionId) + // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = - let [] CategoryId = "InventoryTransaction" - let (|For|) transactionId = FsCodec.StreamName.create CategoryId (InventoryTransactionId.toString transactionId) - type AdjustmentRequested = { location : LocationId; quantity : int } type TransferRequested = { source : LocationId; destination : LocationId; quantity : int } type Removed = { balance : int } @@ -130,28 +130,28 @@ let decide update (state : Fold.State) : Action * Events.Event list = let state' = Fold.fold state events Fold.nextAction state', events -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.For streamId) = Equinox.Stream(log, resolve streamId, maxAttempts) +type Service internal (resolve : InventoryTransactionId -> Equinox.Stream) = member __.Apply(transactionId, update) : Async = let stream = resolve transactionId stream.Transact(decide update) -let createService resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) +let create resolver = + let resolve inventoryTransactionId = + let stream = resolver (streamName inventoryTransactionId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = 2) + Service (resolve) module Cosmos = - open Equinox.Cosmos - // in the happy path case, the event stream will typically be short, and the state cached, so snapshotting is less critical let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized // ... and there will generally be a single actor touching it at a given time, so we don't need to do a load (which would be more expensive than normal given the `accessStrategy`) before we sync let opt = Equinox.AllowStale let resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt) - let createService (context, cache) = createService (resolve (context, cache)) + let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt) + let createService (context, cache) = create (resolve (context, cache)) /// Handles requirement to infer when a transaction is 'stuck' /// Note we don't want to couple to the state in a deep manner; thus we track: @@ -188,3 +188,16 @@ module Watchdog = | Fold.Active startTime when startTime < cutoffTime -> Stuck | Fold.Active _ -> Active | Fold.Completed -> Complete + + let fold : Events.TimestampAndEvent seq -> Fold.State = + Fold.fold Fold.initial + + let (|FoldToWatchdogState|) events : Fold.State = + events + |> Seq.choose Events.codec.TryDecode + |> fold + + let (|Match|_|) = function + | FsCodec.StreamName.CategoryAndId (Category, InventoryTransactionId.Parse transId), FoldToWatchdogState state -> + Some (transId, state) + | _ -> None \ No newline at end of file diff --git a/equinox-fc/Domain/Location.fs b/equinox-fc/Domain/Location.fs index ed4fde94a..37c08479d 100644 --- a/equinox-fc/Domain/Location.fs +++ b/equinox-fc/Domain/Location.fs @@ -44,5 +44,5 @@ module Cosmos = let createService (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) = let series = Series.Cosmos.createService (context, cache, maxAttempts) - let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts) + let epochs = Epoch.Cosmos.create (context, cache, maxAttempts) create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) diff --git a/equinox-fc/Domain/LocationEpoch.fs b/equinox-fc/Domain/LocationEpoch.fs index b5c32f86f..a356c6a79 100644 --- a/equinox-fc/Domain/LocationEpoch.fs +++ b/equinox-fc/Domain/LocationEpoch.fs @@ -3,13 +3,13 @@ /// See Location.Service for the logic that allows competing readers/writers to co-operate in bringing this about module Fc.Location.Epoch +let [] Category = "LocationEpoch" +let streamName (locationId, epochId) = FsCodec.StreamName.compose Category [LocationId.toString locationId; LocationEpochId.toString epochId] + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = - let [] CategoryId = "LocationEpoch" - let (|For|) (locationId, epochId) = FsCodec.StreamName.compose CategoryId [LocationId.toString locationId; LocationEpochId.toString epochId] - type CarriedForward = { initial : int; recentTransactions : InventoryTransactionId[] } type Event = | CarriedForward of CarriedForward @@ -24,7 +24,7 @@ module Fold = type State = | Initial - | Open of Record list // reverse order, i.e. most revent first + | Open of Record list // reverse order, i.e. most recent first | Closed of Record list // trimmed and Record = | Init of Events.CarriedForward @@ -127,22 +127,23 @@ let decide transactionId command (state: Fold.State) = | Fold.Open (Fold.Current cur) -> (if accepted then Accepted cur else Denied), events | s -> failwithf "Unexpected state %A" s -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.For id) = Equinox.Stream(log, resolve id, maxAttempts) +type Service internal (resolve : LocationId * LocationEpochId -> Equinox.Stream) = member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async> = let stream = resolve (locationId, epochId) stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose) -let create resolve maxAttempts = Service(Serilog.Log.ForContext(), resolve, maxAttempts = maxAttempts) +let create resolver maxAttempts = + let resolve locId = + let stream = resolver (streamName locId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = maxAttempts) + Service (resolve) module Cosmos = - open Equinox.Cosmos - + let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized let resolve (context, cache) = - let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve - let createService (context, cache, maxAttempts) = + let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let create (context, cache, maxAttempts) = create (resolve (context, cache)) maxAttempts diff --git a/equinox-fc/Domain/LocationSeries.fs b/equinox-fc/Domain/LocationSeries.fs index 72306eb40..35c77707f 100644 --- a/equinox-fc/Domain/LocationSeries.fs +++ b/equinox-fc/Domain/LocationSeries.fs @@ -1,13 +1,13 @@ /// Manages the active epoch for a given Location module Fc.Location.Series +let [] Category = "LocationSeries" +let streamName locationId = FsCodec.StreamName.create Category (LocationId.toString locationId) + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care [] module Events = - let [] CategoryId = "LocationSeries" - let (|For|) locationId = FsCodec.StreamName.create CategoryId (LocationId.toString locationId) - type Started = { epoch : LocationEpochId } type Event = | Started of Started @@ -27,9 +27,7 @@ let interpretAdvanceIngestionEpoch (epochId : LocationEpochId) (state : Fold.Sta [if state |> Option.forall (fun s -> s < epochId) then yield Events.Started { epoch = epochId }] -type Service internal (log, resolve, maxAttempts) = - - let resolve (Events.For id) = Equinox.Stream(log, resolve id, maxAttempts) +type Service internal (resolve : LocationId -> Equinox.Stream) = member __.TryReadIngestionEpoch(locationId) : Async = let stream = resolve locationId @@ -39,7 +37,11 @@ type Service internal (log, resolve, maxAttempts) = let stream = resolve locationId stream.Transact(interpretAdvanceIngestionEpoch epochId) -let create resolve maxAttempts = Service(Serilog.Log.ForContext(), resolve, maxAttempts) +let create resolver maxAttempts = + let resolve locId = + let stream = resolver (streamName locId) + Equinox.Stream(Serilog.Log.ForContext(), stream, maxAttempts = maxAttempts) + Service (resolve) module Cosmos = diff --git a/equinox-fc/Watchdog/Handler.fs b/equinox-fc/Watchdog/Handler.fs index 908923a04..b96deb7a3 100644 --- a/equinox-fc/Watchdog/Handler.fs +++ b/equinox-fc/Watchdog/Handler.fs @@ -22,18 +22,10 @@ type Stats(log, ?statsInterval, ?stateInterval) = open Fc.Inventory.Transaction -let fold : Watchdog.Events.TimestampAndEvent seq -> Watchdog.Fold.State = - Watchdog.Fold.fold Watchdog.Fold.initial - -let (|FoldToWatchdogState|) (span : Propulsion.Streams.StreamSpan<_>) : Watchdog.Fold.State = - span.events - |> Seq.choose Watchdog.Events.codec.TryDecode - |> fold - let tryHandle driveTransaction (stream, span : Propulsion.Streams.StreamSpan<_>) : Async = async { let processingStuckCutoff = let now = DateTimeOffset.UtcNow in now.AddSeconds -10. - match stream, span with - | FsCodec.StreamName.CategoryAndId (Events.CategoryId, InventoryTransactionId.Parse transId), FoldToWatchdogState state -> + match stream, span.events with + | Watchdog.Match (transId, state) -> match Watchdog.categorize processingStuckCutoff state with | Watchdog.Complete -> return Outcome.Completed diff --git a/equinox-fc/Watchdog/Program.fs b/equinox-fc/Watchdog/Program.fs index 15d6e5693..610f722a5 100644 --- a/equinox-fc/Watchdog/Program.fs +++ b/equinox-fc/Watchdog/Program.fs @@ -312,7 +312,7 @@ module EventStoreContext = | e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (isWhitelisted e.EventStreamId) -> None | PropulsionStreamEvent e -> Some e -let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Events.CategoryId +let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Category let isTransactionStream : string -> bool = function sn -> sn.StartsWith transactionStreamPrefix let build (args : CmdParser.Arguments) = @@ -355,8 +355,8 @@ let build (args : CmdParser.Arguments) = let locations = let zeroBalance : Fc.Location.Epoch.Events.CarriedForward = { initial = 0; recentTransactions = [||] } let chooseTransactionIds = function - | Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids - | Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id + | Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids + | Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id let toBalanceCarriedForward (Fc.Location.Epoch.Fold.Current cur as records) : Fc.Location.Epoch.Events.CarriedForward = { initial = cur; recentTransactions = records |> Seq.collect chooseTransactionIds |> Seq.truncate 5 |> Seq.toArray } let shouldClose x = false