Skip to content

Commit

Permalink
Add pgmq_read_with_poll (#64)
Browse files Browse the repository at this point in the history
* Add pgmq_read_with_poll

For feature parity with SQS. The implementation is a bit naive: it
simply will retry querying for messages on a small interval until
the timeout is reached.

* improvement

* Add test, fix bug

* Add poll_timeout_s and poll_interval_ms arguments to read_with_poll

* Add read_batch_with_poll to client crate

* Code quality

* Test/improve read_batch_with_poll

* Add read_batch_with_queue to queueext

* Add test for ext api read batch with poll, should return a vec

* fmt

* sqlx data

* Fix bug

* Bump version
  • Loading branch information
v0idpwn authored Aug 18, 2023
1 parent df7b396 commit 4ef04f2
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 45 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.13.1"
version = "0.14.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.15.2"
version = "0.16.0"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
56 changes: 52 additions & 4 deletions core/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"type_info": "Int4"
},
{
"name": "vt",
"name": "enqueued_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "enqueued_at",
"name": "vt",
"ordinal": 3,
"type_info": "Timestamptz"
},
Expand Down Expand Up @@ -220,12 +220,12 @@
"type_info": "Int4"
},
{
"name": "vt",
"name": "enqueued_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "enqueued_at",
"name": "vt",
"ordinal": 3,
"type_info": "Timestamptz"
},
Expand Down Expand Up @@ -273,6 +273,54 @@
},
"query": "SELECT pgmq_send as msg_id from pgmq_send($1::text, $2::jsonb);"
},
"e4c38347b44aed05aa890d3351a362d3b6f81387e98fc564ec922cefa1e96f71": {
"describe": {
"columns": [
{
"name": "msg_id",
"ordinal": 0,
"type_info": "Int8"
},
{
"name": "read_ct",
"ordinal": 1,
"type_info": "Int4"
},
{
"name": "enqueued_at",
"ordinal": 2,
"type_info": "Timestamptz"
},
{
"name": "vt",
"ordinal": 3,
"type_info": "Timestamptz"
},
{
"name": "message",
"ordinal": 4,
"type_info": "Jsonb"
}
],
"nullable": [
null,
null,
null,
null,
null
],
"parameters": {
"Left": [
"Text",
"Int4",
"Int4",
"Int4",
"Int4"
]
}
},
"query": "SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)"
},
"ed8b7aacd0d94fe647899b6d2fe61a29372cd7d6dbc28bf59ac6bb3118e3fe6c": {
"describe": {
"columns": [
Expand Down
88 changes: 67 additions & 21 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,12 @@ pub mod util;
pub use pg_ext::PGMQueueExt;
use util::fetch_one_message;

use std::time::Duration;

const VT_DEFAULT: i32 = 30;
const READ_LIMIT_DEFAULT: i32 = 1;
const POLL_TIMEOUT_DEFAULT: Duration = Duration::from_secs(5);
const POLL_INTERVAL_DEFAULT: Duration = Duration::from_millis(250);

/// Message struct received from the queue
///
Expand Down Expand Up @@ -627,6 +631,45 @@ impl PGMQueue {
Ok(messages)
}

/// Similar to [`read_batch`], but allows waiting until a message is available
///
/// You can specify a maximum duration for polling (defaults to 5 seconds),
/// and an interval between calls (defaults to 250ms). A lower interval
/// implies higher maximum latency, but less load on the database.
///
/// Refer to the [`read_batch`] function for more details.
///
pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<i32>,
max_batch_size: i32,
poll_timeout: Option<Duration>,
poll_interval: Option<Duration>,
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
let vt_ = vt.unwrap_or(VT_DEFAULT);
let poll_timeout_ = poll_timeout.unwrap_or(POLL_TIMEOUT_DEFAULT);
let poll_interval_ = poll_interval.unwrap_or(POLL_INTERVAL_DEFAULT);
let start_time = std::time::Instant::now();
loop {
let query = &query::read(queue_name, vt_, max_batch_size)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
match messages {
Some(m) => {
break Ok(Some(m));
}
None => {
if start_time.elapsed() < poll_timeout_ {
tokio::time::sleep(poll_interval_).await;
continue;
} else {
break Ok(None);
}
}
}
}
}

/// Delete a message from the queue.
/// This is a permanent delete and cannot be undone. If you want to retain a log of the message,
/// use the [archive](#method.archive) method.
Expand Down Expand Up @@ -886,28 +929,31 @@ async fn fetch_messages<T: for<'de> Deserialize<'de>>(
connection: &Pool<Postgres>,
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
let mut messages: Vec<Message<T>> = Vec::new();
let rows: Result<Vec<PgRow>, Error> = sqlx::query(query).fetch_all(connection).await;
if let Err(sqlx::error::Error::RowNotFound) = rows {
return Ok(None);
} else if let Err(e) = rows {
return Err(e)?;
} else if let Ok(rows) = rows {
// happy path - successfully read messages
for row in rows.iter() {
let raw_msg = row.get("message");
let parsed_msg = serde_json::from_value::<T>(raw_msg);
if let Err(e) = parsed_msg {
return Err(errors::PgmqError::JsonParsingError(e));
} else if let Ok(parsed_msg) = parsed_msg {
messages.push(Message {
msg_id: row.get("msg_id"),
vt: row.get("vt"),
read_ct: row.get("read_ct"),
enqueued_at: row.get("enqueued_at"),
message: parsed_msg,
})
let result: Result<Vec<PgRow>, Error> = sqlx::query(query).fetch_all(connection).await;
match result {
Ok(rows) => {
if rows.is_empty() {
Ok(None)
} else {
// happy path - successfully read messages
for row in rows.iter() {
let raw_msg = row.get("message");
let parsed_msg = serde_json::from_value::<T>(raw_msg);
if let Err(e) = parsed_msg {
return Err(errors::PgmqError::JsonParsingError(e));
} else if let Ok(parsed_msg) = parsed_msg {
messages.push(Message {
msg_id: row.get("msg_id"),
vt: row.get("vt"),
read_ct: row.get("read_ct"),
enqueued_at: row.get("enqueued_at"),
message: parsed_msg,
})
}
}
Ok(Some(messages))
}
}
Err(e) => Err(e)?,
}
Ok(Some(messages))
}
54 changes: 54 additions & 0 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use serde::{Deserialize, Serialize};
use sqlx::types::chrono::Utc;
use sqlx::{Executor, Pool, Postgres};

const DEFAULT_POLL_TIMEOUT_S: i32 = 5;
const DEFAULT_POLL_INTERVAL_MS: i32 = 250;

/// Main controller for interacting with a managed by the PGMQ Postgres extension.
#[derive(Clone, Debug)]
pub struct PGMQueueExt {
Expand Down Expand Up @@ -194,6 +197,57 @@ impl PGMQueueExt {
}
}

pub async fn read_batch_with_poll<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: i32,
max_batch_size: i32,
poll_timeout: Option<std::time::Duration>,
poll_interval: Option<std::time::Duration>,
) -> Result<Option<Vec<Message<T>>>, PgmqError> {
check_input(queue_name)?;
let poll_timeout_s = poll_timeout.map_or(DEFAULT_POLL_TIMEOUT_S, |t| t.as_secs() as i32);
let poll_interval_ms =
poll_interval.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32);
let result = sqlx::query!(
"SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)",
queue_name,
vt,
max_batch_size,
poll_timeout_s,
poll_interval_ms
)
.fetch_all(&self.connection)
.await;

match result {
Err(sqlx::error::Error::RowNotFound) => Ok(None),
Err(e) => Err(e)?,
Ok(rows) => {
// happy path - successfully read messages
let mut messages: Vec<Message<T>> = Vec::new();
for row in rows.iter() {
let raw_msg = row.message.clone().expect("no message");
let parsed_msg = serde_json::from_value::<T>(raw_msg);
if let Err(e) = parsed_msg {
return Err(PgmqError::JsonParsingError(e));
} else if let Ok(parsed_msg) = parsed_msg {
messages.push(Message {
msg_id: row.msg_id.expect("msg_id missing from queue table"),
vt: row.vt.expect("vt missing from queue table"),
read_ct: row.read_ct.expect("read_ct missing from queue table"),
enqueued_at: row
.enqueued_at
.expect("enqueued_at missing from queue table"),
message: parsed_msg,
})
}
}
Ok(Some(messages))
}
}
}

/// Move a message to the archive table.
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<bool, PgmqError> {
check_input(queue_name)?;
Expand Down
17 changes: 9 additions & 8 deletions core/src/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Query constructors

use crate::{errors::PgmqError, util::CheckedName};

use sqlx::types::chrono::Utc;
pub const TABLE_PREFIX: &str = r#"pgmq"#;
pub const PGMQ_SCHEMA: &str = "public";
Expand Down Expand Up @@ -234,14 +235,14 @@ pub fn read(name: &str, vt: i32, limit: i32) -> Result<String, PgmqError> {
(
SELECT msg_id
FROM {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
WHERE vt <= now()
WHERE vt <= clock_timestamp()
ORDER BY msg_id ASC
LIMIT {limit}
FOR UPDATE SKIP LOCKED
)
UPDATE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{name}
SET
vt = now() + interval '{vt} seconds',
vt = clock_timestamp() + interval '{vt} seconds',
read_ct = read_ct + 1
WHERE msg_id in (select msg_id from cte)
RETURNING *;
Expand Down Expand Up @@ -354,20 +355,20 @@ pub fn unassign_archive(name: CheckedName<'_>) -> Result<String, PgmqError> {
pub fn assign(table_name: &str) -> String {
format!(
"
DO $$
DO $$
BEGIN
-- Check if the table is not yet associated with the extension
IF NOT EXISTS (
SELECT 1
FROM pg_depend
SELECT 1
FROM pg_depend
WHERE refobjid = (SELECT oid FROM pg_extension WHERE extname = 'pgmq')
AND objid = (SELECT oid FROM pg_class WHERE relname = '{TABLE_PREFIX}_{table_name}')
) THEN
EXECUTE 'ALTER EXTENSION pgmq ADD TABLE {PGMQ_SCHEMA}.{TABLE_PREFIX}_{table_name}';
END IF;
END $$;
"
)
Expand Down
Loading

0 comments on commit 4ef04f2

Please sign in to comment.