diff --git a/python/idsse_common/idsse/common/rabbitmq_utils.py b/python/idsse_common/idsse/common/rabbitmq_utils.py index b9e505d..83d9aae 100644 --- a/python/idsse_common/idsse/common/rabbitmq_utils.py +++ b/python/idsse_common/idsse/common/rabbitmq_utils.py @@ -11,6 +11,7 @@ # # ---------------------------------------------------------------------------------- +import json import logging import logging.config from collections.abc import Callable @@ -18,6 +19,7 @@ from pika import BasicProperties, ConnectionParameters, PlainCredentials from pika.adapters import BlockingConnection +from pika.channel import Channel from pika.adapters.blocking_connection import BlockingChannel from pika.frame import Method from pika.spec import Basic @@ -112,6 +114,38 @@ def _initialize_exchange_and_queue( return frame.method.queue +def _initialize_connection_and_channel( + connection: Conn | BlockingConnection, + params: RabbitMqParams, + channel: BlockingChannel | Channel | None = None, +) -> tuple[BlockingConnection, Channel, str]: + """Establish (or reuse) RabbitMQ connection, and declare exchange and queue on new Channel""" + if isinstance(connection, Conn): + # Use connection as parameters to establish new connection + _connection = connection.to_connection() + logger.info('Established new RabbitMQ connection to %s on port %i', + connection.host, connection.port) + elif isinstance(connection, BlockingConnection): + # Or existing open connection was provided, so use that + _connection = connection + else: + # connection of unsupported type passed + raise ValueError( + (f'Cannot use or create new RabbitMQ connection using type {type(connection)}. ' + 'Should be one of: [Conn, pika.BlockingConnection]') + ) + + if channel is None: + logger.info('Creating new RabbitMQ channel') + _channel = _connection.channel() + else: + _channel = channel + + queue_name = _initialize_exchange_and_queue(_channel, params) + + return _connection, _channel, queue_name + + def subscribe_to_queue( connection: Conn | BlockingConnection, params: RabbitMqParams, @@ -145,28 +179,9 @@ def subscribe_to_queue( tuple[BlockingConnection, BlockingChannel]: the connection and channel, which are now open and subscribed to the provided queue. """ - if isinstance(connection, Conn): - # Use connection as parameters to establish new connection - _connection = connection.to_connection() - logger.info('Established new RabbitMQ connection to %s on port %i', - connection.host, connection.port) - elif isinstance(connection, BlockingConnection): - # Or existing open connection was provided, so use that - _connection = connection - else: - # connection of unsupported type passed - raise ValueError( - (f'Cannot use or create new RabbitMQ connection using type {type(connection)}. ' - 'Should be one of: [Conn, pika.BlockingConnection]') - ) - - if channel is None: - logger.info('Creating new RabbitMQ channel') - _channel = _connection.channel() - else: - _channel = channel - - queue_name = _initialize_exchange_and_queue(_channel, params) + _connection, _channel, queue_name = _initialize_connection_and_channel( + connection, params, channel + ) # begin consuming messages auto_ack = queue_name == DIRECT_REPLY_QUEUE @@ -176,3 +191,64 @@ def subscribe_to_queue( _channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, auto_ack=auto_ack) return _connection, _channel + + +class PublisherSync: + """ + Uses a synchronous, blocking RabbitMQ connection to publish messages (no thread safety + or multithreading support). It's recommended that you gracefully close the connection when + you're done with it using close(). + + Args: + connection (Conn | BlockingConnection): connection parameters to establish a new + RabbitMQ connection, or existing RabbitMQ to reuse + params (RabbitMqParams): parameters for RabbitMQ exchange and queue on which to publish + messages + """ + def __init__( + self, + connection: Conn | BlockingConnection, + params: RabbitMqParams, + channel: Channel | None = None, + ) -> tuple[BlockingConnection, Channel]: + + # establish BlockingConnection and declare exchange and queue on Channel + self._connection, self._channel, self._queue_name = _initialize_connection_and_channel( + connection, params, channel, + ) + self._exchange_name = params.exchange.name + self._channel.confirm_delivery() # enable delivery confirmations from RabbitMQ broker + + def close(self): + """Cleanly close any open RabbitMQ connection and channel""" + def _close_connection(): + if self._channel: + self._channel.close() + self._connection.close() + + self._connection.add_callback_threadsafe(_close_connection) + + def publish_message(self, message: dict, routing_key='', corr_id: str | None = None) -> bool: + """Publish a message to the RabbitMQ queue. Non-blocking, and no delivery confirmation. + Returns False if message is invalid or could not be sent, but otherwise no validation. + + Args: + message (dict): message to publish. Must be valid JSON dictionary. + routing_key (str, optional): routing_key to route the message to correct consumer. + Defaults to ''. + corr_id (str | None, optional): correlation_id to tag message. Defaults to None. + + Returns: + bool: True if message was published to the queue + """ + try: + properties = BasicProperties(content_type='application/json', + content_encoding='utf-8', + correlation_id=corr_id) + self._channel.basic_publish(self._exchange_name, routing_key, + json.dumps(message, ensure_ascii=True), properties) + + return True + except Exception as exc: # pylint: disable=broad-exception-caught + logger.error('Publish message problem: [%s] %s', type(exc), str(exc)) + return False diff --git a/python/idsse_common/test/test_rabbitmq_utils.py b/python/idsse_common/test/test_rabbitmq_utils.py index ba96796..173bde3 100644 --- a/python/idsse_common/test/test_rabbitmq_utils.py +++ b/python/idsse_common/test/test_rabbitmq_utils.py @@ -19,7 +19,9 @@ from pika import BlockingConnection from pika.adapters import blocking_connection -from idsse.common.rabbitmq_utils import Conn, Exch, Queue, RabbitMqParams, subscribe_to_queue +from idsse.common.rabbitmq_utils import ( + Conn, Exch, Queue, RabbitMqParams, PublisherSync, subscribe_to_queue +) # Example data objects CONN = Conn('localhost', '/', port=5672, username='user', password='password') @@ -190,3 +192,26 @@ def test_default_exchange_does_not_try_to_declare_exchange( new_channel.exchange_declare.assert_not_called() new_channel.queue_declare.assert_called_once() new_channel.basic_consume.assert_called_once() + + +def test_simple_publisher(monkeypatch: MonkeyPatch, mock_connection: Mock): + # add mock to get Connnection callback to invoke immediately + mock_connection.add_callback_threadsafe = Mock(side_effect=lambda callback: callback()) + mock_blocking_connection = Mock(return_value=mock_connection) + monkeypatch.setattr( + 'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection + ) + + publisher = PublisherSync(CONN, RMQ_PARAMS) + mock_blocking_connection.assert_called_once() + _channel = mock_blocking_connection.return_value.channel + _channel.assert_called_once() + assert publisher._connection == mock_connection + + result = publisher.publish_message({'data': 123}) + assert result + _channel.return_value.basic_publish.assert_called_once() + + publisher.close() + _channel.return_value.close.assert_called_once() + mock_blocking_connection.return_value.close.assert_called_once()