diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index c56f1f238..04b894fd5 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -141,7 +141,13 @@ and [] QueryParameters = | Console -> "Also emit the JSON to the console. Default: Gather statistics (but only write to a File if specified)" | Cosmos _ -> "Parameters for CosmosDB." and [] Mode = Default | SnapOnly | SnapWithStream | ReadOnly | ReadWithStream | Raw -and [] Criteria = SingleStream of string | CatName of string | CatLike of string | Unfiltered +and [] Criteria = + | SingleStream of string | CatName of string | CatLike of string | Unfiltered + member x.Sql = x |> function + | Criteria.SingleStream sn -> $"c.p = \"{sn}\"" + | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\"" + | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\"" + | Criteria.Unfiltered -> "1=1" and QueryArguments(p: ParseResults) = member val Mode = p.GetResult(QueryParameters.Mode, if p.Contains QueryParameters.File then Mode.Raw else Mode.Default) member val Pretty = p.Contains QueryParameters.Pretty @@ -380,25 +386,23 @@ module CosmosStats = let prettySerdes = lazy FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Options.Create(indent = true)) +type System.Text.Json.JsonElement with + member x.Timestamp = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds + member x.TryProp(name: string) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty(name, &p) then ValueSome p else ValueNone +module StreamName = + let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn + module CosmosQuery = open Equinox.CosmosStore.Linq.Internal open FSharp.Control - type System.Text.Json.JsonDocument with - member x.Cast<'T>() = System.Text.Json.JsonSerializer.Deserialize<'T>(x.RootElement) - member x.Timestamp = - let ok, p = x.RootElement.TryGetProperty("_ts") - if ok then p.GetDouble() |> DateTime.UnixEpoch.AddSeconds |> Some else None + let private composeSql (a: QueryArguments) = - let inline warnOnUnfiltered () = + match a.Criteria with + | Criteria.Unfiltered -> let lel = if a.Mode = Mode.Raw then LogEventLevel.Debug elif a.Filepath = None then LogEventLevel.Warning else LogEventLevel.Information Log.Write(lel, "No StreamName or CategoryName/CategoryLike specified - Unfold Criteria better be unambiguous") - let partitionKeyCriteria = - match a.Criteria with - | Criteria.SingleStream sn -> $"c.p = \"{sn}\"" - | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\"" - | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\"" - | Criteria.Unfiltered -> warnOnUnfiltered (); "1=1" + | _ -> () let selectedFields = match a.Mode with | Mode.Default -> "c._etag, c.p, c.u[0].d" @@ -414,7 +418,7 @@ module CosmosQuery = | [||] -> "1=1" | [| x |] -> x |> exists | xs -> String.Join(" AND ", xs) |> exists - $"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter}" + $"SELECT {selectedFields} FROM c WHERE {a.Criteria.Sql} AND {unfoldFilter}" let private queryDef (a: QueryArguments) = let sql = composeSql a Log.Information("Querying {mode}: {q}", a.Mode, sql) @@ -431,14 +435,14 @@ module CosmosQuery = let mutable accI, accE, accU, accRus, accBytesRead = 0L, 0L, 0L, 0., 0L let it = container.GetItemQueryIterator(queryDef a, requestOptions = qo) try for rtt, rc, items, rdc, rds, ods in it |> Query.enum__ do - let mutable newestTs = None - let items = [| for x in items -> newestTs <- max newestTs x.Timestamp - x.Cast() |] + let mutable newestTs = DateTime.MinValue + let items = [| for x in items -> newestTs <- max newestTs x.RootElement.Timestamp + System.Text.Json.JsonSerializer.Deserialize(x.RootElement) |] let inline arrayLen x = if isNull x then 0 else Array.length x pageStreams.Clear(); for x in items do if x.p <> null && pageStreams.Add x.p then accStreams.Add x.p |> ignore let pageI, pageE, pageU = items.Length, items |> Seq.sumBy (_.e >> arrayLen), items |> Seq.sumBy (_.u >> arrayLen) Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, newestTs.Value - DateTime.UtcNow) + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, DateTime.UtcNow - newestTs) maybeFileStream |> Option.iter (fun stream -> for x in items do serdes.SerializeToStream(x, stream) @@ -450,8 +454,7 @@ module CosmosQuery = finally let fileSize = maybeFileStream |> Option.map _.Position |> Option.defaultValue 0 maybeFileStream |> Option.iter _.Close() // Before we log so time includes flush time and no confusion - let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn - let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count + let accCategories = System.Collections.Generic.HashSet(accStreams |> Seq.map StreamName.categoryName).Count Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u R/W {rmib:N1}/{wmib:N1}MiB {ru:N2}RU {s:N1}s", accI, accCategories, accStreams.Count, accE, accU, miB accBytesRead, miB fileSize, accRus, sw.Elapsed.TotalSeconds) } @@ -460,6 +463,7 @@ module CosmosTop = open Equinox.CosmosStore.Linq.Internal open FSharp.Control open System.Text.Json + module private Parser = let scratch = new System.IO.MemoryStream() let inline utf8Size (x: JsonElement) = @@ -470,9 +474,6 @@ module CosmosTop = scratch.Position <- 0L if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position else utf8Size x - let inline tryProp (x: JsonElement) (id: string): ValueOption = - let mutable p = Unchecked.defaultof<_> - if x.TryGetProperty(id, &p) then ValueSome p else ValueNone // using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0 let _e = Unchecked.defaultof // Or Unfold - both share field names @@ -480,11 +481,11 @@ module CosmosTop = (struct (0, 0L), x.EnumerateArray()) ||> Seq.fold (fun struct (c, i) x -> let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0 - struct (c + (tryProp x (nameof _e.correlationId) |> stringLen) + (tryProp x (nameof _e.causationId) |> stringLen), - i + (tryProp x (nameof _e.d) |> infSize) + (tryProp x (nameof _e.m) |> infSize))) + struct (c + (x.TryProp(nameof _e.correlationId) |> stringLen) + (x.TryProp(nameof _e.causationId) |> stringLen), + i + (x.TryProp(nameof _e.d) |> infSize) + (x.TryProp(nameof _e.m) |> infSize))) let _t = Unchecked.defaultof - let inline tryEquinoxStreamName x = - match tryProp x (nameof _t.p) with + let inline tryEquinoxStreamName (x: JsonElement) = + match x.TryProp(nameof _t.p) with | ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String -> je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome | _ -> ValueNone @@ -500,42 +501,33 @@ module CosmosTop = override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false static Create(key, x: JsonElement) = - let struct (e, eb, struct (ec, ei)) = tryProp x (nameof _t.e) |> tryParseEventOrUnfold - let struct (u, ub, struct (uc, ui)) = tryProp x (nameof _t.u) |> tryParseEventOrUnfold + let struct (e, eb, struct (ec, ei)) = x.TryProp(nameof _t.e) |> tryParseEventOrUnfold + let struct (u, ub, struct (uc, ui)) = x.TryProp(nameof _t.u) |> tryParseEventOrUnfold { key = key; count = 1; events = e; unfolds = u bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui } let [] OrderByTs = " ORDER BY c._ts" - let private composeSql (a: TopArguments) = - let partitionKeyCriteria = - match a.Criteria with - | Criteria.SingleStream sn -> $"c.p = \"{sn}\"" - | Criteria.CatName n -> $"c.p LIKE \"{n}-%%\"" - | Criteria.CatLike pat -> $"c.p LIKE \"{pat}\"" - | Criteria.Unfiltered -> "1=1" - $"SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}" - let inline cosmosTimeStamp (x: JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds + let private sql (a: TopArguments) = $"SELECT * FROM c WHERE {a.Criteria.Sql}{if a.TsOrder then OrderByTs else null}" let run (a: TopArguments) = task { let sw = System.Diagnostics.Stopwatch.StartNew() let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet() let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L let s = System.Collections.Generic.HashSet() - let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn - let group = if a.StreamLevel then id else categoryName - try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do + let group = if a.StreamLevel then id else StreamName.categoryName + try for rtt, rc, items, rdc, rds, ods in a.Execute(sql a) |> Query.enum__ do let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew() for x in items do - newestTs <- max newestTs (cosmosTimeStamp x) + newestTs <- max newestTs x.Timestamp match Parser.tryEquinoxStreamName x with | ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}" | ValueSome sn -> if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore let x = Parser.Stat.Create(group sn, x) let mutable v = Unchecked.defaultof<_> - if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore - else s.Add x |> ignore - pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes - Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", - rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow) + s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore + pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds + pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes + Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}", + rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs) pageStreams.Clear() accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB @@ -544,9 +536,10 @@ module CosmosTop = let accCats = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _.Count let accStreams = if a.StreamLevel then s.Count else accStreams.Count let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes - let giB x = miB x / 1024. + let inline giB x = miB x / 1024. Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s", accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds) + let sort: Parser.Stat seq -> Parser.Stat seq = a.Order |> function | Order.Name -> Seq.sortBy _.key | Order.Size -> Seq.sortByDescending _.bytes @@ -561,7 +554,7 @@ module CosmosTop = Log.Information("{count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}", x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key) if a.StreamLevel then - let collapsed = s |> Seq.groupBy (_.key >> categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat }) + let collapsed = s |> Seq.groupBy (_.key >> StreamName.categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat }) sort collapsed |> Seq.truncate a.Count |> Seq.iter render sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render } @@ -675,7 +668,7 @@ type Arguments(p: ParseResults) = member _.CreateDomainLog() = createDomainLog verbose verboseConsole maybeSeq member _.ExecuteSubCommand() = async { match p.GetSubCommand() with - | Init a -> (CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None).Wait() + | Init a -> do! CosmosInit.containerAndOrDb Log.Logger a CancellationToken.None |> Async.AwaitTaskCorrect | InitAws a -> do! DynamoInit.table Log.Logger a | InitSql a -> do! SqlInit.databaseOrSchema Log.Logger a | Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a