From 155ec78e0233e357b386d60144e9679eba44857c Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 4 Oct 2019 18:03:38 +0100 Subject: [PATCH] Helper naming consistency --- equinox-testbed/Infrastructure.fs | 4 +-- equinox-testbed/Services.fs | 5 +++- equinox-web/Domain/Aggregate.fs | 8 +++-- equinox-web/Domain/Infrastructure.fs | 2 +- equinox-web/Domain/Todo.fs | 3 +- propulsion-consumer/Examples.fs | 12 ++++---- propulsion-consumer/Program.fs | 4 +-- propulsion-consumer/Publisher.fs | 4 +-- propulsion-projector/Program.fs | 4 +-- propulsion-summary-consumer/Infrastructure.fs | 2 +- .../{SumaryIngester.fs => Ingester.fs} | 6 ++-- propulsion-summary-consumer/Program.fs | 6 ++-- .../SummaryConsumer.fsproj | 2 +- propulsion-summary-consumer/TodoSummary.fs | 24 ++++++++------- .../Infrastructure.fs | 1 - propulsion-summary-projector/Producer.fs | 2 +- propulsion-summary-projector/Program.fs | 4 +-- propulsion-summary-projector/Todo.fs | 16 +++++----- propulsion-sync/Program.fs | 4 +-- .../Infrastructure.fs | 18 ++++++++++-- .../{SkuIngester.fs => Ingester.fs} | 29 +++++-------------- propulsion-tracking-consumer/Program.fs | 6 ++-- propulsion-tracking-consumer/SkuSummary.fs | 16 +++++----- .../TrackingConsumer.fsproj | 2 +- 24 files changed, 96 insertions(+), 88 deletions(-) rename propulsion-summary-consumer/{SumaryIngester.fs => Ingester.fs} (96%) rename propulsion-tracking-consumer/{SkuIngester.fs => Ingester.fs} (64%) diff --git a/equinox-testbed/Infrastructure.fs b/equinox-testbed/Infrastructure.fs index 814341c58..bfc12f98b 100644 --- a/equinox-testbed/Infrastructure.fs +++ b/equinox-testbed/Infrastructure.fs @@ -16,10 +16,10 @@ module Guid = /// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant type ClientId = Guid and [] clientId -module ClientId = let toStringN (value : ClientId) : string = Guid.toStringN %value +module ClientId = let toString (value : ClientId) : string = Guid.toStringN %value /// SkuId strongly typed id; represented internally as a Guid // NB Perf is suboptimal as a key, see Equinox's samples/Store for expanded version type SkuId = Guid and [] skuId -module SkuId = let toStringN (value : SkuId) : string = Guid.toStringN %value \ No newline at end of file +module SkuId = let toString (value : SkuId) : string = Guid.toStringN %value \ No newline at end of file diff --git a/equinox-testbed/Services.fs b/equinox-testbed/Services.fs index 4fe5fa19f..aaf7b5446 100644 --- a/equinox-testbed/Services.fs +++ b/equinox-testbed/Services.fs @@ -7,6 +7,7 @@ module Domain = // NB - these schemas reflect the actual storage formats and hence need to be versioned with care module Events = + type Favorited = { date: System.DateTimeOffset; skuId: SkuId } type Unfavorited = { skuId: SkuId } module Compaction = @@ -20,6 +21,7 @@ module Domain = let codec = FsCodec.NewtonsoftJson.Codec.Create() module Folds = + type State = Events.Favorited [] type private InternalState(input: State) = @@ -61,7 +63,8 @@ module Domain = [ Events.Unfavorited { skuId = skuId } ] type Service(log, resolveStream, ?maxAttempts) = - let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toStringN id) + + let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 2) let execute (Stream stream) command : Async = stream.Transact(Commands.interpret command) diff --git a/equinox-web/Domain/Aggregate.fs b/equinox-web/Domain/Aggregate.fs index 5ede845f2..cc06883cc 100644 --- a/equinox-web/Domain/Aggregate.fs +++ b/equinox-web/Domain/Aggregate.fs @@ -2,13 +2,14 @@ // NB - these types and names reflect the actual storage formats and hence need to be versioned with care module Events = - type Compacted = { happened: bool } + + type CompactedData = { happened: bool } type Event = | Happened - | Compacted of Compacted + | Compacted of CompactedData interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + let codec = FsCodec.NewtonsoftJson.Codec.Create(rejectNullaryCases=false) module Folds = @@ -33,6 +34,7 @@ module Commands = type View = { sorted : bool } type Service(handlerLog, resolve, ?maxAttempts) = + let (|AggregateId|) (id: string) = Equinox.AggregateId("Aggregate", id) let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2) diff --git a/equinox-web/Domain/Infrastructure.fs b/equinox-web/Domain/Infrastructure.fs index 02c5697e5..0c0c15dde 100644 --- a/equinox-web/Domain/Infrastructure.fs +++ b/equinox-web/Domain/Infrastructure.fs @@ -10,4 +10,4 @@ module Guid = type ClientId = Guid and [] clientId module ClientId = - let toStringN (value : ClientId) : string = Guid.toStringN %value \ No newline at end of file + let toString (value : ClientId) : string = Guid.toStringN %value \ No newline at end of file diff --git a/equinox-web/Domain/Todo.fs b/equinox-web/Domain/Todo.fs index 2fa453323..c311707ae 100644 --- a/equinox-web/Domain/Todo.fs +++ b/equinox-web/Domain/Todo.fs @@ -72,8 +72,9 @@ type View = { id: int; order: int; title: string; completed: bool } /// Defines operations that a Controller can perform on a Todo List type Service(handlerLog, resolve, ?maxAttempts) = + /// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held - let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toStringN clientId) + let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toString clientId) /// Maps a ClientId to Handler for the relevant stream let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2) diff --git a/propulsion-consumer/Examples.fs b/propulsion-consumer/Examples.fs index b57c6c9da..8eaece160 100644 --- a/propulsion-consumer/Examples.fs +++ b/propulsion-consumer/Examples.fs @@ -57,7 +57,7 @@ module MultiStreams = interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() let tryDecode = StreamCodec.tryDecode codec - let [] CategoryId = "SavedForLater" + let [] categoryId = "SavedForLater" // NB - these schemas reflect the actual storage formats and hence need to be versioned with care module Favorites = @@ -70,7 +70,7 @@ module MultiStreams = interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() let tryDecode = StreamCodec.tryDecode codec - let [] CategoryId = "Favorites" + let [] categoryId = "Favorites" type Stat = Faves of int | Saves of int | OtherCategory of string * int | OtherMessage of string @@ -85,10 +85,10 @@ module MultiStreams = let (|FavoritesEvents|SavedForLaterEvents|OtherCategory|UnknownMessage|) (streamName, span : Propulsion.Streams.StreamSpan) = let decode tryDecode = span.events |> Seq.choose (tryDecode log streamName) |> Array.ofSeq match category streamName with - | Category (Favorites.CategoryId, id) -> + | Category (Favorites.categoryId, id) -> let s = match faves.TryGetValue id with true, value -> value | false, _ -> new HashSet() FavoritesEvents (id, s, decode Favorites.tryDecode) - | Category (SavedForLater.CategoryId, id) -> + | Category (SavedForLater.categoryId, id) -> let s = match saves.TryGetValue id with true, value -> value | false, _ -> [] SavedForLaterEvents (id, s, decode SavedForLater.tryDecode) | Category (categoryName, _) -> OtherCategory (categoryName, Seq.length span.events) @@ -185,8 +185,8 @@ module MultiMessages = let span = Propulsion.Codec.NewtonsoftJson.RenderedSpan.Parse spanJson let decode tryDecode wrap = RenderedSpan.enum span |> Seq.choose (fun x -> x.event |> tryDecode log streamName |> Option.map wrap) match streamName with - | Category (Favorites.CategoryId,_) -> yield! decode Favorites.tryDecode Fave - | Category (SavedForLater.CategoryId,_) -> yield! decode SavedForLater.tryDecode Save + | Category (Favorites.categoryId,_) -> yield! decode Favorites.tryDecode Fave + | Category (SavedForLater.categoryId,_) -> yield! decode SavedForLater.tryDecode Save | Category (otherCategoryName,_) -> yield OtherCat (otherCategoryName, Seq.length span.e) | _ -> yield Unclassified streamName } diff --git a/propulsion-consumer/Program.fs b/propulsion-consumer/Program.fs index 75211aa9d..883d4656b 100644 --- a/propulsion-consumer/Program.fs +++ b/propulsion-consumer/Program.fs @@ -73,8 +73,8 @@ let start (args : CmdParser.Arguments) = /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try use consumer = argv |> CmdParser.parse |> start +let run args = + try use consumer = args |> CmdParser.parse |> start consumer.AwaitCompletion() |> Async.RunSynchronously if consumer.RanToCompletion then 0 else 2 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 diff --git a/propulsion-consumer/Publisher.fs b/propulsion-consumer/Publisher.fs index 1f5f804d6..f0c92d8e1 100644 --- a/propulsion-consumer/Publisher.fs +++ b/propulsion-consumer/Publisher.fs @@ -36,7 +36,7 @@ module Input = let codec = FsCodec.NewtonsoftJson.Codec.Create() let tryDecode = StreamCodec.tryDecode codec - let [] CategoryId = "Inventory" + let [] categoryId = "Inventory" module Output = @@ -113,7 +113,7 @@ module Processor = let private enumStreamEvents(KeyValue (streamName : string, spanJson)) : seq> = match streamName with - | Category (Input.CategoryId,_) -> Propulsion.Codec.NewtonsoftJson.RenderedSpan.parse spanJson + | Category (Input.categoryId,_) -> Propulsion.Codec.NewtonsoftJson.RenderedSpan.parse spanJson | _ -> Seq.empty let log = Log.ForContext() diff --git a/propulsion-projector/Program.fs b/propulsion-projector/Program.fs index 1155cd00d..ec6c5b743 100644 --- a/propulsion-projector/Program.fs +++ b/propulsion-projector/Program.fs @@ -213,8 +213,8 @@ let build (args : CmdParser.Arguments) = /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try let sink,runSourcePipeline = argv |> CmdParser.parse |> build +let run args = + try let sink,runSourcePipeline = args |> CmdParser.parse |> build runSourcePipeline |> Async.Start sink.AwaitCompletion() |> Async.RunSynchronously if sink.RanToCompletion then 0 else 2 diff --git a/propulsion-summary-consumer/Infrastructure.fs b/propulsion-summary-consumer/Infrastructure.fs index 84e42c046..a1fe8455e 100644 --- a/propulsion-summary-consumer/Infrastructure.fs +++ b/propulsion-summary-consumer/Infrastructure.fs @@ -32,5 +32,5 @@ module Guid = type ClientId = Guid and [] clientId module ClientId = - let toStringN (value : ClientId) : string = Guid.toStringN %value + let toString (value : ClientId) : string = Guid.toStringN %value let parse (value : string) : ClientId = let raw = Guid.Parse value in % raw \ No newline at end of file diff --git a/propulsion-summary-consumer/SumaryIngester.fs b/propulsion-summary-consumer/Ingester.fs similarity index 96% rename from propulsion-summary-consumer/SumaryIngester.fs rename to propulsion-summary-consumer/Ingester.fs index a17873d2c..cb3abea33 100644 --- a/propulsion-summary-consumer/SumaryIngester.fs +++ b/propulsion-summary-consumer/Ingester.fs @@ -1,6 +1,6 @@ /// Follows a feed of updates, holding the most recently observed one; each update recieved is intended to completely supersede all previous updates /// Due to this, we should ensure that writes only happen where the update is not redundant and/or a replay of a previus message -module ConsumerTemplate.SummaryIngester +module ConsumerTemplate.Ingester open System @@ -31,7 +31,7 @@ type Outcome = NoRelevantEvents of count : int | Ok of count : int | Skipped of type Stats(log, ?statsInterval, ?stateInterval) = inherit Propulsion.Kafka.StreamsConsumerStats(log, defaultArg statsInterval (TimeSpan.FromMinutes 1.), defaultArg stateInterval (TimeSpan.FromMinutes 5.)) - let mutable (ok, na, redundant) = 0, 0, 0 + let mutable ok, na, redundant = 0, 0, 0 override __.HandleOk res = res |> function | Outcome.Ok count -> ok <- ok + 1; redundant <- redundant + count - 1 @@ -51,7 +51,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log { items = [| for x in x.items -> { id = x.id; order = x.order; title = x.title; completed = x.completed } |]} - let (|ClientId|) (value : string) = ClientId.parse value + let (|ClientId|) = ClientId.parse let (|DecodeNewest|_|) (codec : FsCodec.IUnionEncoder<_,_>) (stream, span : Propulsion.Streams.StreamSpan<_>) : 'summary option = span.events |> Seq.rev |> Seq.tryPick (StreamCodec.tryDecode codec log stream) let ingestIncomingSummaryMessage (stream, span : Propulsion.Streams.StreamSpan<_>) : Async = async { diff --git a/propulsion-summary-consumer/Program.fs b/propulsion-summary-consumer/Program.fs index f840030d6..4844a5e10 100644 --- a/propulsion-summary-consumer/Program.fs +++ b/propulsion-summary-consumer/Program.fs @@ -112,12 +112,12 @@ let start (args : CmdParser.Arguments) = Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create( appName, args.Broker, [args.Topic], args.Group, maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency) - SummaryIngester.startConsumer config Log.Logger service args.MaxDop + Ingester.startConsumer config Log.Logger service args.MaxDop /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try use consumer = argv |> CmdParser.parse |> start +let run args = + try use consumer = args |> CmdParser.parse |> start consumer.AwaitCompletion() |> Async.RunSynchronously if consumer.RanToCompletion then 0 else 2 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index b82a04bec..ce6fbef04 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -10,7 +10,7 @@ - + diff --git a/propulsion-summary-consumer/TodoSummary.fs b/propulsion-summary-consumer/TodoSummary.fs index f1e6868aa..237e82846 100644 --- a/propulsion-summary-consumer/TodoSummary.fs +++ b/propulsion-summary-consumer/TodoSummary.fs @@ -5,9 +5,9 @@ module Events = type ItemData = { id: int; order: int; title: string; completed: bool } type SummaryData = { items : ItemData[] } - + type IngestedData = { version : int64; value : SummaryData } type Event = - | Ingested of {| version: int64; value : SummaryData |} + | Ingested of IngestedData interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -18,7 +18,7 @@ module Folds = let evolve _state = function | Events.Ingested e -> { version = e.version; value = Some e.value } let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state - let isOrigin = function _ -> true + let private isOrigin = function Ingested -> true // A `transmute` function gets presented with:XX // a) events a command decided to generate (in it's `interpret`) // b) the state after applying them @@ -31,18 +31,20 @@ module Folds = // and use AccessStrategy.RollingUnfolds with this `transmute` function so we instead convey: // a) "don't actually write these events we just decided on in `interpret` [and don't insert a new event batch document]" // b) "can you treat these events as snapshots please" - let transmute events _state : Events.Event list * Events.Event list = + let private transmute events _state : Events.Event list * Events.Event list = [],events + // We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots + let accessStrategy = Equinox.Cosmos.AccessStrategy.RollingUnfolds (isOrigin,transmute) module Commands = type Command = - | Consume of version: int64 * Events.SummaryData + | Consume of version : int64 * value : Events.SummaryData let decide command (state : Folds.State) = match command with | Consume (version,value) -> if state.version <= version then false,[] else - true,[Events.Ingested {| version = version; value = value |}] + true,[Events.Ingested { version = version; value = value }] type Item = { id: int; order: int; title: string; completed: bool } let render : Folds.State -> Item[] = function @@ -54,11 +56,12 @@ let render : Folds.State -> Item[] = function completed = x.completed } |] | _ -> [||] -let []categoryId = "TodoSummary" +let [] categoryId = "TodoSummary" /// Defines the operations that the Read side of a Controller and/or the Ingester can perform on the 'aggregate' type Service(log, resolve, ?maxAttempts) = - let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toStringN clientId) + + let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) let execute (Stream stream) command : Async = @@ -74,10 +77,9 @@ type Service(log, resolve, ?maxAttempts) = query clientId render module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = - // We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots - let accessStrategy = AccessStrategy.RollingUnfolds (Folds.isOrigin, Folds.transmute) let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) - Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, Folds.accessStrategy).Resolve let createService cache context = Service(Serilog.Log.ForContext(), resolve cache context) \ No newline at end of file diff --git a/propulsion-summary-projector/Infrastructure.fs b/propulsion-summary-projector/Infrastructure.fs index e5ccb3ade..e91564f90 100644 --- a/propulsion-summary-projector/Infrastructure.fs +++ b/propulsion-summary-projector/Infrastructure.fs @@ -27,7 +27,6 @@ module StreamNameParser = | [| category; id |] -> Category (category, id) | _ -> Unknown streamName - module Guid = let inline toStringN (x : Guid) = x.ToString "N" diff --git a/propulsion-summary-projector/Producer.fs b/propulsion-summary-projector/Producer.fs index 1c0f19cb9..cf8ffe720 100644 --- a/propulsion-summary-projector/Producer.fs +++ b/propulsion-summary-projector/Producer.fs @@ -23,7 +23,7 @@ module Contract = let ofState (state : Todo.Folds.State) : SummaryEvent = Summary { items = [| for x in state.items -> render x |]} -let (|ClientId|) (value : string) = ClientId.parse value +let (|ClientId|) = ClientId.parse let (|Decode|) (codec : FsCodec.IUnionEncoder<_,_>) stream (span : Propulsion.Streams.StreamSpan<_>) = span.events |> Seq.choose (StreamCodec.tryDecodeSpan codec Serilog.Log.Logger stream) diff --git a/propulsion-summary-projector/Program.fs b/propulsion-summary-projector/Program.fs index a1994bc9e..5299fb586 100644 --- a/propulsion-summary-projector/Program.fs +++ b/propulsion-summary-projector/Program.fs @@ -177,8 +177,8 @@ let build (args : CmdParser.Arguments) = /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try let projector,runSourcePipeline = argv |> CmdParser.parse |> build +let run args = + try let projector,runSourcePipeline = args |> CmdParser.parse |> build runSourcePipeline |> Async.Start projector.AwaitCompletion() |> Async.RunSynchronously if projector.RanToCompletion then 0 else 2 diff --git a/propulsion-summary-projector/Todo.fs b/propulsion-summary-projector/Todo.fs index 6fba6e327..d9dbb1c1e 100644 --- a/propulsion-summary-projector/Todo.fs +++ b/propulsion-summary-projector/Todo.fs @@ -14,7 +14,7 @@ module Events = | Cleared of {| nextId: int |} /// For EventStore, AccessStrategy.RollingSnapshots embeds these events every `batchSize` events /// For Cosmos, AccessStrategy.Snapshot maintains this as an event in the `u`nfolds list in the Tip-document - | Snapshot of {| nextId: int; items: ItemData[] |} + | Snapshotted of {| nextId: int; items: ItemData[] |} interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.NewtonsoftJson.Codec.Create() @@ -31,20 +31,21 @@ module Folds = | Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) } | Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> x.id <> e.id) } | Events.Cleared e -> { nextId = e.nextId; items = [] } - | Events.Snapshot s -> { nextId = s.nextId; items = List.ofArray s.items } + | Events.Snapshotted s -> { nextId = s.nextId; items = List.ofArray s.items } /// Folds a set of events from the store into a given `state` let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state /// Determines whether a given event represents a checkpoint that implies we don't need to see any preceding events - let isOrigin = function Events.Cleared _ | Events.Snapshot _ -> true | _ -> false + let isOrigin = function Events.Cleared _ | Events.Snapshotted _ -> true | _ -> false /// Prepares an Event that encodes all relevant aspects of a State such that `evolve` can rehydrate a complete State from it - let snapshot state = Events.Snapshot {| nextId = state.nextId; items = Array.ofList state.items |} - /// Allows us to slkip producing summaries for events that we know won't result in an externally discernable change to the summary output - let impliesStateChange = function Events.Snapshot _ -> false | _ -> true + let snapshot state = Events.Snapshotted {| nextId = state.nextId; items = Array.ofList state.items |} + /// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output + let impliesStateChange = function Events.Snapshotted _ -> false | _ -> true -let []categoryId = "Todos" +let [] categoryId = "Todos" /// Defines operations that a Controller or Projector can perform on a Todo List type Service(log, resolve, ?maxAttempts) = + /// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId) @@ -60,6 +61,7 @@ type Service(log, resolve, ?maxAttempts) = queryEx clientId render module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot) diff --git a/propulsion-sync/Program.fs b/propulsion-sync/Program.fs index 120785c9a..6ea6712e2 100644 --- a/propulsion-sync/Program.fs +++ b/propulsion-sync/Program.fs @@ -575,8 +575,8 @@ let build (args : CmdParser.Arguments) = /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try let sink,runSourcePipeline = CmdParser.parse argv |> build +let run args = + try let sink,runSourcePipeline = CmdParser.parse args |> build runSourcePipeline |> Async.Start sink.AwaitCompletion() |> Async.RunSynchronously if sink.RanToCompletion then 0 else 2 diff --git a/propulsion-tracking-consumer/Infrastructure.fs b/propulsion-tracking-consumer/Infrastructure.fs index 8967933a5..1cc82899d 100644 --- a/propulsion-tracking-consumer/Infrastructure.fs +++ b/propulsion-tracking-consumer/Infrastructure.fs @@ -15,8 +15,22 @@ module StreamCodec = None | x -> x -module Guid = - let inline toStringN (x : Guid) = x.ToString "N" +// TODO use one included in Propulsion.Kafka.Core +/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index. +/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary +type StreamKeyEventSequencer() = + // we synthesize a monotonically increasing index to render the deduplication facility inert + let indices = System.Collections.Generic.Dictionary() + let genIndex streamName = + match indices.TryGetValue streamName with + | true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x + | false, _ -> let x = 0 in indices.[streamName] <- x; int64 x + + // Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span + member __.ToStreamEvent(KeyValue (k,v : string), ?eventType) : Propulsion.Streams.StreamEvent seq = + let eventType = defaultArg eventType String.Empty + let e = FsCodec.Core.IndexedEventData(genIndex k,false,eventType,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow) + Seq.singleton { stream=k; event=e } /// SkuId strongly typed id; represented internally as a string type SkuId = string diff --git a/propulsion-tracking-consumer/SkuIngester.fs b/propulsion-tracking-consumer/Ingester.fs similarity index 64% rename from propulsion-tracking-consumer/SkuIngester.fs rename to propulsion-tracking-consumer/Ingester.fs index c3ad5a883..8a3afa9a6 100644 --- a/propulsion-tracking-consumer/SkuIngester.fs +++ b/propulsion-tracking-consumer/Ingester.fs @@ -1,12 +1,12 @@ /// Follows a feed of messages representing items being added/updated on an aggregate that maintains a list of child items -/// Compared to the SummaryIngester in the `summaryProjector` template, each event is potentially relevant -module ConsumerTemplate.SkuIngester +/// Compared to the Ingester in the `summaryProjector` template, each event is potentially relevant +module ConsumerTemplate.Ingester open ConsumerTemplate.SkuSummary.Events open System /// Defines the shape of input messages on the topic we're consuming -module SkuUpdates = +module Contract = type OrderInfo = { poNumber : string; reservedUnitQuantity : int } type Message = @@ -36,29 +36,14 @@ type Stats(log, ?statsInterval, ?stateInterval) = log.Information(" Used {ok} Ignored {skipped}", ok, skipped) ok <- 0; skipped <- 0 -/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index. -/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary -type MessagesByArrivalOrder() = - // we synthesize a monotonically increasing index to render the deduplication facility inert - let indices = System.Collections.Generic.Dictionary() - let genIndex streamName = - match indices.TryGetValue streamName with - | true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x - | false, _ -> let x = 0 in indices.[streamName] <- x; int64 x - - // Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span - member __.ToStreamEvent (KeyValue (k,v : string)) : Propulsion.Streams.StreamEvent seq = - let e = FsCodec.Core.IndexedEventData(genIndex k,false,String.Empty,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow) - Seq.singleton { stream=k; event=e } - -let (|SkuId|) (value : string) = SkuId.parse value +let (|SkuId|) = SkuId.parse /// Starts a processing loop accumulating messages by stream - each time we handle all the incoming updates for a give Sku as a single transaction let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log : Serilog.ILogger) (service : SkuSummary.Service) maxDop = let ingestIncomingSummaryMessage(SkuId skuId, span : Propulsion.Streams.StreamSpan<_>) : Async = async { let items = [ for e in span.events do - let x = SkuUpdates.parse e.Data + let x = Contract.parse e.Data for o in x.purchaseOrderInfo do yield { locationId = x.locationId messageIndex = x.messageIndex @@ -69,7 +54,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log return Outcome.Completed(used,List.length items) } let stats = Stats(log) - // No categorization required, out inputs are all one big family defying categorization + // No categorization required, our inputs are all one big family defying categorization let category _streamName = "Sku" - let sequencer = MessagesByArrivalOrder() + let sequencer = StreamKeyEventSequencer() Propulsion.Kafka.StreamsConsumer.Start(log, config, sequencer.ToStreamEvent, ingestIncomingSummaryMessage, maxDop, stats, category) \ No newline at end of file diff --git a/propulsion-tracking-consumer/Program.fs b/propulsion-tracking-consumer/Program.fs index bf6b63900..b091e4ac3 100644 --- a/propulsion-tracking-consumer/Program.fs +++ b/propulsion-tracking-consumer/Program.fs @@ -112,12 +112,12 @@ let start (args : CmdParser.Arguments) = Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create( appName, args.Broker, [args.Topic], args.Group, maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency) - SkuIngester.startConsumer config Log.Logger service args.MaxDop + Ingester.startConsumer config Log.Logger service args.MaxDop /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main -let run argv = - try use consumer = argv |> CmdParser.parse |> start +let run args = + try use consumer = args |> CmdParser.parse |> start consumer.AwaitCompletion() |> Async.RunSynchronously if consumer.RanToCompletion then 0 else 2 with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 diff --git a/propulsion-tracking-consumer/SkuSummary.fs b/propulsion-tracking-consumer/SkuSummary.fs index 1f04ed845..e2f7491c8 100644 --- a/propulsion-tracking-consumer/SkuSummary.fs +++ b/propulsion-tracking-consumer/SkuSummary.fs @@ -17,9 +17,7 @@ module Events = module Folds = - open Events - - type State = ItemData list + type State = Events.ItemData list module State = let equals (x : Events.ItemData) (y : Events.ItemData) = x.locationId = y.locationId @@ -36,10 +34,10 @@ module Folds = | Events.Snapshotted _ -> true // Yes, a snapshot is enough info | Events.Ingested _ -> false let evolve state = function - | Ingested e -> e :: state - | Snapshotted items -> List.ofArray items - let fold (state : State) : Event seq -> State = Seq.fold evolve state - let snapshot (x : State) : Event = Snapshotted (Array.ofList x) + | Events.Ingested e -> e :: state + | Events.Snapshotted items -> List.ofArray items + let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state + let snapshot (x : State) : Events.Event = Events.Snapshotted (Array.ofList x) module Commands = @@ -51,9 +49,10 @@ module Commands = | Consume updates -> [for x in updates do if x |> Folds.State.isNewOrUpdated state then yield Events.Ingested x] -let []categoryId = "SkuSummary" +let [] categoryId = "SkuSummary" type Service(log, resolve, ?maxAttempts) = + let (|AggregateId|) (id : SkuId) = Equinox.AggregateId(categoryId, SkuId.toString id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) @@ -74,6 +73,7 @@ type Service(log, resolve, ?maxAttempts) = query skuId id module Repository = + open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = // We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots diff --git a/propulsion-tracking-consumer/TrackingConsumer.fsproj b/propulsion-tracking-consumer/TrackingConsumer.fsproj index 507f99eec..4391c219d 100644 --- a/propulsion-tracking-consumer/TrackingConsumer.fsproj +++ b/propulsion-tracking-consumer/TrackingConsumer.fsproj @@ -10,7 +10,7 @@ - +