Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run setup when creating extension #38

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.11.1"
version = "0.11.2"
v0idpwn marked this conversation as resolved.
Show resolved Hide resolved
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.14.3"
version = "0.14.4"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
19 changes: 0 additions & 19 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ pub const PGMQ_SCHEMA: &str = "public";
pub fn init_queue(name: &str) -> Result<Vec<String>, PgmqError> {
let name = CheckedName::new(name)?;
Ok(vec![
create_meta(),
create_queue(name)?,
create_index(name)?,
create_archive(name)?,
create_archive_index(name)?,
insert_meta(name)?,
grant_pgmon_meta(),
grant_pgmon_queue(name)?,
])
}
Expand Down Expand Up @@ -58,17 +56,6 @@ pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
))
}

pub fn create_meta() -> String {
format!(
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (
queue_name VARCHAR UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL
);
"
)
}

fn grant_stmt(table: &str) -> String {
format!(
"
Expand All @@ -86,12 +73,6 @@ $$ LANGUAGE plpgsql;
)
}

// pg_monitor needs to query queue metadata
pub fn grant_pgmon_meta() -> String {
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_meta");
grant_stmt(&table)
}

// pg_monitor needs to query queue tables
pub fn grant_pgmon_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
let table = format!("{PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}");
Expand Down
22 changes: 22 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,28 @@ use pgmq_crate::query::{
};
use thiserror::Error;

extension_sql!(
"
CREATE TABLE IF NOT EXISTS public.pgmq_meta (
queue_name VARCHAR UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL
);
Comment on lines +20 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we won't be able to use the IF NOT EXISTS part since pgmq does not own the existing tables. Might be tricky, I'll have to think how we could handle that.


DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
WHERE has_table_privilege('pg_monitor', 'public.pgmq_meta', 'SELECT')
) THEN
EXECUTE 'GRANT SELECT ON public.pgmq_meta TO pg_monitor';
END IF;
END;
$$ LANGUAGE plpgsql;

",
name = "pgmq_bootstrap"
);

#[derive(Error, Debug)]
enum PgmqExtError {
#[error("")]
Expand Down
8 changes: 3 additions & 5 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use pgrx::prelude::*;
use pgmq_crate::{
errors::PgmqError,
query::{
create_archive, create_index, create_meta, grant_pgmon_meta, grant_pgmon_queue,
grant_pgmon_queue_seq, insert_meta, PGMQ_SCHEMA, TABLE_PREFIX,
create_archive, create_index, grant_pgmon_queue, grant_pgmon_queue_seq, insert_meta,
PGMQ_SCHEMA, TABLE_PREFIX,
},
util::CheckedName,
};
Expand All @@ -22,8 +22,6 @@ pub fn init_partitioned_queue(
let name = CheckedName::new(name)?;
let partition_col = map_partition_col(partition_interval);
Ok(vec![
create_meta(),
grant_pgmon_meta(),
create_partitioned_queue(name, partition_col)?,
create_partitioned_index(name, partition_col)?,
create_index(name)?,
Expand Down Expand Up @@ -96,7 +94,7 @@ fn set_retention_config(queue: CheckedName<'_>, retention: &str) -> Result<Strin
Ok(format!(
"
UPDATE {PGMQ_SCHEMA}.part_config
SET
SET
retention = '{retention}',
retention_keep_table = false,
retention_keep_index = true,
Expand Down
6 changes: 6 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ async fn test_lifecycle() {
.await
.expect("failed to create extension");

// pgmq meta was created
let _ = sqlx::query("select 'public.pgmq_meta'::regclass")
.execute(&conn)
.await
.expect("pgmq_meta table doesn't exist");

// CREATE with default retention and partition strategy
let test_default_queue = format!("test_default_{test_num}");
let _ = sqlx::query(&format!("SELECT pgmq_create('{test_default_queue}');"))
Expand Down
Loading