Skip to content

Commit

Permalink
feat: idsse-763: simple pika RMQ publisher (#65)
Browse files Browse the repository at this point in the history
* add PublisherSync class to use built-in pika basic_publish
* add unit tests for PublisherSync, enable delivery confirmations
  • Loading branch information
mackenzie-grimes-noaa authored Jul 3, 2024
1 parent de05739 commit 0993a81
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 23 deletions.
120 changes: 98 additions & 22 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
#
# ----------------------------------------------------------------------------------

import json
import logging
import logging.config
from collections.abc import Callable
from typing import NamedTuple

from pika import BasicProperties, ConnectionParameters, PlainCredentials
from pika.adapters import BlockingConnection
from pika.channel import Channel
from pika.adapters.blocking_connection import BlockingChannel
from pika.frame import Method
from pika.spec import Basic
Expand Down Expand Up @@ -112,6 +114,38 @@ def _initialize_exchange_and_queue(
return frame.method.queue


def _initialize_connection_and_channel(
connection: Conn | BlockingConnection,
params: RabbitMqParams,
channel: BlockingChannel | Channel | None = None,
) -> tuple[BlockingConnection, Channel, str]:
"""Establish (or reuse) RabbitMQ connection, and declare exchange and queue on new Channel"""
if isinstance(connection, Conn):
# Use connection as parameters to establish new connection
_connection = connection.to_connection()
logger.info('Established new RabbitMQ connection to %s on port %i',
connection.host, connection.port)
elif isinstance(connection, BlockingConnection):
# Or existing open connection was provided, so use that
_connection = connection
else:
# connection of unsupported type passed
raise ValueError(
(f'Cannot use or create new RabbitMQ connection using type {type(connection)}. '
'Should be one of: [Conn, pika.BlockingConnection]')
)

if channel is None:
logger.info('Creating new RabbitMQ channel')
_channel = _connection.channel()
else:
_channel = channel

queue_name = _initialize_exchange_and_queue(_channel, params)

return _connection, _channel, queue_name


def subscribe_to_queue(
connection: Conn | BlockingConnection,
params: RabbitMqParams,
Expand Down Expand Up @@ -145,28 +179,9 @@ def subscribe_to_queue(
tuple[BlockingConnection, BlockingChannel]: the connection and channel, which are now open
and subscribed to the provided queue.
"""
if isinstance(connection, Conn):
# Use connection as parameters to establish new connection
_connection = connection.to_connection()
logger.info('Established new RabbitMQ connection to %s on port %i',
connection.host, connection.port)
elif isinstance(connection, BlockingConnection):
# Or existing open connection was provided, so use that
_connection = connection
else:
# connection of unsupported type passed
raise ValueError(
(f'Cannot use or create new RabbitMQ connection using type {type(connection)}. '
'Should be one of: [Conn, pika.BlockingConnection]')
)

if channel is None:
logger.info('Creating new RabbitMQ channel')
_channel = _connection.channel()
else:
_channel = channel

queue_name = _initialize_exchange_and_queue(_channel, params)
_connection, _channel, queue_name = _initialize_connection_and_channel(
connection, params, channel
)

# begin consuming messages
auto_ack = queue_name == DIRECT_REPLY_QUEUE
Expand All @@ -176,3 +191,64 @@ def subscribe_to_queue(
_channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback,
auto_ack=auto_ack)
return _connection, _channel


class PublisherSync:
"""
Uses a synchronous, blocking RabbitMQ connection to publish messages (no thread safety
or multithreading support). It's recommended that you gracefully close the connection when
you're done with it using close().
Args:
connection (Conn | BlockingConnection): connection parameters to establish a new
RabbitMQ connection, or existing RabbitMQ to reuse
params (RabbitMqParams): parameters for RabbitMQ exchange and queue on which to publish
messages
"""
def __init__(
self,
connection: Conn | BlockingConnection,
params: RabbitMqParams,
channel: Channel | None = None,
) -> tuple[BlockingConnection, Channel]:

# establish BlockingConnection and declare exchange and queue on Channel
self._connection, self._channel, self._queue_name = _initialize_connection_and_channel(
connection, params, channel,
)
self._exchange_name = params.exchange.name
self._channel.confirm_delivery() # enable delivery confirmations from RabbitMQ broker

def close(self):
"""Cleanly close any open RabbitMQ connection and channel"""
def _close_connection():
if self._channel:
self._channel.close()
self._connection.close()

self._connection.add_callback_threadsafe(_close_connection)

def publish_message(self, message: dict, routing_key='', corr_id: str | None = None) -> bool:
"""Publish a message to the RabbitMQ queue. Non-blocking, and no delivery confirmation.
Returns False if message is invalid or could not be sent, but otherwise no validation.
Args:
message (dict): message to publish. Must be valid JSON dictionary.
routing_key (str, optional): routing_key to route the message to correct consumer.
Defaults to ''.
corr_id (str | None, optional): correlation_id to tag message. Defaults to None.
Returns:
bool: True if message was published to the queue
"""
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)
self._channel.basic_publish(self._exchange_name, routing_key,
json.dumps(message, ensure_ascii=True), properties)

return True
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error('Publish message problem: [%s] %s', type(exc), str(exc))
return False
27 changes: 26 additions & 1 deletion python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from pika import BlockingConnection
from pika.adapters import blocking_connection

from idsse.common.rabbitmq_utils import Conn, Exch, Queue, RabbitMqParams, subscribe_to_queue
from idsse.common.rabbitmq_utils import (
Conn, Exch, Queue, RabbitMqParams, PublisherSync, subscribe_to_queue
)

# Example data objects
CONN = Conn('localhost', '/', port=5672, username='user', password='password')
Expand Down Expand Up @@ -190,3 +192,26 @@ def test_default_exchange_does_not_try_to_declare_exchange(
new_channel.exchange_declare.assert_not_called()
new_channel.queue_declare.assert_called_once()
new_channel.basic_consume.assert_called_once()


def test_simple_publisher(monkeypatch: MonkeyPatch, mock_connection: Mock):
# add mock to get Connnection callback to invoke immediately
mock_connection.add_callback_threadsafe = Mock(side_effect=lambda callback: callback())
mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr(
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)

publisher = PublisherSync(CONN, RMQ_PARAMS)
mock_blocking_connection.assert_called_once()
_channel = mock_blocking_connection.return_value.channel
_channel.assert_called_once()
assert publisher._connection == mock_connection

result = publisher.publish_message({'data': 123})
assert result
_channel.return_value.basic_publish.assert_called_once()

publisher.close()
_channel.return_value.close.assert_called_once()
mock_blocking_connection.return_value.close.assert_called_once()

0 comments on commit 0993a81

Please sign in to comment.