diff --git a/core/Cargo.toml b/core/Cargo.toml index e1212c6d..af912d5f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.16.1" +version = "0.17.0" edition = "2021" authors = ["Tembo.io"] description = "A distributed message queue for Rust applications, on Postgres." diff --git a/core/sqlx-data.json b/core/sqlx-data.json index 37dcb775..568fa382 100644 --- a/core/sqlx-data.json +++ b/core/sqlx-data.json @@ -1,5 +1,26 @@ { "db": "PostgreSQL", + "0c81d8ee07fac75c18d9bdebd9b2ac5e473b4ef43a5d332f5d58ae6a0e01e600": { + "describe": { + "columns": [ + { + "name": "pgmq_delete", + "ordinal": 0, + "type_info": "Bool" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Text", + "Int8" + ] + } + }, + "query": "SELECT * from pgmq_delete($1::text, $2::bigint)" + }, "0e13957cc5c1f96a07ec242ce393f0738d62d79d34182e3c9333e9f9712d4cbb": { "describe": { "columns": [ @@ -119,6 +140,27 @@ }, "query": "SELECT * from pgmq_archive($1::text, $2)" }, + "8e4f6635dc4cfb5ed42ddb87930b985793e81243b8a5609dec4ab566aaab4e9c": { + "describe": { + "columns": [ + { + "name": "pgmq_delete", + "ordinal": 0, + "type_info": "Bool" + } + ], + "nullable": [ + null + ], + "parameters": { + "Left": [ + "Text", + "Int8Array" + ] + } + }, + "query": "SELECT * from pgmq_delete($1::text, $2::bigint[])" + }, "9919286cee87946b387f69e67df94f94eb0acdd3b5f4848faf092c55d484b61a": { "describe": { "columns": [ @@ -185,27 +227,6 @@ }, "query": "SELECT * from pgmq_set_vt($1::text, $2::bigint, $3::integer);" }, - "c38b0cc48a6744bdb28ad1fa637223b6427eda19c0a52dc8881750d6a50ac0ef": { - "describe": { - "columns": [ - { - "name": "pgmq_delete", - "ordinal": 0, - "type_info": "Bool" - } - ], - "nullable": [ - null - ], - "parameters": { - "Left": [ - "Text", - "Int8" - ] - } - }, - "query": "SELECT * from pgmq_delete($1::text, $2)" - }, "c89ad5584222a1f9c0d505fe58bf035d68d033047176b106e289ad6842f49972": { "describe": { "columns": [ diff --git a/core/src/pg_ext.rs b/core/src/pg_ext.rs index c656bc5a..e33d2a1b 100644 --- a/core/src/pg_ext.rs +++ b/core/src/pg_ext.rs @@ -295,7 +295,7 @@ impl PGMQueueExt { // Delete a message by message id. pub async fn delete(&self, queue_name: &str, msg_id: i64) -> Result { let row = sqlx::query!( - "SELECT * from pgmq_delete($1::text, $2)", + "SELECT * from pgmq_delete($1::text, $2::bigint)", queue_name, msg_id ) @@ -304,5 +304,15 @@ impl PGMQueueExt { Ok(row.pgmq_delete.expect("no delete result")) } - // + // Delete with a slice of message ids + pub async fn delete_batch(&self, queue_name: &str, msg_id: &[i64]) -> Result { + let row = sqlx::query!( + "SELECT * from pgmq_delete($1::text, $2::bigint[])", + queue_name, + msg_id + ) + .fetch_one(&self.connection) + .await?; + Ok(row.pgmq_delete.expect("no delete result")) + } }