Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eqxFc: Add Cosmos support #75

Draft
wants to merge 1 commit into
base: add-fc
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions equinox-fc/Domain.Tests/Fixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,23 @@ module EnvVar =

let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj

module Cosmos =

open Equinox.Cosmos
let connect () =
match EnvVar.tryGet "EQUINOX_COSMOS_CONNECTION", EnvVar.tryGet "EQUINOX_COSMOS_DATABASE", EnvVar.tryGet "EQUINOX_COSMOS_CONTAINER" with
| Some s, Some d, Some c ->
let appName = "Domain.Tests"
let discovery = Discovery.FromConnectionString s
let connector = Connector(TimeSpan.FromSeconds 5., 5, TimeSpan.FromSeconds 5., Serilog.Log.Logger)
let connection = connector.Connect(appName, discovery) |> Async.RunSynchronously
let context = Context(connection, d, c)
let cache = Equinox.Cache (appName, 10)
context, cache
| s, d, c ->
failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)"
(Option.isSome s) (Option.isSome d) (Option.isSome c)

module EventStore =

open Equinox.EventStore
Expand Down
14 changes: 14 additions & 0 deletions equinox-fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ let [<Property>] ``MemoryStore properties`` epochLen args =
let service = Location.MemoryStore.create (zero, cf, sc) store
run service args

type Cosmos(testOutput) =

let log = TestOutputLogger.create testOutput
do Serilog.Log.Logger <- log

let context, cache = Cosmos.connect ()

let [<Property(MaxTest=5, MaxFail=1)>] properties epochLen args =
let epochLen, idsWindow = max 1 epochLen, 5
let zero, cf, sc = Epoch.zeroBalance, Epoch.toBalanceCarriedForward idsWindow, Epoch.shouldClose epochLen

let service = Location.Cosmos.create (zero, cf, sc) (context, cache, 50)
run service args

type EventStore(testOutput) =

let log = TestOutputLogger.create testOutput
Expand Down
3 changes: 2 additions & 1 deletion equinox-fc/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@
<Compile Include="LocationSeries.fs" />
<Compile Include="LocationEpoch.fs" />
<Compile Include="Location.fs" />
<Compile Include="InventorySeries.fs" />
<Compile Include="InventoryEpoch.fs" />
<Compile Include="Inventory.fs" />
<Compile Include="StockTransaction.fs" />
<Compile Include="StockProcessManager.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Equinox.Cosmos" Version="2.1.0" />
<PackageReference Include="Equinox.EventStore" Version="2.1.0" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.1.0" />
</ItemGroup>

</Project>
7 changes: 7 additions & 0 deletions equinox-fc/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ module InventoryId =
let parse (value : string) : InventoryId = %value
let toString (value : InventoryId) : string = %value

type InventoryEpochId = int<inventoryEpochId>
and [<Measure>] inventoryEpochId
module InventoryEpochId =
let parse (value : int) : InventoryEpochId = %value
let next (value : InventoryEpochId) : InventoryEpochId = % (%value + 1)
let toString (value : InventoryEpochId) : string = string %value

type InventoryTransactionId = string<inventoryTransactionId>
and [<Measure>] inventoryTransactionId
module InventoryTransactionId =
Expand Down
65 changes: 46 additions & 19 deletions equinox-fc/Domain/Inventory.fs
Original file line number Diff line number Diff line change
@@ -1,48 +1,64 @@
namespace Fc.Domain.Inventory

open Equinox.Core // we use Equinox's AsyncCacheCell helper below
open FSharp.UMX

type internal IdsCache<'Id>() =
let all = System.Collections.Concurrent.ConcurrentDictionary<'Id, unit>() // Bounded only by relatively low number of physical pick tickets IRL
static member Create init = let x = IdsCache() in x.Add init; x
member __.Add ids = for x in ids do all.[x] <- ()
member __.Contains id = all.ContainsKey id

/// Ingests items into a log of items, making a best effort at deduplicating as it writes
/// Prior to first add, reads recent ids, in order to minimize the number of duplicated Ids we ingest
type Service internal (inventoryId, epochs : Epoch.Service) =
/// Maintains active Epoch Id in a thread-safe manner while ingesting items into the `series` of `epochs`
/// Prior to first add, reads `lookBack` epochs to seed the cache, in order to minimize the number of duplicated Ids we ingest
type Service internal (inventoryId, series : Series.Service, epochs : Epoch.Service, lookBack, capacity) =

static let log = Serilog.Log.ForContext<Service>()
let log = Serilog.Log.ForContext<Service>()

// Maintains what we believe to be the currently open EpochId
// Guaranteed to be set only after `previousIds.AwaitValue()`
let mutable activeEpochId = Unchecked.defaultof<_>

// We want max one request in flight to establish the pre-existing Events from which the TransactionIds cache will be seeded
let previousIds : AsyncCacheCell<Set<InventoryTransactionId>> =
let read = async { let! r = epochs.TryIngest(inventoryId, Seq.empty) in return r.transactionIds }
AsyncCacheCell read
let previousEpochs = AsyncCacheCell<AsyncCacheCell<Set<InventoryTransactionId>> list> <| async {
let! startingId = series.ReadIngestionEpoch(inventoryId)
activeEpochId <- %startingId
let read epochId = async { let! r = epochs.TryIngest(inventoryId, epochId, (fun _ -> 1), Seq.empty) in return r.transactionIds }
return [ for epoch in (max 0 (%startingId - lookBack)) .. (%startingId - 1) -> AsyncCacheCell(read %epoch) ] }

// TransactionIds cache - used to maintain a list of transactions that have already been ingested in order to avoid db round-trips
let previousIds : AsyncCacheCell<IdsCache<_>> = AsyncCacheCell <| async {
let! previousIds = previousIds.AwaitValue()
return IdsCache.Create(previousIds) }
let! previousEpochs = previousEpochs.AwaitValue()
let! ids = seq { for x in previousEpochs -> x.AwaitValue() } |> Async.Parallel
return IdsCache.Create(Seq.concat ids) }

let tryIngest events = async {
let! previousIds = previousIds.AwaitValue()
let initialEpochId = %activeEpochId

let rec aux totalIngested items = async {
let rec aux epochId totalIngested items = async {
let SeqPartition f = Seq.toArray >> Array.partition f
let dup, fresh = items |> SeqPartition (Epoch.Events.chooseInventoryTransactionId >> Option.exists previousIds.Contains)
let fullCount = List.length items
let dropping = fullCount - Array.length fresh
if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids}", dropping, fullCount, dup)
if dropping <> 0 then log.Information("Ignoring {count}/{fullCount} duplicate ids: {ids} for {epochId}", dropping, fullCount, dup, epochId)
if Array.isEmpty fresh then
return totalIngested
else
let! res = epochs.TryIngest(inventoryId, fresh)
log.Information("Added {count} items to {inventoryId:l}", res.added, inventoryId)
let! res = epochs.TryIngest(inventoryId, epochId, capacity, fresh)
log.Information("Added {count} items to {inventoryId:l}/{epochId}", res.added, inventoryId, epochId)
// The adding is potentially redundant; we don't care
previousIds.Add res.transactionIds
// Any writer noticing we've moved to a new epoch shares the burden of marking it active
if not res.isClosed && activeEpochId < %epochId then
log.Information("Marking {inventoryId:l}/{epochId} active", inventoryId, epochId)
do! series.AdvanceIngestionEpoch(inventoryId, epochId)
System.Threading.Interlocked.CompareExchange(&activeEpochId, %epochId, activeEpochId) |> ignore
let totalIngestedTransactions = totalIngested + res.added
return totalIngestedTransactions }
return! aux 0 events
match res.rejected with
| [] -> return totalIngestedTransactions
| rej -> return! aux (InventoryEpochId.next epochId) totalIngestedTransactions rej }
return! aux initialEpochId 0 events
}

/// Upon startup, we initialize the TransactionIds cache with recent epochs; we want to kick that process off before our first ingest
Expand All @@ -53,11 +69,22 @@ type Service internal (inventoryId, epochs : Epoch.Service) =

module internal Helpers =

let create inventoryId epochs =
Service(inventoryId, epochs)
let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs) =
let remainingEpochCapacity (state: Epoch.Fold.State) =
let currentLen = state.ids.Count
max 0 (maxTransactionsPerEpoch - currentLen)
Service(inventoryId, series, epochs, lookBack=lookBackLimit, capacity=remainingEpochCapacity)

module Cosmos =

let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) =
let series = Series.Cosmos.create (context, cache)
let epochs = Epoch.Cosmos.create (context, cache)
Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs)

module EventStore =

let create inventoryId (context, cache) =
let create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (context, cache) =
let series = Series.EventStore.create (context, cache)
let epochs = Epoch.EventStore.create (context, cache)
Helpers.create inventoryId epochs
Helpers.create inventoryId (maxTransactionsPerEpoch, lookBackLimit) (series, epochs)
58 changes: 46 additions & 12 deletions equinox-fc/Domain/InventoryEpoch.fs
Original file line number Diff line number Diff line change
@@ -1,68 +1,102 @@
/// Manages the ingestion (and deduplication based on a TransactionId) of events reflecting transfers or stock adjustments
/// that have been effected across a given set of Inventory
/// See Inventory.Service for surface level API which manages the ingestion
/// See Inventory.Service for surface level API which manages the ingestion, including transitioning to a new Epoch when an epoch reaches 'full' state
module Fc.Domain.Inventory.Epoch

let [<Literal>] Category = "InventoryEpoch"
let streamName inventoryId = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; "0"]
let streamName (inventoryId, epochId) = FsCodec.StreamName.compose Category [InventoryId.toString inventoryId; InventoryEpochId.toString epochId]

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

type TransactionRef = { transactionId : InventoryTransactionId }
type Snapshotted = { closed: bool; ids : InventoryTransactionId[] }

type Event =
| Adjusted of TransactionRef
| Transferred of TransactionRef
| Closed
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

/// Used for deduplicating input events
let chooseInventoryTransactionId = function
| Adjusted { transactionId = id } | Transferred { transactionId = id } -> Some id
| Closed | Snapshotted _ -> None

module Fold =

type State = { closed : bool; ids : Set<InventoryTransactionId> }
let initial = { closed = false; ids = Set.empty }
let evolve state = function
| (Events.Adjusted e | Events.Transferred e) -> { state with ids = Set.add e.transactionId state.ids }
| Events.Closed -> { state with closed = true }
| Events.Snapshotted e -> { closed = e.closed; ids = Set.ofArray e.ids }
let fold : State -> Events.Event seq -> State = Seq.fold evolve
let isOrigin = function Events.Snapshotted _ -> true | _ -> false
let snapshot s = Events.Snapshotted { closed = s.closed; ids = Array.ofSeq s.ids }

type Result =
{ /// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items
{ /// Indicates whether this epoch is closed (either previously or as a side-effect this time)
isClosed : bool
/// Count of items added to this epoch. May be less than requested due to removal of duplicates and/or rejected items
added : int
/// residual items that [are not duplicates and] were not accepted into this epoch
rejected : Events.Event list
/// identifiers for all items in this epoch
transactionIds : Set<InventoryTransactionId> }

let decideSync events (state : Fold.State) : Result * Events.Event list =
let decideSync capacity events (state : Fold.State) : Result * Events.Event list =
let isFresh = function
| Events.Adjusted { transactionId = id }
| Events.Transferred { transactionId = id } -> (not << state.ids.Contains) id
| Events.Closed | Events.Snapshotted _ -> false
let news = events |> Seq.filter isFresh |> List.ofSeq
let newCount = List.length news
let events = [ if newCount <> 0 then yield! news ]
let closed, allowing, markClosed, residual =
let newCount = List.length news
if state.closed then
true, 0, false, news
else
let capacityNow = capacity state
let accepting = min capacityNow newCount
let closing = accepting = capacityNow
let residual = List.skip accepting news
closing, accepting, closing, residual
let events =
[ if allowing <> 0 then yield! news
if markClosed then yield Events.Closed ]
let state' = Fold.fold state events
{ added = newCount; transactionIds = state'.ids }, events
{ isClosed = closed; added = allowing; rejected = residual; transactionIds = state'.ids }, events

type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =
type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Stream<Events.Event, Fold.State>) =

/// Attempt ingestion of `events` into the cited Epoch.
/// - None will be accepted if the Epoch is `closed`
/// - The `capacity` function will be passed a non-closed `state` in order to determine number of items that can be admitted prior to closing
/// - If the computed capacity result is >= the number of items being submitted (which may be 0), the Epoch will be marked Closed
/// NOTE the result may include rejected items (which the caller is expected to feed into a successor epoch)
member __.TryIngest(inventoryId, events) : Async<Result> =
let stream = resolve inventoryId
stream.Transact(decideSync events)
member __.TryIngest(inventoryId, epochId, capacity, events) : Async<Result> =
let stream = resolve (inventoryId, epochId)
stream.Transact(decideSync capacity events)

let create resolve =
let resolve ids =
let stream = resolve (streamName ids)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts=2)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service(resolve)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create resolver.Resolve

module EventStore =

open Equinox.EventStore
Expand Down
71 changes: 71 additions & 0 deletions equinox-fc/Domain/InventorySeries.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/// Manages a) the ingestion epoch id b) the current checkpointed read position for a long-running Inventory Series
/// See InventoryEpoch for the logic managing the actual events logged within a given epoch
/// See Inventory.Service for the surface API which manages the writing
module Fc.Domain.Inventory.Series

let [<Literal>] Category = "InventorySeries"
let streamName inventoryId = FsCodec.StreamName.create Category (InventoryId.toString inventoryId)

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
[<RequireQualifiedAccess>]
module Events =

type Started = { epoch : InventoryEpochId }
type Event =
| Started of Started
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Fold =

type State = InventoryEpochId option
let initial = None
let evolve _state = function
| Events.Started e -> Some e.epoch
let fold : State -> Events.Event seq -> State = Seq.fold evolve

let queryActiveEpoch state = state |> Option.defaultValue (InventoryEpochId.parse 0)

let interpretAdvanceIngestionEpoch epochId (state : Fold.State) =
if queryActiveEpoch state >= epochId then []
else [Events.Started { epoch = epochId }]

type Service internal (resolve : InventoryId -> Equinox.Stream<Events.Event, Fold.State>) =

member __.ReadIngestionEpoch(inventoryId) : Async<InventoryEpochId> =
let stream = resolve inventoryId
stream.Query queryActiveEpoch

member __.AdvanceIngestionEpoch(inventoryId, epochId) : Async<unit> =
let stream = resolve inventoryId
stream.Transact(interpretAdvanceIngestionEpoch epochId)

let create resolve =
// For this stream, we uniformly use stale reads as:
// a) we don't require any information from competing writers
// b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
let resolve locationId =
let stream = resolve (streamName locationId, opt)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service(resolve)

module Cosmos =

open Equinox.Cosmos

let accessStrategy = AccessStrategy.LatestKnownEvent
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create <| fun (id, opt) -> resolver.Resolve(id, opt)

module EventStore =

open Equinox.EventStore

let accessStrategy = AccessStrategy.LatestKnownEvent
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
create <| fun (id, opt) -> resolver.Resolve(id, opt)
7 changes: 7 additions & 0 deletions equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ module Helpers =
let create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs) =
Service(zeroBalance, toBalanceCarriedForward, shouldClose, series, epochs)

module Cosmos =

let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.create (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.create (context, cache, maxAttempts)
create (zeroBalance, toBalanceCarriedForward, shouldClose) (series, epochs)

module EventStore =

let create (zeroBalance, toBalanceCarriedForward, shouldClose) (context, cache, maxAttempts) =
Expand Down
Loading