Skip to content

Commit

Permalink
Add archive_batch function to ext api
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 24, 2023
1 parent 09071ca commit daed5dc
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 22 deletions.
47 changes: 34 additions & 13 deletions core/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,17 +109,7 @@
},
"query": "SELECT * from pgmq_create($1::text);"
},
"3a3440841fba7d0f8744f5873cdb93a64b6e2f6481c3b14af701c5189f5e73f0": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;"
},
"595dd830c53f4560378cfadcf8a1cc6f566307e3d76fea4c84e35e70649a7f17": {
"36b2ecea83484248d99c8d091f8d9b7fa6ce368d7db8fe14e3ab491eb52c723c": {
"describe": {
"columns": [
{
Expand All @@ -134,11 +124,21 @@
"parameters": {
"Left": [
"Text",
"Int8"
"Int8Array"
]
}
},
"query": "SELECT * from pgmq_archive($1::text, $2)"
"query": "SELECT * from pgmq_archive($1::text, $2::bigint[])"
},
"3a3440841fba7d0f8744f5873cdb93a64b6e2f6481c3b14af701c5189f5e73f0": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": []
}
},
"query": "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;"
},
"8e4f6635dc4cfb5ed42ddb87930b985793e81243b8a5609dec4ab566aaab4e9c": {
"describe": {
Expand Down Expand Up @@ -273,6 +273,27 @@
},
"query": "SELECT * from pgmq_read($1::text, $2, $3)"
},
"cb7072cc0f81a187953b6210a40d799b0ff301b73ad873f767c29fd383724a32": {
"describe": {
"columns": [
{
"name": "pgmq_archive",
"ordinal": 0,
"type_info": "Bool"
}
],
"nullable": [
null
],
"parameters": {
"Left": [
"Text",
"Int8"
]
}
},
"query": "SELECT * from pgmq_archive($1::text, $2::bigint)"
},
"cd7b28f6bd348038e7c26d89292f2445841e7c13143b453277a010390699d1fc": {
"describe": {
"columns": [
Expand Down
19 changes: 18 additions & 1 deletion core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl PGMQueueExt {
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<bool, PgmqError> {
check_input(queue_name)?;
let arch = sqlx::query!(
"SELECT * from pgmq_archive($1::text, $2)",
"SELECT * from pgmq_archive($1::text, $2::bigint)",
queue_name,
msg_id
)
Expand All @@ -261,6 +261,23 @@ impl PGMQueueExt {
Ok(arch.pgmq_archive.expect("no archive result"))
}

/// Move a message to the archive table.
pub async fn archive_batch(
&self,
queue_name: &str,
msg_ids: &[i64],
) -> Result<bool, PgmqError> {
check_input(queue_name)?;
let arch = sqlx::query!(
"SELECT * from pgmq_archive($1::text, $2::bigint[])",
queue_name,
msg_ids
)
.fetch_one(&self.connection)
.await?;
Ok(arch.pgmq_archive.expect("no archive result"))
}

// Read and message and immediately delete it.
pub async fn pop<T: for<'de> Deserialize<'de>>(
&self,
Expand Down
26 changes: 25 additions & 1 deletion core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,30 @@ pub fn archive(name: &str, msg_id: i64) -> Result<String, PgmqError> {
))
}

pub fn archive_batch(name: &str, msg_ids: &[i64]) -> Result<String, PgmqError> {
check_input(name)?;
let mut msg_id_list: String = "".to_owned();
for msg_id in msg_ids.iter() {
let id_str = format!("{msg_id},");
msg_id_list.push_str(&id_str)
}
// drop trailing comma from constructed string
msg_id_list.pop();

Ok(format!(
"
WITH archived AS (
DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE msg_id in ({msg_id_list})
RETURNING msg_id, vt, read_ct, enqueued_at, message
)
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message)
SELECT msg_id, vt, read_ct, enqueued_at, message
FROM archived;
"
))
}

pub fn pop(name: &str) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
Expand Down Expand Up @@ -428,7 +452,7 @@ BEGIN
WHERE has_table_privilege('pg_monitor', 'pgmq_meta', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON pgmq_meta TO pg_monitor';
END IF;
END;
$$ LANGUAGE plpgsql;
Expand Down
19 changes: 17 additions & 2 deletions core/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ async fn test_set_vt() {
#[tokio::test]
async fn test_extension_api() {
let test_queue = format!("test_ext_api_{}", rand::thread_rng().gen_range(0..100000));
let test_queue_archive = format!("{}_archive", test_queue);

let queue = init_queue_ext(&test_queue).await;
let msg = MyMessage::default();
Expand Down Expand Up @@ -879,9 +880,23 @@ async fn test_extension_api() {
.delete_batch(&test_queue, &[m1, m2, m3])
.await
.expect("delete batch error");
let rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(rowcount, 0);
let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_delete_rowcount, 0);
assert_eq!(delete_result, true);

// archive a batch of messages
let m4 = queue.send(&test_queue, &del_msg).await.unwrap();
let m5 = queue.send(&test_queue, &del_msg).await.unwrap();
let m6 = queue.send(&test_queue, &del_msg).await.unwrap();
let archive_result = queue
.archive_batch(&test_queue, &[m4, m5, m6])
.await
.expect("archive batch error");
let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await;
assert_eq!(post_archive_rowcount, 0);
assert_eq!(archive_result, true);
let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await;
assert_eq!(post_archive_archive_rowcount, 4);
}

#[tokio::test]
Expand Down
23 changes: 18 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ pub mod partition;

use pgmq_crate::errors::PgmqError;
use pgmq_crate::query::{
archive, check_input, delete, delete_batch, init_queue, pop, read, PGMQ_SCHEMA, TABLE_PREFIX,
archive, archive_batch, check_input, delete, delete_batch, init_queue, pop, read, PGMQ_SCHEMA,
TABLE_PREFIX,
};

use errors::PgmqExtError;
Expand Down Expand Up @@ -217,18 +218,16 @@ fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtErr

#[pg_extern(name = "pgmq_delete")]
fn pgmq_delete_batch(queue_name: &str, msg_ids: Vec<i64>) -> Result<Option<bool>, PgmqExtError> {
let mut num_deleted = 0;
let query = delete_batch(queue_name, &msg_ids)?;
Spi::connect(|mut client| {
let tup_table = client.update(&query, None, None);
match tup_table {
Ok(tup_table) => num_deleted = tup_table.len(),
Ok(_) => Ok(Some(true)),
Err(e) => {
error!("error deleting message: {}", e);
}
}
});
Ok(Some(true))
})
}

/// archive a message forever instead of deleting it
Expand Down Expand Up @@ -257,6 +256,20 @@ fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result<Option<bool>, PgmqExtEr
}
}

#[pg_extern(name = "pgmq_archive")]
fn pgmq_archive_batch(queue_name: &str, msg_ids: Vec<i64>) -> Result<Option<bool>, PgmqExtError> {
let query = archive_batch(queue_name, &msg_ids)?;
Spi::connect(|mut client| {
let tup_table = client.update(&query, None, None);
match tup_table {
Ok(_) => Ok(Some(true)),
Err(e) => {
error!("error deleting message: {}", e);
}
}
})
}

// reads and deletes at same time
#[pg_extern]
fn pgmq_pop(
Expand Down

0 comments on commit daed5dc

Please sign in to comment.