Skip to content

Commit

Permalink
Tidy
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 8, 2024
1 parent a0e4bb2 commit c0d525a
Showing 1 changed file with 43 additions and 50 deletions.
93 changes: 43 additions & 50 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,13 @@ and [<NoComparison; NoEquality; RequireSubcommand>] QueryParameters =
| Console -> "Also emit the JSON to the console. Default: Gather statistics (but only write to a File if specified)"
| Cosmos _ -> "Parameters for CosmosDB."
and [<RequireQualifiedAccess>] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw
and [<RequireQualifiedAccess>] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered
and [<RequireQualifiedAccess>] Criteria =
| SingleStream of string | CatName of string | CatLike of string | Unfiltered
member x.Sql = x |> function
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
| Criteria.Unfiltered -> "1=1"
and QueryArguments(p: ParseResults<QueryParameters>) =
member val Mode = p.GetResult(QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default)
member val Pretty = p.Contains QueryParameters.Pretty
Expand Down Expand Up @@ -380,25 +386,23 @@ module CosmosStats =

let prettySerdes = lazy FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Options.Create(indent = true))

type System.Text.Json.JsonElement with
member x.Timestamp = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
member x.TryProp(name: string) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty(name, &p) then ValueSome p else ValueNone
module StreamName =
let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn

module CosmosQuery =

open Equinox.CosmosStore.Linq.Internal
open FSharp.Control
type System.Text.Json.JsonDocument with
member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x.RootElement)
member x.Timestamp =
let ok, p = x.RootElement.TryGetProperty("_ts")
if ok then p.GetDouble() |> DateTime.UnixEpoch.AddSeconds |> Some else None

let private composeSql (a: QueryArguments) =
let inline warnOnUnfiltered () =
match a.Criteria with
| Criteria.Unfiltered ->
let lel = if a.Mode = Mode.Raw then LogEventLevel.Debug elif a.Filepath = None then LogEventLevel.Warning else LogEventLevel.Information
Log.Write(lel, "No StreamName or CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous")
let partitionKeyCriteria =
match a.Criteria with
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
| Criteria.Unfiltered -> warnOnUnfiltered (); "1=1"
| _ -> ()
let selectedFields =
match a.Mode with
| Mode.Default -> "c._etag, c.p, c.u[0].d"
Expand All @@ -414,7 +418,7 @@ module CosmosQuery =
| [||] -> "1=1"
| [| x |] -> x |> exists
| xs -> String.Join(" AND ", xs) |> exists
$"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter}"
$"SELECT {selectedFields} FROM c WHERE {a.Criteria.Sql} AND {unfoldFilter}"
let private queryDef (a: QueryArguments) =
let sql = composeSql a
Log.Information("Querying {mode}: {q}", a.Mode, sql)
Expand All @@ -431,14 +435,14 @@ module CosmosQuery =
let mutable accI, accE, accU, accRus, accBytesRead = 0L, 0L, 0L, 0., 0L
let it = container.GetItemQueryIterator<System.Text.Json.JsonDocument>(queryDef a, requestOptions = qo)
try for rtt, rc, items, rdc, rds, ods in it |> Query.enum__ do
let mutable newestTs = None
let items = [| for x in items -> newestTs <- max newestTs x.Timestamp
x.Cast<Equinox.CosmosStore.Core.Tip>() |]
let mutable newestTs = DateTime.MinValue
let items = [| for x in items -> newestTs <- max newestTs x.RootElement.Timestamp
System.Text.Json.JsonSerializer.Deserialize<Equinox.CosmosStore.Core.Tip>(x.RootElement) |]
let inline arrayLen x = if isNull x then 0 else Array.length x
pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore
let pageI, pageE, pageU = items.Length, items |> Seq.sumBy (_.e >> arrayLen), items |> Seq.sumBy (_.u >> arrayLen)
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, newestTs.Value - DateTime.UtcNow)
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs)
maybeFileStream |> Option.iter (fun stream ->
for x in items do
serdes.SerializeToStream(x, stream)
Expand All @@ -450,8 +454,7 @@ module CosmosQuery =
finally
let fileSize = maybeFileStream |> Option.map _.Position |> Option.defaultValue 0
maybeFileStream |> Option.iter _.Close() // Before we log so time includes flush time and no confusion
let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count
let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map StreamName.categoryName).Count
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s",
accI, accCategories, accStreams.Count, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) }

Expand All @@ -460,6 +463,7 @@ module CosmosTop =
open Equinox.CosmosStore.Linq.Internal
open FSharp.Control
open System.Text.Json

module private Parser =
let scratch = new System.IO.MemoryStream()
let inline utf8Size (x: JsonElement) =
Expand All @@ -470,21 +474,18 @@ module CosmosTop =
scratch.Position <- 0L
if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position
else utf8Size x
let inline tryProp (x: JsonElement) (id: string): ValueOption<JsonElement> =
let mutable p = Unchecked.defaultof<_>
if x.TryGetProperty(id, &p) then ValueSome p else ValueNone
// using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about
let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0
let _e = Unchecked.defaultof<Equinox.CosmosStore.Core.Event> // Or Unfold - both share field names
let inline ciSize (x: JsonElement) =
(struct (0, 0L), x.EnumerateArray())
||> Seq.fold (fun struct (c, i) x ->
let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0
struct (c + (tryProp x (nameof _e.correlationId) |> stringLen) + (tryProp x (nameof _e.causationId) |> stringLen),
i + (tryProp x (nameof _e.d) |> infSize) + (tryProp x (nameof _e.m) |> infSize)))
struct (c + (x.TryProp(nameof _e.correlationId) |> stringLen) + (x.TryProp(nameof _e.causationId) |> stringLen),
i + (x.TryProp(nameof _e.d) |> infSize) + (x.TryProp(nameof _e.m) |> infSize)))
let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
let inline tryEquinoxStreamName x =
match tryProp x (nameof _t.p) with
let inline tryEquinoxStreamName (x: JsonElement) =
match x.TryProp(nameof _t.p) with
| ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
| _ -> ValueNone
Expand All @@ -500,42 +501,33 @@ module CosmosTop =
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
static Create(key, x: JsonElement) =
let struct (e, eb, struct (ec, ei)) = tryProp x (nameof _t.e) |> tryParseEventOrUnfold
let struct (u, ub, struct (uc, ui)) = tryProp x (nameof _t.u) |> tryParseEventOrUnfold
let struct (e, eb, struct (ec, ei)) = x.TryProp(nameof _t.e) |> tryParseEventOrUnfold
let struct (u, ub, struct (uc, ui)) = x.TryProp(nameof _t.u) |> tryParseEventOrUnfold
{ key = key; count = 1; events = e; unfolds = u
bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui }
let [<Literal>] OrderByTs = " ORDER BY c._ts"
let private composeSql (a: TopArguments) =
let partitionKeyCriteria =
match a.Criteria with
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
| Criteria.Unfiltered -> "1=1"
$"SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}"
let inline cosmosTimeStamp (x: JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
let private sql (a: TopArguments) = $"SELECT * FROM c WHERE {a.Criteria.Sql}{if a.TsOrder then OrderByTs else null}"
let run (a: TopArguments) = task {
let sw = System.Diagnostics.Stopwatch.StartNew()
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L
let s = System.Collections.Generic.HashSet()
let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
let group = if a.StreamLevel then id else categoryName
try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do
let group = if a.StreamLevel then id else StreamName.categoryName
try for rtt, rc, items, rdc, rds, ods in a.Execute(sql a) |> Query.enum__ do
let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
for x in items do
newestTs <- max newestTs (cosmosTimeStamp x)
newestTs <- max newestTs x.Timestamp
match Parser.tryEquinoxStreamName x with
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
| ValueSome sn ->
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
let x = Parser.Stat.Create(group sn, x)
let mutable v = Unchecked.defaultof<_>
if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore
else s.Add x |> ignore
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow)
s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds
pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs)
pageStreams.Clear()
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
Expand All @@ -544,9 +536,10 @@ module CosmosTop =
let accCats = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _.Count
let accStreams = if a.StreamLevel then s.Count else accStreams.Count
let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes
let giB x = miB x / 1024.
let inline giB x = miB x / 1024.
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s",
accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds)

let sort: Parser.Stat seq -> Parser.Stat seq = a.Order |> function
| Order.Name -> Seq.sortBy _.key
| Order.Size -> Seq.sortByDescending _.bytes
Expand All @@ -561,7 +554,7 @@ module CosmosTop =
Log.Information("{count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}",
x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
if a.StreamLevel then
let collapsed = s |> Seq.groupBy (_.key >> categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat })
let collapsed = s |> Seq.groupBy (_.key >> StreamName.categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat })
sort collapsed |> Seq.truncate a.Count |> Seq.iter render
sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render }

Expand Down Expand Up @@ -675,7 +668,7 @@ type Arguments(p: ParseResults<Parameters>) =
member _.CreateDomainLog() = createDomainLog verbose verboseConsole maybeSeq
member _.ExecuteSubCommand() = async {
match p.GetSubCommand() with
| Init a -> (CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None).Wait()
| Init a -> do! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect
| InitAws a -> do! DynamoInit.table Log.Logger a
| InitSql a -> do! SqlInit.databaseOrSchema Log.Logger a
| Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a
Expand Down

0 comments on commit c0d525a

Please sign in to comment.