Skip to content

Commit

Permalink
Release 4.0.0-rc.13
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Aug 24, 2023
1 parent ecbf8f1 commit 832d5e9
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 52 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
- `Equinox.Core.Batching`: `BatcherDictionary`, `BatcherCache` to host concurrent `Batchers` [#390](https://github.com/jet/equinox/pull/390)
- `Equinox.Core.Batching`: Add `limiter: SemaphoreSlim` argument to extend linger phase [#427](https://github.com/jet/equinox/pull/427)
- `CosmosStore`: `CosmosStoreConnector.Connect`, `CosmosStoreClient.Connect`, `CosmosStoreContext.Connect` [#421](https://github.com/jet/equinox/pull/421)
- `CosmosStore.CosmosStoreConnector`: `Connect`, `ConnectAsync` [#421](https://github.com/jet/equinox/pull/421)
- `CosmosStore.Exceptions`: Active patterns to simplify classification in the context of Propulsion handlers [#416](https://github.com/jet/equinox/pull/416)
- `CosmosStore.Prometheus`: Add `rut` tag to enable filtering/grouping by Read vs Write activity as per `DynamoStore` [#321](https://github.com/jet/equinox/pull/321)
- `DynamoStore`/`DynamoStore.Prometheus`: Implements the majority of the `CosmosStore` functionality via `FSharp.AWS.DynamoDB` [#321](https://github.com/jet/equinox/pull/321)
Expand Down Expand Up @@ -53,7 +53,6 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `CosmosStore`: Switch to natively using `JsonElement` event bodies [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach)
- `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305)
- `CosmosStore.Connect`: Changed arguments to use `CosmosStoreConnector` (to align with other `Connect` members in same PR) [#421](https://github.com/jet/equinox/pull/421)
- `CosmosStore.AccessStrategy.MultiSnapshot`,`Custom`: Change `list` and `seq` types to `array` [#338](https://github.com/jet/equinox/pull/338)
- `CosmosStore.Core.Initialization.initAux`: Replace hard-coded manual 400 RU with `mode` parameter [#328](https://github.com/jet/equinox/pull/328) :pray: [@brihadish](https://github.com/brihadish)
- `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317)
Expand Down
4 changes: 2 additions & 2 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -1833,8 +1833,8 @@ let connector: Equinox.CosmosStore.CosmosStoreConnector =
maxRetryAttemptsOnRateLimitedRequests = 1,
maxRetryWaitTimeOnRateLimitedRequests = TimeSpan.FromSeconds 3.)
let! context = CosmosStoreContext.Connect(connector, "databaseName", "containerName")
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, "databaseName", "containerName")
let ctx = EventsContext(context, gatewayLog)
//
Expand Down
8 changes: 4 additions & 4 deletions samples/Infrastructure/Store.fs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ module Cosmos =
let context =
match connect log a with
| (connector, databaseId, containerId), None ->
CosmosStoreContext.Connect(connector, databaseId, containerId, a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength, queryMaxItems = a.QueryMaxItems)
|> Async.RunSynchronously
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
CosmosStoreContext(client, databaseId, containerId, a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength, queryMaxItems = a.QueryMaxItems)
| (connector, databaseId, containerId), Some (aConnector, aDatabaseId, aContainerId) ->
let cosmosClient = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let archiveCosmosClient = aConnector.Connect(aDatabaseId, [| aContainerId |]) |> Async.RunSynchronously
let cosmosClient = connector.CreateAndInitialize(databaseId, [| containerId |]) |> Async.RunSynchronously
let archiveCosmosClient = aConnector.CreateAndInitialize(aDatabaseId, [| aContainerId |]) |> Async.RunSynchronously
let client = CosmosStoreClient(cosmosClient, archiveCosmosClient)
CosmosStoreContext(client, databaseId, containerId, a.TipMaxEvents, tipMaxJsonLength = a.TipMaxJsonLength, queryMaxItems = a.QueryMaxItems,
archiveDatabaseId = aDatabaseId, archiveContainerId = aContainerId)
Expand Down
3 changes: 2 additions & 1 deletion samples/Tutorial/AsAt.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ module Cosmos =
let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., Microsoft.Azure.Cosmos.ConnectionMode.Gateway)
let databaseId, containerId = read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER"
let context = CosmosStoreContext.Connect(connector, databaseId, containerId, tipMaxEvents = 10) |> Async.RunSynchronously
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, databaseId, containerId, tipMaxEvents = 10)
let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot)
let cat = CosmosStoreCategory(context, Stream.Category, Events.codecJe, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
let resolve = Equinox.Decider.forStream Log.log cat
Expand Down
3 changes: 2 additions & 1 deletion samples/Tutorial/Cosmos.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ module Store =
let connector = CosmosStoreConnector(discovery, System.TimeSpan.FromSeconds 5., 2, System.TimeSpan.FromSeconds 5., Microsoft.Azure.Cosmos.ConnectionMode.Gateway)

let databaseId, containerId = read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER"
let context = CosmosStoreContext.Connect(connector, databaseId, containerId, tipMaxEvents = 10) |> Async.RunSynchronously
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, databaseId, containerId, tipMaxEvents = 10)
let cache = Equinox.Cache(appName, 20)

let service = Favorites.Cosmos.category (Store.context, Store.cache) |> Favorites.create
Expand Down
3 changes: 2 additions & 1 deletion samples/Tutorial/FulfilmentCenter.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ module Store =
let appName = "equinox-tutorial"
let connector = CosmosStoreConnector(Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION"), TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5.)
let databaseId, containerId = read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER"
let context = CosmosStoreContext.Connect(connector, databaseId, containerId, tipMaxEvents = 256) |> Async.RunSynchronously
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, databaseId, containerId, tipMaxEvents = 256)
let cache = Equinox.Cache(appName, 20)
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching

Expand Down
3 changes: 2 additions & 1 deletion samples/Tutorial/Todo.fsx
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ module Store =
let discovery = Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION")
let connector = CosmosStoreConnector(discovery, TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5.)
let databaseId, containerId = read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER"
let context = CosmosStoreContext.Connect(connector, databaseId, containerId, tipMaxEvents = 100) |> Async.RunSynchronously // Keep up to 100 events in tip before moving events to a new document
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
let context = CosmosStoreContext(client, databaseId, containerId, tipMaxEvents = 100)
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)

let access = AccessStrategy.Snapshot Snapshot.config
Expand Down
3 changes: 2 additions & 1 deletion src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type
cell.TryCompleted() |> ValueOption.exists isValid

/// Gets or asynchronously recomputes value depending on whether value passes the validity check
member _.Await(ct : CancellationToken) = task {
member _.Await(ct: CancellationToken) = task {
// Each concurrent execution takes a copy of the cell, and attempts to reuse the value; later used to ensure only one triggers the workflow
let current = cell
match! current.TryAwaitValid() with
Expand All @@ -30,3 +30,4 @@ type
// If there are concurrent executions, the first through the gate wins; everybody else awaits the instance the winner wrote
let _ = Interlocked.CompareExchange(&cell, newInstance, current)
return! cell.Await() }
member x.Await() = Async.call x.Await
52 changes: 20 additions & 32 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1167,13 +1167,13 @@ type CosmosClientFactory
co

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>CosmosStoreClient.Connect()</c> in preference to this API
/// It's recommended to use <c>CreateAndInitializeAsync</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member x.CreateUninitialized(discovery: Discovery) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> new CosmosClient(string uri, key, x.Options)
| Discovery.ConnectionString cs -> new CosmosClient(cs, x.Options)

/// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitializeAsync(discovery: Discovery, containers, ct) = discovery |> function
| Discovery.AccountUriAndKey (accountUri = uri; key = key) -> CosmosClient.CreateAndInitializeAsync(string uri, key, containers, x.Options, ct)
| Discovery.ConnectionString cs -> CosmosClient.CreateAndInitializeAsync(cs, containers, x.Options, ct)
Expand Down Expand Up @@ -1211,35 +1211,43 @@ type CosmosStoreConnector
member _.Endpoint = discovery.Endpoint

/// Creates an instance of CosmosClient without actually validating or establishing the connection
/// It's recommended to use <c>Connect</c> and/or <c>CosmosStoreClient.Connect()</c> in preference to this API
/// It's recommended to use <c>Connect</c> and/or <c>CreateAndInitialize</c> in preference to this API
/// in order to avoid latency spikes, and/or deferring discovery of connectivity or permission issues.
member _.CreateUninitialized() = factory.CreateUninitialized(discovery)

/// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.CreateAndInitializeAsync(containers, ct): Task<CosmosClient> = factory.CreateAndInitializeAsync(discovery, containers, ct)
/// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseAndContainerIds: struct (string * string)[]) =
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.CreateAndInitialize(databaseAndContainerIds: struct (string * string)[]) =
Async.call (fun ct -> x.CreateAndInitializeAsync(databaseAndContainerIds, ct))
/// Creates and validates a Client [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers within the (single) database
/// Creates and validates a CosmosClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers within the (single) database
member x.CreateAndInitialize(databaseId, containerIds: string[]) =
x.CreateAndInitialize[| for containerId in containerIds -> databaseId, containerId |]

/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member _.ConnectAsync(containers, ct): Task<CosmosStoreClient> = task {
let! cosmosClient = factory.CreateAndInitializeAsync(discovery, containers, ct)
return CosmosStoreClient(cosmosClient) }
/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseAndContainerIds: struct (string * string)[]) =
Async.call (fun ct -> x.ConnectAsync(databaseAndContainerIds, ct))
/// Creates and validates a CosmosStoreClient [including loading metadata](https://devblogs.microsoft.com/cosmosdb/improve-net-sdk-initialization) for the specified containers
member x.Connect(databaseId, containerIds: string[]) =
x.Connect[| for containerId in containerIds -> databaseId, containerId |]

/// Holds all relevant state for a Store within a given CosmosDB Database
/// - The CosmosDB CosmosClient (there should be a single one of these per process, plus an optional fallback one for pruning scenarios)
/// - The (singleton) per Container Stored Procedure initialization state
type CosmosStoreClient
and CosmosStoreClient
( client: CosmosClient,
// Client to use for fallback Containers. Default: use <c>client</c>
// Typically created via <c>CosmoStoreConnector.CreateAndInitialize</c>
[<O; D null>] ?archiveClient: CosmosClient,
// Admits a hook to enable customization of how <c>Equinox.CosmosStore</c> handles the low level interactions with the underlying Cosmos <c>Container</c>.
[<O; D null>] ?customize: Func<Container, Container>,
// Inhibit <c>CreateStoredProcedureIfNotExists</c> when a given Container is used for the first time
[<O; D null>] ?disableInitialization) =
let containerInitGuards = System.Collections.Concurrent.ConcurrentDictionary<struct (string * string), Initialization.ContainerInitializerGuard>()
member val CosmosClient = client
static member Connect(connector: CosmosStoreConnector, databaseId, containerIds: string[], [<O; D null>] ?customize, [<O; D null>] ?disableInitialization) = async {
let! cosmosClient = connector.Connect(databaseId, containerIds)
return CosmosStoreClient(cosmosClient, ?customize = customize, ?disableInitialization = disableInitialization) }
member private x.CreateContainer(client: CosmosClient, struct (databaseId, containerId)) =
let container = client.GetDatabase(databaseId).GetContainer(containerId)
match customize with Some f -> f.Invoke container | None -> container
Expand Down Expand Up @@ -1280,26 +1288,6 @@ type CosmosStoreContext(client: CosmosStoreClient, databaseId, containerId, tipO
| None, Some c -> Some (databaseId, c)
| Some d, c -> Some (d, defaultArg c containerId)
CosmosStoreContext(client, databaseId, containerId, tipOptions, queryOptions, ?archive = archive)
static member Connect(connector: CosmosStoreConnector, databaseId, containerId, tipMaxEvents,
[<O; D null>] ?otherContainersToInitialize,
(* CosmosStoreContext options *)
[<O; D null>] ?tipMaxJsonLength, [<O; D null>] ?ignoreMissingEvents,
[<O; D null>] ?queryMaxItems, [<O; D null>] ?queryMaxRequests,
[<O; D null>] ?archiveDatabaseId, [<O; D null>] ?archiveContainerId,
(* CosmosStoreClient options *)
[<O; D null>] ?customize, [<O; D null>] ?disableInitialization): Async<CosmosStoreContext> = async {
let mainDb c = struct (databaseId, c)
let ids = [| yield mainDb containerId
match archiveDatabaseId, archiveContainerId with
| None, Some c -> yield mainDb c
| Some d, Some c -> yield d, c
| _ -> ()
match otherContainersToInitialize with Some xs when not (isNull xs) -> yield! Seq.map mainDb xs | _ -> () |]
let! cosmosClient = connector.Connect(ids)
let client = CosmosStoreClient(cosmosClient, ?customize = customize, ?disableInitialization = disableInitialization)
return CosmosStoreContext(client, databaseId, containerId, tipMaxEvents, ?tipMaxJsonLength = tipMaxJsonLength,
?queryMaxItems = queryMaxItems, ?queryMaxRequests = queryMaxRequests,
?ignoreMissingEvents = ignoreMissingEvents, ?archiveContainerId = archiveContainerId) }
member val internal StoreClient =
let fallback = archive |> Option.map client.CreateFallbackContainer
StoreClient(containerGuard.Container, fallback, queryOptions, tipOptions)
Expand Down
14 changes: 7 additions & 7 deletions tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,17 @@ let createConnector (log: Serilog.ILogger) =
type DocStoreCollection() =
do ()

let createPrimaryContextIgnoreMissing connector containerId queryMaxItems tipMaxEvents ignoreMissing =
CosmosStoreContext.Connect(connector, databaseId, containerId, tipMaxEvents = tipMaxEvents, queryMaxItems = queryMaxItems, ignoreMissingEvents = ignoreMissing)
|> Async.RunSynchronously
let createPrimaryContextIgnoreMissing (connector: CosmosStoreConnector) containerId queryMaxItems tipMaxEvents ignoreMissing =
let client = connector.Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
CosmosStoreContext(client, databaseId, containerId, tipMaxEvents = tipMaxEvents, queryMaxItems = queryMaxItems, ignoreMissingEvents = ignoreMissing)

let defaultTipMaxEvents = 10
let createArchiveContext log queryMaxItems =
CosmosStoreContext.Connect(createConnector log, databaseId, containerId, defaultTipMaxEvents, queryMaxItems = queryMaxItems)
|> Async.RunSynchronously
let client = (createConnector log).Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
CosmosStoreContext(client, databaseId, containerId, defaultTipMaxEvents, queryMaxItems = queryMaxItems)
let createFallbackContext log queryMaxItems =
CosmosStoreContext.Connect(createConnector log, databaseId, containerId, defaultTipMaxEvents, queryMaxItems = queryMaxItems, archiveContainerId = archiveContainerId)
|> Async.RunSynchronously
let client = (createConnector log).Connect(databaseId, [| containerId |]) |> Async.RunSynchronously
CosmosStoreContext(client, databaseId, containerId, defaultTipMaxEvents, queryMaxItems = queryMaxItems, archiveContainerId = archiveContainerId)

type StoreContext = CosmosStoreContext
type StoreCategory<'E, 'S> = CosmosStoreCategory<'E, 'S, unit>
Expand Down

0 comments on commit 832d5e9

Please sign in to comment.