Skip to content

Commit

Permalink
feat(postgres): remove lifetime from PgAdvisoryLockGuard
Browse files Browse the repository at this point in the history
Unlike with CPU synchronization primitives, there is no semantic requirement
that the guard borrows the lock object.

We preserve the optimization of memoizing the release query by wrapping it in
an Arc. This way all instances of `PgAdvisoryLockGuard` that originate from the
same lock will share a single instance of the release query.
  • Loading branch information
bonsairobo committed Sep 9, 2024
1 parent c597a22 commit 78754ac
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions sqlx-postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

/// A mutex-like type utilizing [Postgres advisory locks].
///
Expand Down Expand Up @@ -37,7 +38,7 @@ use std::ops::{Deref, DerefMut};
pub struct PgAdvisoryLock {
key: PgAdvisoryLockKey,
/// The query to execute to release this lock.
release_query: OnceCell<String>,
release_query: Arc<OnceCell<String>>,
}

/// A key type natively used by Postgres advisory locks.
Expand Down Expand Up @@ -77,8 +78,8 @@ pub enum PgAdvisoryLockKey {
///
/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
lock: &'lock PgAdvisoryLock,
pub struct PgAdvisoryLockGuard<C: AsMut<PgConnection>> {
lock: PgAdvisoryLock,
conn: Option<C>,
}

Expand Down Expand Up @@ -163,7 +164,7 @@ impl PgAdvisoryLock {
pub fn with_key(key: PgAdvisoryLockKey) -> Self {
Self {
key,
release_query: OnceCell::new(),
release_query: Arc::new(OnceCell::new()),
}
}

Expand Down Expand Up @@ -201,7 +202,7 @@ impl PgAdvisoryLock {
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<'_, C>> {
) -> Result<PgAdvisoryLockGuard<C>> {
match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
Expand All @@ -218,7 +219,7 @@ impl PgAdvisoryLock {
}
}

Ok(PgAdvisoryLockGuard::new(self, conn))
Ok(PgAdvisoryLockGuard::new(self.clone(), conn))
}

/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
Expand All @@ -244,7 +245,7 @@ impl PgAdvisoryLock {
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
) -> Result<Either<PgAdvisoryLockGuard<C>, C>> {
let locked: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
Expand All @@ -262,7 +263,7 @@ impl PgAdvisoryLock {
};

if locked {
Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
Ok(Either::Left(PgAdvisoryLockGuard::new(self.clone(), conn)))
} else {
Ok(Either::Right(conn))
}
Expand Down Expand Up @@ -322,8 +323,8 @@ impl PgAdvisoryLockKey {

const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";

impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
impl<C: AsMut<PgConnection>> PgAdvisoryLockGuard<C> {
fn new(lock: PgAdvisoryLock, conn: C) -> Self {
PgAdvisoryLockGuard {
lock,
conn: Some(conn),
Expand Down Expand Up @@ -362,7 +363,7 @@ impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<C> {
type Target = PgConnection;

fn deref(&self) -> &Self::Target {
Expand All @@ -376,17 +377,13 @@ impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLo
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
for PgAdvisoryLockGuard<'lock, C>
{
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut for PgAdvisoryLockGuard<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
for PgAdvisoryLockGuard<'lock, C>
{
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection> for PgAdvisoryLockGuard<C> {
fn as_ref(&self) -> &PgConnection {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
Expand All @@ -398,7 +395,7 @@ impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<C> {
fn as_mut(&mut self) -> &mut PgConnection {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
Expand All @@ -407,7 +404,7 @@ impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<
/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<C> {
fn drop(&mut self) {
if let Some(mut conn) = self.conn.take() {
// Queue a simple query message to execute next time the connection is used.
Expand Down

0 comments on commit 78754ac

Please sign in to comment.