Skip to content

Commit

Permalink
Destroy skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 12, 2024
1 parent a92f8cb commit 1a42903
Showing 1 changed file with 90 additions and 20 deletions.
110 changes: 90 additions & 20 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,22 @@ type Parameters =
| [<CliPrefix(CliPrefix.None); Last>] Stats of ParseResults<StatsParameters>
| [<CliPrefix(CliPrefix.None); Last>] Query of ParseResults<QueryParameters>
| [<CliPrefix(CliPrefix.None); Last>] Top of ParseResults<TopParameters>
| [<CliPrefix(CliPrefix.None); Last>] Destroy of ParseResults<DestroyParameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| Verbose -> "Include low level logging regarding specific test runs."
| VerboseConsole -> "Include low level test and store actions logging in on-screen output to console."
| LocalSeq -> "Configures writing to a local Seq endpoint at http://localhost:5341, see https://getseq.net"
| LogFile _ -> "specify a log file to write the result breakdown into (default: eqx.log)."
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
| LoadTest _ -> "Run a load test"
| Init _ -> "Initialize Store/Container (supports `cosmos` stores; also handles RU/s provisioning adjustment)."
| InitAws _ -> "Initialize DynamoDB Table (supports `dynamo` stores; also handles RU/s provisioning adjustment)."
| InitSql _ -> "Initialize Database Schema (supports `mssql`/`mysql`/`postgres` SqlStreamStore stores)."
| Stats _ -> "inspect store to determine numbers of streams/documents/events and/or config (supports `cosmos` and `dynamo` stores)."
| Query _ -> "Load/Summarise streams based on Cosmos SQL Queries (supports `cosmos` only)."
| Top _ -> "Scan to determine top categories and streams (supports `cosmos` only)."
| Dump _ -> "Load and show events in a specified stream (supports all stores)."
| Destroy _ -> "DELETE documents for a nominated category and/or stream (includes a dry-run mode). (supports `cosmos` only)."
and [<NoComparison; NoEquality; RequireSubcommand>] InitParameters =
| [<AltCommandLine "-ru"; Unique>] Rus of int
| [<AltCommandLine "-A"; Unique>] Autoscale
Expand Down Expand Up @@ -156,7 +158,7 @@ and QueryArguments(p: ParseResults<QueryParameters>) =
match p.TryGetResult QueryParameters.StreamName, p.TryGetResult QueryParameters.CategoryName, p.TryGetResult QueryParameters.CategoryLike with
| Some sn, None, None -> Criteria.SingleStream sn
| Some _, Some _, _
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive"
| None, Some cn, None -> Criteria.CatName cn
| None, None, Some cl -> Criteria.CatLike cl
| None, None, None -> Criteria.Unfiltered
Expand Down Expand Up @@ -190,10 +192,10 @@ and [<NoComparison; NoEquality; RequireSubcommand>] TopParameters =
and Order = Name | Items | Events | Unfolds | Size | EventSize | UnfoldSize | InflateSize | CorrCauseSize
and TopArguments(p: ParseResults<TopParameters>) =
member val Criteria =
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
match p.TryGetResult TopParameters.StreamName, p.TryGetResult TopParameters.CategoryName, p.TryGetResult TopParameters.CategoryLike with
| Some sn, None, None -> Criteria.SingleStream sn
| Some _, Some _, _
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName mutually exclusive"
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive"
| None, Some cn, None -> Criteria.CatName cn
| None, None, Some cl -> Criteria.CatLike cl
| None, None, None -> Criteria.Unfiltered
Expand All @@ -211,7 +213,39 @@ and TopArguments(p: ParseResults<TopParameters>) =
let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems)
container.GetItemQueryIterator<System.Text.Json.JsonElement>(qd, requestOptions = qo)

and [<NoComparison; NoEquality; RequireSubcommand>] DestroyParameters =
| [<AltCommandLine "-sn"; Unique>] StreamName of string
| [<AltCommandLine "-cn"; Unique>] CategoryName of string
| [<AltCommandLine "-cl"; Unique>] CategoryLike of string
| [<AltCommandLine "-f"; Unique>] Force
| [<CliPrefix(CliPrefix.None)>] Cosmos of ParseResults<Store.Cosmos.Parameters>
interface IArgParserTemplate with
member a.Usage = a |> function
| StreamName _ -> "Specify stream name to match against `p`, e.g. `$UserServices-f7c1ce63389a45bdbea1cccebb1b3c8a`."
| CategoryName _ -> "Specify category name to match against `p`, e.g. `$UserServices`."
| CategoryLike _ -> "Specify category name to match against `p` as a Cosmos LIKE expression (with `%` as wildcard, e.g. `$UserServices-%`."
| Force -> "Actually delete the documents (default is a dry run, reporting what would be deleted)"
| Cosmos _ -> "Parameters for CosmosDB."
and DestroyArguments(p: ParseResults<DestroyParameters>) =
member val Criteria =
match p.TryGetResult StreamName, p.TryGetResult CategoryName, p.TryGetResult CategoryLike with
| Some sn, None, None -> Criteria.SingleStream sn
| Some _, Some _, _
| Some _, _, Some _ -> p.Raise "StreamName and CategoryLike/CategoryName are mutually exclusive"
| None, Some cn, None -> Criteria.CatName cn
| None, None, Some cl -> Criteria.CatLike cl
| None, None, None -> failwith "Category or stream name criteria must be supplied"
| None, Some _, Some _ -> p.Raise "CategoryLike and CategoryName are mutually exclusive"
member val CosmosArgs = p.GetResult DestroyParameters.Cosmos |> Store.Cosmos.Arguments
member val DryRun = p.Contains Force |> not
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
| Store.Config.Cosmos (cc, _, _) -> cc.Container
| _ -> failwith "Destroy requires Cosmos"
member x.Execute(sql) = let container = x.Connect()
let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems)
container.GetItemQueryIterator<SnEventsUnfolds>(qd, requestOptions = qo)
and SnEventsUnfolds = { sn: string; events: int; unfolds: int }
and [<NoComparison; NoEquality; RequireSubcommand>] DumpParameters =
| [<AltCommandLine "-s"; MainCommand>] Stream of FsCodec.StreamName
| [<AltCommandLine "-C"; Unique>] Correlation
Expand Down Expand Up @@ -348,6 +382,7 @@ module CosmosStats =

open Equinox.CosmosStore.Linq.Internal
open FSharp.Control

let run (log : ILogger, _verboseConsole, _maybeSeq) (p : ParseResults<StatsParameters>) =
match p.GetSubCommand() with
| StatsParameters.Cosmos sp ->
Expand Down Expand Up @@ -389,6 +424,7 @@ let prettySerdes = lazy FsCodec.SystemTextJson.Serdes(FsCodec.SystemTextJson.Opt
type System.Text.Json.JsonElement with
member x.Timestamp = x.GetProperty("_ts").GetDouble() |> DateTime.UnixEpoch.AddSeconds
member x.TryProp(name: string) = let mutable p = Unchecked.defaultof<_> in if x.TryGetProperty(name, &p) then ValueSome p else ValueNone

module StreamName =
let categoryName = FsCodec.StreamName.parse >> FsCodec.StreamName.split >> fun struct (cn, _sid) -> cn

Expand Down Expand Up @@ -464,6 +500,17 @@ module CosmosTop =
open FSharp.Control
open System.Text.Json

let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
let inline tryEquinoxStreamName (x: JsonElement) =
match x.TryProp(nameof _t.p) with
| ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
| _ -> ValueNone
let inline parseEquinoxStreamName (x: JsonElement) =
match tryEquinoxStreamName x with
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
| ValueSome sn -> sn

module private Parser =
let scratch = new System.IO.MemoryStream()
let utf8Size (x: JsonElement) =
Expand All @@ -483,15 +530,10 @@ module CosmosTop =
||> Seq.fold (fun struct (c, i) x ->
struct (c + (x.TryProp(nameof _e.correlationId) |> stringLen) + (x.TryProp(nameof _e.causationId) |> stringLen),
i + (x.TryProp(nameof _e.d) |> infSize) + (x.TryProp(nameof _e.m) |> infSize)))
let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
let inline tryEquinoxStreamName (x: JsonElement) =
match x.TryProp(nameof _t.p) with
| ValueSome (je: JsonElement) when je.ValueKind = JsonValueKind.String ->
je.GetString() |> FsCodec.StreamName.parse |> FsCodec.StreamName.toString |> ValueSome
| _ -> ValueNone
let private tryParseEventOrUnfold = function
| ValueNone -> struct (0, 0L, struct (0, 0L))
| ValueSome (x: JsonElement) -> x.GetArrayLength(), utf8Size x, dmcSize x
let _t = Unchecked.defaultof<Equinox.CosmosStore.Core.Tip>
[<Struct; CustomEquality; NoComparison>]
type Stat =
{ key: string; count: int; events: int; unfolds: int; bytes: int64; eBytes: int64; uBytes: int64; cBytes: int64; iBytes: int64 }
Expand All @@ -517,15 +559,13 @@ module CosmosTop =
let mutable pageI, pageE, pageU, pageB, pageCc, pageDm, newestTs, sw = 0, 0, 0, 0L, 0L, 0L, DateTime.MinValue, System.Diagnostics.Stopwatch.StartNew()
for x in items do
newestTs <- max newestTs x.Timestamp
match Parser.tryEquinoxStreamName x with
| ValueNone -> failwith $"Could not parse document:\n{prettySerdes.Value.Serialize x}"
| ValueSome sn ->
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
let x = Parser.Stat.Create(group sn, x)
let mutable v = Unchecked.defaultof<_>
s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds
pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
let sn = parseEquinoxStreamName x
if pageStreams.Add sn && not a.StreamLevel then accStreams.Add sn |> ignore
let x = Parser.Stat.Create(group sn, x)
let mutable v = Unchecked.defaultof<_>
s.Add(if s.TryGetValue(x, &v) then s.Remove x |> ignore; v.Merge x else x) |> ignore
pageI <- pageI + 1; pageE <- pageE + x.events; pageU <- pageU + x.unfolds
pageB <- pageB + x.bytes; pageCc <- pageCc + x.cBytes; pageDm <- pageDm + x.iBytes
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}<{jds,4:f2}MiB{rc,7:f2}RU{s,5:N1}s D+M{im,4:f1} C+C{cm,5:f2} {ms,3}ms age {age:dddd\.hh\:mm\:ss}",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, miB pageB, rc, rtt.TotalSeconds, miB pageDm, miB pageCc, sw.ElapsedMilliseconds, DateTime.UtcNow - newestTs)
pageStreams.Clear()
Expand Down Expand Up @@ -558,6 +598,35 @@ module CosmosTop =
sort collapsed |> Seq.truncate a.Count |> Seq.iter render
sort s |> Seq.truncate (if a.StreamLevel then a.StreamCount else a.Count) |> Seq.iter render }

module CosmosDestroy =

open Equinox.CosmosStore.Linq.Internal
open FSharp.Control

let run (a: DestroyArguments) = task {
let sw = System.Diagnostics.Stopwatch.StartNew()
let sql = $"SELECT c.p AS sn, ARRAYLENGTH(c.e) AS events, ARRAYLENGTH(c.u) AS unfolds FROM c WHERE {a.Criteria.Sql}"
if a.DryRun then Log.Warning("Dry-run of deleting all Items matching {sql}", sql)
else Log.Warning("DESTROYING all Items matching {sql}", sql)

let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
let mutable accI, accE, accU, accRus, accRds, accOds = 0L, 0L, 0L, 0., 0L, 0L
try for rtt, rc, items, rdc, rds, ods in a.Execute sql |> Query.enum__ do
let mutable pageI, pageE, pageU, sw = 0, 0, 0, System.Diagnostics.Stopwatch.StartNew()
for i in items do
if pageStreams.Add i.sn then accStreams.Add i.sn |> ignore
pageI <- pageI + 1; pageE <- pageE + i.events; pageU <- pageU + i.unfolds
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}{rc,7:f2}RU{s,5:N1}s {s,5:N1}s",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, sw.Elapsed.TotalSeconds)
pageStreams.Clear()
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods
finally

let accCats = accStreams |> Seq.map StreamName.categoryName |> System.Collections.Generic.HashSet |> _.Count
Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {ru:N2}RU {s:N1}s",
accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds) }

module DynamoInit =

open Equinox.DynamoStore
Expand Down Expand Up @@ -674,6 +743,7 @@ type Arguments(p: ParseResults<Parameters>) =
| Dump a -> do! Dump.run (Log.Logger, verboseConsole, maybeSeq) a
| Query a -> do! CosmosQuery.run (QueryArguments a) |> Async.AwaitTaskCorrect
| Top a -> do! CosmosTop.run (TopArguments a) |> Async.AwaitTaskCorrect
| Destroy a -> do! CosmosDestroy.run (DestroyArguments a) |> Async.AwaitTaskCorrect
| Stats a -> do! CosmosStats.run (Log.Logger, verboseConsole, maybeSeq) a
| LoadTest a -> let n = p.GetResult(LogFile, fun () -> p.ProgramName + ".log")
let reportFilename = System.IO.FileInfo(n).FullName
Expand Down

0 comments on commit 1a42903

Please sign in to comment.