Skip to content

Commit

Permalink
Support batches on pgmq_delete
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 23, 2023
1 parent c4716c7 commit 7194fe4
Showing 1 changed file with 17 additions and 1 deletion.
18 changes: 17 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub mod partition;

use pgmq_crate::errors::PgmqError;
use pgmq_crate::query::{
archive, check_input, delete, init_queue, pop, read, PGMQ_SCHEMA, TABLE_PREFIX,
archive, check_input, delete, delete_batch, init_queue, pop, read, PGMQ_SCHEMA, TABLE_PREFIX,
};

use errors::PgmqExtError;
Expand Down Expand Up @@ -215,6 +215,22 @@ fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtErr
}
}

#[pg_extern(name = "pgmq_delete")]
fn pgmq_delete_batch(queue_name: &str, msg_ids: Vec<i64>) -> Result<Option<bool>, PgmqExtError> {
let mut num_deleted = 0;
let query = delete_batch(queue_name, &msg_ids)?;
Spi::connect(|mut client| {
let tup_table = client.update(&query, None, None);
match tup_table {
Ok(tup_table) => num_deleted = tup_table.len(),
Err(e) => {
error!("error deleting message: {}", e);
}
}
});
Ok(Some(true))
}

/// archive a message forever instead of deleting it
#[pg_extern]
fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtError> {
Expand Down

0 comments on commit 7194fe4

Please sign in to comment.