-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Updates to handle automatic logging to a rabbitmq exchange log. The c… #54
Changes from all commits
58aa5df
721aa61
fd474cd
3b150c4
34d68b6
6cfe25d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,7 @@ def set_corr_id_context_var( | |
Args: | ||
originator (str): Function, class, service name, etc. that is using logging module | ||
key (Optional[uuid.UUID]): a UUID. Default: randomly generated UUID. | ||
issue_dt (Optional[Union[str, datettime]]): Datetime when a relevant forecast was issued | ||
issue_dt (Optional[Union[str, datetime]]): Datetime when a relevant forecast was issued | ||
""" | ||
if not key: | ||
key = uuid.uuid4() | ||
|
@@ -88,7 +88,7 @@ class UTCFormatter(logging.Formatter): | |
|
||
def get_default_log_config(level: str, with_corr_id=True): | ||
""" | ||
Get standardized python logging config (formatters, handlers directing to stdout, etc.) | ||
Get standardized python logging config (formatters, handlers directing to stdout, rabbitmq etc.) | ||
as a dictionary. This dictionary can be passed directly to logging.config.dictConfig: | ||
|
||
import logging | ||
|
@@ -137,11 +137,20 @@ def get_default_log_config(level: str, with_corr_id=True): | |
'formatter': 'standard', | ||
'filters': filter_list, | ||
}, | ||
'rabbit': { | ||
'class': 'python_logging_rabbitmq.RabbitMQHandler', | ||
'host': 'localhost', | ||
'port': 5672, | ||
'formatter': 'standard', | ||
'filters': filter_list, | ||
'level': 'ERROR', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder about this being hard coded. Probably fine for now but might want to address in a future log_utils cleanup. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess we could parameterize the function for these elements and set the default values. |
||
'declare_exchange': True, | ||
}, | ||
}, | ||
'loggers': { | ||
'': { | ||
'level': level, | ||
'handlers': ['default', ], | ||
'handlers': ['default', 'rabbit'], | ||
}, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ class Exch(NamedTuple): | |
"""An internal data class for holding the RabbitMQ exchange info""" | ||
name: str | ||
type: str | ||
durable: bool = True | ||
|
||
|
||
class Queue(NamedTuple): | ||
|
@@ -89,7 +90,7 @@ def _initialize_exchange_and_queue( | |
|
||
# Do not try to declare the default exchange. It already exists | ||
if exch.name != '': | ||
channel.exchange_declare(exchange=exch.name, exchange_type=exch.type) | ||
channel.exchange_declare(exchange=exch.name, exchange_type=exch.type, durable=exch.durable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there other args that we should be including, ie autoDelete and durable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They haven't been necessary until now so probably not. |
||
|
||
# Do not try to declare or bind built-in queues. They are pseudo-queues that already exist | ||
if queue.name.startswith('amq.rabbitmq.'): | ||
|
@@ -121,7 +122,7 @@ def subscribe_to_queue( | |
queue if needed, and invoking the provided callback when a message is received. | ||
|
||
If an existing BlockingConnection or BlockingChannel are passed, they are used to | ||
setup the subscription, but by default a new connection and channel will be established and | ||
set up the subscription, but by default a new connection and channel will be established and | ||
returned, which the caller can immediately begin doing RabbitMQ operations with. | ||
|
||
For example: start a blocking consume of messages with channel.start_consuming(), or | ||
|
@@ -131,7 +132,7 @@ def subscribe_to_queue( | |
connection (Union[Conn, BlockingConnection]): connection parameters to establish new | ||
RabbitMQ connection, or existing RabbitMQ connection to reuse for this consumer. | ||
params (RabbitMqParams): parameters for the RabbitMQ exchange and queue from which to | ||
to consume messages. | ||
consume messages. | ||
on_message_callback (Callable[ | ||
[BlockingChannel, Basic.Deliver, BasicProperties, bytes], None]): | ||
function to handle messages that are received over the subscribed exchange and queue. | ||
|
@@ -172,4 +173,4 @@ def subscribe_to_queue( | |
_channel.basic_qos(prefetch_count=1) | ||
_channel.basic_consume(queue=queue_name, on_message_callback=on_message_callback, | ||
auto_ack=auto_ack) | ||
return (_connection, _channel) | ||
return _connection, _channel |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will these values work when running on AWS in k8s?