Skip to content

Commit

Permalink
Apply style changes from 4.2
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 10, 2020
1 parent 5ce4774 commit ba7c315
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 76 deletions.
8 changes: 4 additions & 4 deletions equinox-fc/Domain/Inventory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Service2 internal (inventoryId, series : Series.Service, epochs : Epoch.Ser

module internal Helpers =

let createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) =
let create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs) =
let remainingEpochCapacity (state: Epoch.Fold.State) =
let currentLen = state.ids.Count
max 0 (maxTransactionsPerEpoch - currentLen)
Expand All @@ -79,9 +79,9 @@ module internal Helpers =
module Cosmos =

let create inventoryId maxTransactionsPerEpoch lookBackLimit (context, cache) =
let series = Series.Cosmos.createService (context, cache)
let epochs = Epoch.Cosmos.createService (context, cache)
Helpers.createService inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs)
let series = Series.Cosmos.create (context, cache)
let epochs = Epoch.Cosmos.create (context, cache)
Helpers.create inventoryId maxTransactionsPerEpoch lookBackLimit (series, epochs)

module Processor =

Expand Down
24 changes: 12 additions & 12 deletions equinox-fc/Domain/InventoryEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
/// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state
module Fc.Inventory.Epoch

let [<Literal>] Category = "InventoryEpoch"
let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

let [<Literal>] CategoryId = "InventoryEpoch"
let (|For|) (inventoryId, epochId) = FsCodec.StreamName.compose CategoryId [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]

type TransactionRef = { transactionId : InventoryTransactionId }
type Snapshotted = { closed: bool; ids : InventoryTransactionId[] }

Expand Down Expand Up @@ -70,9 +70,7 @@ let decideSync capacity events (state : Fold.State) : Result * Events.Event list
let state' = Fold.fold state events
{ isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream<Events.Event, Fold.State>) =

/// Attempt ingestion of `events` into the cited Epoch.
/// - None will be accepted if the Epoch is `closed`
Expand All @@ -83,14 +81,16 @@ type Service internal (log, resolve, maxAttempts) =
let stream = resolve (inventoryId, epochId)
stream.Transact(decideSync capacity events)

let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 2)
let create resolver =
let resolve locationId =
let stream = resolver (streamName locationId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service (resolve)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = Equinox.Cosmos.AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let createService (context, cache) = createService (resolve (context, cache))
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let create (context, cache) = create (resolve (context, cache))
28 changes: 14 additions & 14 deletions equinox-fc/Domain/InventorySeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
/// See Inventory.Service for the surface API which manages the writing
module Fc.Inventory.Series

let [<Literal>] Category = "InventorySeries"
let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

let [<Literal>] CategoryId = "InventorySeries"
let (|For|) inventoryId = FsCodec.StreamName.create CategoryId (InventoryId.toString inventoryId)

type Started = { epoch : InventoryEpochId }
type Event =
| Started of Started
Expand All @@ -30,9 +30,7 @@ let interpretAdvanceIngestionEpoch epochId (state : Fold.State) =
if queryActiveEpoch state >= epochId then []
else [Events.Started { epoch = epochId }]

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.ReadIngestionEpoch(inventoryId) : Async<InventoryEpochId> =
let stream = resolve inventoryId
Expand All @@ -42,19 +40,21 @@ type Service internal (log, resolve, maxAttempts) =
let stream = resolve inventoryId
stream.Transact(interpretAdvanceIngestionEpoch epochId)

let createService resolve =
Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 2)
let create resolver =
let resolve locationId =
let stream = resolver (streamName locationId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service (resolve)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = Equinox.Cosmos.AccessStrategy.LatestKnownEvent
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
// For this stream, we uniformly use stale reads as:
// a) we don't require any information from competing writers
// b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt)
let createService (context, cache) =
createService (resolve (context, cache))
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let create (context, cache) =
create (resolve (context, cache))
37 changes: 25 additions & 12 deletions equinox-fc/Domain/InventoryTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
/// This represents the case where a 'happy path' actor died, or experienced another impediment on the path.
module Fc.Inventory.Transaction

let [<Literal>] Category = "InventoryTransaction"
let streamName transactionId = FsCodec.StreamName.create Category (InventoryTransactionId.toString transactionId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

let [<Literal>] CategoryId = "InventoryTransaction"
let (|For|) transactionId = FsCodec.StreamName.create CategoryId (InventoryTransactionId.toString transactionId)

type AdjustmentRequested = { location : LocationId; quantity : int }
type TransferRequested = { source : LocationId; destination : LocationId; quantity : int }
type Removed = { balance : int }
Expand Down Expand Up @@ -130,28 +130,28 @@ let decide update (state : Fold.State) : Action * Events.Event list =
let state' = Fold.fold state events
Fold.nextAction state', events

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)
type Service internal (resolve : InventoryTransactionId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.Apply(transactionId, update) : Async<Action> =
let stream = resolve transactionId
stream.Transact(decide update)

let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)
let create resolver =
let resolve inventoryTransactionId =
let stream = resolver (streamName inventoryTransactionId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service (resolve)

module Cosmos =

open Equinox.Cosmos

// in the happy path case, the event stream will typically be short, and the state cached, so snapshotting is less critical
let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
// ... and there will generally be a single actor touching it at a given time, so we don't need to do a load (which would be more expensive than normal given the `accessStrategy`) before we sync
let opt = Equinox.AllowStale
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let createService (context, cache) = createService (resolve (context, cache))
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let createService (context, cache) = create (resolve (context, cache))

/// Handles requirement to infer when a transaction is 'stuck'
/// Note we don't want to couple to the state in a deep manner; thus we track:
Expand Down Expand Up @@ -188,3 +188,16 @@ module Watchdog =
| Fold.Active startTime when startTime < cutoffTime -> Stuck
| Fold.Active _ -> Active
| Fold.Completed -> Complete

let fold : Events.TimestampAndEvent seq -> Fold.State =
Fold.fold Fold.initial

let (|FoldToWatchdogState|) events : Fold.State =
events
|> Seq.choose Events.codec.TryDecode
|> fold

let (|Match|_|) = function
| FsCodec.StreamName.CategoryAndId (Category, InventoryTransactionId.Parse transId), FoldToWatchdogState state ->
Some (transId, state)
| _ -> None
2 changes: 1 addition & 1 deletion equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ module Cosmos =

let createService (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.createService (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.create (context, cache, maxAttempts)
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs)
27 changes: 14 additions & 13 deletions equinox-fc/Domain/LocationEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
/// See Location.Service for the logic that allows competing readers/writers to co-operate in bringing this about
module Fc.Location.Epoch

let [<Literal>] Category = "LocationEpoch"
let streamName (locationId, epochId) = FsCodec.StreamName.compose Category [LocationId.toString locationId; LocationEpochId.toString epochId]

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

let [<Literal>] CategoryId = "LocationEpoch"
let (|For|) (locationId, epochId) = FsCodec.StreamName.compose CategoryId [LocationId.toString locationId; LocationEpochId.toString epochId]

type CarriedForward = { initial : int; recentTransactions : InventoryTransactionId[] }
type Event =
| CarriedForward of CarriedForward
Expand All @@ -24,7 +24,7 @@ module Fold =

type State =
| Initial
| Open of Record list // reverse order, i.e. most revent first
| Open of Record list // reverse order, i.e. most recent first
| Closed of Record list // trimmed
and Record =
| Init of Events.CarriedForward
Expand Down Expand Up @@ -127,22 +127,23 @@ let decide transactionId command (state: Fold.State) =
| Fold.Open (Fold.Current cur) -> (if accepted then Accepted cur else Denied), events
| s -> failwithf "Unexpected state %A" s

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
type Service internal (resolve : LocationId * LocationEpochId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async<Result<'R>> =
let stream = resolve (locationId, epochId)
stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose)

let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = maxAttempts)
let create resolver maxAttempts =
let resolve locId =
let stream = resolver (streamName locId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
Service (resolve)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve
let createService (context, cache, maxAttempts) =
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let create (context, cache, maxAttempts) =
create (resolve (context, cache)) maxAttempts
16 changes: 9 additions & 7 deletions equinox-fc/Domain/LocationSeries.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
/// Manages the active epoch for a given Location
module Fc.Location.Series

let [<Literal>] Category = "LocationSeries"
let streamName locationId = FsCodec.StreamName.create Category (LocationId.toString locationId)

// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

let [<Literal>] CategoryId = "LocationSeries"
let (|For|) locationId = FsCodec.StreamName.create CategoryId (LocationId.toString locationId)

type Started = { epoch : LocationEpochId }
type Event =
| Started of Started
Expand All @@ -27,9 +27,7 @@ let interpretAdvanceIngestionEpoch (epochId : LocationEpochId) (state : Fold.Sta

[if state |> Option.forall (fun s -> s < epochId) then yield Events.Started { epoch = epochId }]

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
type Service internal (resolve : LocationId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.TryReadIngestionEpoch(locationId) : Async<LocationEpochId option> =
let stream = resolve locationId
Expand All @@ -39,7 +37,11 @@ type Service internal (log, resolve, maxAttempts) =
let stream = resolve locationId
stream.Transact(interpretAdvanceIngestionEpoch epochId)

let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts)
let create resolver maxAttempts =
let resolve locId =
let stream = resolver (streamName locId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
Service (resolve)

module Cosmos =

Expand Down
12 changes: 2 additions & 10 deletions equinox-fc/Watchdog/Handler.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,10 @@ type Stats(log, ?statsInterval, ?stateInterval) =

open Fc.Inventory.Transaction

let fold : Watchdog.Events.TimestampAndEvent seq -> Watchdog.Fold.State =
Watchdog.Fold.fold Watchdog.Fold.initial

let (|FoldToWatchdogState|) (span : Propulsion.Streams.StreamSpan<_>) : Watchdog.Fold.State =
span.events
|> Seq.choose Watchdog.Events.codec.TryDecode
|> fold

let tryHandle driveTransaction (stream, span : Propulsion.Streams.StreamSpan<_>) : Async<Outcome> = async {
let processingStuckCutoff = let now = DateTimeOffset.UtcNow in now.AddSeconds -10.
match stream, span with
| FsCodec.StreamName.CategoryAndId (Events.CategoryId, InventoryTransactionId.Parse transId), FoldToWatchdogState state ->
match stream, span.events with
| Watchdog.Match (transId, state) ->
match Watchdog.categorize processingStuckCutoff state with
| Watchdog.Complete ->
return Outcome.Completed
Expand Down
6 changes: 3 additions & 3 deletions equinox-fc/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ module EventStoreContext =
| e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (isWhitelisted e.EventStreamId) -> None
| PropulsionStreamEvent e -> Some e

let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Events.CategoryId
let transactionStreamPrefix = sprintf "%s-" Fc.Inventory.Transaction.Category
let isTransactionStream : string -> bool = function sn -> sn.StartsWith transactionStreamPrefix

let build (args : CmdParser.Arguments) =
Expand Down Expand Up @@ -355,8 +355,8 @@ let build (args : CmdParser.Arguments) =
let locations =
let zeroBalance : Fc.Location.Epoch.Events.CarriedForward = { initial = 0; recentTransactions = [||] }
let chooseTransactionIds = function
| Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids
| Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id
| Fc.Location.Epoch.Fold.Init { recentTransactions = ids } -> Seq.ofArray ids
| Fc.Location.Epoch.Fold.Step { id = id } -> Seq.singleton id
let toBalanceCarriedForward (Fc.Location.Epoch.Fold.Current cur as records) : Fc.Location.Epoch.Events.CarriedForward =
{ initial = cur; recentTransactions = records |> Seq.collect chooseTransactionIds |> Seq.truncate 5 |> Seq.toArray }
let shouldClose x = false
Expand Down

0 comments on commit ba7c315

Please sign in to comment.