From daed5dce7779462ecc1167fb8b6945658f641576 Mon Sep 17 00:00:00 2001 From: v0idpwn Date: Fri, 25 Aug 2023 00:50:36 +0300 Subject: [PATCH] Add archive_batch function to ext api --- core/sqlx-data.json | 47 ++++++++++++++++++++++++---------- core/src/pg_ext.rs | 19 +++++++++++++- core/src/query.rs | 26 ++++++++++++++++++- core/tests/integration_test.rs | 19 ++++++++++++-- src/lib.rs | 23 +++++++++++++---- 5 files changed, 112 insertions(+), 22 deletions(-) diff --git a/core/sqlx-data.json b/core/sqlx-data.json index 568fa382..16accea9 100644 --- a/core/sqlx-data.json +++ b/core/sqlx-data.json @@ -109,17 +109,7 @@ }, "query": "SELECT * from pgmq_create($1::text);" }, - "3a3440841fba7d0f8744f5873cdb93a64b6e2f6481c3b14af701c5189f5e73f0": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [] - } - }, - "query": "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;" - }, - "595dd830c53f4560378cfadcf8a1cc6f566307e3d76fea4c84e35e70649a7f17": { + "36b2ecea83484248d99c8d091f8d9b7fa6ce368d7db8fe14e3ab491eb52c723c": { "describe": { "columns": [ { @@ -134,11 +124,21 @@ "parameters": { "Left": [ "Text", - "Int8" + "Int8Array" ] } }, - "query": "SELECT * from pgmq_archive($1::text, $2)" + "query": "SELECT * from pgmq_archive($1::text, $2::bigint[])" + }, + "3a3440841fba7d0f8744f5873cdb93a64b6e2f6481c3b14af701c5189f5e73f0": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [] + } + }, + "query": "CREATE EXTENSION IF NOT EXISTS pgmq CASCADE;" }, "8e4f6635dc4cfb5ed42ddb87930b985793e81243b8a5609dec4ab566aaab4e9c": { "describe": { @@ -273,6 +273,27 @@ }, "query": "SELECT * from pgmq_read($1::text, $2, $3)" }, + "cb7072cc0f81a187953b6210a40d799b0ff301b73ad873f767c29fd383724a32": { + "describe": { + "columns": [ + { + "name": "pgmq_archive", + "ordinal": 0, + "type_info": "Bool" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + } + }, + "query": "SELECT * from pgmq_archive($1::text, $2::bigint)" + }, "cd7b28f6bd348038e7c26d89292f2445841e7c13143b453277a010390699d1fc": { "describe": { "columns": [ diff --git a/core/src/pg_ext.rs b/core/src/pg_ext.rs index e33d2a1b..0a00e998 100644 --- a/core/src/pg_ext.rs +++ b/core/src/pg_ext.rs @@ -252,7 +252,7 @@ impl PGMQueueExt { pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result { check_input(queue_name)?; let arch = sqlx::query!( - "SELECT * from pgmq_archive($1::text, $2)", + "SELECT * from pgmq_archive($1::text, $2::bigint)", queue_name, msg_id ) @@ -261,6 +261,23 @@ impl PGMQueueExt { Ok(arch.pgmq_archive.expect("no archive result")) } + /// Move a message to the archive table. + pub async fn archive_batch( + &self, + queue_name: &str, + msg_ids: &[i64], + ) -> Result { + check_input(queue_name)?; + let arch = sqlx::query!( + "SELECT * from pgmq_archive($1::text, $2::bigint[])", + queue_name, + msg_ids + ) + .fetch_one(&self.connection) + .await?; + Ok(arch.pgmq_archive.expect("no archive result")) + } + // Read and message and immediately delete it. pub async fn pop Deserialize<'de>>( &self, diff --git a/core/src/query.rs b/core/src/query.rs index d0727a2a..5b8105af 100644 --- a/core/src/query.rs +++ b/core/src/query.rs @@ -302,6 +302,30 @@ pub fn archive(name: &str, msg_id: i64) -> Result { )) } +pub fn archive_batch(name: &str, msg_ids: &[i64]) -> Result { + check_input(name)?; + let mut msg_id_list: String = "".to_owned(); + for msg_id in msg_ids.iter() { + let id_str = format!("{msg_id},"); + msg_id_list.push_str(&id_str) + } + // drop trailing comma from constructed string + msg_id_list.pop(); + + Ok(format!( + " + WITH archived AS ( + DELETE FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} + WHERE msg_id in ({msg_id_list}) + RETURNING msg_id, vt, read_ct, enqueued_at, message + ) + INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (msg_id, vt, read_ct, enqueued_at, message) + SELECT msg_id, vt, read_ct, enqueued_at, message + FROM archived; + " + )) +} + pub fn pop(name: &str) -> Result { check_input(name)?; Ok(format!( @@ -428,7 +452,7 @@ BEGIN WHERE has_table_privilege('pg_monitor', 'pgmq_meta', 'SELECT') ) THEN EXECUTE 'GRANT SELECT ON pgmq_meta TO pg_monitor'; - + END IF; END; $$ LANGUAGE plpgsql; diff --git a/core/tests/integration_test.rs b/core/tests/integration_test.rs index e4e43318..b196c85d 100644 --- a/core/tests/integration_test.rs +++ b/core/tests/integration_test.rs @@ -766,6 +766,7 @@ async fn test_set_vt() { #[tokio::test] async fn test_extension_api() { let test_queue = format!("test_ext_api_{}", rand::thread_rng().gen_range(0..100000)); + let test_queue_archive = format!("{}_archive", test_queue); let queue = init_queue_ext(&test_queue).await; let msg = MyMessage::default(); @@ -879,9 +880,23 @@ async fn test_extension_api() { .delete_batch(&test_queue, &[m1, m2, m3]) .await .expect("delete batch error"); - let rowcount = rowcount(&test_queue, &queue.connection).await; - assert_eq!(rowcount, 0); + let post_delete_rowcount = rowcount(&test_queue, &queue.connection).await; + assert_eq!(post_delete_rowcount, 0); assert_eq!(delete_result, true); + + // archive a batch of messages + let m4 = queue.send(&test_queue, &del_msg).await.unwrap(); + let m5 = queue.send(&test_queue, &del_msg).await.unwrap(); + let m6 = queue.send(&test_queue, &del_msg).await.unwrap(); + let archive_result = queue + .archive_batch(&test_queue, &[m4, m5, m6]) + .await + .expect("archive batch error"); + let post_archive_rowcount = rowcount(&test_queue, &queue.connection).await; + assert_eq!(post_archive_rowcount, 0); + assert_eq!(archive_result, true); + let post_archive_archive_rowcount = rowcount(&test_queue_archive, &queue.connection).await; + assert_eq!(post_archive_archive_rowcount, 4); } #[tokio::test] diff --git a/src/lib.rs b/src/lib.rs index a5d18053..73935035 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,8 @@ pub mod partition; use pgmq_crate::errors::PgmqError; use pgmq_crate::query::{ - archive, check_input, delete, delete_batch, init_queue, pop, read, PGMQ_SCHEMA, TABLE_PREFIX, + archive, archive_batch, check_input, delete, delete_batch, init_queue, pop, read, PGMQ_SCHEMA, + TABLE_PREFIX, }; use errors::PgmqExtError; @@ -217,18 +218,16 @@ fn pgmq_delete(queue_name: &str, msg_id: i64) -> Result, PgmqExtErr #[pg_extern(name = "pgmq_delete")] fn pgmq_delete_batch(queue_name: &str, msg_ids: Vec) -> Result, PgmqExtError> { - let mut num_deleted = 0; let query = delete_batch(queue_name, &msg_ids)?; Spi::connect(|mut client| { let tup_table = client.update(&query, None, None); match tup_table { - Ok(tup_table) => num_deleted = tup_table.len(), + Ok(_) => Ok(Some(true)), Err(e) => { error!("error deleting message: {}", e); } } - }); - Ok(Some(true)) + }) } /// archive a message forever instead of deleting it @@ -257,6 +256,20 @@ fn pgmq_archive(queue_name: &str, msg_id: i64) -> Result, PgmqExtEr } } +#[pg_extern(name = "pgmq_archive")] +fn pgmq_archive_batch(queue_name: &str, msg_ids: Vec) -> Result, PgmqExtError> { + let query = archive_batch(queue_name, &msg_ids)?; + Spi::connect(|mut client| { + let tup_table = client.update(&query, None, None); + match tup_table { + Ok(_) => Ok(Some(true)), + Err(e) => { + error!("error deleting message: {}", e); + } + } + }) +} + // reads and deletes at same time #[pg_extern] fn pgmq_pop(