Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: spawn task when opening new connection in acquire. #3516

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 61 additions & 16 deletions sqlx-core/src/pool/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::database::Database;
use crate::error::Error;
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
use crossbeam_queue::ArrayQueue;
use either::Either;

use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};

Expand Down Expand Up @@ -251,7 +252,7 @@ impl<DB: Database> PoolInner<DB> {
}

let acquire_started_at = Instant::now();
let deadline = acquire_started_at + self.options.acquire_timeout;
let connect_deadline = acquire_started_at + self.options.connect_timeout;

let acquired = crate::rt::timeout(
self.options.acquire_timeout,
Expand All @@ -260,21 +261,18 @@ impl<DB: Database> PoolInner<DB> {
// Handles the close-event internally
let permit = self.acquire_permit().await?;

// First attempt to pop a connection from the idle queue and check if we can
// use it.
let guard = match self.get_live_idle(permit).await {
// All good!
Ok(Either::Left(conn)) => return Ok(conn),

// First attempt to pop a connection from the idle queue.
let guard = match self.pop_idle(permit) {
// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => guard,

// Then, check that we can use it...
Ok(conn) => match check_idle_conn(conn, &self.options).await {

// All good!
Ok(live) => return Ok(live),

// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => guard,
},
Err(permit) => if let Ok(guard) = self.try_increment_size(permit) {
// we can open a new connection
Ok(Either::Right(permit)) => if let Ok(guard) = self.try_increment_size(permit) {
// we can open a new connection
guard
} else {
Expand All @@ -290,8 +288,19 @@ impl<DB: Database> PoolInner<DB> {
}
};

// Attempt to connect...
return self.connect(deadline, guard).await;

let pool = self.clone();
let pool2 = self.clone();

// Future that tries to get a live idle connection.
let idle_fut = std::pin::pin!(pool.get_idle_conn(connect_deadline));

// Future that tries to open a new connection.
let conn_fut = crate::rt::spawn(async move {
pool2.connect(connect_deadline, guard).await
});

return future::select(idle_fut, conn_fut).await.factor_first().0;
}
}
)
Expand Down Expand Up @@ -398,6 +407,24 @@ impl<DB: Database> PoolInner<DB> {
}
}

// Tries to get a live connection that was idle in a loop.
async fn get_idle_conn(
self: &Arc<Self>,
deadline: Instant,
) -> Result<Floating<DB, Live<DB>>, Error> {
let mut backoff = Duration::from_millis(10);
let max_backoff = deadline_as_timeout(deadline)? / 5;

loop {
let new_permit = self.acquire_permit().await?;
if let Ok(Either::Left(live)) = self.get_live_idle(new_permit).await {
return Ok(live);
};
crate::rt::sleep(backoff).await;
backoff = cmp::min(backoff * 2, max_backoff);
}
}

/// Try to maintain `min_connections`, returning any errors (including `PoolTimedOut`).
pub async fn try_min_connections(self: &Arc<Self>, deadline: Instant) -> Result<(), Error> {
while self.size() < self.options.min_connections {
Expand All @@ -422,6 +449,24 @@ impl<DB: Database> PoolInner<DB> {
Ok(())
}

// Tries to get a live idle connection.
async fn get_live_idle<'a>(
self: &'a Arc<Self>,
permit: AsyncSemaphoreReleaser<'a>,
) -> Result<Either<Floating<DB, Live<DB>>, AsyncSemaphoreReleaser<'a>>, DecrementSizeGuard<DB>>
{
match self.pop_idle(permit) {
Ok(conn) => match check_idle_conn(conn, &self.options).await {
// All good!
Ok(live) => Ok(Either::Left(live)),
// if the connection isn't usable for one reason or another,
// we get the `DecrementSizeGuard` back to open a new one
Err(guard) => Err(guard),
},
Err(permit) => Ok(Either::Right(permit)),
}
}

/// Attempt to maintain `min_connections`, logging if unable.
pub async fn min_connections_maintenance(self: &Arc<Self>, deadline: Option<Instant>) {
let deadline = deadline.unwrap_or_else(|| {
Expand Down
3 changes: 3 additions & 0 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub struct PoolOptions<DB: Database> {
pub(crate) acquire_slow_level: LevelFilter,
pub(crate) acquire_slow_threshold: Duration,
pub(crate) acquire_timeout: Duration,
pub(crate) connect_timeout: Duration,
pub(crate) min_connections: u32,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
Expand All @@ -102,6 +103,7 @@ impl<DB: Database> Clone for PoolOptions<DB> {
acquire_slow_threshold: self.acquire_slow_threshold,
acquire_slow_level: self.acquire_slow_level,
acquire_timeout: self.acquire_timeout,
connect_timeout: self.connect_timeout,
min_connections: self.min_connections,
max_lifetime: self.max_lifetime,
idle_timeout: self.idle_timeout,
Expand Down Expand Up @@ -158,6 +160,7 @@ impl<DB: Database> PoolOptions<DB> {
// to not flag typical time to add a new connection to a pool.
acquire_slow_threshold: Duration::from_secs(2),
acquire_timeout: Duration::from_secs(30),
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
fair: true,
Expand Down