Skip to content

Commit

Permalink
pg_partman as optional dependency (#18)
Browse files Browse the repository at this point in the history
* bring your own pg connection

* bump ver

* pg_partman is optional

* update err statement

* update readme

* Update integration_test.rs

* fix test

* fix some clippy warnings
  • Loading branch information
ChuckHend authored Aug 8, 2023
1 parent e182c90 commit 2f0fe41
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 42 deletions.
57 changes: 30 additions & 27 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.10.2"
version = "0.11.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ A lightweight distributed message queue. Like [AWS SQS](https://aws.amazon.com/s
- API parity with [AWS SQS](https://aws.amazon.com/sqs/) and [RSMQ](https://github.com/smrchy/rsmq)
- Messages stay in the queue until explicitly deleted
- Messages can be archived, instead of deleted, for long-term retention and replayability
- Table (bloat) maintenance automated with [pg_partman](https://github.com/pgpartman/pg_partman)
- High performance operations with index-only scans.

## Table of Contents
Expand Down Expand Up @@ -54,8 +53,8 @@ psql postgres://postgres:[email protected]:5432/postgres
```

```sql
-- create the extension, pg_partman is also required
CREATE EXTENSION pgmq CASCADE;
-- create the extension
CREATE EXTENSION pgmq;
```

### Creating a queue
Expand Down Expand Up @@ -201,7 +200,9 @@ pgmq=# SELECT pgmq_delete('my_queue', 3);

## Partitioned Queues

`pgmq` queue tables can be created as a paritioned table by using pgmq_create_partitioned(). [pg_partman](https://github.com/pgpartman/pg_partman/)
You will need to install [pg_partman](https://github.com/pgpartman/pg_partman/) if you want to use `pgmq` paritioned queues.

`pgmq` queue tables can be created as a partitioned table by using pgmq_create_partitioned(). [pg_partman](https://github.com/pgpartman/pg_partman/)
handles all maintenance of queue tables. This includes creating new partitions and dropping old partitions.

Partitions behavior is configured at the time queues are created, via `pgmq_create_partitioned()`. This function has a three parameters:
Expand Down
3 changes: 1 addition & 2 deletions pgmq.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
comment = 'Distributed message queues'
comment = 'A lightweight message queue. Like AWS SQS and RSMQ but on Postgres.'
default_version = '@CARGO_VERSION@'
module_pathname = '$libdir/pgmq'
relocatable = false
superuser = false
requires = 'pg_partman'
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn pgmq_list_queues() -> Result<
spi::Error,
> {
let results = listit()?;
Ok(TableIterator::new(results.into_iter()))
Ok(TableIterator::new(results))
}

pub fn listit() -> Result<Vec<(String, TimestampWithTimeZone)>, spi::Error> {
Expand Down
30 changes: 27 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ enum PgmqExtError {

#[error("{0} invalid types")]
TypeErrorError(String),

#[error("missing dependency: {0}")]
MissingDependency(String),
}

#[pg_extern]
Expand All @@ -45,6 +48,16 @@ fn pgmq_create_partitioned(
partition_interval: default!(String, "'10000'"),
retention_interval: default!(String, "'100000'"),
) -> Result<(), PgmqExtError> {
// validate pg_partman is installed
match Spi::get_one::<bool>(&partition::partman_installed())?
.expect("could not query extensions table")
{
true => (),
false => {
warning!("pg_partman not installed. Install https://github.com/pgpartman/pg_partman and then run `CREATE EXTENSION pg_partman;`");
return Err(PgmqExtError::MissingDependency("pg_partman".to_owned()));
}
};
validate_same_type(&partition_interval, &retention_interval)?;
let setup =
partition::init_partitioned_queue(queue_name, &partition_interval, &retention_interval)?;
Expand Down Expand Up @@ -110,7 +123,7 @@ fn pgmq_read(
spi::Error,
> {
let results = readit(queue_name, vt, limit)?;
Ok(TableIterator::new(results.into_iter()))
Ok(TableIterator::new(results))
}

fn readit(
Expand Down Expand Up @@ -224,7 +237,7 @@ fn pgmq_pop(
PgmqExtError,
> {
let results = popit(queue_name)?;
Ok(TableIterator::new(results.into_iter()))
Ok(TableIterator::new(results))
}

fn popit(
Expand Down Expand Up @@ -318,7 +331,7 @@ fn pgmq_set_vt(
Ok(())
});
res?;
Ok(TableIterator::new(results.into_iter()))
Ok(TableIterator::new(results))
}

#[cfg(any(test, feature = "pg_test"))]
Expand Down Expand Up @@ -429,6 +442,16 @@ mod tests {
let partition_interval = "2".to_owned();
let retention_interval = "2".to_owned();

let _ = Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("SQL select failed");

let failed = pgmq_create_partitioned(
&qname,
partition_interval.clone(),
retention_interval.clone(),
);
assert!(failed.is_err());

let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed");
let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap();

let queues = api::listit().unwrap();
Expand Down Expand Up @@ -481,6 +504,7 @@ mod tests {
#[pg_test]
fn test_archive() {
let qname = r#"test_archive"#;
let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed");
let _ = pgmq_create_non_partitioned(&qname).unwrap();
// no messages in the queue
let retval = Spi::get_one::<i64>(&format!("SELECT count(*) FROM {TABLE_PREFIX}_{qname}"))
Expand Down
6 changes: 3 additions & 3 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ fn pgmq_metrics(
>,
crate::PgmqExtError,
> {
let results = query_summary(&queue_name)?;
Ok(TableIterator::new(results.into_iter()))
let results = query_summary(queue_name)?;
Ok(TableIterator::new(results))
}

#[pg_extern]
Expand All @@ -58,7 +58,7 @@ fn pgmq_metrics_all() -> Result<
let q_results = query_summary(&q.0)?;
results.extend(q_results);
}
Ok(TableIterator::new(results.into_iter()))
Ok(TableIterator::new(results))
}

fn query_summary(queue_name: &str) -> Result<MetricResult, crate::PgmqExtError> {
Expand Down
12 changes: 12 additions & 0 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pgmq_crate::{
// for now, put pg_partman in the public PGMQ_SCHEMA
pub const PARTMAN_SCHEMA: &str = "public";

/// partitioned queues require pg_partman to be installed
pub fn init_partitioned_queue(
name: &str,
partition_interval: &str,
Expand Down Expand Up @@ -105,6 +106,17 @@ fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result<Strin
))
}

pub fn partman_installed() -> String {
"
SELECT EXISTS (
SELECT 1
FROM pg_extension
WHERE extname = 'pg_partman'
);
"
.to_owned()
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use sqlx::{FromRow, Pool, Postgres, Row};

async fn connect(url: &str) -> Pool<Postgres> {
let options = conn_options(url).expect("failed to parse url");
println!("URL: {}", url);
PgPoolOptions::new()
.acquire_timeout(std::time::Duration::from_secs(10))
.max_connections(5)
.connect_with(options)
.await
.unwrap()
.expect("failed to connect to pg")
}

#[tokio::test]
Expand Down

0 comments on commit 2f0fe41

Please sign in to comment.