Skip to content

Commit

Permalink
Add connection pool metrics. Fixes #491
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Jun 14, 2023
1 parent f53882e commit d02539b
Show file tree
Hide file tree
Showing 15 changed files with 955 additions and 16 deletions.
61 changes: 60 additions & 1 deletion src/MySqlConnector/Core/ConnectionPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Net;
using System.Security.Authentication;
using Microsoft.Extensions.Logging;
Expand All @@ -19,6 +20,8 @@ internal sealed class ConnectionPool : IDisposable

public SslProtocols SslProtocols { get; set; }

public void AddPendingRequestCount(int delta) => s_pendingRequestsCounter.Add(delta, PoolNameTagList);

public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection, int startTickCount, Activity? activity, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Expand Down Expand Up @@ -50,12 +53,14 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
{
if (m_sessions.Count > 0)
{
// NOTE: s_connectionsUsageCounter updated outside lock below
session = m_sessions.First!.Value;
m_sessions.RemoveFirst();
}
}
if (session is not null)
{
s_connectionsUsageCounter.Add(-1, IdleStateTagList);
Log.FoundExistingSession(m_logger, Id);
bool reuseSession;

Expand Down Expand Up @@ -89,8 +94,12 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountPooled = m_leasedSessions.Count;
}
s_connectionsUsageCounter.Add(1, UsedStateTagList);
ActivitySourceHelper.CopyTags(session.ActivityTags, activity);
Log.ReturningPooledSession(m_logger, Id, session.Id, leasedSessionsCountPooled);

session.LastLeasedTicks = unchecked((uint) Environment.TickCount);
s_waitTimeHistory.Record(unchecked(session.LastLeasedTicks - (uint) startTickCount), PoolNameTagList);
return session;
}
}
Expand All @@ -105,7 +114,11 @@ public async ValueTask<ServerSession> GetSessionAsync(MySqlConnection connection
m_leasedSessions.Add(session.Id, session);
leasedSessionsCountNew = m_leasedSessions.Count;
}
s_connectionsUsageCounter.Add(1, UsedStateTagList);
Log.ReturningNewSession(m_logger, Id, session.Id, leasedSessionsCountNew);

session.LastLeasedTicks = unchecked((uint) Environment.TickCount);
s_createTimeHistory.Record(unchecked(session.LastLeasedTicks - (uint) startTickCount), PoolNameTagList);
return session;
}
catch (Exception ex)
Expand Down Expand Up @@ -157,12 +170,14 @@ public async ValueTask ReturnAsync(IOBehavior ioBehavior, ServerSession session)
{
lock (m_leasedSessions)
m_leasedSessions.Remove(session.Id);
s_connectionsUsageCounter.Add(-1, UsedStateTagList);
session.OwningConnection = null;
var sessionHealth = GetSessionHealth(session);
if (sessionHealth == 0)
{
lock (m_sessions)
m_sessions.AddFirst(session);
s_connectionsUsageCounter.Add(1, IdleStateTagList);
}
else
{
Expand Down Expand Up @@ -236,6 +251,10 @@ public void Dispose()
reaperWaitHandle.WaitOne();
}
#endif

s_minIdleConnectionsCounter.Add(-ConnectionSettings.MinimumPoolSize, PoolNameTagList);
s_maxIdleConnectionsCounter.Add(-ConnectionSettings.MaximumPoolSize, PoolNameTagList);
s_maxConnectionsCounter.Add(-ConnectionSettings.MaximumPoolSize, PoolNameTagList);
}

/// <summary>
Expand Down Expand Up @@ -319,12 +338,14 @@ private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<ServerSession, boo
{
if (m_sessions.Count > 0)
{
// NOTE: s_connectionsUsageCounter updated outside lock below
session = m_sessions.Last!.Value;
m_sessions.RemoveLast();
}
}
if (session is null)
return;
s_connectionsUsageCounter.Add(-1, IdleStateTagList);

if (shouldCleanFn(session))
{
Expand All @@ -337,6 +358,7 @@ private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<ServerSession, boo
// session should not be cleaned; put it back in the queue and stop iterating
lock (m_sessions)
m_sessions.AddLast(session);
s_connectionsUsageCounter.Add(1, IdleStateTagList);
return;
}
}
Expand Down Expand Up @@ -382,6 +404,7 @@ private async Task CreateMinimumPooledSessions(MySqlConnection connection, IOBeh
AdjustHostConnectionCount(session, 1);
lock (m_sessions)
m_sessions.AddFirst(session);
s_connectionsUsageCounter.Add(1, IdleStateTagList);
}
finally
{
Expand Down Expand Up @@ -587,8 +610,22 @@ private ConnectionPool(MySqlConnectorLoggingConfiguration loggingConfiguration,
cs.LoadBalance == MySqlLoadBalance.LeastConnections ? new LeastConnectionsLoadBalancer(m_hostSessions!) :
(ILoadBalancer) new RoundRobinLoadBalancer();

// create tag lists for reporting pool metrics
var connectionString = cs.ConnectionStringBuilder.GetConnectionString(includePassword: false);
m_stateTagList = new KeyValuePair<string, object?>[3]
{
new("state", "idle"),
new("pool.name", Name ?? connectionString),
new("state", "used"),
};

// set pool size counters
s_minIdleConnectionsCounter.Add(ConnectionSettings.MinimumPoolSize, PoolNameTagList);
s_maxIdleConnectionsCounter.Add(ConnectionSettings.MaximumPoolSize, PoolNameTagList);
s_maxConnectionsCounter.Add(ConnectionSettings.MaximumPoolSize, PoolNameTagList);

Id = Interlocked.Increment(ref s_poolId);
Log.CreatingNewConnectionPool(m_logger, Id, cs.ConnectionStringBuilder.GetConnectionString(includePassword: false));
Log.CreatingNewConnectionPool(m_logger, Id, connectionString);
}

private void StartReaperTask()
Expand Down Expand Up @@ -734,6 +771,13 @@ private void AdjustHostConnectionCount(ServerSession session, int delta)
}
}

// Provides a slice of m_stateTagList that contains either the 'idle' or 'used' state tag along with the pool name.
private ReadOnlySpan<KeyValuePair<string, object?>> IdleStateTagList => m_stateTagList.AsSpan(0, 2);
private ReadOnlySpan<KeyValuePair<string, object?>> UsedStateTagList => m_stateTagList.AsSpan(1, 2);

// A slice of m_stateTagList that contains only the pool name tag.
public ReadOnlySpan<KeyValuePair<string, object?>> PoolNameTagList => m_stateTagList.AsSpan(1, 1);

private sealed class LeastConnectionsLoadBalancer : ILoadBalancer
{
public LeastConnectionsLoadBalancer(Dictionary<string, int> hostSessions) => m_hostSessions = hostSessions;
Expand Down Expand Up @@ -768,6 +812,20 @@ static ConnectionPool()
private static void OnAppDomainShutDown(object? sender, EventArgs e) =>
ClearPoolsAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();

private static readonly UpDownCounter<int> s_connectionsUsageCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.usage",
unit: "{connection}", description: "The number of connections that are currently in the state described by the state tag.");
private static readonly UpDownCounter<int> s_maxIdleConnectionsCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.idle.max",
unit: "{connection}", description: "The maximum number of idle open connections allowed.");
private static readonly UpDownCounter<int> s_minIdleConnectionsCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.idle.min",
unit: "{connection}", description: "The minimum number of idle open connections allowed.");
private static readonly UpDownCounter<int> s_maxConnectionsCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.max",
unit: "{connection}", description: "The maximum number of open connections allowed.");
private static readonly UpDownCounter<int> s_pendingRequestsCounter = ActivitySourceHelper.Meter.CreateUpDownCounter<int>("db.client.connections.pending_requests",
unit: "{request}", description: "The number of pending requests for an open connection, cumulative for the entire pool.");
private static readonly Histogram<float> s_createTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.create_time",
unit: "ms", description: "The time it took to create a new connection.");
private static readonly Histogram<float> s_waitTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.wait_time",
unit: "ms", description: "The time it took to obtain an open connection from the pool.");
private static readonly ConcurrentDictionary<string, ConnectionPool?> s_pools = new();
private static readonly Action<ILogger, int, string, Exception?> s_createdNewSession = LoggerMessage.Define<int, string>(
LogLevel.Debug, new EventId(EventIds.PoolCreatedNewSession, nameof(EventIds.PoolCreatedNewSession)),
Expand All @@ -780,6 +838,7 @@ private static void OnAppDomainShutDown(object? sender, EventArgs e) =>

private readonly ILogger m_logger;
private readonly ILogger m_connectionLogger;
private readonly KeyValuePair<string, object?>[] m_stateTagList;
private readonly SemaphoreSlim m_cleanSemaphore;
private readonly SemaphoreSlim m_sessionSemaphore;
private readonly LinkedList<ServerSession> m_sessions;
Expand Down
10 changes: 9 additions & 1 deletion src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Buffers.Text;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Globalization;
using System.IO.Pipes;
using System.Net;
Expand Down Expand Up @@ -56,6 +57,7 @@ public ServerSession(ILogger logger, ConnectionPool? pool, int poolGeneration, i
public uint CreatedTicks { get; }
public ConnectionPool? Pool { get; }
public int PoolGeneration { get; }
public uint LastLeasedTicks { get; set; }
public uint LastReturnedTicks { get; private set; }
public string? DatabaseOverride { get; set; }
public string HostName { get; private set; }
Expand All @@ -75,7 +77,11 @@ public ValueTask ReturnToPoolAsync(IOBehavior ioBehavior, MySqlConnection? ownin
{
Log.ReturningToPool(m_logger, Id, Pool?.Id ?? 0);
LastReturnedTicks = unchecked((uint) Environment.TickCount);
return Pool is null ? default : Pool.ReturnAsync(ioBehavior, this);
if (Pool is null)
return default;
s_useTimeHistory.Record(unchecked(LastReturnedTicks - LastLeasedTicks), Pool.PoolNameTagList);
LastLeasedTicks = 0;
return Pool.ReturnAsync(ioBehavior, this);
}

public bool IsConnected
Expand Down Expand Up @@ -1953,6 +1959,8 @@ protected override void OnStatementBegin(int index)
[LoggerMessage(EventIds.ExpectedSessionState6, LogLevel.Error, "Session {SessionId} should have state {ExpectedState1} or {ExpectedState2} or {ExpectedState3} or {ExpectedState4} or {ExpectedState5} or {ExpectedState6} but was {SessionState}")]
private static partial void ExpectedSessionState6(ILogger logger, string sessionId, State expectedState1, State expectedState2, State expectedState3, State expectedState4, State expectedState5, State expectedState6, State sessionState);

private static readonly Histogram<float> s_useTimeHistory = ActivitySourceHelper.Meter.CreateHistogram<float>("db.client.connections.use_time",
unit: "ms", description: "The time between borrowing a connection and returning it to the pool.");
private static ReadOnlySpan<byte> BeginCertificateBytes => "-----BEGIN CERTIFICATE-----"u8;
private static readonly PayloadData s_setNamesUtf8NoAttributesPayload = QueryPayload.Create(false, "SET NAMES utf8;"u8);
private static readonly PayloadData s_setNamesUtf8mb4NoAttributesPayload = QueryPayload.Create(false, "SET NAMES utf8mb4;"u8);
Expand Down
6 changes: 4 additions & 2 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ private async ValueTask<bool> PingAsync(IOBehavior ioBehavior, CancellationToken

internal async Task OpenAsync(IOBehavior? ioBehavior, CancellationToken cancellationToken)
{
var openStartTickCount = Environment.TickCount;

VerifyNotDisposed();
cancellationToken.ThrowIfCancellationRequested();
if (State != ConnectionState.Closed)
Expand All @@ -398,8 +400,6 @@ internal async Task OpenAsync(IOBehavior? ioBehavior, CancellationToken cancella
using var activity = ActivitySourceHelper.StartActivity(ActivitySourceHelper.OpenActivityName);
try
{
var openStartTickCount = Environment.TickCount;

SetState(ConnectionState.Connecting);

var pool = m_dataSource?.Pool ??
Expand Down Expand Up @@ -896,6 +896,7 @@ internal void FinishQuerying(bool hasWarnings)

private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool, int startTickCount, Activity? activity, IOBehavior? ioBehavior, CancellationToken cancellationToken)
{
pool?.AddPendingRequestCount(1);
var connectionSettings = GetInitializedConnectionSettings();
var actualIOBehavior = ioBehavior ?? (connectionSettings.ForceSynchronous ? IOBehavior.Synchronous : IOBehavior.Asynchronous);

Expand Down Expand Up @@ -949,6 +950,7 @@ private async ValueTask<ServerSession> CreateSessionAsync(ConnectionPool? pool,
}
finally
{
pool?.AddPendingRequestCount(-1);
linkedSource?.Dispose();
timeoutSource?.Dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion src/MySqlConnector/MySqlConnector.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' != '.NETCoreApp' ">
<ItemGroup Condition=" '$(TargetFrameworkIdentifier)' != '.NETCoreApp' OR '$(TargetFramework)' == 'net6.0' ">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="7.0.0" />
</ItemGroup>

Expand Down
13 changes: 6 additions & 7 deletions src/MySqlConnector/Utilities/ActivitySourceHelper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Globalization;
using System.Reflection;

Expand Down Expand Up @@ -64,12 +65,10 @@ public static void CopyTags(IEnumerable<KeyValuePair<string, object?>> tags, Act
}
}

private static ActivitySource ActivitySource { get; } = CreateActivitySource();
public static Meter Meter { get; } = new("MySqlConnector", GetVersion());

private static ActivitySource CreateActivitySource()
{
var assembly = typeof(ActivitySourceHelper).Assembly;
var version = assembly.GetCustomAttribute<AssemblyFileVersionAttribute>()!.Version;
return new("MySqlConnector", version);
}
private static ActivitySource ActivitySource { get; } = new("MySqlConnector", GetVersion());

private static Version GetVersion() =>
typeof(ActivitySourceHelper).Assembly.GetCustomAttribute<AssemblyFileVersionAttribute>()!.Version;
}
Loading

0 comments on commit d02539b

Please sign in to comment.