Skip to content

Commit

Permalink
Close connection pool even if some connections are leased/in creation (
Browse files Browse the repository at this point in the history
…#86)

Currently closing a pool, that has leased connections or currently creates connections fails. Such a close attempt brings the pool in an unrecoverable closing state and may lead to crashes. This patch changes the pool shutdown behavior to allow closing the pool even if the pool is not a predefined state.
  • Loading branch information
fabianfett authored Aug 4, 2023
1 parent 2d626c8 commit 30a43b0
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 60 deletions.
157 changes: 98 additions & 59 deletions Sources/RediStack/ConnectionPool/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ internal final class ConnectionPool {
}

/// Deactivates this connection pool. Once this is called, no further connections can be obtained
/// from the pool. Leased connections are not deactivated and can continue to be used.
/// from the pool. Leased connections are not deactivated and can continue to be used. All waiters
/// are failed with a pool is closed error.
func close(promise: EventLoopPromise<Void>? = nil, logger: Logger) {
if self.loop.inEventLoop {
self.closePool(promise: promise, logger: logger)
Expand Down Expand Up @@ -228,7 +229,7 @@ extension ConnectionPool {
switch self.state {
case .closing:
// We don't want this anymore, drop it.
_ = connection.close()
self.closeConnectionForShutdown(connection)
case .closed:
// This is programmer error, we shouldn't have entered this state.
logger.critical("new connection created on a closed pool", metadata: [
Expand All @@ -250,10 +251,21 @@ extension ConnectionPool {
RedisLogging.MetadataKeys.error: "\(error)"
])

guard case .active = self.state else {
// No point continuing connection creation if we're not active.
logger.trace("not creating new connections due to inactivity")
switch self.state {
case .active:
break // continue further down

case .closing(let remaining, let promise):
if remaining == 1 {
self.state = .closed
promise?.succeed()
} else {
self.state = .closing(remaining: remaining - 1, promise)
}
return

case .closed:
preconditionFailure("Invalid state: \(self.state)")
}

// Ok, we're still active. Before we do anything, we want to check whether anyone is still waiting
Expand Down Expand Up @@ -315,39 +327,42 @@ extension ConnectionPool {
private func closePool(promise: EventLoopPromise<Void>?, logger: Logger) {
self.loop.preconditionInEventLoop()

// Pool closure must be monotonic.
guard case .active = self.state else {
logger.info("received duplicate request to close connection pool")
promise?.succeed(())
return
}
switch self.state {
case .active:
self.state = .closing(remaining: self.activeConnectionCount, promise)

self.state = .closing
case .closing(let count, let existingPromise):
if let existingPromise = existingPromise {
existingPromise.futureResult.cascade(to: promise)
} else {
self.state = .closing(remaining: count, promise)
}
return

// To close the pool we need to drop all active connections.
let connections = self.availableConnections
self.availableConnections = []
let closeFutures = connections.map { $0.close() }
case .closed:
promise?.succeed()
return
}

// We also cancel all pending leases.
while let pendingLease = self.connectionWaiters.popFirst() {
pendingLease.fail(RedisConnectionPoolError.poolClosed)
}

guard self.activeConnectionCount == 0 else {
logger.debug("not closing pool, waiting for all connections to be returned", metadata: [
RedisLogging.MetadataKeys.poolConnectionCount: "\(self.activeConnectionCount)"
])
promise?.fail(RedisConnectionPoolError.poolHasActiveConnections)
if self.activeConnectionCount == 0 {
// That was all the connections, so this is now closed.
logger.trace("pool is now closed")
self.state = .closed
promise?.succeed()
return
}

// That was all the connections, so this is now closed.
logger.trace("pool is now closed")
self.state = .closed
EventLoopFuture<Void>
.andAllSucceed(closeFutures, on: self.loop)
.cascade(to: promise)
// To close the pool we need to drop all active connections.
let connections = self.availableConnections
self.availableConnections = []
for connection in connections {
self.closeConnectionForShutdown(connection)
}
}

/// This is the on-thread implementation for leasing connections out to users. Here we work out how to get a new
Expand Down Expand Up @@ -401,51 +416,66 @@ extension ConnectionPool {
private func _returnLeasedConnection(_ connection: RedisConnection, logger: Logger) {
self.loop.assertInEventLoop()
self.leasedConnectionCount -= 1
self._returnConnection(connection, logger: logger)

switch self.state {
case .active:
self._returnConnection(connection, logger: logger)

case .closing:
return self.closeConnectionForShutdown(connection)

case .closed:
preconditionFailure("Invalid state: \(self.state)")
}
}

/// This is the on-thread implementation for returning connections to the pool. Here we work out what to do with a newly-acquired
/// connection.
private func _returnConnection(_ connection: RedisConnection, logger: Logger) {
self.loop.assertInEventLoop()
precondition(self.state.isActive)

guard connection.isConnected else {
// This connection isn't active anymore. We'll dump it and potentially kick off a reconnection.
self.refillConnections(logger: logger)
return
}

switch self.state {
case .active:
// If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room
// in the pool, we'll put it there. Otherwise, we'll close it.
if let waiter = self.connectionWaiters.popFirst() {
self.leaseConnection(connection, to: waiter)
} else if self.canAddConnectionToPool {
self.availableConnections.append(connection)
} else if let evictable = self.availableConnections.popFirst() {
// We have at least one pooled connection. The returned is more recently active, so kick out the pooled
// connection in favour of this one and close the recently evicted one.
self.availableConnections.append(connection)
_ = evictable.close()
} else {
// We don't need it, close it.
_ = connection.close()
}
case .closed:
// In general we shouldn't see leased connections return in .closed, as we should only be able to
// transition to .closed when all the leases are back. We tolerate this in production builds by just closing the
// connection, but in debug builds we assert to be sure.
logger.warning("connection returned to closed pool", metadata: [
RedisLogging.MetadataKeys.connectionID: "\(connection.id)"
])
assertionFailure("Returned connection to closed pool")
fallthrough
case .closing:
// We don't need this connection, close it.
// If anyone is waiting for a connection, let's give them this one. Otherwise, if there's room
// in the pool, we'll put it there. Otherwise, we'll close it.
if let waiter = self.connectionWaiters.popFirst() {
self.leaseConnection(connection, to: waiter)
} else if self.canAddConnectionToPool {
self.availableConnections.append(connection)
} else if let evictable = self.availableConnections.popFirst() {
// We have at least one pooled connection. The returned is more recently active, so kick out the pooled
// connection in favour of this one and close the recently evicted one.
self.availableConnections.append(connection)
_ = evictable.close()
} else {
// We don't need it, close it.
_ = connection.close()
guard self.leasedConnectionCount == 0 else { return }
self.state = .closed
}
}

private func closeConnectionForShutdown(_ connection: RedisConnection) {
connection.close().whenComplete { _ in
self.loop.preconditionInEventLoop()

switch self.state {
case .closing(let remaining, let promise):
if remaining == 1 {
self.state = .closed
promise?.succeed()
} else {
self.state = .closing(remaining: remaining - 1, promise)
}

case .closed, .active:
// The state must not change if we are closing a connection, while we are
// closing the pool.
preconditionFailure("Invalid state: \(self.state)")
}
}
}
}
Expand All @@ -457,10 +487,19 @@ extension ConnectionPool {

/// The user has requested the connection pool to close, but there are still active connections leased to users
/// and in the pool.
case closing
case closing(remaining: Int, EventLoopPromise<Void>?)

/// The connection pool is closed: no connections are outstanding
case closed

var isActive: Bool {
switch self {
case .active:
return true
case .closing, .closed:
return false
}
}
}
}

Expand Down
1 change: 0 additions & 1 deletion Sources/RediStack/ConnectionPool/RedisConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ extension RedisConnectionPool {
/// Closes all connections in the pool and deactivates the pool from creating new connections.
///
/// This method is safe to call multiple times.
/// - Important: If the pool has connections in active use, the close process will not complete.
/// - Parameters:
/// - promise: A notification promise to resolve once the close process has completed.
/// - logger: An optional logger to use for any log statements generated while closing the pool.
Expand Down
89 changes: 89 additions & 0 deletions Tests/RediStackTests/ConnectionPoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,95 @@ final class ConnectionPoolTests: XCTestCase {
connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason)
}

func testShutdownPoolThatHasConnectionThatFailsInCreation() {
var connectionPromise: EventLoopPromise<RedisConnection>? = nil
let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in
XCTAssertTrue(loop === self.server.loop)
connectionPromise = self.server.loop.makePromise()
return connectionPromise!.futureResult
}
pool.activate()
XCTAssertNoThrow(try self.server.runWhileActive())
XCTAssertEqual(self.server.channels.count, 0)
XCTAssertNotNil(connectionPromise)

var failedLastConnection = false
let closePromise = self.server.loop.makePromise(of: Void.self)

pool.close(promise: closePromise)
closePromise.futureResult.whenComplete { _ in
XCTAssertTrue(failedLastConnection)
}

failedLastConnection = true
connectionPromise?.fail(ConnectionPoolTestError.connectionFailedForSomeReason)
XCTAssertNoThrow(try closePromise.futureResult.wait())
}

func testShutdownPoolThatHasConnectionThatSucceedsInCreation() {
var connectionPromise: EventLoopPromise<RedisConnection>? = nil
let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in
XCTAssertTrue(loop === self.server.loop)
connectionPromise = self.server.loop.makePromise()
return connectionPromise!.futureResult
}
pool.activate()
XCTAssertNoThrow(try self.server.runWhileActive())
XCTAssertEqual(self.server.channels.count, 0)
XCTAssertNotNil(connectionPromise)

var lastConnectionCreated = false
let closePromise = self.server.loop.makePromise(of: Void.self)

pool.close(promise: closePromise)
closePromise.futureResult.whenComplete { _ in
XCTAssertTrue(lastConnectionCreated)
}

let singleConnection = self.createAConnection()
lastConnectionCreated = true
connectionPromise?.succeed(singleConnection)
self.server.loop.run()
XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.")
XCTAssertNoThrow(try closePromise.futureResult.wait())
}

func testShutdownPoolThatHasLeasedConnection() {
var connectionPromise: EventLoopPromise<RedisConnection>? = nil
let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 1, leaky: false) { loop in
XCTAssertTrue(loop === self.server.loop)
connectionPromise = self.server.loop.makePromise()
return connectionPromise!.futureResult
}
pool.activate()
XCTAssertNoThrow(try self.server.runWhileActive())
XCTAssertEqual(self.server.channels.count, 0)
XCTAssertNotNil(connectionPromise)

var leasedConnectionReturned = false
let closePromise = self.server.loop.makePromise(of: Void.self)

let singleConnection = self.createAConnection()
connectionPromise?.succeed(singleConnection)

var leasedConnection: RedisConnection?
XCTAssertNoThrow(leasedConnection = try pool.leaseConnection(deadline: .distantFuture).wait())
XCTAssert(singleConnection === leasedConnection)

pool.close(promise: closePromise)
closePromise.futureResult.whenComplete { _ in
XCTAssertTrue(leasedConnectionReturned)
}

leasedConnectionReturned = true
pool.returnConnection(singleConnection)

self.server.loop.run()
XCTAssertNoThrow(try singleConnection.channel.closeFuture.wait(), "New connection is closed right away.")
XCTAssertNoThrow(try closePromise.futureResult.wait())
}


func testNonLeakyBucketWillKeepConnectingIfThereIsSpaceAndWaiters() throws {
var connectionPromise: EventLoopPromise<RedisConnection>? = nil
let pool = self.createPool(maximumConnectionCount: 1, minimumConnectionCount: 0, leaky: false) { loop in
Expand Down

0 comments on commit 30a43b0

Please sign in to comment.