Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(MessageDb): Replace logging with complete OT metrics #407

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Removed

- Removed `netstandard2.1`; minimum target not `net6.0` [#407](https://github.com/jet/equinox/pull/407)
bartelink marked this conversation as resolved.
Show resolved Hide resolved
- Remove explicit `net461` handling; minimum target now `net6.0` / `FSharp.Core` v `6.0.0` [#310](https://github.com/jet/equinox/pull/310) [#323](https://github.com/jet/equinox/pull/323) [#354](https://github.com/jet/equinox/pull/354)
- Remove `Equinox.Core.ICache` (there is/was only one impl, and the interface has changed as part of [#386](https://github.com/jet/equinox/pull/386)) [#389](https://github.com/jet/equinox/pull/389)

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
bartelink marked this conversation as resolved.
Show resolved Hide resolved
</PropertyGroup>

<ItemGroup>
Expand Down
16 changes: 16 additions & 0 deletions samples/Store/Integration/AutoDataAttribute.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,22 @@

open System

open Domain
open FsCheck.FSharp
open FSharp.UMX

module Arb =
let generate<'t> = ArbMap.defaults |> ArbMap.generate<'t>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was asking for a blank L10 but this is consistent w L13 so nmm!
Also should name of helper be ArbMap?
Also should helper be called defaultGen
Also should we log a request for FsCheck to put this in the box ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


type FsCheckGenerators =
static member SkuId = Arb.generate |> Gen.map SkuId |> Arb.fromGen
static member ContactPreferencesId =
Arb.generate<Guid>
|> Gen.map (fun x -> sprintf "%[email protected]" (x.ToString("N")))
|> Gen.map ContactPreferences.ClientId
|> Arb.fromGen
static member RequestId = Arb.generate<Guid> |> Gen.map (fun x -> RequestId.parse %x) |> Arb.fromGen

type AutoDataAttribute() =
inherit FsCheck.Xunit.PropertyAttribute(Arbitrary = [|typeof<FsCheckGenerators>|], MaxTest = 1, QuietOnSuccess = true)

Expand Down
16 changes: 0 additions & 16 deletions samples/Store/Integration/Infrastructure.fs

This file was deleted.

1 change: 0 additions & 1 deletion samples/Store/Integration/Integration.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

<ItemGroup>
<Compile Include="TestOutput.fs" />
<Compile Include="Infrastructure.fs" />
<Compile Include="AutoDataAttribute.fs" />
<Compile Include="CodecIntegration.fs" />
<Compile Include="EventStoreIntegration.fs" />
Expand Down
2 changes: 1 addition & 1 deletion samples/TodoBackend/TodoBackend.fsproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
Expand Down
14 changes: 8 additions & 6 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@ type private CacheEntry<'state>(initialToken: StreamToken, initialState: 'state,
/// Coordinates having a max of one in-flight request across all staleness-tolerant loads at all times
// Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first!
member x.ReadThrough(maxAge: TimeSpan, isStale, load: Func<_, _>) = task {
let act = System.Diagnostics.Activity.Current
let setCacheHit hit = if act <> null then act.SetTag(Tags.cache_hit, hit) |> ignore
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
let age timestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - timestamp)
if act <> null then act.SetTag(Tags.cache_age, 1000. * age verifiedTimestamp) |> ignore
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't really matter but arguably this should be inside fetchStateConsistently
and probably calc age into a temporary (once) inline there

let isWithinMaxAge cachedValueTimestamp = age cachedValueTimestamp <= maxAge.TotalSeconds
let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp)
match lock x fetchStateConsistently with
| _, ValueSome cachedValue, true ->
return cachedValue
| _, ValueSome cachedValue, true -> setCacheHit true; return cachedValue
| ourInitialCellState, maybeBaseState, _ -> // If it's not good enough for us, trigger a request (though someone may have beaten us to that)

setCacheHit false
// Inspect/await any concurrent attempt to see if it is sufficient for our needs
match! ourInitialCellState.TryAwaitValid() with
| ValueSome (fetchCommencedTimestamp, res) when isWithinMaxAge fetchCommencedTimestamp -> return res
| _ ->

// .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid)
let newInstance = AsyncLazy(fun () -> load.Invoke maybeBaseState)
let _ = Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
Expand Down Expand Up @@ -81,13 +84,12 @@ type Cache private (inner: System.Runtime.Caching.MemoryCache) =
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member internal _.Load(key, maxAge, isStale, policy, loadOrReload, ct) = task {
let loadOrReload maybeBaseState = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload ct maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let act = System.Diagnostics.Activity.Current in act.SetTag(Tags.cache_hit, ValueOption.isSome maybeBaseState) |> ignore
let! timestamp, res = loadOrReload maybeBaseState
addOrMergeCacheEntry isStale key policy timestamp res
return res
Expand Down
8 changes: 2 additions & 6 deletions src/Equinox.Core/Category.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ type ICategory<'event, 'state, 'context> =
// Low level stream impl, used by Store-specific Category types that layer policies such as Caching in
namespace Equinox

open Equinox.Core.Tracing
open System.Diagnostics
open System.Threading
open System.Threading.Tasks

Expand All @@ -42,12 +40,10 @@ type Category<'event, 'state, 'context>
member _.LoadEmpty() =
empty
member _.Load(maxAge, requireLeader, ct) = task {
use act = source.StartActivity("Load", ActivityKind.Client)
if act <> null then act.AddStream(categoryName, streamId, streamName).AddLeader(requireLeader).AddStale(maxAge) |> ignore
Equinox.Core.Tracing.Load.setTags(categoryName, streamId, streamName, requireLeader, maxAge)
return! inner.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) }
member _.TrySync(attempt, (token, originState), events, ct) = task {
use act = source.StartActivity("TrySync", ActivityKind.Client)
if act <> null then act.AddStream(categoryName, streamId, streamName).AddSyncAttempt(attempt) |> ignore
Equinox.Core.Tracing.TrySync.setTags(attempt, events)
let log = if attempt = 1 then log else log.ForContext("attempts", attempt)
return! inner.TrySync(log, categoryName, streamId, streamName, context, init, token, originState, events, ct) } }

Expand Down
1 change: 0 additions & 1 deletion src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Tracing.fs" />
<Compile Include="Category.fs" />
<Compile Include="StopwatchInterval.fs" />
<Compile Include="Infrastructure.fs" />
Expand Down
40 changes: 0 additions & 40 deletions src/Equinox.Core/Tracing.fs

This file was deleted.

4 changes: 0 additions & 4 deletions src/Equinox.MessageDb/Equinox.MessageDb.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
<Compile Include="..\Equinox.Core\Infrastructure.fs">
<Link>Infrastructure.fs</Link>
</Compile>
<Compile Include="..\Equinox.Core\Internal.fs">
<Link>Internal.fs</Link>
</Compile>
<Compile Include="MessageDbClient.fs" />
<Compile Include="Tracing.fs" />
<Compile Include="MessageDb.fs" />
</ItemGroup>

Expand Down
Loading
Loading