From 2f0fe412bcf558f74530129325aa45ecc7cda78a Mon Sep 17 00:00:00 2001 From: Adam Hendel Date: Tue, 8 Aug 2023 18:39:47 -0500 Subject: [PATCH] pg_partman as optional dependency (#18) * bring your own pg connection * bump ver * pg_partman is optional * update err statement * update readme * Update integration_test.rs * fix test * fix some clippy warnings --- Cargo.lock | 57 ++++++++++++++++++++------------------ Cargo.toml | 2 +- README.md | 9 +++--- pgmq.control | 3 +- src/api.rs | 2 +- src/lib.rs | 30 ++++++++++++++++++-- src/metrics.rs | 6 ++-- src/partition.rs | 12 ++++++++ tests/integration_tests.rs | 3 +- 9 files changed, 82 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee1b848d..5511a41e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,9 +236,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.79" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" +dependencies = [ + "libc", +] [[package]] name = "cexpr" @@ -524,18 +527,18 @@ dependencies = [ [[package]] name = "enum-map" -version = "2.6.0" +version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "017b207acb4cc917f4c31758ed95c0bc63ddb0f358b22eb38f80a2b2a43f6b1f" +checksum = "9705d8de4776df900a4a0b2384f8b0ab42f775e93b083b42f8ce71bdc32a47e3" dependencies = [ "enum-map-derive", ] [[package]] name = "enum-map-derive" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8560b409800a72d2d7860f8e5f4e0b0bd22bea6a352ea2a9ce30ccdef7f16d2f" +checksum = "ccb14d927583dd5c2eac0f2cf264fc4762aefe1ae14c47a8a20fc1939d3a5fc0" dependencies = [ "proc-macro2", "quote", @@ -1088,9 +1091,9 @@ checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "openssl" -version = "0.10.55" +version = "0.10.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345df152bc43501c5eb9e4654ff05f794effb78d4efe3d53abc158baddc0703d" +checksum = "729b745ad4a5575dd06a3e1af1414bd330ee561c01b3899eb584baeaa8def17e" dependencies = [ "bitflags 1.3.2", "cfg-if", @@ -1120,9 +1123,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-sys" -version = "0.9.90" +version = "0.9.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "374533b0e45f3a7ced10fcaeccca020e66656bc03dac384f852e4e5a7a8104a6" +checksum = "866b5f16f90776b9bb8dc1e1802ac6f0513de3a7a7465867bfbc563dc737faac" dependencies = [ "cc", "libc", @@ -1240,7 +1243,7 @@ dependencies = [ [[package]] name = "pgmq" -version = "0.10.2" +version = "0.11.0" dependencies = [ "chrono", "pgmq 0.14.2", @@ -1404,9 +1407,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c40d25201921e5ff0c862a505c6557ea88568a4e3ace775ab55e93f2f4f9d57" +checksum = "2c516611246607d0c04186886dbb3a754368ef82c79e9827a802c6d836dd111c" [[package]] name = "pin-utils" @@ -1576,9 +1579,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.1" +version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2eae68fc220f7cf2532e4494aded17545fce192d59cd996e0fe7887f4ceb575" +checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ "aho-corasick", "memchr", @@ -1588,9 +1591,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.4" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7b6d6190b7594385f61bd3911cd1be99dfddcfc365a4160cc2ab5bff4aed294" +checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ "aho-corasick", "memchr", @@ -1635,9 +1638,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "172891ebdceb05aa0005f533a6cbfca599ddd7d966f6f5d4d9b2e70478e70399" dependencies = [ "bitflags 2.3.3", "errno", @@ -1728,9 +1731,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea67f183f058fe88a4e3ec6e2788e003840893b91bac4559cabedd00863b3ed" +checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" dependencies = [ "serde_derive", ] @@ -1747,9 +1750,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.180" +version = "1.0.183" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24e744d7782b686ab3b73267ef05697159cc0e5abbed3f47f9933165e5219036" +checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", @@ -2033,9 +2036,9 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.7.0" +version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" +checksum = "dc02fddf48964c42031a0b3fe0428320ecf3a73c401040fc0096f97794310651" dependencies = [ "cfg-if", "fastrand", @@ -2500,9 +2503,9 @@ checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.5.2" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8bd122eb777186e60c3fdf765a58ac76e41c582f1f535fbf3314434c6b58f3f7" +checksum = "acaaa1190073b2b101e15083c38ee8ec891b5e05cbee516521e94ec008f61e64" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 3996c22a..eca45b9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pgmq" -version = "0.10.2" +version = "0.11.0" edition = "2021" authors = ["Tembo.io"] description = "Postgres extension for PGMQ" diff --git a/README.md b/README.md index b28d8b3e..3e9007ac 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,6 @@ A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/s - API parity with [AWS SQS](https://aws.amazon.com/sqs/) and [RSMQ](https://github.com/smrchy/rsmq) - Messages stay in the queue until explicitly deleted - Messages can be archived, instead of deleted, for long-term retention and replayability -- Table (bloat) maintenance automated with [pg_partman](https://github.com/pgpartman/pg_partman) - High performance operations with index-only scans. ## Table of Contents @@ -54,8 +53,8 @@ psql postgres://postgres:postgres@0.0.0.0:5432/postgres ``` ```sql --- create the extension, pg_partman is also required -CREATE EXTENSION pgmq CASCADE; +-- create the extension +CREATE EXTENSION pgmq; ``` ### Creating a queue @@ -201,7 +200,9 @@ pgmq=# SELECT pgmq_delete('my_queue', 3); ## Partitioned Queues -`pgmq` queue tables can be created as a paritioned table by using pgmq_create_partitioned(). [pg_partman](https://github.com/pgpartman/pg_partman/) +You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` paritioned queues. + +`pgmq` queue tables can be created as a partitioned table by using pgmq_create_partitioned(). [pg_partman](https://github.com/pgpartman/pg_partman/) handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions. Partitions behavior is configured at the time queues are created, via `pgmq_create_partitioned()`. This function has a three parameters: diff --git a/pgmq.control b/pgmq.control index 8dc3f5ee..9c57d6d7 100644 --- a/pgmq.control +++ b/pgmq.control @@ -1,6 +1,5 @@ -comment = 'Distributed message queues' +comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.' default_version = '@CARGO_VERSION@' module_pathname = '$libdir/pgmq' relocatable = false superuser = false -requires = 'pg_partman' diff --git a/src/api.rs b/src/api.rs index 08a14bc7..9b45e666 100644 --- a/src/api.rs +++ b/src/api.rs @@ -39,7 +39,7 @@ fn pgmq_list_queues() -> Result< spi::Error, > { let results = listit()?; - Ok(TableIterator::new(results.into_iter())) + Ok(TableIterator::new(results)) } pub fn listit() -> Result, spi::Error> { diff --git a/src/lib.rs b/src/lib.rs index c3f2b3e2..9436aa24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,6 +25,9 @@ enum PgmqExtError { #[error("{0} invalid types")] TypeErrorError(String), + + #[error("missing dependency: {0}")] + MissingDependency(String), } #[pg_extern] @@ -45,6 +48,16 @@ fn pgmq_create_partitioned( partition_interval: default!(String, "'10000'"), retention_interval: default!(String, "'100000'"), ) -> Result<(), PgmqExtError> { + // validate pg_partman is installed + match Spi::get_one::(&partition::partman_installed())? + .expect("could not query extensions table") + { + true => (), + false => { + warning!("pg_partman not installed. Install https://github.com/pgpartman/pg_partman and then run `CREATE EXTENSION pg_partman;`"); + return Err(PgmqExtError::MissingDependency("pg_partman".to_owned())); + } + }; validate_same_type(&partition_interval, &retention_interval)?; let setup = partition::init_partitioned_queue(queue_name, &partition_interval, &retention_interval)?; @@ -110,7 +123,7 @@ fn pgmq_read( spi::Error, > { let results = readit(queue_name, vt, limit)?; - Ok(TableIterator::new(results.into_iter())) + Ok(TableIterator::new(results)) } fn readit( @@ -224,7 +237,7 @@ fn pgmq_pop( PgmqExtError, > { let results = popit(queue_name)?; - Ok(TableIterator::new(results.into_iter())) + Ok(TableIterator::new(results)) } fn popit( @@ -318,7 +331,7 @@ fn pgmq_set_vt( Ok(()) }); res?; - Ok(TableIterator::new(results.into_iter())) + Ok(TableIterator::new(results)) } #[cfg(any(test, feature = "pg_test"))] @@ -429,6 +442,16 @@ mod tests { let partition_interval = "2".to_owned(); let retention_interval = "2".to_owned(); + let _ = Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("SQL select failed"); + + let failed = pgmq_create_partitioned( + &qname, + partition_interval.clone(), + retention_interval.clone(), + ); + assert!(failed.is_err()); + + let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed"); let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap(); let queues = api::listit().unwrap(); @@ -481,6 +504,7 @@ mod tests { #[pg_test] fn test_archive() { let qname = r#"test_archive"#; + let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed"); let _ = pgmq_create_non_partitioned(&qname).unwrap(); // no messages in the queue let retval = Spi::get_one::(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}")) diff --git a/src/metrics.rs b/src/metrics.rs index ae086e82..418b5fc9 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -33,8 +33,8 @@ fn pgmq_metrics( >, crate::PgmqExtError, > { - let results = query_summary(&queue_name)?; - Ok(TableIterator::new(results.into_iter())) + let results = query_summary(queue_name)?; + Ok(TableIterator::new(results)) } #[pg_extern] @@ -58,7 +58,7 @@ fn pgmq_metrics_all() -> Result< let q_results = query_summary(&q.0)?; results.extend(q_results); } - Ok(TableIterator::new(results.into_iter())) + Ok(TableIterator::new(results)) } fn query_summary(queue_name: &str) -> Result { diff --git a/src/partition.rs b/src/partition.rs index 55880c9b..affa8fad 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -13,6 +13,7 @@ use pgmq_crate::{ // for now, put pg_partman in the public PGMQ_SCHEMA pub const PARTMAN_SCHEMA: &str = "public"; +/// partitioned queues require pg_partman to be installed pub fn init_partitioned_queue( name: &str, partition_interval: &str, @@ -105,6 +106,17 @@ fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result String { + " + SELECT EXISTS ( + SELECT 1 + FROM pg_extension + WHERE extname = 'pg_partman' + ); + " + .to_owned() +} + #[cfg(any(test, feature = "pg_test"))] #[pg_schema] mod tests { diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 508a579a..955e544b 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -5,12 +5,13 @@ use sqlx::{FromRow, Pool, Postgres, Row}; async fn connect(url: &str) -> Pool { let options = conn_options(url).expect("failed to parse url"); + println!("URL: {}", url); PgPoolOptions::new() .acquire_timeout(std::time::Duration::from_secs(10)) .max_connections(5) .connect_with(options) .await - .unwrap() + .expect("failed to connect to pg") } #[tokio::test]