Skip to content

Commit

Permalink
Merge branch 'main' into max-table-name-len
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Aug 14, 2023
2 parents a599051 + 977d924 commit ae1f3d4
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 479 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/extension_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,44 @@ jobs:
pgmq_descr=$(stoml Cargo.toml package.description)
pgmq_repo=$(stoml Cargo.toml package.repository)
trunk publish pgmq --version ${pgmq_ver} --file .trunk/pgmq-${pgmq_ver}.tar.gz --description "A lightweight distributed message queue. Like AWS SQS and RSMQ, on Postgres." --homepage "https://github.com/tembo-io/pgmq" --repository "https://github.com/tembo-io/pgmq" --license "PostgreSQL" --category featured --category orchestration
build_and_push:
name: Build and push images
needs:
- publish
runs-on:
- self-hosted
- dind
- large-8x8
outputs:
short_sha: ${{ steps.versions.outputs.SHORT_SHA }}
steps:
- name: Check out the repo
uses: actions/checkout@v3
- name: Install stoml and pg-trunk
shell: bash
run: |
set -xe
wget https://github.com/freshautomations/stoml/releases/download/v0.7.1/stoml_linux_amd64 &> /dev/null
mv stoml_linux_amd64 stoml
chmod +x stoml
sudo mv stoml /usr/local/bin/
- name: Set version strings
id: versions
run: |
echo "SHORT_SHA=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT
echo "TAG_VER=$(/usr/local/bin/stoml Cargo.toml package.version)" >> $GITHUB_OUTPUT
- name: Build and upload image
run: |
docker build --build-arg="PGMQ_VER=${{ steps.versions.outputs.TAG_VER }}" -t pgmq-pg images/pgmq-pg
docker tag pgmq-pg quay.io/tembo/pgmq-pg:v${{ steps.versions.outputs.TAG_VER }}
docker tag pgmq-pg quay.io/tembo/pgmq-pg:latest
- name: Login to Quay
uses: docker/login-action@v2
with:
registry: quay.io/tembo
username: ${{ secrets.QUAY_USER_TEMBO }}
password: ${{ secrets.QUAY_PASSWORD_TEMBO }}
- name: Push image
run: |
docker push quay.io/tembo/pgmq-pg:v${{ steps.versions.outputs.TAG_VER }}
docker push quay.io/tembo/pgmq-pg:latest
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.11.2"
version = "0.12.0"
edition = "2021"
authors = ["Tembo.io"]
description = "Postgres extension for PGMQ"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.14.4"
version = "0.15.0"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
38 changes: 19 additions & 19 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
//!
//! // Read a message
//! let received_message: Message<MyMessage> = queue
//! .read::<MyMessage>(&my_queue, Some(&visibility_timeout_seconds))
//! .read::<MyMessage>(&my_queue, Some(visibility_timeout_seconds))
//! .await
//! .unwrap()
//! .expect("No messages in the queue");
Expand All @@ -106,7 +106,7 @@
//! assert_eq!(received_message.msg_id, message_id);
//!
//! // archive the messages
//! let _ = queue.archive(&my_queue, &received_message.msg_id)
//! let _ = queue.archive(&my_queue, received_message.msg_id)
//! .await
//! .expect("Failed to archive message");
//! println!("archived the messages from the queue");
Expand Down Expand Up @@ -517,13 +517,13 @@ impl PGMQueue {
/// println!("Struct Message ids: {:?}", struct_message_batch_ids);
///
/// let visibility_timeout_seconds = 30;
/// let known_message_structure: Message<MyMessage> = queue.read::<MyMessage>(&my_queue, Some(&visibility_timeout_seconds))
/// let known_message_structure: Message<MyMessage> = queue.read::<MyMessage>(&my_queue, Some(visibility_timeout_seconds))
/// .await
/// .unwrap()
/// .expect("no messages in the queue!");
/// println!("Received known : {known_message_structure:?}");
///
/// let unknown_message_structure: Message = queue.read(&my_queue, Some(&visibility_timeout_seconds))
/// let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
/// .await
/// .unwrap()
/// .expect("no messages in the queue!");
Expand All @@ -533,14 +533,14 @@ impl PGMQueue {
pub async fn read<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<&i32>,
vt: Option<i32>,
) -> Result<Option<Message<T>>, errors::PgmqError> {
// map vt or default VT
let vt_ = match vt {
Some(t) => t,
None => &VT_DEFAULT,
None => VT_DEFAULT,
};
let limit = &READ_LIMIT_DEFAULT;
let limit = READ_LIMIT_DEFAULT;
let query = &query::read(queue_name, vt_, limit)?;
let message = fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
Expand Down Expand Up @@ -597,14 +597,14 @@ impl PGMQueue {
///
/// let visibility_timeout_seconds = 30;
/// let batch_size = 1;
/// let batch: Vec<Message<MyMessage>> = queue.read_batch::<MyMessage>(&my_queue, Some(&visibility_timeout_seconds), &batch_size)
/// let batch: Vec<Message<MyMessage>> = queue.read_batch::<MyMessage>(&my_queue, Some(visibility_timeout_seconds), batch_size)
/// .await
/// .unwrap()
/// .expect("no messages in the queue!");
/// println!("Received a batch of messages: {batch:?}");
///
/// let batch_size = 2;
/// let unknown_message_structure: Message = queue.read(&my_queue, Some(&visibility_timeout_seconds))
/// let unknown_message_structure: Message = queue.read(&my_queue, Some(visibility_timeout_seconds))
/// .await
/// .unwrap()
/// .expect("no messages in the queue!");
Expand All @@ -614,13 +614,13 @@ impl PGMQueue {
pub async fn read_batch<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
vt: Option<&i32>,
num_msgs: &i32,
vt: Option<i32>,
num_msgs: i32,
) -> Result<Option<Vec<Message<T>>>, errors::PgmqError> {
// map vt or default VT
let vt_ = match vt {
Some(t) => t,
None => &VT_DEFAULT,
None => VT_DEFAULT,
};
let query = &query::read(queue_name, vt_, num_msgs)?;
let messages = fetch_messages::<T>(query, &self.connection).await?;
Expand Down Expand Up @@ -667,11 +667,11 @@ impl PGMQueue {
/// .expect("Failed to enqueue message");
/// println!("Struct Message id: {message_id}");
///
/// queue.delete(&my_queue, &message_id).await.expect("failed to delete message");
/// queue.delete(&my_queue, message_id).await.expect("failed to delete message");
///
/// Ok(())
/// }
pub async fn delete(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
pub async fn delete(&self, queue_name: &str, msg_id: i64) -> Result<u64, PgmqError> {
let query = &query::delete(queue_name, msg_id)?;
let row = sqlx::query(query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Expand Down Expand Up @@ -759,11 +759,11 @@ impl PGMQueue {
/// .await
/// .expect("Failed to enqueue message");
///
/// queue.archive(&my_queue, &message_id).await.expect("failed to archive message");
/// queue.archive(&my_queue, message_id).await.expect("failed to archive message");
///
/// Ok(())
/// }
pub async fn archive(&self, queue_name: &str, msg_id: &i64) -> Result<u64, PgmqError> {
pub async fn archive(&self, queue_name: &str, msg_id: i64) -> Result<u64, PgmqError> {
let query = query::archive(queue_name, msg_id)?;
let row = sqlx::query(&query).execute(&self.connection).await?;
let num_deleted = row.rows_affected();
Expand Down Expand Up @@ -862,15 +862,15 @@ impl PGMQueue {
///
/// let utc_24h_from_now = Utc::now() + Duration::hours(24);
///
/// queue.set_vt::<MyMessage>(&my_queue, &message_id, &utc_24h_from_now).await.expect("failed to set vt");
/// queue.set_vt::<MyMessage>(&my_queue, message_id, utc_24h_from_now).await.expect("failed to set vt");
///
/// Ok(())
/// }
pub async fn set_vt<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
msg_id: &i64,
vt: &chrono::DateTime<Utc>,
msg_id: i64,
vt: chrono::DateTime<Utc>,
) -> Result<Option<Message<T>>, errors::PgmqError> {
let query = &query::set_vt(queue_name, msg_id, vt)?;
let updated_message = fetch_one_message::<T>(query, &self.connection).await?;
Expand Down
12 changes: 6 additions & 6 deletions core/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn enqueue(
))
}

pub fn read(name: &str, vt: &i32, limit: &i32) -> Result<String, PgmqError> {
pub fn read(name: &str, vt: i32, limit: i32) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
Expand All @@ -220,7 +220,7 @@ pub fn read(name: &str, vt: &i32, limit: &i32) -> Result<String, PgmqError> {
))
}

pub fn delete(name: &str, msg_id: &i64) -> Result<String, PgmqError> {
pub fn delete(name: &str, msg_id: i64) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
Expand All @@ -230,7 +230,7 @@ pub fn delete(name: &str, msg_id: &i64) -> Result<String, PgmqError> {
))
}

pub fn set_vt(name: &str, msg_id: &i64, vt: &chrono::DateTime<Utc>) -> Result<String, PgmqError> {
pub fn set_vt(name: &str, msg_id: i64, vt: chrono::DateTime<Utc>) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
Expand Down Expand Up @@ -261,7 +261,7 @@ pub fn delete_batch(name: &str, msg_ids: &[i64]) -> Result<String, PgmqError> {
))
}

pub fn archive(name: &str, msg_id: &i64) -> Result<String, PgmqError> {
pub fn archive(name: &str, msg_id: i64) -> Result<String, PgmqError> {
check_input(name)?;
Ok(format!(
"
Expand Down Expand Up @@ -353,7 +353,7 @@ mod tests {
let vt: i32 = 20;
let limit: i32 = 1;

let query = read(&qname, &vt, &limit).unwrap();
let query = read(&qname, vt, limit).unwrap();

assert!(query.contains(&qname));
assert!(query.contains(&vt.to_string()));
Expand All @@ -364,7 +364,7 @@ mod tests {
let qname = "myqueue";
let msg_id: i64 = 42;

let query = delete(&qname, &msg_id).unwrap();
let query = delete(&qname, msg_id).unwrap();

assert!(query.contains(&qname));
assert!(query.contains(&msg_id.to_string()));
Expand Down
Loading

0 comments on commit ae1f3d4

Please sign in to comment.