Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: idsse-795: PublishConfirm setup timeout #67

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions python/idsse_common/idsse/common/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ def get_default_log_config(level: str,
'loggers': {
'': {
'level': level,
# 'handlers': ['default', 'rabbit']
'handlers': ['default']
'handlers': ['default'], # , 'rabbit']
},
}
}
113 changes: 69 additions & 44 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import logging.config
import json
import time
from collections.abc import Callable
from concurrent.futures import Future
from dataclasses import dataclass, field
from random import randint
from threading import Thread, Event
from threading import Thread
from typing import NamedTuple, cast

from pika import SelectConnection, BasicProperties
Expand Down Expand Up @@ -87,7 +87,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
self._rmq_params = PublishConfirmParams(conn, exchange, queue)

self._records = PublishConfirmRecords() # data class to track message activity
self._on_ready_callback: Callable[[], None] | None = None
self._is_ready_future: Future | None = None

def publish_message(self,
message: dict,
Expand All @@ -110,10 +110,12 @@ def publish_message(self,
Raises:
RuntimeError: if channel is uninitialized (start() not completed yet) or is closed
"""
self._wait_for_channel_to_be_ready()
logger.info('DEBUG: channel is ready to publish message')
is_channel_ready = self._wait_for_channel_to_be_ready()
if not is_channel_ready:
logger.error('RabbitMQ channel not established for some reason. Cannnot publish')
return False

# We expect a JSON message format, do a check here...
logger.debug('DEBUG: channel is ready to publish message')
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
Expand All @@ -132,22 +134,33 @@ def publish_message(self,
return True

except Exception as e: # pylint: disable=broad-exception-caught
logger.error('Publish message problem : %s', str(e))
logger.error('Publish message problem : (%s) %s', type(e), str(e))
return False

def start(self):
"""Start thread to connect to RabbitMQ queue and prepare to publish messages, invoking
callback when setup complete.
def start(self, is_ready: Future | None = None):
"""
Start thread to handle PublicConfirm operations, such as connect to RabbitMQ, declare
a queue and enable delivery confirmation. If not None, ```is_ready``` will be resolved
to True when setup is complete and messages can be published.

Args:
is_ready (Future | None): optional Python Future that will be resolved once
PublishConfirm connection & channel are ready to publish messages, or raise an
exception if some issue is encountered on setup. Default is None
Raises:
RuntimeError: if PublishConfirm thread is already running
"""
logger.debug('Starting thread')
logger.info('Starting thread with callback %s. is_alive? %s, self._channel: %s',
is_ready, self._thread.is_alive(), self._channel)

if is_ready is not None:
self._is_ready_future = is_ready # to be invoked after all pika setup is done

# not possible to start Thread when it's already running
if self._thread.is_alive() or (self._connection is not None and self._connection.is_open):
raise RuntimeError('PublishConfirm thread already running, cannot be started')
self._start()

self._thread.start()

def stop(self):
"""Stop the example by closing the channel and connection. We
Expand All @@ -162,6 +175,8 @@ def stop(self):
self._close_connection()
self._stopping = False # done stopping



def _run(self):
"""Run a new thread: get a new RMQ connection, and start looping until stop() is called"""
self._connection = self._create_connection()
Expand All @@ -175,20 +190,6 @@ def _run(self):
# Finish closing
self._connection.ioloop.start()

def _start(self, callback: Callable[[], None] | None = None):
"""
Start a thread to handle PublishConfirm operations

Args:
callback (Callable[[], None] | None): callback function to be invoked
once instance is ready to publish messages (all RabbitMQ connection and channel
are set up, delivery confirmation is enabled, etc.). Default to None.
"""
logger.debug('Starting thread with callback')
if callback is not None:
self._on_ready_callback = callback # to be invoked after all pika setup is done
self._thread.start()

def _create_connection(self):
"""This method connects to RabbitMQ, returning the connection handle.
When the connection is established, the on_connection_open method
Expand All @@ -203,29 +204,48 @@ def _create_connection(self):
on_open_error_callback=self._on_connection_open_error,
on_close_callback=self._on_connection_closed)

def _wait_for_channel_to_be_ready(self) -> None:
def _wait_for_channel_to_be_ready(self, timeout: float | None = 6) -> bool:
"""If connection or channel are not open, start the PublishConfirm to do needed
RabbitMQ setup. This method will not return until channel is confirmed ready for use"""
RabbitMQ setup. This method will not return until channel is confirmed ready for use,
or timeout is exceeded.

Args:
timeout (optional, float): Duration of time, in seconds, to wait for RabbitMQ
connection, channel, exchange and queue to be setup and ready to send messages.
If timeout is None, thread will wait indefinitely. Default is 6 seconds.

Returns:
bool: True if channel is ready. False if timed out waiting for RabbitMQ to connect
"""

# validate that PublishConfirm thread has been set up and connected to RabbitMQ
logger.info('DEBUG _wait_for_channel_to_be_ready state')
logger.info(self._connection)
logger.info(self._channel)
logger.info('----------------------')
if not (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
logger.info('Channel is not ready to publish, calling _start() now')
logger.debug('DEBUG _wait_for_channel_to_be_ready state')
logger.debug(self._connection)
logger.debug(self._channel)
logger.debug('----------------------')

# pass callback to flip is_ready flag, and block until flag changes
is_ready = Event()
if (self._connection and self._connection.is_open
and self._channel and self._channel.is_open):
return True # connection and channel already open, no setup needed

logger.info('calling _start() with callback')
self._start(callback=is_ready.set)
logger.info('Channel is not ready to publish, calling start() now')
# pass callback to flip is_ready flag, and block until flag changes
is_ready_future = Future()

logger.info('waiting for is_ready flag to be set')
is_ready.wait()
logger.debug('calling start() with callback')
self.start(is_ready=is_ready_future)

logger.info('waiting for is_ready flag to be set')
try:
is_ready_future.result(timeout=timeout)
logger.info('Connection and channel setup complete, ready to publish message')
return True
except TimeoutError:
logger.error('Timed out waiting for RabbitMQ connection, channel, or exchange')
return False
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error('RabbitMQ rejected connection for some reason: %s', str(exc))
return False

def _on_connection_open(self, connection: SelectConnection):
"""This method is called by pika once the connection to RabbitMQ has been established.
Expand Down Expand Up @@ -287,9 +307,14 @@ def _on_channel_open(self, channel: Channel):
# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(self._on_exchange_declareok, userdata=exch_name)
self._channel.exchange_declare(exchange=exch_name,
try:
self._channel.exchange_declare(exchange=exch_name,
exchange_type=exch_type,
callback=cb)
except ValueError as exc:
logger.warning('RabbitMQ failed to declare exchange: (%s) %s', type(exc), str(exc))
if self._is_ready_future:
self._is_ready_future.set_exception(exc) # notify caller that we could not connect

def _on_channel_closed(self, channel: Channel, reason: Exception):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Expand Down Expand Up @@ -365,8 +390,8 @@ def _on_bindok(self, _unused_frame: Method):
self._channel.confirm_delivery(self._on_delivery_confirmation)

# notify up that channel can now be published to
if self._on_ready_callback:
self._on_ready_callback()
if self._is_ready_future:
self._is_ready_future.set_result(True)

# self.schedule_next_message()

Expand Down
59 changes: 48 additions & 11 deletions python/idsse_common/test/test_publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from time import sleep
from collections.abc import Callable
from concurrent.futures import Future
from copy import deepcopy
from typing import Any, NamedTuple, Self
from unittest.mock import Mock

Expand Down Expand Up @@ -177,6 +179,19 @@ def mock_confirm_delivery(self: context.Channel, callback: Callable[[Method], No
assert publish_confirm._records.acked == 0


def test_wait_for_channel_to_be_ready_timeout(publish_confirm: PublishConfirm, context: MockPika):
# start() doesn't call its callback in time (at all), so timeout should expire
publish_confirm.start = Mock(side_effect=lambda is_ready: None)

# run wait_for_channel which should timeout waiting for Future to resolve
channel_is_ready = publish_confirm._wait_for_channel_to_be_ready(timeout=0.3)
assert not channel_is_ready
publish_confirm.start.assert_called_once()

# teardown by undoing our hacky mock
publish_confirm.start = PublishConfirm.start


def test_publish_message_success_without_calling_start(monkeypatch: MonkeyPatch, context: MockPika):
monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection)
pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)
Expand Down Expand Up @@ -205,6 +220,9 @@ def test_publish_message_failure_rmq_error(publish_confirm: PublishConfirm, cont
assert publish_confirm._records.message_number == 0
assert len(publish_confirm._records.deliveries) == 0

# teardown our ad-hoc mocking of PublishConfirm instance
publish_confirm.start = PublishConfirm.start


def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika):
publish_confirm._connection = context.SelectConnection(None, Mock(), Mock(), Mock())
Expand All @@ -216,20 +234,38 @@ def test_on_channel_closed(publish_confirm: PublishConfirm, context: MockPika):
assert publish_confirm._connection.is_closed


def test_start_with_callback(publish_confirm: PublishConfirm):
example_message = {'callback_executed': True}
def test_start_with_future(publish_confirm: PublishConfirm):
is_channel_ready = Future()
assert publish_confirm._channel is None

def test_callback():
assert publish_confirm._channel.is_open
success = publish_confirm.publish_message(message=example_message)
assert success
# run test
publish_confirm.start(is_channel_ready)
assert is_channel_ready.result(timeout=5)

assert publish_confirm._channel is None
publish_confirm._start(test_callback)
# teardown
publish_confirm.stop()

sleep(.1) # ensure that our test's callback has time to run and send its message
assert publish_confirm._records.message_number == 1
assert publish_confirm._records.deliveries[1] == example_message

def test_start_future_raises_exception(monkeypatch: MonkeyPatch, context: MockPika):
# set up mock to fail RabbitMQ exchange declare step
monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection)
pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)

original_channel_class = context.Channel
mock_channel = deepcopy(context.Channel)
mock_channel.exchange_declare = Mock(
side_effect=ValueError('Precondition failed: exchange did not match')
)
context.Channel = mock_channel

# run test
is_channel_ready = Future()
pub_conf.start(is_ready=is_channel_ready)
exc = is_channel_ready.exception()
assert isinstance(exc, ValueError) and 'Precondition failed' in str(exc.args[0])

# teardown hacky test mock
context.Channel = original_channel_class


def test_start_without_callback_sleeps(publish_confirm: PublishConfirm, monkeypatch: MonkeyPatch):
Expand Down Expand Up @@ -259,6 +295,7 @@ def test_wait_for_channel_returns_when_ready(monkeypatch: MonkeyPatch, context:
pub_conf._wait_for_channel_to_be_ready()
assert pub_conf._channel is not None and pub_conf._channel.is_open


def test_calling_start_twice_raises_error(monkeypatch: MonkeyPatch, context: MockPika):
monkeypatch.setattr('idsse.common.publish_confirm.SelectConnection', context.SelectConnection)
pub_conf = PublishConfirm(conn=EXAMPLE_CONN, exchange=EXAMPLE_EXCH, queue=EXAMPLE_QUEUE)
Expand Down
Loading