Skip to content

Commit

Permalink
Run setup when creating extension (#66)
Browse files Browse the repository at this point in the history
* Run setup when creating extension

* Update src/lib.rs

* Bump versions

* Remove unused function
  • Loading branch information
v0idpwn authored Aug 25, 2023
1 parent 7b081bb commit fd7d811
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 18 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.17.0"
version = "0.18.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.18.0"
version = "0.19.0"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
7 changes: 0 additions & 7 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ pub const PGMQ_SCHEMA: &str = "public";
pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_meta(),
assign_meta(),
create_queue(name)?,
assign_queue(name)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
grant_pgmon_meta(),
grant_pgmon_queue(name)?,
])
}
Expand Down Expand Up @@ -348,10 +345,6 @@ pub fn pop(name: &str) -> Result<String, PgmqError> {
))
}

pub fn assign_meta() -> String {
assign("meta")
}

pub fn assign_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
Ok(assign(&name.to_string()))
}
Expand Down
22 changes: 21 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,29 @@ use pgmq_crate::query::{
};

use errors::PgmqExtError;

use std::time::Duration;

extension_sql!(
"
CREATE TABLE public.pgmq_meta (
queue_name VARCHAR UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
WHERE has_table_privilege('pg_monitor', 'public.pgmq_meta', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON public.pgmq_meta TO pg_monitor';
END IF;
END;
$$ LANGUAGE plpgsql;
",
name = "bootstrap"
);

#[pg_extern]
fn pgmq_create_non_partitioned(queue_name: &str) -> Result<(), PgmqExtError> {
let setup = init_queue(queue_name)?;
Expand Down
31 changes: 26 additions & 5 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ use pgrx::prelude::*;
use pgmq_crate::{
errors::PgmqError,
query::{
assign_archive, assign_meta, assign_queue, create_archive, create_index, create_meta,
grant_pgmon_meta, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta, PGMQ_SCHEMA,
TABLE_PREFIX,
assign_archive, assign_queue, create_archive, create_index, create_meta, grant_pgmon_meta,
grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta, PGMQ_SCHEMA, TABLE_PREFIX,
},
util::CheckedName,
};
Expand All @@ -19,12 +18,34 @@ pub fn init_partitioned_queue(
name: &str,
partition_interval: &str,
retention_interval: &str,
) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
let partition_col = map_partition_col(partition_interval);
Ok(vec![
create_partitioned_queue(name, partition_col)?,
assign_queue(name)?,
create_partitioned_index(name, partition_col)?,
create_index(name)?,
create_archive(name)?,
assign_archive(name)?,
create_partitioned_table(name, partition_col, partition_interval)?,
insert_meta(name)?,
set_retention_config(name, retention_interval)?,
grant_pgmon_queue(name)?,
grant_pgmon_queue_seq(name)?,
])
}

/// partitioned queues require pg_partman to be installed
pub fn init_partitioned_queue_client_only(
name: &str,
partition_interval: &str,
retention_interval: &str,
) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
let partition_col = map_partition_col(partition_interval);
Ok(vec![
create_meta(),
assign_meta(),
grant_pgmon_meta(),
create_partitioned_queue(name, partition_col)?,
assign_queue(name)?,
Expand Down Expand Up @@ -100,7 +121,7 @@ fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result<Strin
Ok(format!(
"
UPDATE {PGMQ_SCHEMA}.part_config
SET
SET
retention = '{retention}',
retention_keep_table = false,
retention_keep_index = true,
Expand Down

0 comments on commit fd7d811

Please sign in to comment.