Skip to content


Minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 4, 2024
1 parent 58e491b commit d4be677
Show file tree
Hide file tree
Showing 18 changed files with 67 additions and 68 deletions.
5 changes: 3 additions & 2 deletions equinox-patterns/Domain/Types.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
namespace Patterns.Domain

open FSharp.UMX
open System

/// Identifies a single period within a temporally linked chain of periods
/// Each Period commences with a Balance `BroughtForward` based on what the predecessor Period
Expand Down Expand Up @@ -44,6 +43,8 @@ module ListSeriesId =
let wellKnownId: ListSeriesId = UMX.tag "0"
let toString: ListSeriesId -> string = UMX.untag

namespace global

module Guid =

let toStringN (g: Guid) = g.ToString "N"
let toStringN (g: System.Guid) = g.ToString "N"
4 changes: 3 additions & 1 deletion equinox-shipping/Domain/Types.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ module TransactionId =
let parse (x: string): TransactionId = %x
let (|Parse|) = parse

namespace global

module Seq =

let inline chooseV f xs = seq { for x in xs do match f x with ValueSome v -> yield v | ValueNone -> () }

module Guid =

let inline gen () = System.Guid.NewGuid()
let inline toStringN (x: System.Guid) = x.ToString "N"
let generateStringN () = let g = System.Guid.NewGuid() in toStringN g

/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers
type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) =
Expand Down
2 changes: 1 addition & 1 deletion equinox-shipping/Watchdog.Integration/Generators.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ let genDefault<'t> = ArbMap.defaults |> ArbMap.generate<'t>

type Custom =

static member GuidStringN() = genDefault |> (Shipping.Domain.Guid.toStringN >> GuidStringN) |> Arb.fromGen
static member GuidStringN() = genDefault |> (Guid.toStringN >> GuidStringN) |> Arb.fromGen

[<assembly: Properties( Arbitrary = [| typeof<Custom> |] )>] do()
2 changes: 1 addition & 1 deletion equinox-shipping/Watchdog.Integration/ReactorFixture.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ open System
/// See SerilogLogFixture for details of how to expose complete diagnostic messages
type FixtureBase(messageSink, store, dumpStats, createSourceConfig) =
let serilogLog = new SerilogLogFixture(messageSink) // create directly to ensure correct sequencing and no loss of messages
let contextId = Shipping.Domain.Guid.generateStringN ()
let contextId = Guid.gen () |> Guid.toStringN
let manager =
let maxDop = 4
Shipping.Domain.FinalizationProcess.Factory.create maxDop store
Expand Down
2 changes: 1 addition & 1 deletion equinox-web/Domain/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module Fold =
let initial = { items = []; nextId = 0 }
/// Compute State change implied by a given Event
let evolve state = function
| Events.Added item -> { state with items = item :: state.items; nextId = state.nextId + 1 }
| Events.Added item -> { items = item :: state.items; nextId = state.nextId + 1 }
| Events.Updated value -> { state with items = state.items |> (function { id = id } when id = -> value | item -> item) }
| Events.Deleted e -> { state with items = state.items |> List.filter (fun x -> <> }
| Events.Cleared e -> { nextId = e.nextId; items = [] }
Expand Down
2 changes: 1 addition & 1 deletion equinox-web/Web/Startup.fs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type Startup() =
.UseSerilogRequestLogging() // see
#if todos
// NB Jet does now own, control or audit; it is a third party site; please satisfy yourself that this is a safe thing use in your environment before using it._
.UseCors(fun x -> x.WithOrigins([|""|]).AllowAnyHeader().AllowAnyMethod() |> ignore)
.UseCors(x -> x.WithOrigins([|""|]).AllowAnyHeader().AllowAnyMethod() |> ignore)
.UseEndpoints(fun endpoints ->
endpoints.MapMetrics() |> ignore // Host /metrics for Prometheus
Expand Down
3 changes: 1 addition & 2 deletions feed-source/FeedApi/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ module Args =
| [<AltCommandLine "-V"; Unique>] Verbose
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<CosmosParameters>
interface IArgParserTemplate with
member a.Usage =
match a with
member a.Usage = a |> function
| Verbose -> "request Verbose Logging. Default: off."
| Cosmos _ -> "specify CosmosDB input parameters."
and Arguments(config: Configuration, p: ParseResults<Parameters>) =
Expand Down
4 changes: 2 additions & 2 deletions propulsion-consumer/Examples.fs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ module MultiStreams =

// Dump stats relating to how much information is being held - note it's likely for requests to be in flighht during the call
member _.DumpState(log: ILogger) =
log.Information(" Favorited {total}/{users}", faves.Values |> Seq.sumBy (fun x -> x.Count), faves.Count)
log.Information(" SavedForLater {total}/{users}", saves.Values |> Seq.sumBy (fun x -> x.Length), saves.Count)
log.Information(" Favorited {total}/{users}", faves.Values |> Seq.sumBy _.Count, faves.Count)
log.Information(" SavedForLater {total}/{users}", saves.Values |> Seq.sumBy _.Length, saves.Count)

type Stats(log, statsInterval, stateInterval) =
inherit Propulsion.Streams.Stats<Stat>(log, statsInterval, stateInterval)
Expand Down
1 change: 1 addition & 0 deletions propulsion-hotel/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<Compile Include="Infrastructure.fs" />
<Compile Include="Store.fs" />
<Compile Include="Types.fs" />
<Compile Include="GuestStay.fs" />
Expand Down
2 changes: 1 addition & 1 deletion propulsion-hotel/Domain/GroupCheckout.fs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ module Fold =
| StaysMerged e ->
{ removePending (seq { for s in e.residuals -> s.stay }) state with
checkedOut = Array.append state.checkedOut e.residuals
balance = state.balance + (e.residuals |> Seq.sumBy (fun x -> x.residual)) }
balance = state.balance + (e.residuals |> Seq.sumBy _.residual) }
| MergesFailed e ->
{ removePending e.stays state with
failed = Array.append state.failed e.stays }
Expand Down
23 changes: 23 additions & 0 deletions propulsion-hotel/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace global

module Guid =
let gen () = System.Guid.NewGuid()
let parse: string -> System.Guid = System.Guid.Parse
let toStringN (x: System.Guid): string = x.ToString "N"

type DateTimeOffset = System.DateTimeOffset
type HashSet<'t> = System.Collections.Generic.HashSet<'t>

/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers
type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) =
member _.StreamName = gen >> FsCodec.StreamName.create name
member _.TryDecode = FsCodec.StreamName.tryFind name >> dec

module DeciderExtensions =

type Equinox.Decider<'E, 'S> with

member x.TransactWithPostVersion(decide: 'S -> Async<'R * 'E[]>): Async<'R * int64> =
x.TransactEx((fun c -> decide c.State),
(fun r (c: Equinox.ISyncContext<'S>) -> (r, c.Version)))
27 changes: 3 additions & 24 deletions propulsion-hotel/Domain/Types.fs
Original file line number Diff line number Diff line change
@@ -1,41 +1,20 @@
namespace Domain

open FSharp.UMX
open System

module Guid =
let toString (x: Guid): string = x.ToString "N"

type GroupCheckoutId = Guid<groupCheckoutId>
and [<Measure>] groupCheckoutId
module GroupCheckoutId =
let toString: GroupCheckoutId -> string = UMX.untag >> Guid.toString
let parse: string -> GroupCheckoutId = Guid.Parse >> UMX.tag
let toString: GroupCheckoutId -> string = UMX.untag >> Guid.toStringN
let parse: string -> GroupCheckoutId = Guid.parse >> UMX.tag

type GuestStayId = Guid<guestStayId>
and [<Measure>] guestStayId
module GuestStayId =
let toString: GuestStayId -> string = UMX.untag >> Guid.toString
let toString: GuestStayId -> string = UMX.untag >> Guid.toStringN

type ChargeId = Guid<chargeId>
and [<Measure>] chargeId

type PaymentId = Guid<paymentId>
and [<Measure>] paymentId

type DateTimeOffset = System.DateTimeOffset
type HashSet<'t> = System.Collections.Generic.HashSet<'t>

/// Handles symmetric generation and decoding of StreamNames composed of a series of elements via the FsCodec.StreamId helpers
type internal CategoryId<'elements>(name, gen: 'elements -> FsCodec.StreamId, dec: FsCodec.StreamId -> 'elements) =
member _.StreamName = gen >> FsCodec.StreamName.create name
member _.TryDecode = FsCodec.StreamName.tryFind name >> dec

module DeciderExtensions =

type Equinox.Decider<'E, 'S> with

member x.TransactWithPostVersion(decide: 'S -> Async<'R * 'E[]>): Async<'R * int64> =
x.TransactEx((fun c -> decide c.State),
(fun r (c: Equinox.ISyncContext<'S>) -> (r, c.Version)))
7 changes: 1 addition & 6 deletions propulsion-hotel/Reactor.Integration/ReactorFixture.fs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
namespace Reactor.Integration

open Infrastructure
open Propulsion.Internal
open Reactor
open System

module Guid =

let generateStringN () = Guid.NewGuid() |> Domain.Guid.toString

/// XUnit Collection Fixture managing setup and disposal of Serilog.Log.Logger, a Reactor instance and the source passed from the concrete fixture
/// See SerilogLogFixture for details of how to expose complete diagnostic messages
type FixtureBase(messageSink, store, dumpStats, createSourceConfig) =
let serilogLog = new SerilogLogFixture(messageSink) // create directly to ensure correct sequencing and no loss of messages
let contextId = Guid.generateStringN ()
let contextId = Guid.gen () |> Guid.toStringN
let handler = Handler.create store
let log = Serilog.Log.Logger
let stats = Handler.Stats(log, statsInterval = TimeSpan.FromMinutes 1, stateInterval = TimeSpan.FromMinutes 2,
Expand Down
16 changes: 8 additions & 8 deletions propulsion-indexer/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ module TimeSpan =

let seconds value = TimeSpan.FromSeconds value

/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toString (value: ClientId): string = Guid.toStringN %value
let parse (value: string): ClientId = let raw = Guid.Parse value in % raw
let (|Parse|) = parse

type Equinox.Decider<'e, 's> with

member x.TransactWithPostVersion(decide: 's -> 'r * 'e[]) =
x.TransactEx((fun (c: Equinox.ISyncContext<_>) -> decide c.State),
(fun res (c: Equinox.ISyncContext<_>) -> res, c.Version))

type DataMemberAttribute = System.Runtime.Serialization.DataMemberAttribute

/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toString (value: ClientId): string = Guid.toStringN %value
let parse (value: string): ClientId = let raw = Guid.Parse value in % raw
let (|Parse|) = parse
5 changes: 3 additions & 2 deletions propulsion-indexer/Domain/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ module Store

module Metrics =

let log = Serilog.Log.ForContext("isMetric", true)
let [<Literal>] PropertyTag = "isMetric"
let log = Serilog.Log.ForContext(PropertyTag, true)

let createDecider cat = Equinox.Decider.forStream Metrics.log cat

module Codec =

let genJsonElement<'t when 't :> TypeShape.UnionContract.IUnionContract> =
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default
FsCodec.SystemTextJson.CodecJsonElement.Create<'t>() // options = Options.Default

/// Implements a Service with a single method that visits the identified stream, with the following possible outcomes:
/// 1) stream has a 'current' snapshot (per the `isCurrentSnapshot` predicate supplied to `Snapshot.create` and/or `fold'`:-
Expand Down
2 changes: 1 addition & 1 deletion propulsion-indexer/Domain/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ module Fold =

/// Compute State change implied by a given Event
let evolve s = function
| Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 }
| Events.Added item -> { items = item :: s.items; nextId = s.nextId + 1 }
| Events.Updated value -> { s with items = s.items |> (function { id = id } when id = -> value | item -> item) }
| Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> <> }
| Events.Cleared e -> { nextId = e.nextId; items = [] }
Expand Down
25 changes: 12 additions & 13 deletions propulsion-reactor/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ module ReactorTemplate.Todo

open Propulsion.Internal

module private Stream =
let [<Literal>] Category = "Todos"
let id = FsCodec.StreamId.gen ClientId.toString
let decodeId = FsCodec.StreamId.dec ClientId.parse
let tryDecode = FsCodec.StreamName.tryFind Category >> decodeId
let [<Literal>] CategoryName = "Todos"
let private streamId = FsCodec.StreamId.gen ClientId.toString
let private decodeId = FsCodec.StreamId.dec ClientId.parse
let private tryDecode = FsCodec.StreamName.tryFind CategoryName >> decodeId

// NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
Expand All @@ -26,13 +25,13 @@ module Events =

module Reactions =

let categories = [| Stream.Category |]
let categories = [| CategoryName |]

/// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output
let private impliesStateChange = function Events.Snapshotted _ -> false | _ -> true

let private dec = Streams.Codec.gen<Events.Event>
let [<return: Struct>] (|For|_|) = Stream.tryDecode
let [<return: Struct>] (|For|_|) = tryDecode
let [<return: Struct>] private (|Parse|_|) = function
| struct (For clientId, _) & Streams.Decode dec events -> ValueSome struct (clientId, events)
| _ -> ValueNone
Expand All @@ -51,7 +50,7 @@ module Fold =
let initial = { items = []; nextId = 0 }
/// Compute State change implied by a given Event
let evolve s = function
| Events.Added item -> { s with items = item :: s.items; nextId = s.nextId + 1 }
| Events.Added item -> { items = item :: s.items; nextId = s.nextId + 1 }
| Events.Updated value -> { s with items = s.items |> (function { id = id } when id = -> value | item -> item) }
| Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> <> }
| Events.Cleared e -> { nextId = e.nextId; items = [] }
Expand All @@ -75,10 +74,10 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S
module Factory =

let private (|Category|) = function
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted Stream.Category Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Dynamo (context, cache) -> Store.Dynamo.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
| Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codecJe Fold.initial Fold.fold (Fold.isOrigin, Fold.toSnapshot) (context, cache)
#if !(sourceKafka && kafka)
| Store.Config.Esdb (context, cache) -> Store.Esdb.create Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Sss (context, cache) -> Store.Sss.create Stream.Category Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Esdb (context, cache) -> Store.Esdb.create CategoryName Events.codec Fold.initial Fold.fold (context, cache)
| Store.Config.Sss (context, cache) -> Store.Sss.create CategoryName Events.codec Fold.initial Fold.fold (context, cache)
let create (Category cat) = Service( >> Store.createDecider cat)
let create (Category cat) = Service(streamId >> Store.createDecider cat)
3 changes: 1 addition & 2 deletions propulsion-sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ open Propulsion.Kafka
open Serilog
open System
open System.Threading

type Configuration(tryGet) =

Expand Down Expand Up @@ -210,7 +209,7 @@ module Args =
| Gorge _ -> "Request Parallel readers phase during initial catchup, running one chunk (256MB) apart. Default: off"
| StreamReaders _ -> "number of concurrent readers that will fetch a missing stream when in tailing mode. Default: 1. TODO: IMPLEMENT!"
| Tail _ -> "attempt to read from tail at specified interval in Seconds. Default: 1"
| ForceRestart _ -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed."
| ForceRestart -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed."
| BatchSize _ -> "maximum item count to request from feed. Default: 4096"
| MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512"
| Position _ -> "EventStore $all Stream Position to commence from"
Expand Down

0 comments on commit d4be677

Please sign in to comment.