Skip to content

Commit

Permalink
Fix "Aborted connection" error when connection leaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Jun 29, 2017
1 parent dd9971e commit 3f75075
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 17 deletions.
34 changes: 18 additions & 16 deletions src/MySqlConnector/MySqlClient/ConnectionPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace MySql.Data.MySqlClient
{
internal sealed class ConnectionPool
{
public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
public async Task<MySqlSession> GetSessionAsync(MySqlConnection connection, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();

Expand Down Expand Up @@ -54,17 +54,19 @@ public async Task<MySqlSession> GetSessionAsync(IOBehavior ioBehavior, Cancellat
}

// pooled session is ready to be used; return it
session.OwningConnection = new WeakReference<MySqlConnection>(connection);
lock (m_leasedSessions)
m_leasedSessions.Add(session.Id, new WeakReference<MySqlSession>(session));
m_leasedSessions.Add(session.Id, session);
return session;
}
}

// create a new session
session = new MySqlSession(this, m_generation, Interlocked.Increment(ref m_lastId));
await session.ConnectAsync(m_connectionSettings, ioBehavior, cancellationToken).ConfigureAwait(false);
session.OwningConnection = new WeakReference<MySqlConnection>(connection);
lock (m_leasedSessions)
m_leasedSessions.Add(session.Id, new WeakReference<MySqlSession>(session));
m_leasedSessions.Add(session.Id, session);
return session;
}
catch
Expand Down Expand Up @@ -95,6 +97,7 @@ public void Return(MySqlSession session)
{
lock (m_leasedSessions)
m_leasedSessions.Remove(session.Id);
session.OwningConnection = null;
if (SessionIsHealthy(session))
lock (m_sessions)
m_sessions.AddFirst(session);
Expand All @@ -111,6 +114,7 @@ public async Task ClearAsync(IOBehavior ioBehavior, CancellationToken cancellati
{
// increment the generation of the connection pool
Interlocked.Increment(ref m_generation);
RecoverLeakedSessions();
await CleanPoolAsync(ioBehavior, session => session.PoolGeneration != m_generation, false, cancellationToken).ConfigureAwait(false);
}

Expand All @@ -123,27 +127,25 @@ public async Task ReapAsync(IOBehavior ioBehavior, CancellationToken cancellatio
}

/// <summary>
/// Examines all the <see cref="WeakReference{MySqlSession}"/> in <see cref="m_leasedSessions"/> to determine if any
/// <see cref="MySqlSession"/> objects have been garbage-collected. If so, assumes that the related <see cref="MySqlConnection"/>
/// was not properly disposed but the associated server connection has been closed (by the finalizer). Releases the semaphore
/// once for each leaked session to allow new client connections to be made.
/// Examines all the <see cref="MySqlSession"/> objects in <see cref="m_leasedSessions"/> to determine if any
/// have an owning <see cref="MySqlConnection"/> that has been garbage-collected. If so, assumes that the connection
/// was not properly disposed and returns the session to the pool.
/// </summary>
private void RecoverLeakedSessions()
{
var recoveredIds = new List<int>();
var recoveredSessions = new List<MySqlSession>();
lock (m_leasedSessions)
{
m_lastRecoveryTime = unchecked((uint) Environment.TickCount);
foreach (var pair in m_leasedSessions)
{
if (!pair.Value.TryGetTarget(out var _))
recoveredIds.Add(pair.Key);
var session = pair.Value;
if (!session.OwningConnection.TryGetTarget(out var _))
recoveredSessions.Add(session);
}
foreach (var id in recoveredIds)
m_leasedSessions.Remove(id);
}
if (recoveredIds.Count > 0)
m_sessionSemaphore.Release(recoveredIds.Count);
foreach (var session in recoveredSessions)
session.ReturnToPool();
}

private async Task CleanPoolAsync(IOBehavior ioBehavior, Func<MySqlSession, bool> shouldCleanFn, bool respectMinPoolSize, CancellationToken cancellationToken)
Expand Down Expand Up @@ -250,7 +252,7 @@ private ConnectionPool(ConnectionSettings cs)
m_cleanSemaphore = new SemaphoreSlim(1);
m_sessionSemaphore = new SemaphoreSlim(cs.MaximumPoolSize);
m_sessions = new LinkedList<MySqlSession>();
m_leasedSessions = new Dictionary<int, WeakReference<MySqlSession>>();
m_leasedSessions = new Dictionary<int, MySqlSession>();
}

static readonly ConcurrentDictionary<string, ConnectionPool> s_pools = new ConcurrentDictionary<string, ConnectionPool>();
Expand Down Expand Up @@ -280,7 +282,7 @@ private ConnectionPool(ConnectionSettings cs)
readonly SemaphoreSlim m_sessionSemaphore;
readonly LinkedList<MySqlSession> m_sessions;
readonly ConnectionSettings m_connectionSettings;
readonly Dictionary<int, WeakReference<MySqlSession>> m_leasedSessions;
readonly Dictionary<int, MySqlSession> m_leasedSessions;
int m_lastId;
uint m_lastRecoveryTime;
}
Expand Down
2 changes: 1 addition & 1 deletion src/MySqlConnector/MySqlClient/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private async Task<MySqlSession> CreateSessionAsync(IOBehavior ioBehavior, Cance
{
var pool = ConnectionPool.GetPool(m_connectionSettings);
// this returns an open session
return await pool.GetSessionAsync(ioBehavior, linkedSource.Token).ConfigureAwait(false);
return await pool.GetSessionAsync(this, ioBehavior, linkedSource.Token).ConfigureAwait(false);
}
else
{
Expand Down
1 change: 1 addition & 0 deletions src/MySqlConnector/Serialization/MySqlSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public MySqlSession(ConnectionPool pool, int poolGeneration, int id)
public DateTime LastReturnedUtc { get; private set; }
public string DatabaseOverride { get; set; }
public IPAddress IPAddress => (m_tcpClient?.Client.RemoteEndPoint as IPEndPoint)?.Address;
public WeakReference<MySqlConnection> OwningConnection { get; set; }

public void ReturnToPool()
{
Expand Down

0 comments on commit 3f75075

Please sign in to comment.