Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection pool metrics #1329

Merged
merged 4 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions src/MySqlConnector/Core/ConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Net;
using System.Security.Authentication;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -50,12 +51,14 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
{
if (m_sessions.Count > 0)
{
// NOTE: MetricsReporter updated outside lock below
session = m_sessions.First!.Value;
m_sessions.RemoveFirst();
}
}
if (session is not null)
{
MetricsReporter.RemoveIdle(this);
Log.FoundExistingSession(m_logger, Id);
bool reuseSession;

Expand Down Expand Up @@ -96,8 +99,12 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountPooled = m_leasedSessions.Count;
}
MetricsReporter.AddUsed(this);
ActivitySourceHelper.CopyTags(session.ActivityTags, activity);
Log.ReturningPooledSession(m_logger, Id, session.Id, leasedSessionsCountPooled);

session.LastLeasedTicks = unchecked((uint) Environment.TickCount);
MetricsReporter.RecordWaitTime(this, unchecked(session.LastLeasedTicks - (uint) startTickCount));
return session;
}
}
Expand All @@ -112,7 +119,11 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountNew = m_leasedSessions.Count;
}
MetricsReporter.AddUsed(this);
Log.ReturningNewSession(m_logger, Id, session.Id, leasedSessionsCountNew);

session.LastLeasedTicks = unchecked((uint) Environment.TickCount);
MetricsReporter.RecordCreateTime(this, unchecked(session.LastLeasedTicks - (uint) startTickCount));
return session;
}
catch (Exception ex)
Expand Down Expand Up @@ -164,12 +175,14 @@ public async ValueTask ReturnAsync(IOBehavior ioBehavior, ServerSession session)
{
lock (m_leasedSessions)
m_leasedSessions.Remove(session.Id);
MetricsReporter.RemoveUsed(this);
session.OwningConnection = null;
var sessionHealth = GetSessionHealth(session);
if (sessionHealth == 0)
{
lock (m_sessions)
m_sessions.AddFirst(session);
MetricsReporter.AddIdle(this);
}
else
{
Expand Down Expand Up @@ -224,6 +237,8 @@ public async Task ReapAsync(IOBehavior ioBehavior, CancellationToken cancellatio
public void Dispose()
{
Log.DisposingConnectionPool(m_logger, Id);
lock (s_allPools)
s_allPools.Remove(this);
#if NET6_0_OR_GREATER
m_dnsCheckTimer?.Dispose();
m_dnsCheckTimer = null;
Expand Down Expand Up @@ -326,12 +341,14 @@ private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<ServerSession, boo
{
if (m_sessions.Count > 0)
{
// NOTE: MetricsReporter updated outside lock below
session = m_sessions.Last!.Value;
m_sessions.RemoveLast();
}
}
if (session is null)
return;
MetricsReporter.RemoveIdle(this);

if (shouldCleanFn(session))
{
Expand All @@ -344,6 +361,7 @@ private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<ServerSession, boo
// session should not be cleaned; put it back in the queue and stop iterating
lock (m_sessions)
m_sessions.AddLast(session);
MetricsReporter.AddIdle(this);
return;
}
}
Expand Down Expand Up @@ -389,6 +407,7 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh
AdjustHostConnectionCount(session, 1);
lock (m_sessions)
m_sessions.AddFirst(session);
MetricsReporter.AddIdle(this);
}
finally
{
Expand Down Expand Up @@ -546,17 +565,18 @@ private async ValueTask<ServerSession> ConnectSessionAsync(MySqlConnection conne
else if (pool != newPool)
{
Log.CreatedPoolWillNotBeUsed(newPool.m_logger, newPool.Id);
newPool.Dispose();
}

return pool;
}

public static async Task ClearPoolsAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
foreach (var pool in GetAllPools())
foreach (var pool in GetCachedPools())
await pool.ClearAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

static List<ConnectionPool> GetAllPools()
static List<ConnectionPool> GetCachedPools()
{
var pools = new List<ConnectionPool>(s_pools.Count);
var uniquePools = new HashSet<ConnectionPool>();
Expand Down Expand Up @@ -594,8 +614,19 @@ private ConnectionPool(MySqlConnectorLoggingConfiguration loggingConfiguration,
cs.LoadBalance == MySqlLoadBalance.LeastConnections ? new LeastConnectionsLoadBalancer(m_hostSessions!) :
(ILoadBalancer) new RoundRobinLoadBalancer();

// create tag lists for reporting pool metrics
var connectionString = cs.ConnectionStringBuilder.GetConnectionString(includePassword: false);
m_stateTagList =
[
new("state", "idle"),
new("pool.name", Name ?? connectionString),
new("state", "used"),
];

Id = Interlocked.Increment(ref s_poolId);
Log.CreatingNewConnectionPool(m_logger, Id, cs.ConnectionStringBuilder.GetConnectionString(includePassword: false));
lock (s_allPools)
s_allPools.Add(this);
Log.CreatingNewConnectionPool(m_logger, Id, connectionString);
}

private void StartReaperTask()
Expand Down Expand Up @@ -741,6 +772,19 @@ private void AdjustHostConnectionCount(ServerSession session, int delta)
}
}

// Provides a slice of m_stateTagList that contains either the 'idle' or 'used' state tag along with the pool name.
public ReadOnlySpan<KeyValuePair<string, object?>> IdleStateTagList => m_stateTagList.AsSpan(0, 2);
public ReadOnlySpan<KeyValuePair<string, object?>> UsedStateTagList => m_stateTagList.AsSpan(1, 2);

// A slice of m_stateTagList that contains only the pool name tag.
public ReadOnlySpan<KeyValuePair<string, object?>> PoolNameTagList => m_stateTagList.AsSpan(1, 1);

public static List<ConnectionPool> GetAllPools()
{
lock (s_allPools)
return new(s_allPools);
}

private sealed class LeastConnectionsLoadBalancer(Dictionary<string, int> hostSessions) : ILoadBalancer
{
public IReadOnlyList<string> LoadBalance(IReadOnlyList<string> hosts)
Expand All @@ -766,6 +810,7 @@ private static void OnAppDomainShutDown(object? sender, EventArgs e) =>
ClearPoolsAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

private static readonly ConcurrentDictionary<string, ConnectionPool?> s_pools = new();
private static readonly List<ConnectionPool> s_allPools = new();
private static readonly Action<ILogger, int, string, Exception?> s_createdNewSession = LoggerMessage.Define<int, string>(
LogLevel.Debug, new EventId(EventIds.PoolCreatedNewSession, nameof(EventIds.PoolCreatedNewSession)),
"Pool {PoolId} has no pooled session available; created new session {SessionId}");
Expand All @@ -777,6 +822,7 @@ private static void OnAppDomainShutDown(object? sender, EventArgs e) =>

private readonly ILogger m_logger;
private readonly ILogger m_connectionLogger;
private readonly KeyValuePair<string, object?>[] m_stateTagList;
private readonly SemaphoreSlim m_cleanSemaphore;
private readonly SemaphoreSlim m_sessionSemaphore;
private readonly LinkedList<ServerSession> m_sessions;
Expand Down
57 changes: 57 additions & 0 deletions src/MySqlConnector/Core/MetricsReporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using System.Diagnostics.Metrics;
using MySqlConnector.Utilities;

namespace MySqlConnector.Core;

internal static class MetricsReporter
{
public static void AddIdle(ConnectionPool pool) => s_connectionsUsageCounter.Add(1, pool.IdleStateTagList);
public static void RemoveIdle(ConnectionPool pool) => s_connectionsUsageCounter.Add(-1, pool.IdleStateTagList);
public static void AddUsed(ConnectionPool pool) => s_connectionsUsageCounter.Add(1, pool.UsedStateTagList);
public static void RemoveUsed(ConnectionPool pool) => s_connectionsUsageCounter.Add(-1, pool.UsedStateTagList);
public static void RecordCreateTime(ConnectionPool pool, uint ticks) => s_createTimeHistory.Record(ticks, pool.PoolNameTagList);
public static void RecordUseTime(ConnectionPool pool, uint ticks) => s_useTimeHistory.Record(ticks, pool.PoolNameTagList);
public static void RecordWaitTime(ConnectionPool pool, uint ticks) => s_waitTimeHistory.Record(ticks, pool.PoolNameTagList);

public static void AddPendingRequest(ConnectionPool? pool)
{
if (pool is not null)
s_pendingRequestsCounter.Add(1, pool.PoolNameTagList);
}

public static void RemovePendingRequest(ConnectionPool? pool)
{
if (pool is not null)
s_pendingRequestsCounter.Add(-1, pool.PoolNameTagList);
}

static MetricsReporter()
{
ActivitySourceHelper.Meter.CreateObservableUpDownCounter<int>("db.client.connections.idle.max",
observeValues: GetMaximumConnections, unit: "{connection}",
description: "The maximum number of idle open connections allowed; this corresponds to MaximumPoolSize in the connection string.");
ActivitySourceHelper.Meter.CreateObservableUpDownCounter<int>("db.client.connections.idle.min",
observeValues: GetMinimumConnections, unit: "{connection}",
description: "The minimum number of idle open connections allowed; this corresponds to MinimumPoolSize in the connection string.");
ActivitySourceHelper.Meter.CreateObservableUpDownCounter<int>("db.client.connections.max",
observeValues: GetMaximumConnections, unit: "{connection}",
description: "The maximum number of open connections allowed; this corresponds to MaximumPoolSize in the connection string.");

static IEnumerable<Measurement<int>> GetMaximumConnections() =>
ConnectionPool.GetAllPools().Select(x => new Measurement<int>(x.ConnectionSettings.MaximumPoolSize, x.PoolNameTagList));

static IEnumerable<Measurement<int>> GetMinimumConnections() =>
ConnectionPool.GetAllPools().Select(x => new Measurement<int>(x.ConnectionSettings.MinimumPoolSize, x.PoolNameTagList));
}

private static readonly UpDownCounter<int> s_connectionsUsageCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.usage",
unit: "{connection}", description: "The number of connections that are currently in the state described by the state tag.");
private static readonly UpDownCounter<int> s_pendingRequestsCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.pending_requests",
unit: "{request}", description: "The number of pending requests for an open connection, cumulative for the entire pool.");
private static readonly Histogram<float> s_createTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.create_time",
unit: "ms", description: "The time it took to create a new connection.");
private static readonly Histogram<float> s_useTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.use_time",
unit: "ms", description: "The time between borrowing a connection and returning it to the pool.");
private static readonly Histogram<float> s_waitTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.wait_time",
unit: "ms", description: "The time it took to obtain an open connection from the pool.");
}
8 changes: 7 additions & 1 deletion src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers.Text;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Globalization;
using System.IO.Pipes;
using System.Net;
Expand Down Expand Up @@ -56,6 +57,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
public uint CreatedTicks { get; }
public ConnectionPool? Pool { get; }
public int PoolGeneration { get; }
public uint LastLeasedTicks { get; set; }
public uint LastReturnedTicks { get; private set; }
public string? DatabaseOverride { get; set; }
public string HostName { get; private set; }
Expand All @@ -75,7 +77,11 @@ public ValueTask ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? ownin
{
Log.ReturningToPool(m_logger, Id, Pool?.Id ?? 0);
LastReturnedTicks = unchecked((uint) Environment.TickCount);
return Pool is null ? default : Pool.ReturnAsync(ioBehavior, this);
if (Pool is null)
return default;
MetricsReporter.RecordUseTime(Pool, unchecked(LastReturnedTicks - LastLeasedTicks));
LastLeasedTicks = 0;
return Pool.ReturnAsync(ioBehavior, this);
}

public bool IsConnected
Expand Down
6 changes: 4 additions & 2 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,8 @@ private async ValueTask<bool> PingAsync(IOBehavior ioBehavior, CancellationToken

internal async Task OpenAsync(IOBehavior? ioBehavior, CancellationToken cancellationToken)
{
var openStartTickCount = Environment.TickCount;

VerifyNotDisposed();
cancellationToken.ThrowIfCancellationRequested();
if (State != ConnectionState.Closed)
Expand All @@ -392,8 +394,6 @@ internal async Task OpenAsync(IOBehavior? ioBehavior, CancellationToken cancella
using var activity = ActivitySourceHelper.StartActivity(ActivitySourceHelper.OpenActivityName);
try
{
var openStartTickCount = Environment.TickCount;

SetState(ConnectionState.Connecting);

var pool = m_dataSource?.Pool ??
Expand Down Expand Up @@ -896,6 +896,7 @@ internal void FinishQuerying(bool hasWarnings)

private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool, int startTickCount, Activity? activity, IOBehavior? ioBehavior, CancellationToken cancellationToken)
{
MetricsReporter.AddPendingRequest(pool);
var connectionSettings = GetInitializedConnectionSettings();
var actualIOBehavior = ioBehavior ?? (connectionSettings.ForceSynchronous ? IOBehavior.Synchronous : IOBehavior.Asynchronous);

Expand Down Expand Up @@ -949,6 +950,7 @@ private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool,
}
finally
{
MetricsReporter.RemovePendingRequest(pool);
linkedSource?.Dispose();
timeoutSource?.Dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion src/MySqlConnector/MySqlConnector.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<PackageReference Include="System.Threading.Tasks.Extensions" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' != '.NETCoreApp' ">
<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' != '.NETCoreApp' OR '$(TargetFramework)' == 'net6.0' ">
<PackageReference Include="System.Diagnostics.DiagnosticSource" />
</ItemGroup>

Expand Down
13 changes: 6 additions & 7 deletions src/MySqlConnector/Utilities/ActivitySourceHelper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Globalization;
using System.Reflection;

Expand Down Expand Up @@ -55,12 +56,10 @@ public static void CopyTags(IEnumerable<KeyValuePair<string, object?>> tags, Act
}
}

private static ActivitySource ActivitySource { get; } = CreateActivitySource();
public static Meter Meter { get; } = new("MySqlConnector", GetVersion());

private static ActivitySource CreateActivitySource()
{
var assembly = typeof(ActivitySourceHelper).Assembly;
var version = assembly.GetCustomAttribute<AssemblyFileVersionAttribute>()!.Version;
return new("MySqlConnector", version);
}
private static ActivitySource ActivitySource { get; } = new("MySqlConnector", GetVersion());

private static string GetVersion() =>
typeof(ActivitySourceHelper).Assembly.GetCustomAttribute<AssemblyFileVersionAttribute>()!.Version;
}
2 changes: 1 addition & 1 deletion tests/MySqlConnector.Tests/ConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void PingWhenClosed()
[Fact]
public void ConnectionTimeout()
{
m_server.BlockOnConnect = true;
m_server.ConnectDelay = TimeSpan.FromSeconds(10);
var csb = new MySqlConnectionStringBuilder(m_csb.ConnectionString);
csb.ConnectionTimeout = 4;
using var connection = new MySqlConnection(csb.ConnectionString);
Expand Down
2 changes: 1 addition & 1 deletion tests/MySqlConnector.Tests/FakeMySqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void Stop()

public bool SuppressAuthPluginNameTerminatingNull { get; set; }
public bool SendIncompletePostHandshakeResponse { get; set; }
public bool BlockOnConnect { get; set; }
public TimeSpan? ConnectDelay { get; set; }
public TimeSpan? ResetDelay { get; set; }

internal void CancelQuery(int connectionId)
Expand Down
4 changes: 2 additions & 2 deletions tests/MySqlConnector.Tests/FakeMySqlServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public async Task RunAsync(TcpClient client, CancellationToken token)
using (client)
using (var stream = client.GetStream())
{
if (m_server.BlockOnConnect)
Thread.Sleep(TimeSpan.FromSeconds(10));
if (m_server.ConnectDelay is { } connectDelay)
await Task.Delay(connectDelay);

await SendAsync(stream, 0, WriteInitialHandshake);
await ReadPayloadAsync(stream, token); // handshake response
Expand Down
Loading