Skip to content

Commit

Permalink
Merge pull request #74 from NOAA-GSL/chore/idsse-906/thread-safe-ack-…
Browse files Browse the repository at this point in the history
…nack

added thread safe callback
  • Loading branch information
Geary-Layne authored Sep 6, 2024
2 parents aad2df2 + ab850a8 commit 72673c2
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 5 deletions.
10 changes: 5 additions & 5 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def publish_message(self,

is_channel_ready = self._wait_for_channel_to_be_ready()
if not is_channel_ready:
logger.error('RabbitMQ channel not established for some reason. Cannnot publish')
logger.error('RabbitMQ channel not established for some reason. Cannot publish')
return False

logger.debug('DEBUG: channel is ready to publish message')
Expand All @@ -133,7 +133,7 @@ def publish_message(self,
except (AMQPChannelError, AMQPConnectionError) as exc:
# something wrong with RabbitMQ connection; destroy and recreate the daemon Thread
logger.warning('Publish message problem, restarting thread to re-attempt: (%s) %s',
type(exc), str(exc))
type(exc), str(exc))

# create new Thread, abandoning old one (it will shut itself down)
self._create_thread()
Expand Down Expand Up @@ -344,16 +344,16 @@ def _on_channel_open(self, channel: Channel):
self._channel.add_on_close_callback(self._on_channel_closed)

# Declare exchange on our new channel
exch_name, exch_type, exch_durable = self._rmq_params.exchange # pylint: disable=unused-variable
exch_name, exch_type, _ = self._rmq_params.exchange
logger.debug('Declaring exchange %s', exch_name)

# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(self._on_exchange_declareok, userdata=exch_name)
try:
self._channel.exchange_declare(exchange=exch_name,
exchange_type=exch_type,
callback=cb)
exchange_type=exch_type,
callback=cb)
except ValueError as exc:
logger.warning('RabbitMQ failed to declare exchange: (%s) %s', type(exc), str(exc))
if self._is_ready_future:
Expand Down
47 changes: 47 additions & 0 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,53 @@ def subscribe_to_queue(
return _connection, _channel


def threadsafe_call(connection, channel, *partial_functions):
"""This function provides a thread safe way to call pika functions (or functions that call
pika functions) from a thread other than the main. The need for this utility is practice of
executing function/method and separate thread to avoid blocking the rabbitMQ heartbeat
messages send by pika from the main thread.
Note: that `channel` must be the same pika channel instance via which
the message being ACKed was retrieved (AMQP protocol constraint).
Examples:
# Simple ack a message
threadsafe_call(self.connection, self.channel,
partial(self.channel.basic_ack,
delivery_tag=delivery_tag))
# RPC response followed and nack without requeueing
response = {'Error': 'Invalid request'}
threadsafe_call(self.connection, self.channel,
partial(self.channel.basic_publish,
exchange='',
routing_key=response_props.reply_to,
properties=response_props,
body=json.dumps(response)),
partial(channel.basic_nack,
delivery_tag=delivery_tag,
requeue=False))
# Publishing message via the PublishConfirm utility
threadsafe_call(self.connection, self.pub_conf.channel,
partial(self.pub_conf.publish_message,
message=message))
Args:
connection (BlockingConnection): RabbitMQ connection.
channel (BlockingChannel): RabbitMQ channel.
partial_functions (Callable): One or more callable function (typically created via
functools.partial)
"""
def call_if_channel_is_open():
if channel.is_open:
for func in partial_functions:
func()
else:
logger.error('Channel closed before callback could be run')
raise ConnectionError('RabbitMQ Channel is closed')
connection.add_callback_threadsafe(call_if_channel_is_open)


class PublisherSync:
"""
Uses a synchronous, blocking RabbitMQ connection to publish messages (no thread safety
Expand Down

0 comments on commit 72673c2

Please sign in to comment.