Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres): remove lifetime from PgAdvisoryLockGuard #3495

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions sqlx-postgres/src/advisory_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hkdf::Hkdf;
use once_cell::sync::OnceCell;
use sha2::Sha256;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

/// A mutex-like type utilizing [Postgres advisory locks].
///
Expand Down Expand Up @@ -37,7 +38,7 @@ use std::ops::{Deref, DerefMut};
pub struct PgAdvisoryLock {
key: PgAdvisoryLockKey,
/// The query to execute to release this lock.
release_query: OnceCell<String>,
release_query: Arc<OnceCell<String>>,
}

/// A key type natively used by Postgres advisory locks.
Expand Down Expand Up @@ -77,8 +78,8 @@ pub enum PgAdvisoryLockKey {
///
/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
lock: &'lock PgAdvisoryLock,
pub struct PgAdvisoryLockGuard<C: AsMut<PgConnection>> {
lock: PgAdvisoryLock,
conn: Option<C>,
}

Expand Down Expand Up @@ -163,7 +164,7 @@ impl PgAdvisoryLock {
pub fn with_key(key: PgAdvisoryLockKey) -> Self {
Self {
key,
release_query: OnceCell::new(),
release_query: Arc::new(OnceCell::new()),
}
}

Expand Down Expand Up @@ -201,7 +202,7 @@ impl PgAdvisoryLock {
pub async fn acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<PgAdvisoryLockGuard<'_, C>> {
) -> Result<PgAdvisoryLockGuard<C>> {
match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query::query("SELECT pg_advisory_lock($1)")
Expand All @@ -218,7 +219,7 @@ impl PgAdvisoryLock {
}
}

Ok(PgAdvisoryLockGuard::new(self, conn))
Ok(PgAdvisoryLockGuard::new(self.clone(), conn))
}

/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
Expand All @@ -244,7 +245,7 @@ impl PgAdvisoryLock {
pub async fn try_acquire<C: AsMut<PgConnection>>(
&self,
mut conn: C,
) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
) -> Result<Either<PgAdvisoryLockGuard<C>, C>> {
let locked: bool = match &self.key {
PgAdvisoryLockKey::BigInt(key) => {
crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
Expand All @@ -262,7 +263,7 @@ impl PgAdvisoryLock {
};

if locked {
Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
Ok(Either::Left(PgAdvisoryLockGuard::new(self.clone(), conn)))
} else {
Ok(Either::Right(conn))
}
Expand Down Expand Up @@ -322,8 +323,8 @@ impl PgAdvisoryLockKey {

const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";

impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
impl<C: AsMut<PgConnection>> PgAdvisoryLockGuard<C> {
fn new(lock: PgAdvisoryLock, conn: C) -> Self {
PgAdvisoryLockGuard {
lock,
conn: Some(conn),
Expand Down Expand Up @@ -362,7 +363,7 @@ impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<C> {
type Target = PgConnection;

fn deref(&self) -> &Self::Target {
Expand All @@ -376,17 +377,13 @@ impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLo
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
for PgAdvisoryLockGuard<'lock, C>
{
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut for PgAdvisoryLockGuard<C> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
}

impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
for PgAdvisoryLockGuard<'lock, C>
{
impl<C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection> for PgAdvisoryLockGuard<C> {
fn as_ref(&self) -> &PgConnection {
self.conn.as_ref().expect(NONE_ERR).as_ref()
}
Expand All @@ -398,7 +395,7 @@ impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
/// guard attempts to release the lock.
impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<C> {
fn as_mut(&mut self) -> &mut PgConnection {
self.conn.as_mut().expect(NONE_ERR).as_mut()
}
Expand All @@ -407,7 +404,7 @@ impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<
/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
impl<C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<C> {
fn drop(&mut self) {
if let Some(mut conn) = self.conn.take() {
// Queue a simple query message to execute next time the connection is used.
Expand Down