From 3f75075334713be37eae33f435ed1f896b1e1235 Mon Sep 17 00:00:00 2001 From: Bradley Grainger Date: Wed, 28 Jun 2017 21:42:54 -0700 Subject: [PATCH] Fix "Aborted connection" error when connection leaks. --- .../MySqlClient/ConnectionPool.cs | 34 ++++++++++--------- .../MySqlClient/MySqlConnection.cs | 2 +- .../Serialization/MySqlSession.cs | 1 + 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/MySqlConnector/MySqlClient/ConnectionPool.cs b/src/MySqlConnector/MySqlClient/ConnectionPool.cs index c578d4c14..8da4cdd14 100644 --- a/src/MySqlConnector/MySqlClient/ConnectionPool.cs +++ b/src/MySqlConnector/MySqlClient/ConnectionPool.cs @@ -10,7 +10,7 @@ namespace MySql.Data.MySqlClient { internal sealed class ConnectionPool { - public async Task GetSessionAsync(IOBehavior ioBehavior, CancellationToken cancellationToken) + public async Task GetSessionAsync(MySqlConnection connection, IOBehavior ioBehavior, CancellationToken cancellationToken) { cancellationToken.ThrowIfCancellationRequested(); @@ -54,8 +54,9 @@ public async Task GetSessionAsync(IOBehavior ioBehavior, Cancellat } // pooled session is ready to be used; return it + session.OwningConnection = new WeakReference(connection); lock (m_leasedSessions) - m_leasedSessions.Add(session.Id, new WeakReference(session)); + m_leasedSessions.Add(session.Id, session); return session; } } @@ -63,8 +64,9 @@ public async Task GetSessionAsync(IOBehavior ioBehavior, Cancellat // create a new session session = new MySqlSession(this, m_generation, Interlocked.Increment(ref m_lastId)); await session.ConnectAsync(m_connectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false); + session.OwningConnection = new WeakReference(connection); lock (m_leasedSessions) - m_leasedSessions.Add(session.Id, new WeakReference(session)); + m_leasedSessions.Add(session.Id, session); return session; } catch @@ -95,6 +97,7 @@ public void Return(MySqlSession session) { lock (m_leasedSessions) m_leasedSessions.Remove(session.Id); + session.OwningConnection = null; if (SessionIsHealthy(session)) lock (m_sessions) m_sessions.AddFirst(session); @@ -111,6 +114,7 @@ public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellati { // increment the generation of the connection pool Interlocked.Increment(ref m_generation); + RecoverLeakedSessions(); await CleanPoolAsync(ioBehavior, session => session.PoolGeneration != m_generation, false, cancellationToken).ConfigureAwait(false); } @@ -123,27 +127,25 @@ public async Task ReapAsync(IOBehavior ioBehavior, CancellationToken cancellatio } /// - /// Examines all the in to determine if any - /// objects have been garbage-collected. If so, assumes that the related - /// was not properly disposed but the associated server connection has been closed (by the finalizer). Releases the semaphore - /// once for each leaked session to allow new client connections to be made. + /// Examines all the objects in to determine if any + /// have an owning that has been garbage-collected. If so, assumes that the connection + /// was not properly disposed and returns the session to the pool. /// private void RecoverLeakedSessions() { - var recoveredIds = new List(); + var recoveredSessions = new List(); lock (m_leasedSessions) { m_lastRecoveryTime = unchecked((uint) Environment.TickCount); foreach (var pair in m_leasedSessions) { - if (!pair.Value.TryGetTarget(out var _)) - recoveredIds.Add(pair.Key); + var session = pair.Value; + if (!session.OwningConnection.TryGetTarget(out var _)) + recoveredSessions.Add(session); } - foreach (var id in recoveredIds) - m_leasedSessions.Remove(id); } - if (recoveredIds.Count > 0) - m_sessionSemaphore.Release(recoveredIds.Count); + foreach (var session in recoveredSessions) + session.ReturnToPool(); } private async Task CleanPoolAsync(IOBehavior ioBehavior, Func shouldCleanFn, bool respectMinPoolSize, CancellationToken cancellationToken) @@ -250,7 +252,7 @@ private ConnectionPool(ConnectionSettings cs) m_cleanSemaphore = new SemaphoreSlim(1); m_sessionSemaphore = new SemaphoreSlim(cs.MaximumPoolSize); m_sessions = new LinkedList(); - m_leasedSessions = new Dictionary>(); + m_leasedSessions = new Dictionary(); } static readonly ConcurrentDictionary s_pools = new ConcurrentDictionary(); @@ -280,7 +282,7 @@ private ConnectionPool(ConnectionSettings cs) readonly SemaphoreSlim m_sessionSemaphore; readonly LinkedList m_sessions; readonly ConnectionSettings m_connectionSettings; - readonly Dictionary> m_leasedSessions; + readonly Dictionary m_leasedSessions; int m_lastId; uint m_lastRecoveryTime; } diff --git a/src/MySqlConnector/MySqlClient/MySqlConnection.cs b/src/MySqlConnector/MySqlClient/MySqlConnection.cs index e4a3caa3c..30798e613 100644 --- a/src/MySqlConnector/MySqlClient/MySqlConnection.cs +++ b/src/MySqlConnector/MySqlClient/MySqlConnection.cs @@ -311,7 +311,7 @@ private async Task CreateSessionAsync(IOBehavior ioBehavior, Cance { var pool = ConnectionPool.GetPool(m_connectionSettings); // this returns an open session - return await pool.GetSessionAsync(ioBehavior, linkedSource.Token).ConfigureAwait(false); + return await pool.GetSessionAsync(this, ioBehavior, linkedSource.Token).ConfigureAwait(false); } else { diff --git a/src/MySqlConnector/Serialization/MySqlSession.cs b/src/MySqlConnector/Serialization/MySqlSession.cs index 17ed8066b..39b510ccc 100644 --- a/src/MySqlConnector/Serialization/MySqlSession.cs +++ b/src/MySqlConnector/Serialization/MySqlSession.cs @@ -41,6 +41,7 @@ public MySqlSession(ConnectionPool pool, int poolGeneration, int id) public DateTime LastReturnedUtc { get; private set; } public string DatabaseOverride { get; set; } public IPAddress IPAddress => (m_tcpClient?.Client.RemoteEndPoint as IPEndPoint)?.Address; + public WeakReference OwningConnection { get; set; } public void ReturnToPool() {