Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to handle automatic logging to a rabbitmq exchange log. The c… #54

Merged
merged 6 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions python/idsse_common/idsse/common/log_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def set_corr_id_context_var(
Args:
originator (str): Function, class, service name, etc. that is using logging module
key (Optional[uuid.UUID]): a UUID. Default: randomly generated UUID.
issue_dt (Optional[Union[str, datettime]]): Datetime when a relevant forecast was issued
issue_dt (Optional[Union[str, datetime]]): Datetime when a relevant forecast was issued
"""
if not key:
key = uuid.uuid4()
Expand Down Expand Up @@ -88,7 +88,7 @@ class UTCFormatter(logging.Formatter):

def get_default_log_config(level: str, with_corr_id=True):
"""
Get standardized python logging config (formatters, handlers directing to stdout, etc.)
Get standardized python logging config (formatters, handlers directing to stdout, rabbitmq etc.)
as a dictionary. This dictionary can be passed directly to logging.config.dictConfig:

import logging
Expand Down Expand Up @@ -137,11 +137,20 @@ def get_default_log_config(level: str, with_corr_id=True):
'formatter': 'standard',
'filters': filter_list,
},
'rabbit': {
'class': 'python_logging_rabbitmq.RabbitMQHandler',
'host': 'localhost',
'port': 5672,
Comment on lines +142 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these values work when running on AWS in k8s?

'formatter': 'standard',
'filters': filter_list,
'level': 'ERROR',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about this being hard coded. Probably fine for now but might want to address in a future log_utils cleanup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we could parameterize the function for these elements and set the default values.

'declare_exchange': True,
},
},
'loggers': {
'': {
'level': level,
'handlers': ['default', ],
'handlers': ['default', 'rabbit'],
},
}
}
23 changes: 13 additions & 10 deletions python/idsse_common/idsse/common/publish_confirm.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class PublishConfirm:
be closed, which usually are tied to permission related issues or
socket timeouts.
"""

def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
"""Setup the example publisher object, passing in the RabbitMqUtils we will use to
connect to RabbitMQ.
Expand All @@ -74,7 +75,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):
a "private queue", i.e. not intended for consumers, and all published messages
will have a 10-second TTL.
"""
self._thread = Thread(name=f'PublishConfirm-{randint(0,9)}',
self._thread = Thread(name=f'PublishConfirm-{randint(0, 9)}',
daemon=True,
target=self._run)

Expand All @@ -89,7 +90,7 @@ def __init__(self, conn: Conn, exchange: Exch, queue: Queue):

def publish_message(self,
message: Dict,
routing_key = '',
routing_key='',
corr_id: Optional[str] = None) -> bool:
"""If the class is not stopping, publish a message to RabbitMQ,
appending a list of deliveries with the message number that was sent.
Expand All @@ -113,8 +114,8 @@ def publish_message(self,
# We expect a JSON message format, do a check here...
try:
properties = BasicProperties(content_type='application/json',
content_encoding='utf-8',
correlation_id=corr_id)
content_encoding='utf-8',
correlation_id=corr_id)

logger.info('Publishing message to queue %s, message length: %d',
self._rmq_params.queue.name, len(json.dumps(message)))
Expand All @@ -124,8 +125,8 @@ def publish_message(self,
self._records.message_number += 1
self._records.deliveries[self._records.message_number] = message
logger.debug('Published message # %i to exchange %s, queue %s, routing_key %s',
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)
self._records.message_number, self._rmq_params.exchange.name,
self._rmq_params.queue.name, routing_key)
return True

except Exception as e: # pylint: disable=broad-exception-caught
Expand Down Expand Up @@ -269,14 +270,16 @@ def _on_channel_open(self, channel: Channel):
logger.debug('Adding channel close callback')
self._channel.add_on_close_callback(self._on_channel_closed)

# Decleare exchange on our new channel
exch_name, exch_type = self._rmq_params.exchange
# Declare exchange on our new channel
exch_name, exch_type, exch_durable = self._rmq_params.exchange # pylint: disable=unused-variable
logger.debug('Declaring exchange %s', exch_name)

# Note: using functools.partial is not required, it is demonstrating
# how arbitrary data can be passed to the callback when it is called
cb = functools.partial(self._on_exchange_declareok, userdata=exch_name)
self._channel.exchange_declare(exchange=exch_name, exchange_type=exch_type, callback=cb)
self._channel.exchange_declare(exchange=exch_name,
exchange_type=exch_type,
callback=cb)

def _on_channel_closed(self, channel: Channel, reason: Exception):
"""Invoked by pika when RabbitMQ unexpectedly closes the channel.
Expand Down Expand Up @@ -378,7 +381,7 @@ def _on_delivery_confirmation(self, method_frame: Method):
delivery_tag = method.delivery_tag

logger.info('Received %s for delivery tag: %i (multiple: %s)',
confirmation_type, delivery_tag, ack_multiple)
confirmation_type, delivery_tag, ack_multiple)

if confirmation_type == 'ack':
self._records.acked += 1
Expand Down
9 changes: 5 additions & 4 deletions python/idsse_common/idsse/common/rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class Exch(NamedTuple):
"""An internal data class for holding the RabbitMQ exchange info"""
name: str
type: str
durable: bool = True


class Queue(NamedTuple):
Expand Down Expand Up @@ -89,7 +90,7 @@ def _initialize_exchange_and_queue(

# Do not try to declare the default exchange. It already exists
if exch.name != '':
channel.exchange_declare(exchange=exch.name, exchange_type=exch.type)
channel.exchange_declare(exchange=exch.name, exchange_type=exch.type, durable=exch.durable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other args that we should be including, ie autoDelete and durable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They haven't been necessary until now so probably not.


# Do not try to declare or bind built-in queues. They are pseudo-queues that already exist
if queue.name.startswith('amq.rabbitmq.'):
Expand Down Expand Up @@ -121,7 +122,7 @@ def subscribe_to_queue(
queue if needed, and invoking the provided callback when a message is received.

If an existing BlockingConnection or BlockingChannel are passed, they are used to
setup the subscription, but by default a new connection and channel will be established and
set up the subscription, but by default a new connection and channel will be established and
returned, which the caller can immediately begin doing RabbitMQ operations with.

For example: start a blocking consume of messages with channel.start_consuming(), or
Expand All @@ -131,7 +132,7 @@ def subscribe_to_queue(
connection (Union[Conn, BlockingConnection]): connection parameters to establish new
RabbitMQ connection, or existing RabbitMQ connection to reuse for this consumer.
params (RabbitMqParams): parameters for the RabbitMQ exchange and queue from which to
to consume messages.
consume messages.
on_message_callback (Callable[
[BlockingChannel, Basic.Deliver, BasicProperties, bytes], None]):
function to handle messages that are received over the subscribed exchange and queue.
Expand Down Expand Up @@ -172,4 +173,4 @@ def subscribe_to_queue(
_channel.basic_qos(prefetch_count=1)
_channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback,
auto_ack=auto_ack)
return (_connection, _channel)
return _connection, _channel
3 changes: 2 additions & 1 deletion python/idsse_common/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
'pint',
'importlib_metadata',
'pika',
'jsonschema'
'jsonschema',
'python-logging-rabbitmq'
],
extras_require={
'develop': [
Expand Down
5 changes: 3 additions & 2 deletions python/idsse_common/test/test_rabbitmq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# Example data objects
CONN = Conn('localhost', '/', port=5672, username='user', password='password')
RMQ_PARAMS = RabbitMqParams(
Exch('ims_data', 'topic'),
Exch('ims_data', 'topic', True),
Queue('ims_data', '', True, False, True)
)

Expand Down Expand Up @@ -91,7 +91,8 @@ def test_connection_params_works(monkeypatch: MonkeyPatch, mock_connection: Mock
# assert exchange was declared
_channel.exchange_declare.assert_called_once_with(
exchange=RMQ_PARAMS.exchange.name,
exchange_type=RMQ_PARAMS.exchange.type
exchange_type=RMQ_PARAMS.exchange.type,
durable=RMQ_PARAMS.exchange.durable
)

# assert queue was declared and bound
Expand Down
Loading