Skip to content

Commit

Permalink
Merge branch 'main' into pg17
Browse files Browse the repository at this point in the history
  • Loading branch information
ChuckHend authored Sep 28, 2024
2 parents 14c8799 + 1e953ef commit 8786a9a
Show file tree
Hide file tree
Showing 9 changed files with 1,201 additions and 248 deletions.
64 changes: 56 additions & 8 deletions tembo-pgmq-python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ Install with `pip` from pypi.org:
pip install tembo-pgmq-python
```

In order to use async version install with the optional dependecies:
To use the async version, install with the optional dependencies:

``` bash
```bash
pip install tembo-pgmq-python[async]
```


Dependencies:

Postgres running the [Tembo PGMQ extension](https://github.com/tembo-io/tembo/tree/main/pgmq).
- Postgres running the [Tembo PGMQ extension](https://github.com/tembo-io/tembo/tree/main/pgmq).

## Usage

Expand Down Expand Up @@ -51,7 +50,7 @@ queue = PGMQueue()

Initialization for the async version requires an explicit call of the initializer:

``` bash
```python
from tembo_pgmq_python.async_queue import PGMQueue

async def main():
Expand Down Expand Up @@ -81,11 +80,12 @@ queue = PGMQueue(
queue.create_queue("my_queue")
```

### or a partitioned queue
### Or create a partitioned queue

```python
queue.create_partitioned_queue("my_partitioned_queue", partition_interval=10000)
```

### List all queues

```python
Expand Down Expand Up @@ -128,14 +128,18 @@ The `read_with_poll` method allows you to repeatedly check for messages in the q
In the following example, the method will check for up to 5 messages in the queue `my_queue`, making the messages invisible for 30 seconds (`vt`), and will poll for a maximum of 5 seconds (`max_poll_seconds`) with intervals of 100 milliseconds (`poll_interval_ms`) between checks.

```python
read_messages: list[Message] = queue.read_with_poll("my_queue", vt=30, qty=5, max_poll_seconds=5, poll_interval_ms=100)
read_messages: list[Message] = queue.read_with_poll(
"my_queue", vt=30, qty=5, max_poll_seconds=5, poll_interval_ms=100
)
for message in read_messages:
print(message)
```

This method will continue polling until it either finds the specified number of messages (`qty`) or the `max_poll_seconds` duration is reached. The `poll_interval_ms` parameter controls the interval between successive polls, allowing you to avoid hammering the database with continuous queries.

### Archive the message after we're done with it. Archived messages are moved to an archive table
### Archive the message after we're done with it

Archived messages are moved to an archive table.

```python
archived: bool = queue.archive("my_queue", read_message.msg_id)
Expand Down Expand Up @@ -238,5 +242,49 @@ for metrics in all_metrics:
print(f"Scrape time: {metrics.scrape_time}")
```

### Optional Logging Configuration

You can enable verbose logging and specify a custom log filename.

```python
queue = PGMQueue(
host="0.0.0.0",
port="5432",
username="postgres",
password="postgres",
database="postgres",
verbose=True,
log_filename="my_custom_log.log"
)
```

# Using Transactions

To perform multiple operations within a single transaction, use the `@transaction` decorator from the `tembo_pgmq_python.decorators` module.
This ensures that all operations within the function are executed within the same transaction and are either committed together or rolled back if an error occurs.

First, import the transaction decorator:

```python
from tembo_pgmq_python.decorators import transaction
```

### Example: Transactional Operation

```python
@transaction
def transactional_operation(queue: PGMQueue, conn=None):
# Perform multiple queue operations within a transaction
queue.create_queue("transactional_queue", conn=conn)
queue.send("transactional_queue", {"message": "Hello, World!"}, conn=conn)

```
To execute the transaction:

```python
try:
transactional_operation(queue)
except Exception as e:
print(f"Transaction failed: {e}")
```
In this example, the transactional_operation function is decorated with `@transaction`, ensuring all operations inside it are part of a single transaction. If an error occurs, the entire transaction is rolled back automatically.
160 changes: 160 additions & 0 deletions tembo-pgmq-python/example/example_app_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import asyncio
from tembo_pgmq_python.async_queue import PGMQueue
from tembo_pgmq_python.decorators import async_transaction as transaction


async def main():
# Initialize the queue
queue = PGMQueue(
host="0.0.0.0",
port="5432",
username="postgres",
password="postgres",
database="postgres",
verbose=True,
log_filename="pgmq_async.log",
)
await queue.init()

test_queue = "transactional_queue_async"

# Clean up if the queue already exists
queues = await queue.list_queues()
if test_queue in queues:
await queue.drop_queue(test_queue)
await queue.create_queue(test_queue)

# Example messages
message1 = {"id": 1, "content": "First message"}
message2 = {"id": 2, "content": "Second message"}

# Transactional operation: send messages within a transaction
@transaction
async def transactional_operation(queue: PGMQueue, conn=None):
# Perform multiple queue operations within a transaction
await queue.send(test_queue, message1, conn=conn)
await queue.send(test_queue, message2, conn=conn)
# Transaction commits if no exception occurs

# Execute the transactional function (Success Case)
try:
await transactional_operation(queue)
print("Transaction committed successfully.")
except Exception as e:
print(f"Transaction failed: {e}")

# Read messages outside of the transaction
read_message1 = await queue.read(test_queue)
read_message2 = await queue.read(test_queue)
print("Messages read after transaction commit:")
if read_message1:
print(f"Message 1: {read_message1.message}")
if read_message2:
print(f"Message 2: {read_message2.message}")

# Purge the queue for the failure case
await queue.purge(test_queue)

# Transactional operation: simulate failure
@transaction
async def transactional_operation_failure(queue: PGMQueue, conn=None):
await queue.send(test_queue, message1, conn=conn)
await queue.send(test_queue, message2, conn=conn)
# Simulate an error to trigger rollback
raise Exception("Simulated failure")

# Execute the transactional function (Failure Case)
try:
await transactional_operation_failure(queue)
except Exception as e:
print(f"Transaction failed: {e}")

# Attempt to read messages after failed transaction
read_message = await queue.read(test_queue)
if read_message:
print("Message read after failed transaction (should not exist):")
print(read_message.message)
else:
print("No messages found after transaction rollback.")

# Simulate conditional rollback
await queue.purge(test_queue) # Clear the queue before the next test

@transaction
async def conditional_failure(queue: PGMQueue, conn=None):
# Send messages
msg_ids = await queue.send_batch(test_queue, [message1, message2], conn=conn)
print(f"Messages sent with IDs: {msg_ids}")
messages_in_queue = await queue.read_batch(test_queue, batch_size=10, conn=conn)
print(
f"Messages currently in queue before conditional failure: {messages_in_queue}"
)

# Conditional rollback based on number of messages
if len(messages_in_queue) > 3:
await queue.delete(
test_queue, msg_id=messages_in_queue[0].msg_id, conn=conn
)
print(
f"Message ID {messages_in_queue[0].msg_id} deleted within transaction."
)
else:
# Simulate failure if queue size is not greater than 3
print(
"Transaction failed: Not enough messages in queue to proceed with deletion."
)
raise Exception("Queue size too small to proceed.")

print("\n=== Executing Conditional Failure Scenario ===")
try:
await conditional_failure(queue)
except Exception as e:
print(f"Conditional Failure Transaction failed: {e}")

# Simulate success for conditional scenario
@transaction
async def conditional_success(queue: PGMQueue, conn=None):
# Send additional messages to ensure queue has more than 3 messages
additional_messages = [
{"id": 3, "content": "Third message"},
{"id": 4, "content": "Fourth message"},
]
msg_ids = await queue.send_batch(test_queue, additional_messages, conn=conn)
print(f"Additional messages sent with IDs: {msg_ids}")

# Read messages in queue
messages_in_queue = await queue.read_batch(test_queue, batch_size=10, conn=conn)
print(
f"Messages currently in queue before successful conditional deletion: {messages_in_queue}"
)

if len(messages_in_queue) > 3:
await queue.delete(
test_queue, msg_id=messages_in_queue[0].msg_id, conn=conn
)
print(
f"Message ID {messages_in_queue[0].msg_id} deleted within transaction."
)

print("\n=== Executing Conditional Success Scenario ===")
try:
await conditional_success(queue)
except Exception as e:
print(f"Conditional Success Transaction failed: {e}")

# Read messages after the conditional scenarios
read_messages = await queue.read_batch(test_queue, batch_size=10)
if read_messages:
print("Messages read after conditional scenarios:")
for msg in read_messages:
print(f"ID: {msg.msg_id}, Content: {msg.message}")
else:
print("No messages found after transactions.")

await queue.drop_queue(test_queue)
await queue.pool.close()


# Run the main function
if __name__ == "__main__":
asyncio.run(main())
Loading

0 comments on commit 8786a9a

Please sign in to comment.