Skip to content

Commit

Permalink
Tidy wiring
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 27, 2020
1 parent 7bc3e71 commit 37643c9
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions equinox-fc/Watchdog/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,21 @@ let build (args : CmdParser.Arguments) =
let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute)
fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale)
let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext<Checkpoint.CheckpointSeries>(), resolveCheckpointStream)
let handle (_stream, _span) : Async<int64*Handler.Outcome> = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction)
let sink =
Propulsion.Streams.StreamsProjector.Start(
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))
let connect () = let c = connectEs () in c.ReadConnection
let runPipeline =
EventStoreSource.Run(
Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream,
args.MaxReadAhead, args.StatsInterval)
sink, runPipeline
| Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) ->
let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails()
let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously
let cache = Equinox.Cache(AppName, sizeMb = 10)
let context = Equinox.Cosmos.Context(connection, database, container)
let inventoryService =
let inventoryId = InventoryId.parse "FC000"
let maxTransactionsPerEpoch = 100
Expand All @@ -350,18 +365,6 @@ let build (args : CmdParser.Arguments) =
let processor = Fc.Inventory.Processor.Service(transactionService, locations, inventoryService)

let handle = Handler.handleStreamEvents (Handler.tryHandle processor.Push)
let sink =
Propulsion.Streams.StreamsProjector.Start(
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))
let connect () = let c = connectEs () in c.ReadConnection
let runPipeline =
EventStoreSource.Run(
Log.Logger, sink, checkpoints, connect, spec, EventStoreContext.tryMapEvent isTransactionStream,
args.MaxReadAhead, args.StatsInterval)
sink, runPipeline
| Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) ->
let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails()
let handle (_stream, _span) : Async<int64*Handler.Outcome> = failwith "TODO" // Handler.handleStreamEvents (Handler.tryHandle driveTransaction)
let sink =
Propulsion.Streams.StreamsProjector.Start(
Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Handler.Stats(Log.Logger))
Expand Down

0 comments on commit 37643c9

Please sign in to comment.