From 8f7ffbf3b3a375a8f4c01a60f47e2d1a7d49b925 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sat, 30 Nov 2019 00:44:05 +0000 Subject: [PATCH] Allocation example from Equinox 174 --- equinox-fc/Domain.Tests/AllocationTests.fs | 10 + equinox-fc/Domain.Tests/AllocatorTests.fs | 59 +++++ equinox-fc/Domain.Tests/Domain.Tests.fsproj | 4 + equinox-fc/Domain.Tests/TicketListTests.fs | 45 ++++ equinox-fc/Domain.Tests/TicketTests.fs | 82 +++++++ equinox-fc/Domain/Allocation.fs | 252 ++++++++++++++++++++ equinox-fc/Domain/Allocator.fs | 74 ++++++ equinox-fc/Domain/Domain.fsproj | 6 + equinox-fc/Domain/Infrastructure.fs | 30 ++- equinox-fc/Domain/ListAllocation.fs | 22 ++ equinox-fc/Domain/Ticket.fs | 81 +++++++ equinox-fc/Domain/TicketList.fs | 65 +++++ 12 files changed, 729 insertions(+), 1 deletion(-) create mode 100644 equinox-fc/Domain.Tests/AllocationTests.fs create mode 100644 equinox-fc/Domain.Tests/AllocatorTests.fs create mode 100644 equinox-fc/Domain.Tests/TicketListTests.fs create mode 100644 equinox-fc/Domain.Tests/TicketTests.fs create mode 100644 equinox-fc/Domain/Allocation.fs create mode 100644 equinox-fc/Domain/Allocator.fs create mode 100644 equinox-fc/Domain/ListAllocation.fs create mode 100644 equinox-fc/Domain/Ticket.fs create mode 100644 equinox-fc/Domain/TicketList.fs diff --git a/equinox-fc/Domain.Tests/AllocationTests.fs b/equinox-fc/Domain.Tests/AllocationTests.fs new file mode 100644 index 000000000..f6c92ff1c --- /dev/null +++ b/equinox-fc/Domain.Tests/AllocationTests.fs @@ -0,0 +1,10 @@ +module AllocationTests + +open Allocation +open FsCheck.Xunit +open Swensen.Unquote + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/AllocatorTests.fs b/equinox-fc/Domain.Tests/AllocatorTests.fs new file mode 100644 index 000000000..fddfe5183 --- /dev/null +++ b/equinox-fc/Domain.Tests/AllocatorTests.fs @@ -0,0 +1,59 @@ +module AllocatorTests + +open Allocator +open FsCheck.Xunit +open Swensen.Unquote +open System + +type Command = + | Commence of AllocationId * DateTimeOffset + | Complete of AllocationId * Events.Reason + +type Result = + | Accepted + | Conflict of AllocationId + +let execute cmd state = + match cmd with + | Commence (a,c) -> + match decideCommence a c state with + | CommenceResult.Accepted, es -> Accepted,es + | CommenceResult.Conflict a, es -> Conflict a,es + | Complete (a,r) -> let es = decideComplete a r state in Accepted, es + +let [] properties c1 c2 = + let res,events = execute c1 Folds.initial + let state1 = Folds.fold Folds.initial events + match c1, res, events, state1 with + | Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> + test <@ a = ea && c = ec && state = Some e @> + | Complete _, Accepted, [], None -> + () // Non-applicable Complete requests are simply ignored + | _, res, l, _ -> + test <@ List.isEmpty l && res = Accepted @> + + let res,events = execute c2 state1 + let state2 = Folds.fold state1 events + match state1, c2, res, events, state2 with + // As per above, normal commence + | None, Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state -> + test <@ a = ea && c = ec && state = Some e @> + // Idempotent accept if same allocationId + | Some active as s1, Commence (a,_), Accepted, [], s2 -> + test <@ s1 = s2 && active.allocationId = a @> + // Conflict reports owner allocator + | Some active as s1, Commence (a2,_), Conflict a1, [], s2 -> + test <@ s1 = s2 && a2 <> a1 && a1 = active.allocationId @> + // Correct complete for same allocator is accepted + | Some active, Complete (a,r), Accepted, [Events.Completed { allocationId = ea; reason = er }], None -> + test <@ er = r && ea = a && active.allocationId = a @> + // Completes not for the same allocator are ignored + | Some active as s1, Complete (a,_), Accepted, [], s2 -> + test <@ active.allocationId <> a && s2 = s1 @> + | _, _, res, l, _ -> + test <@ List.isEmpty l && res = Accepted @> + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/Domain.Tests.fsproj b/equinox-fc/Domain.Tests/Domain.Tests.fsproj index 8ec512e47..30383b8f9 100644 --- a/equinox-fc/Domain.Tests/Domain.Tests.fsproj +++ b/equinox-fc/Domain.Tests/Domain.Tests.fsproj @@ -12,6 +12,10 @@ + + + + diff --git a/equinox-fc/Domain.Tests/TicketListTests.fs b/equinox-fc/Domain.Tests/TicketListTests.fs new file mode 100644 index 000000000..c2d8ea059 --- /dev/null +++ b/equinox-fc/Domain.Tests/TicketListTests.fs @@ -0,0 +1,45 @@ +module TicketListTests + +open FsCheck.Xunit +open Swensen.Unquote +open TicketList + +let [] properties c1 c2 = + let events = interpret c1 Folds.initial + let state1 = Folds.fold Folds.initial events + match c1, events, state1 with + // Empty request -> no Event + | (_,[]), [], state -> + test <@ Set.isEmpty state @> + | (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> + test <@ a = ea @> + test <@ state = set t @> + test <@ state = set et @> + | _, l, _ -> + test <@ List.isEmpty l @> + + let events = interpret c2 state1 + let state2 = Folds.fold state1 events + test <@ Folds.fold state2 [Folds.snapshot state2] = state2 @> + match state1, c2, events, state2 with + // Empty request -> no Event, same state + | s1, (_,[]), [], state -> + test <@ state = s1 @> + // Redundant request -> No Event, same state + | s1, (_,t), [], _ -> + test <@ Set.isSuperset s1 (set t) @> + // Two consecutive commands should both manifest in the state + | s1, (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state -> + test <@ a = ea @> + let et = Set et + test <@ Set.isSuperset (set t) et @> + test <@ Set.intersect s1 et |> Set.isEmpty @> + test <@ Set.isSuperset state s1 @> + test <@ Set.isSuperset state et @> + | _, _, l, _ -> + test <@ List.isEmpty l @> + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain.Tests/TicketTests.fs b/equinox-fc/Domain.Tests/TicketTests.fs new file mode 100644 index 000000000..32179871a --- /dev/null +++ b/equinox-fc/Domain.Tests/TicketTests.fs @@ -0,0 +1,82 @@ +module TicketTests + +open FsCheck.Xunit +open Swensen.Unquote +open Ticket +open Ticket.Folds + +/// We want to generate Allocate requests with and without the same listId in some cases +let (|MaybeSameCommands|) = function + | Allocate _ as x, Allocate _, cmd3, Choice1Of2 () -> x, x, cmd3 + | cmd1, (Allocate _ as x), Allocate _, Choice1Of2 () -> cmd1, x, x + | cmd1, cmd2, cmd3, (Choice1Of2 ()|Choice2Of2 ()) -> cmd1, cmd2, cmd3 + +/// Explicitly generate sequences with the same allocator running twice or three times +let (|MaybeSameIds|) = function + | Choice1Of4 a -> a, a, a + | Choice2Of4 (a,b) -> a, a, b + | Choice3Of4 (a,b) -> a, b, b + | Choice4Of4 (a,b,c) -> a, b, c + +let (|Invariants|) = function + // Revokes always succeed iff Unallocated + | Unallocated, Revoke, true, [], Unallocated -> + () + // Everything else fails + | _, _, res, e, _ -> + test <@ not res && List.isEmpty e @> + +let (|ReservedCases|_|) allocator = function + // Reserve given unallocated + | Unallocated, Reserve, true, [Events.Reserved { allocatorId = a }], state -> + test <@ a = allocator && state = Reserved a @> + Some () + // Idempotent reserve request + | Reserved a, Reserve, true, [], _ -> + test <@ a = allocator @> + Some () + // Revokes not by the owner are reported as successful, but we force the real owner to do the real relinquish + | (Reserved by | Allocated(by,_)), Revoke, true, [], _ -> + test <@ by <> allocator @> + Some () + // Revokes succeed iff by the owner + | (Reserved by | Allocated(by,_)), Revoke, true, [Events.Revoked], Unallocated -> + test <@ by = allocator @> + Some () + // Reservations can transition to Allocations as long as it's the same Allocator requesting + | Reserved a, Allocate l, true, [Events.Allocated { allocatorId = ea; listId = el }], Allocated (sa,sl) -> + test <@ a = allocator && a = ea && a = sa && l = el && l = sl @> + Some() + | _ -> None + +let [] properties (MaybeSameIds (a1,a2,a3)) (MaybeSameCommands (c1,c2,c3)) = + let res, events = decide a1 c1 Folds.initial + let state1 = Folds.fold Folds.initial events + + match Folds.initial, c1, res, events, state1 with + | _, Reserve, true, [Events.Reserved { allocatorId = a }], Reserved sa -> + test <@ a = a1 && sa = a1 @> + | Invariants -> () + + let res, events = decide a2 c2 state1 + let state2 = Folds.fold state1 events + match state1, c2, res, events, state2 with + | ReservedCases a2 -> () + | Invariants -> () + + let res, events = decide a3 c3 state2 + let state3 = Folds.fold state2 events + match state2, c3, res, events, state3 with + // Idempotent allocate ignore + | Allocated (a,l), Allocate l3, true, [], _ -> + test <@ a = a3 && l = l3 @> + // Allocated -> Revoked + | Allocated (a,_), Revoke, true, [Events.Revoked], Unallocated -> + test <@ a = a3 @> + | ReservedCases a3 -> () + | Invariants -> () + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/equinox-fc/Domain/Allocation.fs b/equinox-fc/Domain/Allocation.fs new file mode 100644 index 000000000..e89a81be8 --- /dev/null +++ b/equinox-fc/Domain/Allocation.fs @@ -0,0 +1,252 @@ +module Allocation + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +module Events = + + type Commenced = { ticketIds : TicketId[] } + type Tickets = { ticketIds : TicketId[] } + type Allocated = { ticketIds : TicketId[]; listId : TicketListId } + type Assigned = { listId : TicketListId } + type Snapshotted = { ticketIds : TicketId[] } + type Event = + /// Records full set of targets (so Abort can Revoke all potential in flight Reservations) + | Commenced of Commenced + /// Tickets verified as not being attainable (Allocated, not just Reserved) + | Failed of Tickets + /// Tickets verified as having been marked Reserved + | Reserved of Tickets + /// Confirming cited tickets are to be allocated to the cited list + | Allocated of Allocated + /// Records intention to release cited tickets (while Running, not implicitly via Aborted) + | Released of Tickets + /// Transitioning to phase where (Commenced-Allocated) get Returned by performing Releases on the Tickets + | Cancelled + /// Confirming cited tickets have been assigned to the list + | Assigned of Assigned + /// Records confirmed Revokes of cited Tickets + | Revoked of Tickets + /// Allocated + Returned = Commenced ==> Open for a new Commenced to happen + | Completed + // Dummy event to make Equinox.EventStore happy (see `module EventStore`) + | Snapshotted + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "Allocation" + +module Folds = + + type State = NotStarted | Running of States | Canceling of States | Completed + and States = + { unknown : Set + failed : Set + reserved : Set + assigning : Events.Allocated list + releasing : Set + stats : Stats } + and Stats = + { requested : int + denied : int + reserved : int + releasing : int + assigned : int list } + let (|Idle|Acquiring|Releasing|) = function NotStarted | Completed -> Idle | Running s -> Acquiring s | Canceling s -> Releasing s + module States = + let (|ToSet|) = set + let private withKnown xs x = { x with unknown = Set.difference x.unknown xs } + let withFailed (ToSet xs) x = { withKnown xs x with failed = x.failed |> Set.union xs } + let withReserved (ToSet xs) x = { withKnown xs x with reserved = x.reserved |> Set.union xs } + let withRevoked (ToSet xs) x = { withKnown xs x with reserved = Set.difference x.reserved xs } + let withReleasing (ToSet xs) x ={ withKnown xs x with releasing = x.releasing |> Set.union xs } // TODO + let withAssigned listId x = // TODO + let decided,remaining = x.assigning |> List.partition (fun x -> x.listId = listId) + let xs = seq { for x in decided do yield! x.ticketIds } + { withRevoked xs x with assigning = remaining } + let initial = NotStarted + let evolve state = function + | Events.Commenced e -> + match state with + | NotStarted -> Running { unknown = set e.ticketIds; failed = Set.empty; reserved = Set.empty; assigning = []; releasing = Set.empty + stats = { requested = 0; denied = 0; reserved = 0; releasing = 0; assigned = [] } } + | x -> failwithf "Can only Commence when NotStarted, not %A" x + | Events.Failed e -> + match state with + | Idle -> failwith "Cannot have Failed if Idle" + | Acquiring s -> Running (s |> States.withFailed e.ticketIds) + | Releasing s -> Canceling (s |> States.withFailed e.ticketIds) + | Events.Reserved e -> + match state with + | Idle -> failwith "Cannot have Reserved if Idle" + | Acquiring s -> Running (s |> States.withReserved e.ticketIds) + | Releasing s -> Canceling (s |> States.withReserved e.ticketIds) + | Events.Allocated e -> + match state with + | Idle -> failwith "Cannot have Allocating if Idle" + | Acquiring s -> Running { s with assigning = e :: s.assigning} + | Releasing s -> Canceling { s with assigning = e :: s.assigning} + | Events.Released e -> + match state with + | Idle -> failwith "Cannot have Releasing if Idle" + | Acquiring s -> Running (s |> States.withReleasing e.ticketIds) + | Releasing s -> Canceling (s |> States.withReleasing e.ticketIds) + | Events.Cancelled -> + match state with + | Acquiring s -> Canceling s + | x -> failwithf "Can only Abort when Running, not %A" x + | Events.Assigned e -> + match state with + | Idle -> failwith "Cannot have Allocated if Idle" + | Acquiring s -> Running (s |> States.withAssigned e.listId) + | Releasing s -> Canceling (s |> States.withAssigned e.listId) + | Events.Revoked e -> + match state with + | Idle -> failwith "Cannot have Released if Idle" + | Acquiring s -> Running (s |> States.withRevoked e.ticketIds) + | Releasing s -> Canceling (s |> States.withRevoked e.ticketIds) + | Events.Completed -> + match state with + | Acquiring s + | Releasing s when Set.isEmpty s.unknown && Set.isEmpty s.reserved && List.isEmpty s.assigning -> + Completed + | x -> failwithf "Can only Complete when reservations and unknowns resolved, not %A" x + | Events.Snapshotted -> state // Dummy event, see EventStore bindings + let fold : State -> Events.Event seq -> State = Seq.fold evolve + let isOrigin = function Events.Completed -> true | Events.Snapshotted | _ -> false + +/// Current state of the workflow based on the present state of the Aggregate +type ProcessState = + | NotStarted + | Running of reserved : TicketId list * toAssign : Events.Allocated list * toRelease : TicketId list * toReserve : TicketId list + | Idle of reserved : TicketId list + | Cancelling of toAssign : Events.Allocated list * toRelease : TicketId list + | Completed + static member FromFoldState = function + | Folds.NotStarted -> + NotStarted + | Folds.Running e -> + match Set.toList e.reserved, e.assigning, Set.toList e.releasing, Set.toList e.unknown with + | res, [], [], [] -> + Idle (reserved = res) + | res, ass, rel, tor -> + Running (reserved = res, toAssign = ass, toRelease = rel, toReserve = tor) + | Folds.Canceling e -> + Cancelling (toAssign = e.assigning, toRelease = [yield! e.reserved; yield! e.unknown; yield! e.releasing]) + | Folds.Completed -> + Completed + +/// Updates recording attained progress +type Update = + | Failed of tickets : TicketId list + | Reserved of tickets : TicketId list + | Assigned of listId : TicketListId + | Revoked of tickets : TicketId list + +let (|ToSet|) xs = set xs +let (|SetEmpty|_|) s = if Set.isEmpty s then Some () else None + +/// Map processed work to associated events that are to be recorded in the stream +let decideUpdate update state = + let owned (s : Folds.States) = Set.union s.releasing (set <| seq { yield! s.unknown; yield! s.reserved }) + match state, update with + | (Folds.Completed | Folds.NotStarted), (Failed _|Reserved _|Assigned _|Revoked _) as x -> + failwithf "Folds.Completed or NotStarted cannot handle (Failed|Revoked|Assigned) %A" x + | (Folds.Running s|Folds.Canceling s), Reserved (ToSet xs) -> + match set s.unknown |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Reserved { ticketIds = Set.toArray changed }] + | (Folds.Running s|Folds.Canceling s), Failed (ToSet xs) -> + match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Failed { ticketIds = Set.toArray changed }] + | (Folds.Running s|Folds.Canceling s), Revoked (ToSet xs) -> + match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Revoked { ticketIds = Set.toArray changed }] + | (Folds.Running s|Folds.Canceling s), Assigned listId -> + if s.assigning |> List.exists (fun x -> x.listId = listId) then [Events.Assigned { listId = listId }] else [] + +/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events +type private Accumulator() = + let acc = ResizeArray() + member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function + | res, [] -> res,state + | res, [e] -> acc.Add e; res,Folds.evolve state e + | res, xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + member __.Accumulated = List.ofSeq acc + +/// Impetus provided to the Aggregate Service from the Process Manager +type Command = + | Commence of tickets : TicketId list + | Apply of assign : Events.Allocated list * release : TicketId list + | Cancel + +/// Apply updates, decide whether Command is applicable, emit state reflecting work to be completed to conclude the in-progress workflow (if any) +let sync (updates : Update seq, command : Command) (state : Folds.State) : (bool*ProcessState) * Events.Event list = + let acc = Accumulator() + + (* Apply any updates *) + let mutable state = state + for x in updates do + let (),state' = acc.Ingest state ((),decideUpdate x state) + state <- state' + + (* Decide whether the Command is now acceptable *) + let accepted,state = + acc.Ingest state <| + match state, command with + (* Ignore on the basis of being idempotent in the face of retries *) + // TOCONSIDER how to represent that a request is being denied e.g. due to timeout vs due to being complete + | (Folds.Idle|Folds.Releasing _), Apply _ -> + false, [] + (* Defer; Need to allow current request to progress before it can be considered *) + | (Folds.Acquiring _|Folds.Releasing _), Commence _ -> + true, [] // TODO validate idempotent ? + (* Ok on the basis of idempotency *) + | (Folds.Idle|Folds.Releasing _), Cancel -> + true, [] + (* Ok; Currently idle, normal Commence request*) + | Folds.Idle, Commence tickets -> + true,[Events.Commenced { ticketIds = Array.ofList tickets }] + (* Ok; normal apply to distribute held tickets *) + | Folds.Acquiring s, Apply (assign,release) -> + let avail = System.Collections.Generic.HashSet s.reserved + let toAssign = [for a in assign -> { a with ticketIds = a.ticketIds |> Array.where avail.Remove }] + let toRelease = (Set.empty,release) ||> List.fold (fun s x -> if avail.Remove x then Set.add x s else s) + true, [ + for x in toAssign do if (not << Array.isEmpty) x.ticketIds then yield Events.Allocated x + match toRelease with SetEmpty -> () | toRelease -> yield Events.Released { ticketIds = Set.toArray toRelease }] + (* Ok, normal Cancel *) + | Folds.Acquiring _, Cancel -> + true, [Events.Cancelled] + + (* Yield outstanding processing requirements (if any), together with events accumulated based on the `updates` *) + (accepted, ProcessState.FromFoldState state), acc.Accumulated + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, AllocationId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + + member __.Sync(allocationId,updates,command) : Async = + let (Stream stream) = allocationId + stream.Transact(sync (updates,command)) + +module EventStore = + + open Equinox.EventStore + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent + let opt = Equinox.ResolveOption.AllowStale + // We should be reaching Completed state frequently so no actual Snapshots should get written + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve(id,opt) + let create (context,cache) = + Service(resolve (context,cache)) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent + let opt = Equinox.ResolveOption.AllowStale + // TODO impl snapshots + let makeEmptyUnfolds events _state = events,[] + let accessStrategy = AccessStrategy.Custom (Folds.isOrigin,makeEmptyUnfolds) + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve(id,opt) + let create (context,cache) = + Service(resolve (context,cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/Allocator.fs b/equinox-fc/Domain/Allocator.fs new file mode 100644 index 000000000..7b8c792df --- /dev/null +++ b/equinox-fc/Domain/Allocator.fs @@ -0,0 +1,74 @@ +module Allocator + +open System + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +module Events = + + type Commenced = { allocationId : AllocationId; cutoff : DateTimeOffset } + type Completed = { allocationId : AllocationId; reason : Reason } + and [)>] + Reason = Ok | TimedOut | Cancelled + type Snapshotted = { active : Commenced option } + type Event = + | Commenced of Commenced + | Completed of Completed + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "Allocator" + +module Folds = + + type State = Events.Commenced option + let initial = None + let evolve _state = function + | Events.Commenced e -> Some e + | Events.Completed _ -> None + let fold : State -> Events.Event seq -> State = Seq.fold evolve + +type CommenceResult = Accepted | Conflict of AllocationId + +let decideCommence allocationId cutoff : Folds.State -> CommenceResult*Events.Event list = function + | None -> Accepted, [Events.Commenced { allocationId = allocationId; cutoff = cutoff }] + | Some { allocationId = tid } when allocationId = tid -> Accepted, [] // Accept replay idempotently + | Some curr -> Conflict curr.allocationId, [] // Reject attempts at commencing overlapping transactions + +let decideComplete allocationId reason : Folds.State -> Events.Event list = function + | Some { allocationId = tid } when allocationId = tid -> [Events.Completed { allocationId = allocationId; reason = reason }] + | Some _ | None -> [] // Assume replay; accept but don't write + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, AllocatorId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + + member __.Commence(allocatorId, allocationId, cutoff) : Async = + let (Stream stream) = allocatorId + stream.Transact(decideCommence allocationId cutoff) + + member __.Complete(allocatorId, allocationId, reason) : Async = + let (Stream stream) = allocatorId + stream.Transact(decideComplete allocationId reason) + +module EventStore = + + open Equinox.EventStore + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent + let opt = Equinox.ResolveOption.AllowStale + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) + let create (context,cache) = + Service(resolve (context,cache)) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent + let opt = Equinox.ResolveOption.AllowStale + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) + let create (context,cache) = + Service(resolve (context,cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/Domain.fsproj b/equinox-fc/Domain/Domain.fsproj index 107408957..8db2109f1 100644 --- a/equinox-fc/Domain/Domain.fsproj +++ b/equinox-fc/Domain/Domain.fsproj @@ -11,9 +11,15 @@ + + + + + + diff --git a/equinox-fc/Domain/Infrastructure.fs b/equinox-fc/Domain/Infrastructure.fs index 7a1bfa843..55f110128 100644 --- a/equinox-fc/Domain/Infrastructure.fs +++ b/equinox-fc/Domain/Infrastructure.fs @@ -2,6 +2,8 @@ namespace global open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings +(* Locations *) + type LocationId = string and [] locationId module LocationId = @@ -13,4 +15,30 @@ and [] locationEpochId module LocationEpochId = let parse (value : int) : LocationEpochId = %value let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) - let toString (value : LocationEpochId) : string = string %value \ No newline at end of file + let toString (value : LocationEpochId) : string = string %value + +(* Tickets *) + +type TicketId = string +and [] ticketId +module TicketId = + let parse (value : string) : TicketId = let raw = value in %raw + let toString (value : TicketId) : string = %value + +type TicketListId = string +and [] ticketListId +module TicketListId = + let parse (value : string) : TicketListId = let raw = value in %raw + let toString (value : TicketListId) : string = %value + +type AllocationId = string +and [] allocationId +module AllocationId = + let parse (value : string) : AllocationId = let raw = value in %raw + let toString (value : AllocationId) : string = %value + +type AllocatorId = string +and [] allocatorId +module AllocatorId = + let parse (value : string) : AllocatorId = let raw = value in %raw + let toString (value : AllocatorId) : string = %value \ No newline at end of file diff --git a/equinox-fc/Domain/ListAllocation.fs b/equinox-fc/Domain/ListAllocation.fs new file mode 100644 index 000000000..51662d008 --- /dev/null +++ b/equinox-fc/Domain/ListAllocation.fs @@ -0,0 +1,22 @@ +module ListAllocation + +open System + +type Service(maxListLen, allocators : Allocator.Service, allocations : Allocation.Service, lists : TicketList.Service, tickets : Ticket.Service) = + + member __.Commence(allocatorId, allocationId, tickets, transactionTimeout) : Async<_> = async { + let cutoff = let now = DateTimeOffset.UtcNow in now.Add transactionTimeout + let! state = allocators.Commence(allocatorId, allocationId, cutoff) + // TODO cancel timed out conflicting work + let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Commence tickets) + return state } + + member __.Read(allocationId) : Async<_> = async { + let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Apply ([],[])) + // TODO incorporate allocator state + return state } + + member __.Cancel(allocatorId,allocationId) : Async<_> = async { + let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Cancel) + // TODO propagate to allocator with reason + return state } diff --git a/equinox-fc/Domain/Ticket.fs b/equinox-fc/Domain/Ticket.fs new file mode 100644 index 000000000..b5a98c057 --- /dev/null +++ b/equinox-fc/Domain/Ticket.fs @@ -0,0 +1,81 @@ +module Ticket + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +module Events = + + type Reserved = { allocatorId : AllocatorId } + type Allocated = { allocatorId : AllocatorId; listId : TicketListId } + + type Event = + | Reserved of Reserved + | Allocated of Allocated + | Revoked + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "Ticket" + +module Folds = + + type State = Unallocated | Reserved of by : AllocatorId | Allocated of by : AllocatorId * on : TicketListId + let initial = Unallocated + let private evolve _state = function + | Events.Reserved e -> Reserved e.allocatorId + | Events.Allocated e -> Allocated (e.allocatorId, e.listId) + | Events.Revoked -> Unallocated + // because each event supersedes the previous one, we only ever need to fold the last event + let fold state events = + Seq.tryLast events |> Option.fold evolve state + +type Command = + /// permitted if nobody owns it (or idempotently ok if we are the owner) + | Reserve + /// permitted if the allocator has it reserved (or idempotently ok if already on list) + | Allocate of on : TicketListId + /// must be performed by the owner; attempts by non-owner to deallocate get ignored as a new owner now has that responsibility + /// (but are not failures from an Allocator's perspective) + | Revoke + +let decide (allocator : AllocatorId) (command : Command) (state : Folds.State) : bool * Events.Event list = + match command, state with + | Reserve, Folds.Unallocated -> true,[Events.Reserved { allocatorId = allocator }] // normal case -> allow+record + | Reserve, Folds.Reserved by when by = allocator -> true,[] // idempotently permit + | Reserve, (Folds.Reserved _ | Folds.Allocated _) -> false,[] // report failure, nothing to write + | Allocate list, Folds.Allocated (by,l) when by = allocator && l = list -> true,[] // idempotent processing + | Allocate list, Folds.Reserved by when by = allocator -> true,[Events.Allocated { allocatorId = allocator; listId = list }] // normal + | Allocate _, (Folds.Allocated _ | Folds.Unallocated | Folds.Reserved _) -> false,[] // Fail if someone else has reserved or allocated, or we are jumping straight to Allocated without Reserving first + | Revoke, Folds.Unallocated -> true,[] // idempotent handling + | Revoke, (Folds.Reserved by | Folds.Allocated (by,_)) when by = allocator -> true,[Events.Revoked] // release Reservation or Allocation + | Revoke, (Folds.Reserved _ | Folds.Allocated _ ) -> true,[] // NOTE we report success of achieving the intent (but, critically, we leave it to the actual owner to manage any actual revoke) + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, TicketId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + + /// Attempts to achieve the intent represented by `command`. High level semantics as per comments on Command (see decide for lowdown) + /// `false` is returned if a competing allocator holds it (or we're attempting to jump straight to Allocated without first Reserving) + member __.Sync(pickTicketId, allocator, command : Command) : Async = + let (Stream stream) = pickTicketId + stream.Transact(decide allocator command) + +module EventStore = + + open Equinox.EventStore + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // because we only ever need the last event, we use the Equinox.EventStore access strategy that optimizes around that + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let create (context,cache)= + Service(resolve (context,cache)) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // because we only ever need the last event to build the state, we feed the events we are writing + // (there's always exactly one if we are writing), into the unfolds slot so a single point read with etag check gets us state in one trip + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let create (context,cache) = + Service(resolve (context,cache)) \ No newline at end of file diff --git a/equinox-fc/Domain/TicketList.fs b/equinox-fc/Domain/TicketList.fs new file mode 100644 index 000000000..f843e60c9 --- /dev/null +++ b/equinox-fc/Domain/TicketList.fs @@ -0,0 +1,65 @@ +module TicketList + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +module Events = + + type Allocated = { allocatorId : AllocatorId; ticketIds : TicketId[] } + type Snapshotted = { ticketIds : TicketId[] } + type Event = + | Allocated of Allocated + | Snapshotted of Snapshotted + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "TicketList" + +module Folds = + + type State = Set + let initial = Set.empty + let evolve state = function + | Events.Allocated e -> (state,e.ticketIds) ||> Array.fold (fun m x -> Set.add x m) + | Events.Snapshotted e -> Set.ofArray e.ticketIds + let fold : State -> Events.Event seq -> State = Seq.fold evolve + let isOrigin = function Events.Snapshotted _ -> true | Events.Allocated _ -> false + let snapshot state = Events.Snapshotted { ticketIds = Set.toArray state } + +let interpret (allocatorId : AllocatorId, allocated : TicketId list) (state : Folds.State) : Events.Event list = + match allocated |> Seq.except state |> Seq.distinct |> Seq.toArray with + | [||] -> [] + | news -> [Events.Allocated { allocatorId = allocatorId; ticketIds = news }] + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, TicketListId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 3) + + member __.Sync(pickListId,allocatorId,assignedTickets) : Async = + let (Stream stream) = pickListId + stream.Transact(interpret (allocatorId,assignedTickets)) + +module EventStore = + + open Equinox.EventStore + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers (which might cause us to have to retry a Transact and discover it is redundant), there is never a cost to being wrong + let opt = Equinox.ResolveOption.AllowStale + // we _could_ use this Access Strategy, but because we are only generally doing a single shot write, its unwarranted + // let accessStrategy = AccessStrategy.RollingSnapshots (Folds.isOrigin,Folds.snapshot) + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve(id,opt) + let create (context,cache) = + Service(resolve (context,cache)) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + // while there are competing writers (which might cause us to have to retry a Transact and discover it is redundant), there is never a cost to being wrong + let opt = Equinox.ResolveOption.AllowStale + // we want reads and writes (esp idempotent ones) to have optimal RU efficiency so we go the extra mile to do snapshotting into the Tip + let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot) + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve(id,opt) + let create (context,cache)= + Service(resolve (context,cache)) \ No newline at end of file