Skip to content

Commit

Permalink
Add test for ext api read batch with poll, should return a vec
Browse files Browse the repository at this point in the history
  • Loading branch information
v0idpwn committed Aug 18, 2023
1 parent 051517b commit 14f095b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
53 changes: 30 additions & 23 deletions core/src/pg_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,39 +204,46 @@ impl PGMQueueExt {
max_batch_size: i32,
poll_timeout: Option<std::time::Duration>,
poll_interval: Option<std::time::Duration>,
) -> Result<Option<Message<T>>, PgmqError> {
) -> Result<Option<Vec<Message<T>>>, PgmqError> {
check_input(queue_name)?;
let poll_timeout_s = poll_timeout.map_or(DEFAULT_POLL_TIMEOUT_S, |t| t.as_secs() as i32);
let poll_interval_ms =
poll_interval_ms.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32);
let row = sqlx::query!(
poll_interval.map_or(DEFAULT_POLL_INTERVAL_MS, |i| i.as_millis() as i32);
let result = sqlx::query!(
"SELECT * from pgmq_read_with_poll($1::text, $2, $3, $4, $5)",
queue_name,
vt,
max_batch_size,
poll_timeout_s,
poll_interval_ms
)
.fetch_optional(&self.connection)
.await?;
match row {
Some(row) => {
// happy path - successfully read a message
let raw_msg = row.message.expect("no message");
let parsed_msg = serde_json::from_value::<T>(raw_msg)?;
Ok(Some(Message {
msg_id: row.msg_id.expect("msg_id missing from queue table"),
vt: row.vt.expect("vt missing from queue table"),
read_ct: row.read_ct.expect("read_ct missing from queue table"),
enqueued_at: row
.enqueued_at
.expect("enqueued_at missing from queue table"),
message: parsed_msg,
}))
}
None => {
// no message found
Ok(None)
.fetch_all(&self.connection)
.await;

match result {
Err(sqlx::error::Error::RowNotFound) => Ok(None),
Err(e) => Err(e)?,
Ok(rows) => {
// happy path - successfully read messages
let mut messages: Vec<Message<T>> = Vec::new();
for row in rows.iter() {
let raw_msg = row.message.clone().expect("no message");
let parsed_msg = serde_json::from_value::<T>(raw_msg);
if let Err(e) = parsed_msg {
return Err(PgmqError::JsonParsingError(e));
} else if let Ok(parsed_msg) = parsed_msg {
messages.push(Message {
msg_id: row.msg_id.expect("msg_id missing from queue table"),
vt: row.vt.expect("vt missing from queue table"),
read_ct: row.read_ct.expect("read_ct missing from queue table"),
enqueued_at: row
.enqueued_at
.expect("enqueued_at missing from queue table"),
message: parsed_msg,
})
}
}
Ok(Some(messages))
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,16 @@ async fn test_extension_api() {
.expect("error reading message");
assert!(read_message.is_none());

// read with poll, blocks until message visible
let read_messages = queue
.read_batch_with_poll::<MyMessage>(&test_queue, 5, 1, Some(std::time::Duration::from_secs(6)), None)
.await
.expect("error reading message")
.expect("no message");

assert_eq!(read_messages.len(), 1);
assert_eq!(read_messages[0].msg_id, msg_id);

// change the VT to now
let _vt_set = queue
.set_vt::<MyMessage>(&test_queue, msg_id, 0)
Expand Down

0 comments on commit 14f095b

Please sign in to comment.