Skip to content

Commit

Permalink
Sorting, C+C, D+M, unsorted
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 8, 2024
1 parent d1fb8e1 commit a0e4bb2
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 55 deletions.
13 changes: 9 additions & 4 deletions src/Equinox.CosmosStore/CosmosStoreSerialization.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ module private Deflate =
compressor.Flush() // Could `Close`, but not required
output.ToArray()

let inflate (compressedBytes: byte[]) =
let inflateTo output (compressedBytes: byte[]) =
let input = new MemoryStream(compressedBytes)
let decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress, leaveOpen = true)
let output = new MemoryStream()
decompressor.CopyTo(output)
let inflate compressedBytes =
let output = new MemoryStream()
compressedBytes |> inflateTo output
output.ToArray()

module JsonElement =
Expand All @@ -26,10 +28,13 @@ module JsonElement =

// Avoid introduction of HTML escaping for things like quotes etc (Options.Default uses Options.Create(), which defaults to unsafeRelaxedJsonEscaping=true)
let private optionsNoEscaping = JsonSerializerOptions(Encoder = System.Text.Encodings.Web.JavaScriptEncoder.UnsafeRelaxedJsonEscaping)
let private toUtf8Bytes (value : JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping)
let deflate (value : JsonElement) : JsonElement =
let private toUtf8Bytes (value: JsonElement) = JsonSerializer.SerializeToUtf8Bytes(value, options = optionsNoEscaping)
let deflate (value: JsonElement): JsonElement =
if value.ValueKind = JsonValueKind.Null then value
else value |> toUtf8Bytes |> Deflate.compress |> JsonSerializer.SerializeToElement
let tryInflateTo ms (x: JsonElement) =
if x.ValueKind <> JsonValueKind.String then false
else x.GetBytesFromBase64() |> Deflate.inflateTo ms; true

type CosmosJsonSerializer(options : JsonSerializerOptions) =
inherit Microsoft.Azure.Cosmos.CosmosSerializer()
Expand Down
143 changes: 92 additions & 51 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,21 @@ and [<NoComparison; NoEquality; RequireSubcommand>] TopParameters =
| [<AltCommandLine "-cn"; Unique>] CategoryName of string
| [<AltCommandLine "-cl"; Unique>] CategoryLike of string
| [<AltCommandLine "-S"; Unique>] Streams
| [<MainCommand>] Limit of int
| [<AltCommandLine "-T"; Unique>] TsOrder
| [<AltCommandLine "-c">] Limit of int
| [<MainCommand; AltCommandLine "-s"; Unique>] Sort of Order
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`."
| Streams -> "Stream level stats"
| Limit _ -> "Number of items to limit output to"
| TsOrder -> "Retrieve data in `_ts` ORDER (generally has significant RU impact). Default: Use continuation tokens"
| Sort _ -> "Sort order for results"
| Limit _ -> "Number of categories to limit output to (Streams limit is 10x the category limit). Default: 100"
| Cosmos _ -> "Parameters for CosmosDB."
and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize
and TopArguments(p: ParseResults<TopParameters>) =
member val Criteria =
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
Expand All @@ -188,8 +193,11 @@ and TopArguments(p: ParseResults<TopParameters>) =
| None, None, None -> Criteria.Unfiltered
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
member val CosmosArgs = p.GetResult TopParameters.Cosmos |> Store.Cosmos.Arguments
member val StreamLevel = p.Contains TopParameters.Streams
member val Count = p.GetResult(TopParameters.Limit, 100)
member val StreamLevel = p.Contains Streams
member val Count = p.GetResult(Limit, 100)
member val TsOrder = p.Contains TsOrder
member val Order = p.GetResult(Sort, Order.Size)
member x.StreamCount = p.GetResult(Limit, x.Count * 10)
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
| Store.Config.Cosmos (cc, _, _) -> cc.Container
| _ -> failwith "Top requires Cosmos"
Expand Down Expand Up @@ -406,7 +414,7 @@ module CosmosQuery =
| [||] -> "1=1"
| [| x |] -> x |> exists
| xs -> String.Join(" AND ", xs) |> exists
$"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter} ORDER BY c.i"
$"SELECT {selectedFields} FROM c WHERE {partitionKeyCriteria} AND {unfoldFilter}"
let private queryDef (a: QueryArguments) =
let sql = composeSql a
Log.Information("Querying {mode}: {q}", a.Mode, sql)
Expand Down Expand Up @@ -451,78 +459,111 @@ module CosmosTop =

open Equinox.CosmosStore.Linq.Internal
open FSharp.Control

let cosmosTimeStamp (x: System.Text.Json.JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
let tryParseEquinoxBatch (x: System.Text.Json.JsonElement) =
let tryProp (id: string): ValueOption<System.Text.Json.JsonElement> =
open System.Text.Json
module private Parser =
let scratch = new System.IO.MemoryStream()
let inline utf8Size (x: JsonElement) =
scratch.Position <- 0L
JsonSerializer.Serialize(scratch, x)
scratch.Position
let inline inflatedUtf8Size x =
scratch.Position <- 0L
if Equinox.CosmosStore.Core.JsonElement.tryInflateTo scratch x then scratch.Position
else utf8Size x
let inline tryProp (x: JsonElement) (id: string): ValueOption<JsonElement> =
let mutable p = Unchecked.defaultof<_>
if x.TryGetProperty(id, &p) then ValueSome p else ValueNone
match tryProp "p" with
| ValueSome (je: System.Text.Json.JsonElement) when je.ValueKind = System.Text.Json.JsonValueKind.String ->
ValueSome struct (je.GetString() |> FsCodec.StreamName.parse, tryProp "e", tryProp "u")
| _ -> ValueNone

// using the length as a decent proxy for UTF-8 length of corr/causation; if you have messy data in there, you'll have bigger problems to worry about
let inline stringLen x = match x with ValueSome (x: JsonElement) when x.ValueKind <> JsonValueKind.Null -> x.GetString().Length | _ -> 0
let _e = Unchecked.defaultof<Equinox.CosmosStore.Core.Event> // Or Unfold - both share field names
let inline ciSize (x: JsonElement) =
(struct (0, 0L), x.EnumerateArray())
||> Seq.fold (fun struct (c, i) x ->
let inline infSize x = match x with ValueSome x -> inflatedUtf8Size x | ValueNone -> 0
struct (c + (tryProp x (nameof _e.correlationId) |> stringLen) + (tryProp x (nameof _e.causationId) |> stringLen),
i + (tryProp x (nameof _e.d) |> infSize) + (tryProp x (nameof _e.m) |> infSize)))
let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
let inline tryEquinoxStreamName x =
match tryProp x (nameof _t.p) with
| ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
| _ -> ValueNone
let private tryParseEventOrUnfold = function
| ValueNone -> struct (0, 0L, struct (0, 0L))
| ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, ciSize x
[<Struct; CustomEquality; NoComparison>]
type Stat =
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 }
member x.Merge y =
{ key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds; bytes = x.bytes + y.bytes
eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes; cBytes = x.cBytes + y.cBytes; iBytes = x.iBytes + y.iBytes }
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
static Create(key, x: JsonElement) =
let struct (e, eb, struct (ec, ei)) = tryProp x (nameof _t.e) |> tryParseEventOrUnfold
let struct (u, ub, struct (uc, ui)) = tryProp x (nameof _t.u) |> tryParseEventOrUnfold
{ key = key; count = 1; events = e; unfolds = u
bytes = utf8Size x; eBytes = eb; uBytes = ub; cBytes = int64 (ec + uc); iBytes = ei + ui }
let [<Literal>] OrderByTs = " ORDER BY c._ts"
let private composeSql (a: TopArguments) =
let partitionKeyCriteria =
match a.Criteria with
| Criteria.SingleStream sn -> $"c.p = \"{sn}\""
| Criteria.CatName n -> $"c.p LIKE \"{n}-%%\""
| Criteria.CatLike pat -> $"c.p LIKE \"{pat}\""
| Criteria.Unfiltered -> "1=1"
$"SELECT * FROM c WHERE {partitionKeyCriteria}"
let arrayLen = function ValueNone -> 0 | ValueSome (x: System.Text.Json.JsonElement) -> x.GetArrayLength()
let scratch = new System.IO.MemoryStream()
let utf8Size (x: System.Text.Json.JsonElement) =
scratch.Position <- 0L
System.Text.Json.JsonSerializer.Serialize(scratch, x)
scratch.Position
[<Struct; CustomEquality; NoComparison>]
type Stat =
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64 }
static member Create(key, d: System.Text.Json.JsonElement, e: System.Text.Json.JsonElement voption, u: System.Text.Json.JsonElement voption) =
let eb = match e with ValueSome x -> utf8Size x | ValueNone -> 0
let ub = match u with ValueSome x -> utf8Size x | ValueNone -> 0
{ key = key; count = 1; events = arrayLen e; unfolds = arrayLen u
bytes = utf8Size d; eBytes = eb; uBytes = ub }
member x.Merge y =
{ key = x.key; count = x.count + y.count; events = x.events + y.events; unfolds = x.unfolds + y.unfolds
bytes = x.bytes + y.bytes; eBytes = x.eBytes + y.eBytes; uBytes = x.uBytes + y.uBytes }
override x.GetHashCode() = StringComparer.Ordinal.GetHashCode x.key
override x.Equals y = match y with :? Stat as y -> StringComparer.Ordinal.Equals(x.key, y.key) | _ -> false
$"SELECT * FROM c WHERE {partitionKeyCriteria}{if a.TsOrder then OrderByTs else null}"
let inline cosmosTimeStamp (x: JsonElement) = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
let run (a: TopArguments) = task {
let sw = System.Diagnostics.Stopwatch.StartNew()
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
let mutable accI, accE, accU, accRus, accRds, accOds, accBytes = 0L, 0L, 0L, 0., 0L, 0L, 0L
let s = System.Collections.Generic.HashSet()
let categoryName = FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
let g = if a.StreamLevel then FsCodec.StreamName.toString else categoryName
let categoryName = FsCodec.StreamName.Internal.trust >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn
let group = if a.StreamLevel then id else categoryName
try for rtt, rc, items, rdc, rds, ods in a.Execute(composeSql a) |> Query.enum__ do
let mutable pageI, pageE, pageU, pageB, newestTs = 0, 0, 0, 0L, DateTime.MinValue
let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
for x in items do
newestTs <- max newestTs (cosmosTimeStamp x)
match tryParseEquinoxBatch x with
match Parser.tryEquinoxStreamName x with
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
| ValueSome (sn, e, u) ->
if pageStreams.Add sn then accStreams.Add sn |> ignore
let x = Stat.Create(g sn, x, e, u)
| ValueSome sn ->
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
let x = Parser.Stat.Create(group sn, x)
let mutable v = Unchecked.defaultof<_>
if s.TryGetValue(x, &v) then s.Remove x |> ignore; s.Add(v.Merge x) |> ignore
else s.Add x |> ignore
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, newestTs - DateTime.UtcNow)
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds; pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}>{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, newestTs - DateTime.UtcNow)
pageStreams.Clear()
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods; accBytes <- accBytes + pageB
finally

let accCats = System.Collections.Generic.HashSet(accStreams |> Seq.map categoryName).Count
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u {tmib:N1}MiB Read {rmib:N1}>{omib:N1} {ru:N2}RU {s:N1}s",
accI, accCats, accStreams.Count, accE, accU, miB accBytes, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds)

for x in s |> Seq.sortByDescending _.bytes |> Seq.truncate a.Count do
Log.Information("{key,-20}:{count,7}i {mib,6:N1}MiB E{events,7} {emib,7:N1} U{unfolds,7} {umib,6:N1}",
x.key, x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes) }
let accCats = (if a.StreamLevel then s |> Seq.map _.key else accStreams) |> Seq.map group |> System.Collections.Generic.HashSet |> _.Count
let accStreams = if a.StreamLevel then s.Count else accStreams.Count
let iBytes, cBytes = s |> Seq.sumBy _.iBytes, s |> Seq.sumBy _.cBytes
let giB x = miB x / 1024.
Log.Information("TOTALS {count:N0}i {cats}c {streams:N0}s {es:N0}e {us:N0}u read {rg:f1}GiB output {og:f1}GiB JSON {tg:f1}GiB D+M(inflated) {ig:f1}GiB C+C {cm:f2}MiB {ru:N2}RU {s:N1}s",
accI, accCats, accStreams, accE, accU, giB accRds, giB accOds, giB accBytes, giB iBytes, miB cBytes, accRus, sw.Elapsed.TotalSeconds)
let sort: Parser.Stat seq -> Parser.Stat seq = a.Order |> function
| Order.Name -> Seq.sortBy _.key
| Order.Size -> Seq.sortByDescending _.bytes
| Order.Items -> Seq.sortByDescending _.count
| Order.Events -> Seq.sortByDescending _.events
| Order.Unfolds -> Seq.sortByDescending _.unfolds
| Order.EventSize -> Seq.sortByDescending _.eBytes
| Order.UnfoldSize -> Seq.sortByDescending _.uBytes
| Order.InflateSize -> Seq.sortByDescending _.iBytes
| Order.CorrCauseSize -> Seq.sortByDescending _.cBytes
let render (x: Parser.Stat) =
Log.Information("{count,7}i {tm,6:N2}MiB E{events,7} {em,7:N1} U{unfolds,7} {um,6:N1} D+M{dm,6:N1} C+C{cm,5:N1} {key}",
x.count, miB x.bytes, x.events, miB x.eBytes, x.unfolds, miB x.uBytes, miB x.iBytes, miB x.cBytes, x.key)
if a.StreamLevel then
let collapsed = s |> Seq.groupBy (_.key >> categoryName) |> Seq.map (fun (cat, xs) -> { (xs |> Seq.reduce _.Merge) with key = cat })
sort collapsed |> Seq.truncate a.Count |> Seq.iter render
sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render }

module DynamoInit =

Expand Down

0 comments on commit a0e4bb2

Please sign in to comment.