From c91899666046fdb732156638b282e2c5235b99eb Mon Sep 17 00:00:00 2001 From: Mackenzie Grimes - NOAA Affiliate <136493179+mackenzie-grimes-noaa@users.noreply.github.com> Date: Tue, 16 Jul 2024 14:08:45 -0600 Subject: [PATCH] bug: idsse-795: PublishConfirm setup timeout (#67) use Future to notify that PublishConfirm RabbitMQ setup is ready, or pass Exception if thrown --- python/idsse_common/idsse/common/log_util.py | 3 +- .../idsse/common/publish_confirm.py | 113 +++++++++++------- .../idsse_common/test/test_publish_confirm.py | 59 +++++++-- 3 files changed, 118 insertions(+), 57 deletions(-) diff --git a/python/idsse_common/idsse/common/log_util.py b/python/idsse_common/idsse/common/log_util.py index 10be56c..2f0e223 100644 --- a/python/idsse_common/idsse/common/log_util.py +++ b/python/idsse_common/idsse/common/log_util.py @@ -156,8 +156,7 @@ def get_default_log_config(level: str, 'loggers': { '': { 'level': level, - # 'handlers': ['default', 'rabbit'] - 'handlers': ['default'] + 'handlers': ['default'], # , 'rabbit'] }, } } diff --git a/python/idsse_common/idsse/common/publish_confirm.py b/python/idsse_common/idsse/common/publish_confirm.py index 842169f..66881b7 100644 --- a/python/idsse_common/idsse/common/publish_confirm.py +++ b/python/idsse_common/idsse/common/publish_confirm.py @@ -17,10 +17,10 @@ import logging.config import json import time -from collections.abc import Callable +from concurrent.futures import Future from dataclasses import dataclass, field from random import randint -from threading import Thread, Event +from threading import Thread from typing import NamedTuple, cast from pika import SelectConnection, BasicProperties @@ -87,7 +87,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue): self._rmq_params = PublishConfirmParams(conn, exchange, queue) self._records = PublishConfirmRecords() # data class to track message activity - self._on_ready_callback: Callable[[], None] | None = None + self._is_ready_future: Future | None = None def publish_message(self, message: dict, @@ -110,10 +110,12 @@ def publish_message(self, Raises: RuntimeError: if channel is uninitialized (start() not completed yet) or is closed """ - self._wait_for_channel_to_be_ready() - logger.info('DEBUG: channel is ready to publish message') + is_channel_ready = self._wait_for_channel_to_be_ready() + if not is_channel_ready: + logger.error('RabbitMQ channel not established for some reason. Cannnot publish') + return False - # We expect a JSON message format, do a check here... + logger.debug('DEBUG: channel is ready to publish message') try: properties = BasicProperties(content_type='application/json', content_encoding='utf-8', @@ -132,22 +134,33 @@ def publish_message(self, return True except Exception as e: # pylint: disable=broad-exception-caught - logger.error('Publish message problem : %s', str(e)) + logger.error('Publish message problem : (%s) %s', type(e), str(e)) return False - def start(self): - """Start thread to connect to RabbitMQ queue and prepare to publish messages, invoking - callback when setup complete. + def start(self, is_ready: Future | None = None): + """ + Start thread to handle PublicConfirm operations, such as connect to RabbitMQ, declare + a queue and enable delivery confirmation. If not None, ```is_ready``` will be resolved + to True when setup is complete and messages can be published. + Args: + is_ready (Future | None): optional Python Future that will be resolved once + PublishConfirm connection & channel are ready to publish messages, or raise an + exception if some issue is encountered on setup. Default is None Raises: RuntimeError: if PublishConfirm thread is already running """ - logger.debug('Starting thread') + logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s', + is_ready, self._thread.is_alive(), self._channel) + + if is_ready is not None: + self._is_ready_future = is_ready # to be invoked after all pika setup is done # not possible to start Thread when it's already running if self._thread.is_alive() or (self._connection is not None and self._connection.is_open): raise RuntimeError('PublishConfirm thread already running, cannot be started') - self._start() + + self._thread.start() def stop(self): """Stop the example by closing the channel and connection. We @@ -162,6 +175,8 @@ def stop(self): self._close_connection() self._stopping = False # done stopping + + def _run(self): """Run a new thread: get a new RMQ connection, and start looping until stop() is called""" self._connection = self._create_connection() @@ -175,20 +190,6 @@ def _run(self): # Finish closing self._connection.ioloop.start() - def _start(self, callback: Callable[[], None] | None = None): - """ - Start a thread to handle PublishConfirm operations - - Args: - callback (Callable[[], None] | None): callback function to be invoked - once instance is ready to publish messages (all RabbitMQ connection and channel - are set up, delivery confirmation is enabled, etc.). Default to None. - """ - logger.debug('Starting thread with callback') - if callback is not None: - self._on_ready_callback = callback # to be invoked after all pika setup is done - self._thread.start() - def _create_connection(self): """This method connects to RabbitMQ, returning the connection handle. When the connection is established, the on_connection_open method @@ -203,29 +204,48 @@ def _create_connection(self): on_open_error_callback=self._on_connection_open_error, on_close_callback=self._on_connection_closed) - def _wait_for_channel_to_be_ready(self) -> None: + def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool: """If connection or channel are not open, start the PublishConfirm to do needed - RabbitMQ setup. This method will not return until channel is confirmed ready for use""" + RabbitMQ setup. This method will not return until channel is confirmed ready for use, + or timeout is exceeded. + + Args: + timeout (optional, float): Duration of time, in seconds, to wait for RabbitMQ + connection, channel, exchange and queue to be setup and ready to send messages. + If timeout is None, thread will wait indefinitely. Default is 6 seconds. + + Returns: + bool: True if channel is ready. False if timed out waiting for RabbitMQ to connect + """ # validate that PublishConfirm thread has been set up and connected to RabbitMQ - logger.info('DEBUG _wait_for_channel_to_be_ready state') - logger.info(self._connection) - logger.info(self._channel) - logger.info('----------------------') - if not (self._connection and self._connection.is_open - and self._channel and self._channel.is_open): - logger.info('Channel is not ready to publish, calling _start() now') + logger.debug('DEBUG _wait_for_channel_to_be_ready state') + logger.debug(self._connection) + logger.debug(self._channel) + logger.debug('----------------------') - # pass callback to flip is_ready flag, and block until flag changes - is_ready = Event() + if (self._connection and self._connection.is_open + and self._channel and self._channel.is_open): + return True # connection and channel already open, no setup needed - logger.info('calling _start() with callback') - self._start(callback=is_ready.set) + logger.info('Channel is not ready to publish, calling start() now') + # pass callback to flip is_ready flag, and block until flag changes + is_ready_future = Future() - logger.info('waiting for is_ready flag to be set') - is_ready.wait() + logger.debug('calling start() with callback') + self.start(is_ready=is_ready_future) + logger.info('waiting for is_ready flag to be set') + try: + is_ready_future.result(timeout=timeout) logger.info('Connection and channel setup complete, ready to publish message') + return True + except TimeoutError: + logger.error('Timed out waiting for RabbitMQ connection, channel, or exchange') + return False + except Exception as exc: # pylint: disable=broad-exception-caught + logger.error('RabbitMQ rejected connection for some reason: %s', str(exc)) + return False def _on_connection_open(self, connection: SelectConnection): """This method is called by pika once the connection to RabbitMQ has been established. @@ -287,9 +307,14 @@ def _on_channel_open(self, channel: Channel): # Note: using functools.partial is not required, it is demonstrating # how arbitrary data can be passed to the callback when it is called cb = functools.partial(self._on_exchange_declareok, userdata=exch_name) - self._channel.exchange_declare(exchange=exch_name, + try: + self._channel.exchange_declare(exchange=exch_name, exchange_type=exch_type, callback=cb) + except ValueError as exc: + logger.warning('RabbitMQ failed to declare exchange: (%s) %s', type(exc), str(exc)) + if self._is_ready_future: + self._is_ready_future.set_exception(exc) # notify caller that we could not connect def _on_channel_closed(self, channel: Channel, reason: Exception): """Invoked by pika when RabbitMQ unexpectedly closes the channel. @@ -365,8 +390,8 @@ def _on_bindok(self, _unused_frame: Method): self._channel.confirm_delivery(self._on_delivery_confirmation) # notify up that channel can now be published to - if self._on_ready_callback: - self._on_ready_callback() + if self._is_ready_future: + self._is_ready_future.set_result(True) # self.schedule_next_message() diff --git a/python/idsse_common/test/test_publish_confirm.py b/python/idsse_common/test/test_publish_confirm.py index e9262e9..9ac892f 100644 --- a/python/idsse_common/test/test_publish_confirm.py +++ b/python/idsse_common/test/test_publish_confirm.py @@ -14,6 +14,8 @@ from time import sleep from collections.abc import Callable +from concurrent.futures import Future +from copy import deepcopy from typing import Any, NamedTuple, Self from unittest.mock import Mock @@ -177,6 +179,19 @@ def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], No assert publish_confirm._records.acked == 0 +def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm, context: MockPika): + # start() doesn't call its callback in time (at all), so timeout should expire + publish_confirm.start = Mock(side_effect=lambda is_ready: None) + + # run wait_for_channel which should timeout waiting for Future to resolve + channel_is_ready = publish_confirm._wait_for_channel_to_be_ready(timeout=0.3) + assert not channel_is_ready + publish_confirm.start.assert_called_once() + + # teardown by undoing our hacky mock + publish_confirm.start = PublishConfirm.start + + def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, context: MockPika): monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) @@ -205,6 +220,9 @@ def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, cont assert publish_confirm._records.message_number == 0 assert len(publish_confirm._records.deliveries) == 0 + # teardown our ad-hoc mocking of PublishConfirm instance + publish_confirm.start = PublishConfirm.start + def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika): publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock()) @@ -216,20 +234,38 @@ def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika): assert publish_confirm._connection.is_closed -def test_start_with_callback(publish_confirm: PublishConfirm): - example_message = {'callback_executed': True} +def test_start_with_future(publish_confirm: PublishConfirm): + is_channel_ready = Future() + assert publish_confirm._channel is None - def test_callback(): - assert publish_confirm._channel.is_open - success = publish_confirm.publish_message(message=example_message) - assert success + # run test + publish_confirm.start(is_channel_ready) + assert is_channel_ready.result(timeout=5) - assert publish_confirm._channel is None - publish_confirm._start(test_callback) + # teardown + publish_confirm.stop() - sleep(.1) # ensure that our test's callback has time to run and send its message - assert publish_confirm._records.message_number == 1 - assert publish_confirm._records.deliveries[1] == example_message + +def test_start_future_raises_exception(monkeypatch: MonkeyPatch, context: MockPika): + # set up mock to fail RabbitMQ exchange declare step + monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) + pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE) + + original_channel_class = context.Channel + mock_channel = deepcopy(context.Channel) + mock_channel.exchange_declare = Mock( + side_effect=ValueError('Precondition failed: exchange did not match') + ) + context.Channel = mock_channel + + # run test + is_channel_ready = Future() + pub_conf.start(is_ready=is_channel_ready) + exc = is_channel_ready.exception() + assert isinstance(exc, ValueError) and 'Precondition failed' in str(exc.args[0]) + + # teardown hacky test mock + context.Channel = original_channel_class def test_start_without_callback_sleeps(publish_confirm: PublishConfirm, monkeypatch: MonkeyPatch): @@ -259,6 +295,7 @@ def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context: pub_conf._wait_for_channel_to_be_ready() assert pub_conf._channel is not None and pub_conf._channel.is_open + def test_calling_start_twice_raises_error(monkeypatch: MonkeyPatch, context: MockPika): monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection) pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)