From 37643c9c7e77f6f1f9e666c7d084ffe9921ce071 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 27 Feb 2020 07:21:13 +0000 Subject: [PATCH] Tidy wiring --- equinox-fc/Watchdog/Program.fs | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/equinox-fc/Watchdog/Program.fs b/equinox-fc/Watchdog/Program.fs index 5d7cc4219..15d6e5693 100644 --- a/equinox-fc/Watchdog/Program.fs +++ b/equinox-fc/Watchdog/Program.fs @@ -331,6 +331,21 @@ let build (args : CmdParser.Arguments) = let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute) fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale) let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext(), resolveCheckpointStream) + let handle (_stream, _span) : Async = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction) + let sink = + Propulsion.Streams.StreamsProjector.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger)) + let connect () = let c = connectEs () in c.ReadConnection + let runPipeline = + EventStoreSource.Run( + Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream, + args.MaxReadAhead, args.StatsInterval) + sink, runPipeline + | Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) -> + let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails() + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let cache = Equinox.Cache(AppName, sizeMb = 10) + let context = Equinox.Cosmos.Context(connection, database, container) let inventoryService = let inventoryId = InventoryId.parse "FC000" let maxTransactionsPerEpoch = 100 @@ -350,18 +365,6 @@ let build (args : CmdParser.Arguments) = let processor = Fc.Inventory.Processor.Service(transactionService, locations, inventoryService) let handle = Handler.handleStreamEvents (Handler.tryHandle processor.Push) - let sink = - Propulsion.Streams.StreamsProjector.Start( - Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger)) - let connect () = let c = connectEs () in c.ReadConnection - let runPipeline = - EventStoreSource.Run( - Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream, - args.MaxReadAhead, args.StatsInterval) - sink, runPipeline - | Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) -> - let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails() - let handle (_stream, _span) : Async = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction) let sink = Propulsion.Streams.StreamsProjector.Start( Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))