Skip to content

Commit

Permalink
fix(pgmq-rs): make Message public from the pgmq module (#247)
Browse files Browse the repository at this point in the history
Fix regression from #245
  • Loading branch information
v0idpwn authored Jun 4, 2024
1 parent 352c838 commit b151fa9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pgmq-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pgmq"
version = "0.28.0"
version = "0.28.1"
edition = "2021"
authors = ["Tembo.io"]
description = "A distributed message queue for Rust applications, on Postgres."
Expand Down
25 changes: 13 additions & 12 deletions pgmq-rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
//! ## Minimal example at a glance
//!
//! ```rust
//! use pgmq::{PgmqError, types::Message, PGMQueue};
//! use pgmq::{PgmqError, Message, PGMQueue};
//! use serde::{Deserialize, Serialize};
//! use serde_json::Value;
//!
Expand Down Expand Up @@ -162,6 +162,7 @@ mod query;

pub use errors::PgmqError;
pub use pg_ext::PGMQueueExt;
pub use types::Message;

use std::time::Duration;

Expand Down Expand Up @@ -473,7 +474,7 @@ impl PGMQueue {
/// Example:
///
/// ```rust
/// use pgmq::{types::Message, PgmqError, PGMQueue};
/// use pgmq::{Message, PgmqError, PGMQueue};
/// use serde::{Deserialize, Serialize};
/// use serde_json::Value;
///
Expand Down Expand Up @@ -522,7 +523,7 @@ impl PGMQueue {
&self,
queue_name: &str,
vt: Option<i32>,
) -> Result<Option<types::Message<T>>, PgmqError> {
) -> Result<Option<Message<T>>, PgmqError> {
// map vt or default VT
let vt_ = match vt {
Some(t) => t,
Expand Down Expand Up @@ -551,7 +552,7 @@ impl PGMQueue {
/// Example:
///
/// ```rust
/// use pgmq::{types::Message, PgmqError, PGMQueue};
/// use pgmq::{Message, PgmqError, PGMQueue};
/// use serde::{Deserialize, Serialize};
/// use serde_json::Value;
///
Expand Down Expand Up @@ -604,7 +605,7 @@ impl PGMQueue {
queue_name: &str,
vt: Option<i32>,
num_msgs: i32,
) -> Result<Option<Vec<types::Message<T>>>, PgmqError> {
) -> Result<Option<Vec<Message<T>>>, PgmqError> {
// map vt or default VT
let vt_ = match vt {
Some(t) => t,
Expand All @@ -630,7 +631,7 @@ impl PGMQueue {
max_batch_size: i32,
poll_timeout: Option<Duration>,
poll_interval: Option<Duration>,
) -> Result<Option<Vec<types::Message<T>>>, PgmqError> {
) -> Result<Option<Vec<Message<T>>>, PgmqError> {
let vt_ = vt.unwrap_or(types::VT_DEFAULT);
let poll_timeout_ = poll_timeout.unwrap_or(types::POLL_TIMEOUT_DEFAULT);
let poll_interval_ = poll_interval.unwrap_or(types::POLL_INTERVAL_DEFAULT);
Expand Down Expand Up @@ -866,7 +867,7 @@ impl PGMQueue {
/// Example:
///
/// ```rust
/// use pgmq::{types::Message, PgmqError, PGMQueue};
/// use pgmq::{Message, PgmqError, PGMQueue};
/// use serde::{Deserialize, Serialize};
/// use serde_json::Value;
///
Expand Down Expand Up @@ -903,7 +904,7 @@ impl PGMQueue {
pub async fn pop<T: for<'de> Deserialize<'de>>(
&self,
queue_name: &str,
) -> Result<Option<types::Message<T>>, PgmqError> {
) -> Result<Option<Message<T>>, PgmqError> {
let query = &query::pop(queue_name)?;
let message = util::fetch_one_message::<T>(query, &self.connection).await?;
Ok(message)
Expand Down Expand Up @@ -959,7 +960,7 @@ impl PGMQueue {
queue_name: &str,
msg_id: i64,
vt: chrono::DateTime<Utc>,
) -> Result<Option<types::Message<T>>, PgmqError> {
) -> Result<Option<Message<T>>, PgmqError> {
let query = &query::set_vt(queue_name, msg_id, vt)?;
let updated_message = util::fetch_one_message::<T>(query, &self.connection).await?;
Ok(updated_message)
Expand All @@ -972,8 +973,8 @@ impl PGMQueue {
async fn fetch_messages<T: for<'de> Deserialize<'de>>(
query: &str,
connection: &Pool<Postgres>,
) -> Result<Option<Vec<types::Message<T>>>, PgmqError> {
let mut messages: Vec<types::Message<T>> = Vec::new();
) -> Result<Option<Vec<Message<T>>>, PgmqError> {
let mut messages: Vec<Message<T>> = Vec::new();
let result: Result<Vec<PgRow>, Error> = sqlx::query(query).fetch_all(connection).await;
match result {
Ok(rows) => {
Expand All @@ -987,7 +988,7 @@ async fn fetch_messages<T: for<'de> Deserialize<'de>>(
if let Err(e) = parsed_msg {
return Err(PgmqError::JsonParsingError(e));
} else if let Ok(parsed_msg) = parsed_msg {
messages.push(types::Message {
messages.push(Message {
msg_id: row.get("msg_id"),
vt: row.get("vt"),
read_ct: row.get("read_ct"),
Expand Down

0 comments on commit b151fa9

Please sign in to comment.