diff --git a/CHANGELOG.md b/CHANGELOG.md index 039ebf8ac..edc893cb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- `eqxShipping`: Unit and integration tests [#70](https://github.com/jet/dotnet-templates/pull/70) + ### Changed ### Removed ### Fixed diff --git a/dotnet-templates.sln b/dotnet-templates.sln index 5aa093d84..950b0b6b6 100644 --- a/dotnet-templates.sln +++ b/dotnet-templates.sln @@ -93,6 +93,10 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Watchdog", "equinox-shippin EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Templates.Tests", "tests\Equinox.Templates.Tests\Equinox.Templates.Tests.fsproj", "{26BFE6BC-5887-4E40-8CFD-F15332F5A104}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "equinox-shipping\Domain.Tests\Domain.Tests.fsproj", "{5A45EF21-576B-4B40-86BD-F5960ECD66BF}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Watchdog.Integration", "equinox-shipping\Watchdog.Integration\Watchdog.Integration.fsproj", "{83BA87C3-6288-40F4-BC4F-EC3A54586CDF}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -157,6 +161,14 @@ Global {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Debug|Any CPU.Build.0 = Debug|Any CPU {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Release|Any CPU.ActiveCfg = Release|Any CPU {26BFE6BC-5887-4E40-8CFD-F15332F5A104}.Release|Any CPU.Build.0 = Release|Any CPU + {5A45EF21-576B-4B40-86BD-F5960ECD66BF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5A45EF21-576B-4B40-86BD-F5960ECD66BF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5A45EF21-576B-4B40-86BD-F5960ECD66BF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5A45EF21-576B-4B40-86BD-F5960ECD66BF}.Release|Any CPU.Build.0 = Release|Any CPU + {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {83BA87C3-6288-40F4-BC4F-EC3A54586CDF}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(NestedProjects) = preSolution {F66A5BFE-7C81-44DC-97DE-3FD8C83B8F06} = {B72FFAAE-7801-41B2-86F5-FD90E97A30F7} @@ -172,5 +184,7 @@ Global {36C2D70A-F292-4481-8ADA-5066A80F92B2} = {1F3C9245-F973-43A3-97C9-5E527B93060C} {7B96FCF8-0BB5-4494-A143-628882A6E50A} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} {9AFF6138-B63B-4EBF-B86B-4F626E1F1ADF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} + {5A45EF21-576B-4B40-86BD-F5960ECD66BF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} + {83BA87C3-6288-40F4-BC4F-EC3A54586CDF} = {DAE9E2B9-EDA2-4064-B0CE-FD5294549374} EndGlobalSection EndGlobal diff --git a/equinox-shipping/Domain.Tests/ContainerTests.fs b/equinox-shipping/Domain.Tests/ContainerTests.fs new file mode 100644 index 000000000..d16c88188 --- /dev/null +++ b/equinox-shipping/Domain.Tests/ContainerTests.fs @@ -0,0 +1,12 @@ +module Shipping.Domain.Tests.ContainerTests + +open Shipping.Domain.Container + +open FsCheck.Xunit +open Swensen.Unquote + +let [] ``events roundtrip`` (x : Events.Event) = + let ee = Events.codec.Encode(None, x) + let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + let des = Events.codec.TryDecode e + test <@ des = Some x @> diff --git a/equinox-shipping/Domain.Tests/Domain.Tests.fsproj b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj new file mode 100644 index 000000000..f648e8a71 --- /dev/null +++ b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj @@ -0,0 +1,33 @@ + + + + netcoreapp3.1 + 5 + false + false + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/equinox-shipping/Domain.Tests/FinalizationProcessManagerTests.fs b/equinox-shipping/Domain.Tests/FinalizationProcessManagerTests.fs new file mode 100644 index 000000000..1081d8fa1 --- /dev/null +++ b/equinox-shipping/Domain.Tests/FinalizationProcessManagerTests.fs @@ -0,0 +1,43 @@ +module Shipping.Domain.Tests.FinalizationProcessManagerTests + +open Shipping.Domain + +open FsCheck.Xunit +open Swensen.Unquote + +[] +let ``FinalizationProcessManager properties`` (Id transId1, Id transId2, Id containerId1, Id containerId2, IdsAtLeastOne shipmentIds1, IdsAtLeastOne shipmentIds2, Id shipment3) = + let store = Equinox.MemoryStore.VolatileStore() + let buffer = EventAccumulator() + use __ = store.Committed.Subscribe buffer.Record + let eventTypes = seq { for e in buffer.All() -> e.EventType } + let processManager = createProcessManager 16 store + Async.RunSynchronously <| async { + (* First, run the happy path - should pass through all stages of the lifecycle *) + let requestedShipmentIds = Array.append shipmentIds1 shipmentIds2 + let! res1 = processManager.TryFinalizeContainer(transId1, containerId1, requestedShipmentIds) + let expectedEvents = + [ "FinalizationRequested"; "ReservationCompleted"; "AssignmentCompleted"; "Completed" // Transaction + "Reserved"; "Assigned" // Shipment + "Finalized"] // Container + test <@ res1 && set eventTypes = set expectedEvents @> + let containerEvents = + buffer.Queue(Container.streamName containerId1) + |> Seq.choose Container.Events.codec.TryDecode + |> List.ofSeq + test <@ match containerEvents with + | [ Container.Events.Finalized e ] -> e.shipmentIds = requestedShipmentIds + | xs -> failwithf "Unexpected %A" xs @> + (* Next, we run an overlapping finalize - this should + a) yield a fail result + b) result in triggering of Revert flow with associated Shipment revoke events *) + buffer.Clear() + let! res2 = processManager.TryFinalizeContainer(transId2, containerId2, Array.append shipmentIds2 [|shipment3|]) + let expectedEvents = + [ "FinalizationRequested"; "RevertCommenced"; "Completed" // Transaction + "Reserved"; "Revoked" ] // Shipment + test <@ not res2 + && set eventTypes = set expectedEvents @> + } + +module Dummy = let [] main _argv = 0 diff --git a/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs b/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs new file mode 100644 index 000000000..3baf69562 --- /dev/null +++ b/equinox-shipping/Domain.Tests/FinalizationTransactionTests.fs @@ -0,0 +1,12 @@ +module Shipping.Domain.Tests.FinalizationTransactionTests + +open Shipping.Domain.FinalizationTransaction + +open FsCheck.Xunit +open Swensen.Unquote + +let [] ``events roundtrip`` (x : Events.Event) = + let ee = Events.codec.Encode(None, x) + let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + let des = Events.codec.TryDecode e + test <@ des = Some x @> diff --git a/equinox-shipping/Domain.Tests/Fixtures.fs b/equinox-shipping/Domain.Tests/Fixtures.fs new file mode 100644 index 000000000..86c721942 --- /dev/null +++ b/equinox-shipping/Domain.Tests/Fixtures.fs @@ -0,0 +1,39 @@ +[] +module Shipping.Domain.Tests.Fixtures + +open Shipping.Domain + +module FinalizationTransaction = + open FinalizationTransaction + module MemoryStore = + open Equinox.MemoryStore + let create store = + let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial) + create resolver.Resolve +module Container = + open Container + module MemoryStore = + open Equinox.MemoryStore + let create store = + let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial) + create resolver.Resolve +module Shipment = + open Shipment + module MemoryStore = + open Equinox.MemoryStore + let create store = + let resolver = Resolver(store, Events.codec, Fold.fold, Fold.initial) + create resolver.Resolve + +let createProcessManager maxDop store = + let transactions = FinalizationTransaction.MemoryStore.create store + let containers = Container.MemoryStore.create store + let shipments = Shipment.MemoryStore.create store + FinalizationProcessManager.Service(transactions, containers, shipments, maxDop=maxDop) + +(* Generic FsCheck helpers *) + +let (|Id|) (x : System.Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let (|Ids|) (xs : System.Guid[]) = xs |> Array.map (|Id|) +let (|IdsAtLeastOne|) (Ids xs, Id x) = [| yield x; yield! xs |] +let (|AtLeastOne|) (x, xs) = x::xs diff --git a/equinox-shipping/Domain.Tests/Infrastructure.fs b/equinox-shipping/Domain.Tests/Infrastructure.fs new file mode 100644 index 000000000..55fc39b1f --- /dev/null +++ b/equinox-shipping/Domain.Tests/Infrastructure.fs @@ -0,0 +1,22 @@ +[] +module Shipping.Domain.Tests.Infrastructure + +open System.Collections.Concurrent + +type EventAccumulator<'E>() = + let messages = ConcurrentDictionary>() + + member __.Record(stream, events : 'E seq) = + let initStreamQueue _ = ConcurrentQueue events + let appendToQueue _ (queue : ConcurrentQueue<'E>) = events |> Seq.iter queue.Enqueue; queue + messages.AddOrUpdate(stream, initStreamQueue, appendToQueue) |> ignore + + member __.Queue stream = + match messages.TryGetValue stream with + | false, _ -> Seq.empty<'E> + | true, xs -> xs :> _ + + member __.All() = seq { for KeyValue (_, xs) in messages do yield! xs } + + member __.Clear() = + messages.Clear() diff --git a/equinox-shipping/Domain.Tests/ShipmentTests.fs b/equinox-shipping/Domain.Tests/ShipmentTests.fs new file mode 100644 index 000000000..066d91255 --- /dev/null +++ b/equinox-shipping/Domain.Tests/ShipmentTests.fs @@ -0,0 +1,17 @@ +module Shipping.Domain.Tests.ShipmentTests + +open Shipping.Domain.Shipment + +open FsCheck.Xunit +open Swensen.Unquote + +// We use Optional string values in the event representations +// However, FsCodec.NewtonsoftJson.OptionConverter maps `Some null` to `None`, which does not roundtrip +// In order to avoid having to special case the assertion in the roundtrip test, we stub out such values +let (|ReplaceSomeNullWithNone|) = TypeShape.Generic.map (function Some (null : string) -> None | x -> x) + +let [] ``events roundtrip`` (ReplaceSomeNullWithNone (x : Events.Event)) = + let ee = Events.codec.Encode(None, x) + let e = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + let des = Events.codec.TryDecode e + test <@ des = Some x @> diff --git a/equinox-shipping/Domain/Container.fs b/equinox-shipping/Domain/Container.fs index c41bf55b5..0d6440103 100644 --- a/equinox-shipping/Domain/Container.fs +++ b/equinox-shipping/Domain/Container.fs @@ -7,7 +7,7 @@ let streamName (containerId : ContainerId) = FsCodec.StreamName.create Category module Events = type Event = - | Finalized of {| shipmentIds : ShipmentId[] |} + | Finalized of {| shipmentIds : ShipmentId[] |} | Snapshotted of {| shipmentIds : ShipmentId[] |} interface TypeShape.UnionContract.IUnionContract @@ -28,7 +28,7 @@ module Fold = let toSnapshot (state : State) = Events.Snapshotted {| shipmentIds = state.shipmentIds |} let interpretFinalize shipmentIds (state : Fold.State): Events.Event list = - [ if (not << Array.isEmpty) state.shipmentIds then yield Events.Finalized {| shipmentIds = shipmentIds |} ] + [ if Array.isEmpty state.shipmentIds then yield Events.Finalized {| shipmentIds = shipmentIds |} ] type Service internal (resolve : ContainerId -> Equinox.Stream) = @@ -36,7 +36,7 @@ type Service internal (resolve : ContainerId -> Equinox.Stream(), resolve (streamName id), maxAttempts=3) Service(resolve) diff --git a/equinox-shipping/Domain/FinalizationProcessManager.fs b/equinox-shipping/Domain/FinalizationProcessManager.fs index 5e2a7d6e7..3f6dcb444 100644 --- a/equinox-shipping/Domain/FinalizationProcessManager.fs +++ b/equinox-shipping/Domain/FinalizationProcessManager.fs @@ -37,7 +37,7 @@ type Service | Action.AssignShipments (shipmentIds, containerId) -> let! _ = Async.Parallel(seq { for sId in shipmentIds -> shipments.Assign(sId, containerId, transactionId) }, maxDop) - return! loop Events.Completed + return! loop Events.AssignmentCompleted | Action.FinalizeContainer (containerId, shipmentIds) -> do! containers.Finalize(containerId, shipmentIds) diff --git a/equinox-shipping/Domain/FinalizationTransaction.fs b/equinox-shipping/Domain/FinalizationTransaction.fs index b100635cb..7c4047bd2 100644 --- a/equinox-shipping/Domain/FinalizationTransaction.fs +++ b/equinox-shipping/Domain/FinalizationTransaction.fs @@ -52,8 +52,9 @@ module Fold = // The implementation trusts (does not spend time double checking) that events have passed an isValidTransition check let evolve (state : State) (event : Events.Event) : State = match state, event with - | _, Events.FinalizationRequested event -> State.Assigning {| container = event.container; shipments = event.shipments |} - | State.Assigning state, Events.RevertCommenced event -> State.Reverting {| shipments = event.shipments |} + | _, Events.FinalizationRequested event -> State.Reserving {| container = event.container; shipments = event.shipments |} + | State.Reserving state, Events.ReservationCompleted -> State.Assigning {| container = state.container; shipments = state.shipments |} + | State.Reserving _state, Events.RevertCommenced event -> State.Reverting {| shipments = event.shipments |} | State.Reverting _state, Events.Completed -> State.Completed {| success = false |} | State.Assigning state, Events.AssignmentCompleted -> State.Assigned {| container = state.container; shipments = state.shipments |} | State.Assigned _, Events.Completed -> State.Completed {| success = true |} @@ -99,7 +100,7 @@ type Service internal (resolve : TransactionId -> Equinox.Stream(), resolve (streamName id), maxAttempts=3) Service(resolve) diff --git a/equinox-shipping/Domain/Shipment.fs b/equinox-shipping/Domain/Shipment.fs index cbba0ba85..61551dcda 100644 --- a/equinox-shipping/Domain/Shipment.fs +++ b/equinox-shipping/Domain/Shipment.fs @@ -31,14 +31,12 @@ module Fold = let toSnapshot (state : State) = Events.Snapshotted {| reservation = state.reservation; association = state.association |} let decideReserve transactionId : Fold.State -> bool * Events.Event list = function - | { reservation = r; association = a } -> - // validity is dependent on whether its a) open or b) an idempotent retry by the same transaction - let isValid = Option.forall (fun x -> x = transactionId) r - // if it's already Reserved (and/or assigned), there's no work to do - isValid, if isValid && Option.isNone a then [ Events.Reserved {| transaction = transactionId |} ] else [] + | { reservation = Some r } when r = transactionId -> true, [] + | { reservation = None } -> true, [ Events.Reserved {| transaction = transactionId |} ] + | _ -> false, [] let interpretRevoke transactionId : Fold.State -> Events.Event list = function - | { reservation = Some current; association = None } when current = transactionId -> + | { reservation = Some r; association = None } when r = transactionId -> [ Events.Revoked ] | _ -> [] // Ignore if a) already revoked/never reserved b) not reserved for this transactionId @@ -61,7 +59,7 @@ type Service internal (resolve : ShipmentId -> Equinox.Stream(), resolve (streamName id), maxAttempts=3) Service(resolve) diff --git a/equinox-shipping/Shipping.sln b/equinox-shipping/Shipping.sln index 04eb34a63..d989d1eb9 100644 --- a/equinox-shipping/Shipping.sln +++ b/equinox-shipping/Shipping.sln @@ -9,6 +9,10 @@ ProjectSection(SolutionItems) = preProject README.md = README.md EndProjectSection EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "Domain.Tests\Domain.Tests.fsproj", "{4F3C949A-0D13-4ED1-841E-B74AC500D0E7}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Watchdog.Integration", "Watchdog.Integration\Watchdog.Integration.fsproj", "{DAF9BD44-E012-4986-9820-5A3732845384}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -23,5 +27,13 @@ Global {222D612E-51E0-4ED1-BB34-AF08E3BBF741}.Debug|Any CPU.Build.0 = Debug|Any CPU {222D612E-51E0-4ED1-BB34-AF08E3BBF741}.Release|Any CPU.ActiveCfg = Release|Any CPU {222D612E-51E0-4ED1-BB34-AF08E3BBF741}.Release|Any CPU.Build.0 = Release|Any CPU + {4F3C949A-0D13-4ED1-841E-B74AC500D0E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4F3C949A-0D13-4ED1-841E-B74AC500D0E7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4F3C949A-0D13-4ED1-841E-B74AC500D0E7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4F3C949A-0D13-4ED1-841E-B74AC500D0E7}.Release|Any CPU.Build.0 = Release|Any CPU + {DAF9BD44-E012-4986-9820-5A3732845384}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DAF9BD44-E012-4986-9820-5A3732845384}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DAF9BD44-E012-4986-9820-5A3732845384}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DAF9BD44-E012-4986-9820-5A3732845384}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection EndGlobal diff --git a/equinox-shipping/Watchdog.Integration/PropulsionInfrastructure.fs b/equinox-shipping/Watchdog.Integration/PropulsionInfrastructure.fs new file mode 100644 index 000000000..70d7ae61c --- /dev/null +++ b/equinox-shipping/Watchdog.Integration/PropulsionInfrastructure.fs @@ -0,0 +1,64 @@ +[] +module Shipping.Watchdog.Integration.PropulsionInfrastructure + +open System +open System.Threading + +type Async with + + /// Returns an async computation which runs the argument computation but raises an exception if it doesn't complete + /// by the specified timeout. + static member timeoutAfter (timeout : System.TimeSpan) (c : Async<'a>) = async { + let! r = Async.StartChild(c, int timeout.TotalMilliseconds) + return! r } + +type MemoryStoreSource<'F, 'B>(sink : Propulsion.ProjectorPipeline seq, 'B>>) = + let ingester = sink.StartIngester(Serilog.Log.Logger, 0) + let mutable epoch = -1L + let mutable completed = None + let mutable checkpointed = None + + member __.Submit(stream, events : 'E seq) = + let epoch = Interlocked.Increment &epoch + let markCompleted () = completed <- Some epoch + let checkpoint = async { checkpointed <- Some epoch } + ingester.Submit(epoch, checkpoint, seq { for x in events -> { stream = stream; event = x } }, markCompleted) + |> Async.Ignore + |> Async.RunSynchronously + // TODO use a ProducerConsumerCollection of some kind + + member __.AwaitCompletion(?delay, ?logInterval, ?log) = async { + let delay = defaultArg delay TimeSpan.FromMilliseconds 5. + let maybeLog = + let logInterval = defaultArg logInterval (TimeSpan.FromMinutes 1.) + let logDue = Propulsion.Internal.intervalCheck logInterval + let log = defaultArg log Serilog.Log.Logger + fun () -> + if logDue () then + log.Information("Waiting for epoch {epoch} (completed to {completed}, checkpointed to {checkpoint})", + epoch, completed, checkpointed) + let delayMs = int delay.TotalMilliseconds + while Some (Volatile.Read &epoch) <> Volatile.Read &completed do + maybeLog() + do! Async.Sleep delayMs + sink.Stop() + return! sink.AwaitCompletion() + } + +type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = + let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level}] {Message}{NewLine}{Exception}", null) + let writeSerilogEvent logEvent = + use writer = new System.IO.StringWriter() + formatter.Format(logEvent, writer) + writer |> string |> testOutput.WriteLine + writer |> string |> System.Diagnostics.Debug.Write + interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + +module TestOutputLogger = + + open Serilog + + let create output = + let logger = TestOutputAdapter output + LoggerConfiguration().Destructure.FSharpTypes().WriteTo.Sink(logger).CreateLogger() + diff --git a/equinox-shipping/Watchdog.Integration/Watchdog.Integration.fsproj b/equinox-shipping/Watchdog.Integration/Watchdog.Integration.fsproj new file mode 100644 index 000000000..a64c41a00 --- /dev/null +++ b/equinox-shipping/Watchdog.Integration/Watchdog.Integration.fsproj @@ -0,0 +1,31 @@ + + + + netcoreapp3.1 + 5 + false + false + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/equinox-shipping/Watchdog.Integration/WatchdogIntegrationTests.fs b/equinox-shipping/Watchdog.Integration/WatchdogIntegrationTests.fs new file mode 100644 index 000000000..73116b64b --- /dev/null +++ b/equinox-shipping/Watchdog.Integration/WatchdogIntegrationTests.fs @@ -0,0 +1,44 @@ +namespace Shipping.Watchdog.Integration + +open Shipping.Domain.Tests +open Shipping.Watchdog + +open FsCheck.Xunit +open System + +type WatchdogIntegrationTests(output) = + + let log = TestOutputLogger.create output + + [] + let ``Watchdog.Handler properties`` (AtLeastOne batches) = + let store = Equinox.MemoryStore.VolatileStore() + let processManager = Shipping.Domain.Tests.Fixtures.createProcessManager 4 store + + let runTimeout, processingTimeout = TimeSpan.FromSeconds 0.1, TimeSpan.FromSeconds 1. + let maxReadAhead, maxConcurrentStreams = Int32.MaxValue, 4 + + let stats = Handler.Stats(log, TimeSpan.FromSeconds 10., TimeSpan.FromMinutes 1.) + let watchdogSink = Program.createSink log (processingTimeout, stats) (maxReadAhead, maxConcurrentStreams) processManager.Drive + Async.RunSynchronously <| async { + let source = MemoryStoreSource(watchdogSink) + use __ = + store.Committed + |> Observable.filter (fun (s,_e) -> Handler.isRelevant s) + |> Observable.subscribe source.Submit + + let counts = System.Collections.Generic.Stack() + let mutable timeouts = 0 + for (Id tid, Id cid, IdsAtLeastOne shipmentIds) in batches do + counts.Push shipmentIds.Length + try let! _ = processManager.TryFinalizeContainer(tid, cid, shipmentIds) + |> Async.timeoutAfter runTimeout + () + with :? TimeoutException -> timeouts <- timeouts + 1 + + log.Information("Awaiting batches: {counts} ({timeouts}/{total} timeouts)", counts, timeouts, counts.Count) + do! source.AwaitCompletion(logInterval=TimeSpan.FromSeconds 0.5, log=log) + stats.DumpStats() + } + +module Dummy = let [] main _argv = 0 \ No newline at end of file diff --git a/equinox-shipping/Watchdog/Handler.fs b/equinox-shipping/Watchdog/Handler.fs index 108b7a2a4..26c72cbb3 100644 --- a/equinox-shipping/Watchdog/Handler.fs +++ b/equinox-shipping/Watchdog/Handler.fs @@ -43,7 +43,7 @@ let handle | TransactionWatchdog.Active -> // We don't want to be warming the data center for no purpose; visiting every second is not too expensive do! Async.Sleep 1000 // ms - return Propulsion.Streams.SpanResult.AllProcessed, Outcome.Deferred + return Propulsion.Streams.SpanResult.PartiallyProcessed 0, Outcome.Deferred | TransactionWatchdog.Stuck -> let! success = driveTransaction transId return Propulsion.Streams.SpanResult.AllProcessed, Outcome.Resolved success