From 420e299daef5778953397b7bec4601765383614b Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 30 Dec 2021 17:23:03 +0000 Subject: [PATCH] Finish ConfirmedController --- .../Controllers/ConfirmedCartsController.fs | 64 ++++++++++--------- .../ECommerce.Domain/ConfirmedEpoch.fs | 4 +- .../ECommerce.Domain/Types.fs | 23 ++++++- 3 files changed, 57 insertions(+), 34 deletions(-) diff --git a/Sample/ECommerce.Equinox/ECommerce.Api/Controllers/ConfirmedCartsController.fs b/Sample/ECommerce.Equinox/ECommerce.Api/Controllers/ConfirmedCartsController.fs index 8f3f2098f..a4680290b 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Api/Controllers/ConfirmedCartsController.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Api/Controllers/ConfirmedCartsController.fs @@ -4,54 +4,56 @@ open Microsoft.AspNetCore.Mvc open ECommerce.Domain -type TicketsTranchesDto = { activeEpochs : TrancheReferenceDto[] } - and TrancheReferenceDto = { fc : FcId; epochId : ConfirmedEpochId } +type TranchesDto = { activeEpochs : TrancheReferenceDto[] } + and TrancheReferenceDto = { seriesId : ConfirmedSeriesId; epochId : ConfirmedEpochId } -type SliceDto = { closed : bool; tickets : ItemDto[]; position : TicketsCheckpoint; checkpoint : TicketsCheckpoint } - and ItemDto = { id : TicketId; payload : string } -module ItemDto = +module TranchesDto = - let ofDto (x : TicketsEpoch.Events.Item) : ItemDto = - { id = x.id; payload = x.payload } + let ofEpochId epochId = + { activeEpochs = [| { seriesId = ConfirmedSeriesId.wellKnownId; epochId = epochId } |]} + +type SliceDto = { closed : bool; carts : CartDto[]; position : ConfirmedCheckpoint; checkpoint : ConfirmedCheckpoint } + and CartDto = { id : CartId; items : ItemDto[] } + and ItemDto = { productId : ProductId; unitPrice : decimal; quantity : int } + +module CartDto = + + let ofDto (x : ConfirmedEpoch.Events.Cart) : CartDto = + { id = x.cartId + items = [| for x in x.items -> { productId = x.productId; unitPrice = x.unitPrice; quantity = x.quantity } |] } module Checkpoint = - let ofEpochAndOffset (epoch : TicketsEpochId) (offset : int) = - TicketsCheckpoint.ofEpochAndOffset epoch offset + let ofEpochAndOffset (epoch : ConfirmedEpochId) (offset : int) = + ConfirmedCheckpoint.ofEpochAndOffset epoch offset - let ofState (epochId : TicketsEpochId) (s : TicketsEpoch.Reader.StateDto) = - TicketsCheckpoint.ofEpochContent epochId s.closed s.tickets.Length + let ofState (epochId : ConfirmedEpochId) (s : ConfirmedEpoch.Reader.StateDto) = + ConfirmedCheckpoint.ofEpochContent epochId s.closed s.carts.Length [] -type TicketsController(tickets : TicketsIngester.Service, series : TicketsSeries.Service, epochs : TicketsEpoch.Reader.Service) = +type ConfirmedController(series : ConfirmedSeries.Service, epochs : ConfirmedEpoch.Reader.Service) = inherit ControllerBase() - [] - member _.Post(fc : FcId, ticket : TicketId, [] payload) = async { - let! _added = tickets.ForFc(fc).TryIngest({ id = ticket; payload = payload}) - () - } - [] - member _.ListTranches() : Async = async { - let! active = series.ReadIngestionEpochs() - return { activeEpochs = [| for x in active -> { fc = x.fc; epochId = x.ingestionEpochId } |]} + member _.ListTranches() : Async = async { + let! active = series.ReadIngestionEpochId() + return TranchesDto.ofEpochId active } - [] - member _.ReadTranche(fcId : FcId, epoch : TicketsEpochId) : Async = async { - let! state = epochs.Read(fcId, epoch) + [] + member _.ReadTranche(epoch : ConfirmedEpochId) : Async = async { + let! state = epochs.Read(epoch) // TOCONSIDER closed should control cache header let pos, checkpoint = Checkpoint.ofEpochAndOffset epoch 0, Checkpoint.ofState epoch state - return { closed = state.closed; tickets = Array.map ItemDto.ofDto state.tickets; position = pos; checkpoint = checkpoint } + return { closed = state.closed; carts = Array.map CartDto.ofDto state.carts; position = pos; checkpoint = checkpoint } } - [] - member _.Poll(fcId : FcId, token : System.Nullable) : Async = async { - let pos = if token.HasValue then token.Value else TicketsCheckpoint.initial - let epochId, offset = TicketsCheckpoint.toEpochAndOffset pos - let! state = epochs.Read(fcId, epochId) + [] + member _.Poll(token : System.Nullable) : Async = async { + let pos = if token.HasValue then token.Value else ConfirmedCheckpoint.initial + let epochId, offset = ConfirmedCheckpoint.toEpochAndOffset pos + let! state = epochs.Read(epochId) // TOCONSIDER closed should control cache header let pos, checkpoint = Checkpoint.ofEpochAndOffset epochId offset, Checkpoint.ofState epochId state - return { closed = state.closed; tickets = Array.skip offset state.tickets |> Array.map ItemDto.ofDto; position = pos; checkpoint = checkpoint } + return { closed = state.closed; carts = Array.skip offset state.carts |> Array.map CartDto.ofDto; position = pos; checkpoint = checkpoint } } diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs index 994938e0c..3e6b7f80a 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/ConfirmedEpoch.fs @@ -68,12 +68,12 @@ type Service internal /// Ingest the supplied items. Yields relevant elements of the post-state to enable generation of stats /// and facilitate deduplication of incoming items in order to avoid null store round-trips where possible - member _.Ingest(epochId, items) = + member _.Ingest(epochId, carts) = let decider = resolveStale epochId /// NOTE decider which will initially transact against potentially stale cached state, which will trigger a /// resync if another writer has gotten in before us. This is a conscious decision in this instance; the bulk /// of writes are presumed to be coming from within this same process - decider.Transact(decide shouldClose items) + decider.Transact(decide shouldClose carts) /// Returns all the items currently held in the stream (Not using AllowStale on the assumption this needs to see updates from other apps) member _.Read epochId : Async = diff --git a/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs b/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs index 888616c20..5cfe0b5c7 100644 --- a/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs +++ b/Sample/ECommerce.Equinox/ECommerce.Domain/Types.fs @@ -47,6 +47,27 @@ type [] confirmedEpochId type ConfirmedEpochId = int module ConfirmedEpochId = let initial = 0 -// let value (value : ConfirmedEpochId) : int = %value + let value (value : ConfirmedEpochId) : int = %value + let parse (value : int) : ConfirmedEpochId = %value let next (value : ConfirmedEpochId) : ConfirmedEpochId = % (%value + 1) let toString (value : ConfirmedEpochId) : string = string %value + +type [] confirmedCheckpoint +type ConfirmedCheckpoint = int64 +module ConfirmedCheckpoint = + + let initial : ConfirmedCheckpoint = %0L + let factor = 1_000_000L + + let ofEpochAndOffset (epoch : ConfirmedEpochId) offset : ConfirmedCheckpoint = + int64 (ConfirmedEpochId.value epoch) * factor + int64 offset |> UMX.tag + + let ofEpochContent (epoch : ConfirmedEpochId) isClosed count : ConfirmedCheckpoint = + let epoch, offset = + if isClosed then ConfirmedEpochId.next epoch, 0 + else epoch, count + ofEpochAndOffset epoch offset + + let toEpochAndOffset (value : ConfirmedCheckpoint) : ConfirmedEpochId * int = + let d, r = Math.DivRem(%value, factor) + (ConfirmedEpochId.parse (int d)), int r