Skip to content

Commit

Permalink
Implement deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jun 12, 2024
1 parent 1a42903 commit 8a8c25c
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -241,11 +241,7 @@ and DestroyArguments(p: ParseResults<DestroyParameters>) =
member x.Connect() = match Store.Cosmos.config Log.Logger (None, true) x.CosmosArgs with
| Store.Config.Cosmos (cc, _, _) -> cc.Container
| _ -> failwith "Destroy requires Cosmos"
member x.Execute(sql) = let container = x.Connect()
let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = x.CosmosArgs.QueryMaxItems)
container.GetItemQueryIterator<SnEventsUnfolds>(qd, requestOptions = qo)
and SnEventsUnfolds = { sn: string; events: int; unfolds: int }
and SnEventsUnfolds = { p: string; id: string; es: int; us: int }
and [<NoComparison; NoEquality; RequireSubcommand>] DumpParameters =
| [<AltCommandLine "-s"; MainCommand>] Stream of FsCodec.StreamName
| [<AltCommandLine "-C"; Unique>] Correlation
Expand Down Expand Up @@ -604,28 +600,44 @@ module CosmosDestroy =
open FSharp.Control

let run (a: DestroyArguments) = task {
let sw = System.Diagnostics.Stopwatch.StartNew()
let sql = $"SELECT c.p AS sn, ARRAYLENGTH(c.e) AS events, ARRAYLENGTH(c.u) AS unfolds FROM c WHERE {a.Criteria.Sql}"
let tsw = System.Diagnostics.Stopwatch.StartNew()
let sql = $"SELECT c.p, c.id, ARRAYLENGTH(c.e) AS es, ARRAYLENGTH(c.u) AS us FROM c WHERE {a.Criteria.Sql}"
if a.DryRun then Log.Warning("Dry-run of deleting all Items matching {sql}", sql)
else Log.Warning("DESTROYING all Items matching {sql}", sql)

let container = a.Connect()
let query =
let qd = Microsoft.Azure.Cosmos.QueryDefinition sql
let qo = Microsoft.Azure.Cosmos.QueryRequestOptions(MaxItemCount = a.CosmosArgs.QueryMaxItems)
container.GetItemQueryIterator<SnEventsUnfolds>(qd, requestOptions = qo)
let pageStreams, accStreams = System.Collections.Generic.HashSet(), System.Collections.Generic.HashSet()
let mutable accI, accE, accU, accRus, accRds, accOds = 0L, 0L, 0L, 0., 0L, 0L
try for rtt, rc, items, rdc, rds, ods in a.Execute sql |> Query.enum__ do
let mutable pageI, pageE, pageU, sw = 0, 0, 0, System.Diagnostics.Stopwatch.StartNew()
let mutable accI, accE, accU, accRus, accDelRu, accRds, accOds = 0L, 0L, 0L, 0., 0., 0L, 0L
try for rtt, rc, items, rdc, rds, ods in query |> Query.enum__ do
let mutable pageI, pageE, pageU, pdRu, idRu = 0, 0, 0, 0., 0.
let psw, isw = System.Diagnostics.Stopwatch.StartNew(), System.Diagnostics.Stopwatch.StartNew()
for i in items do
if pageStreams.Add i.sn then accStreams.Add i.sn |> ignore
pageI <- pageI + 1; pageE <- pageE + i.events; pageU <- pageU + i.unfolds
Log.Information("Page{rdc,5}>{count,4}i{streams,5}s{es,5}e{us,5}u{rds,5:f2}>{ods,4:f2}{rc,7:f2}RU{s,5:N1}s {s,5:N1}s",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, sw.Elapsed.TotalSeconds)
if pageStreams.Add i.p then accStreams.Add i.p |> ignore
pageI <- pageI + 1; pageE <- pageE + i.es; pageU <- pageU + i.us
if not a.DryRun then
let! res = container.DeleteItemStreamAsync(i.id, Microsoft.Azure.Cosmos.PartitionKey i.p)
let ru = res.Headers.RequestCharge in idRu <- idRu + ru; pdRu <- pdRu + ru
if not res.IsSuccessStatusCode then
failwith $"Deletion of {i.p}/{i.id} failed with Code: {res.StatusCode} Message: {res.ErrorMessage}\n{res.Diagnostics}"
if isw.Elapsed.TotalSeconds > 30 then
Log.Information(".. Deleted {count,5}i {streams,7}s{es,7}e{us,7}u {ru,6:N2}WRU/s {s,5:N1}s",
pageI, pageStreams.Count, pageE, pageU, idRu / isw.Elapsed.TotalSeconds, psw.Elapsed.TotalSeconds)
isw.Restart()
idRu <- 0
let ps = psw.Elapsed.TotalSeconds
Log.Information("Page{rdc,6}>{count,5}i {streams,7}s{es,7}e{us,7}u{rds,8:f2}>{ods,4:f2} {rc,8:f2}RRU {s,5:N1}s {ru:N2}WRU/s {s,5:N1}s",
rdc, pageI, pageStreams.Count, pageE, pageU, miB rds, miB ods, rc, rtt.TotalSeconds, pdRu / ps, ps)
pageStreams.Clear()
accI <- accI + int64 pageI; accE <- accE + int64 pageE; accU <- accU + int64 pageU
accRus <- accRus + rc; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods
accRus <- accRus + rc; accDelRu <- accDelRu + pdRu; accRds <- accRds + int64 rds; accOds <- accOds + int64 ods
finally

let accCats = accStreams |> Seq.map StreamName.categoryName |> System.Collections.Generic.HashSet |> _.Count
Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {ru:N2}RU {s:N1}s",
accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, sw.Elapsed.TotalSeconds) }
Log.Information("TOTALS {count:N0}i {cats:N0}c {streams:N0}s {es:N0}e {us:N0}u read {rmib:f1}MiB output {omib:f1}MiB {rru:N2}RRU Avg {ru:N2}WRU/s Delete {ru:N2}WRU Total {s:N1}s",
accI, accCats, accStreams.Count, accE, accU, miB accRds, miB accOds, accRus, accDelRu / tsw.Elapsed.TotalSeconds, accDelRu, tsw.Elapsed.TotalSeconds) }

module DynamoInit =

Expand Down

0 comments on commit 8a8c25c

Please sign in to comment.