Skip to content

Commit

Permalink
Merge branch 'main' into max-table-name-len
Browse files Browse the repository at this point in the history
  • Loading branch information
vrmiguel authored Aug 10, 2023
2 parents 073d040 + 12c9643 commit a599051
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 22 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/extension_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ jobs:
git clone https://github.com/pgpartman/pg_partman.git && \
cd pg_partman && \
sudo make install && cd ../
cp /usr/share/postgresql/14/extension/pg_partman* ~/.pgrx/15.3/pgrx-install/share/postgresql/extension/
cp /usr/lib/postgresql/14/lib/pg_partman_bgw.so ~/.pgrx/15.3/pgrx-install/lib/postgresql/
cp /usr/share/postgresql/14/extension/pg_partman* ~/.pgrx/15.4/pgrx-install/share/postgresql/extension/
cp /usr/lib/postgresql/14/lib/pg_partman_bgw.so ~/.pgrx/15.4/pgrx-install/lib/postgresql/
rm -rf ./target/pgrx-test-data-* || true
pg_version=$(stoml Cargo.toml features.default)
cargo pgrx run ${pg_version} --pgcli || true
Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.11.1"
version = "0.11.2"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.14.3"
version = "0.14.4"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
18 changes: 8 additions & 10 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn create_queue(name: CheckedName<'_>) -> Result<String, PgmqError> {
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
Expand All @@ -49,8 +49,8 @@ pub fn create_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}_archive (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
deleted_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
);
Expand All @@ -63,7 +63,7 @@ pub fn create_meta() -> String {
"
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_meta (
queue_name VARCHAR UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL
created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL
);
"
)
Expand Down Expand Up @@ -183,9 +183,7 @@ pub fn enqueue(
check_input(name)?;
let mut values = "".to_owned();
for message in messages.iter() {
let full_msg = format!(
"((now() at time zone 'utc' + interval '{delay} seconds'), '{message}'::json),"
);
let full_msg = format!("((now() + interval '{delay} seconds'), '{message}'::json),");
values.push_str(&full_msg)
}
// drop trailing comma from constructed string
Expand All @@ -207,14 +205,14 @@ pub fn read(name: &str, vt: &i32, limit: &i32) -> Result<String, PgmqError> {
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
WHERE vt <= now()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
SET
vt = (now() at time zone 'utc' + interval '{vt} seconds'),
vt = now() + interval '{vt} seconds',
read_ct = read_ct + 1
WHERE msg_id in (select msg_id from cte)
RETURNING *;
Expand Down Expand Up @@ -287,7 +285,7 @@ pub fn pop(name: &str) -> Result<String, PgmqError> {
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE vt <= now() at time zone 'utc'
WHERE vt <= now()
ORDER BY msg_id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
Expand Down
12 changes: 12 additions & 0 deletions sql/pgmq--0.11.1--0.11.2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
DO $$
DECLARE
table_name TEXT;
BEGIN
FOR table_name IN (SELECT queue_name FROM public.pgmq_meta)
LOOP
EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name);
EXECUTE format('ALTER TABLE %I ALTER COLUMN enqueued_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive');
EXECUTE format('ALTER TABLE %I ALTER COLUMN deleted_at SET DEFAULT now()', 'pgmq_' || table_name || '_archive');

END LOOP;
END $$;
10 changes: 6 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn enqueue_str(name: &str) -> Result<String, PgmqError> {
Ok(format!(
"
INSERT INTO {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name} (vt, message)
VALUES (now() at time zone 'utc', $1)
VALUES (now(), $1)
RETURNING msg_id;
"
))
Expand Down Expand Up @@ -310,7 +310,7 @@ fn pgmq_set_vt(
let query = format!(
"
UPDATE {TABLE_PREFIX}_{queue_name}
SET vt = (now() at time zone 'utc' + interval '{vt_offset} seconds')
SET vt = (now() + interval '{vt_offset} seconds')
WHERE msg_id = $1
RETURNING *;
"
Expand Down Expand Up @@ -442,7 +442,8 @@ mod tests {
let partition_interval = "2".to_owned();
let retention_interval = "2".to_owned();

let _ = Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("SQL select failed");
let _ =
Spi::run("DROP EXTENSION IF EXISTS pg_partman").expect("Failed dropping pg_partman");

let failed = pgmq_create_partitioned(
&qname,
Expand All @@ -451,7 +452,8 @@ mod tests {
);
assert!(failed.is_err());

let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman").expect("SQL select failed");
let _ = Spi::run("CREATE EXTENSION IF NOT EXISTS pg_partman")
.expect("Failed creating pg_partman");
let _ = pgmq_create_partitioned(&qname, partition_interval, retention_interval).unwrap();

let queues = api::listit().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn create_partitioned_queue(
CREATE TABLE IF NOT EXISTS {PGMQ_SCHEMA}.{TABLE_PREFIX}_{queue} (
msg_id BIGSERIAL NOT NULL,
read_ct INT DEFAULT 0 NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT (now() at time zone 'utc') NOT NULL,
enqueued_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
vt TIMESTAMP WITH TIME ZONE NOT NULL,
message JSONB
) PARTITION BY RANGE ({partition_col});
Expand Down
5 changes: 5 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ async fn test_lifecycle() {
.expect("expected message");
assert_eq!(message.msg_id, 1);

let _ = sqlx::query("CREATE EXTENSION IF NOT EXISTS pg_partman")
.execute(&conn)
.await
.expect("failed to create extension");

// CREATE with 5 seconds per partition, 10 seconds retention
let test_duration_queue = format!("test_duration_{test_num}");
let q = format!("SELECT \"pgmq_create_partitioned\"('{test_duration_queue}'::text, '5 seconds'::text, '10 seconds'::text);");
Expand Down

0 comments on commit a599051

Please sign in to comment.