Skip to content

Commit

Permalink
PublisherSync: support for private queues (#70)
Browse files Browse the repository at this point in the history
* add stop() method to PublisherSync
* support message TTL for private queue name
  • Loading branch information
mackenzie-grimes-noaa authored Jul 25, 2024
1 parent 4412130 commit e83485c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 3 deletions.
7 changes: 6 additions & 1 deletion python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,15 @@ def _initialize_exchange_and_queue(
if queue.name.startswith('amq.rabbitmq.'):
return queue.name

# If we have a 'private' queue, i.e. one used to support message publishing, not consumed
# Set message time-to-live (TTL) to 10 seconds
arguments = {'x-message-ttl': 10 * 1000} if queue.name.startswith('_') else None
frame: Method = channel.queue_declare(
queue=queue.name,
exclusive=queue.exclusive,
durable=queue.durable,
auto_delete=queue.auto_delete
auto_delete=queue.auto_delete,
arguments=arguments
)

# Bind queue to exchange with routing_key. May need to support multiple keys in the future
Expand Down Expand Up @@ -211,6 +215,7 @@ def __init__(
conn_params: Conn,
rmq_params: RabbitMqParams,
channel: Channel | None = None,

) -> tuple[BlockingConnection, Channel]:
# save params
self._conn_params = conn_params
Expand Down
35 changes: 33 additions & 2 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,16 @@ def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock
_channel.exchange_declare.assert_called_once_with(
exchange=RMQ_PARAMS.exchange.name,
exchange_type=RMQ_PARAMS.exchange.type,
durable=RMQ_PARAMS.exchange.durable
durable=RMQ_PARAMS.exchange.durable,
)

# assert queue was declared and bound
_channel.queue_declare.assert_called_once_with(
queue=RMQ_PARAMS.queue.name,
exclusive=RMQ_PARAMS.queue.exclusive,
durable=RMQ_PARAMS.queue.durable,
auto_delete=RMQ_PARAMS.queue.auto_delete
auto_delete=RMQ_PARAMS.queue.auto_delete,
arguments=None
)

_channel.queue_bind.assert_called_once_with(
Expand All @@ -119,6 +120,35 @@ def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock
)



def test_private_queue_sets_ttl(monkeypatch: MonkeyPatch, mock_connection: Mock):
mock_blocking_connection = Mock(return_value=mock_connection)
monkeypatch.setattr(
'idsse.common.rabbitmq_utils.BlockingConnection', mock_blocking_connection
)
example_queue = Queue('_my_private_queue', 'route_key', True, False, True)

# run method
mock_callback_function = Mock()
_connection, _channel = subscribe_to_queue(
CONN,
RabbitMqParams(RMQ_PARAMS.exchange, example_queue),
mock_callback_function)

# assert correct (mocked) pika calls were made
mock_blocking_connection.assert_called_once()
_channel.basic_consume.assert_called_once()

# assert queue was declared with message time-to-live of 10 seconds
_channel.queue_declare.assert_called_once_with(
queue=example_queue.name,
exclusive=example_queue.exclusive,
durable=example_queue.durable,
auto_delete=example_queue.auto_delete,
arguments={'x-message-ttl': 10000}
)


def test_passing_connection_does_not_create_new(mock_connection):
mock_connection.__class__ = BlockingConnection # set mock type to pika.BlockingConnection
mock_callback_function = Mock(name='on_message_callback')
Expand Down Expand Up @@ -175,6 +205,7 @@ def test_direct_reply_does_not_declare_queue(
new_channel.queue_bind.assert_not_called()
new_channel.basic_consume.assert_called_once()


def test_default_exchange_does_not_declare_exchange(
monkeypatch: MonkeyPatch, mock_connection: Mock
):
Expand Down

0 comments on commit e83485c

Please sign in to comment.