Skip to content

Commit

Permalink
Merge pull request #261 from openedx/diana/add-monitoring-information
Browse files Browse the repository at this point in the history
feat: Add custom metric to track whether a message processed.
  • Loading branch information
dianakhuang authored Aug 5, 2024
2 parents 823645f + e007901 commit aaffd52
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.8.1] - 2024-08-02
********************
Changed
=======
* Monitoring: Add a custom attribute, ``kafka_received_message`` to track whether a message was processed or not.

[5.8.0] - 2024-08-01
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.8.0'
__version__ = '5.8.1'
8 changes: 8 additions & 0 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
set_custom_attribute('kafka_topic', run_context['full_topic'])

if kafka_message:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span, False otherwise.
set_custom_attribute('kafka_received_message', True)

# .. custom_attribute_name: kafka_partition
# .. custom_attribute_description: The partition of the message.
set_custom_attribute('kafka_partition', kafka_message.partition())
Expand All @@ -594,6 +598,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
# .. custom_attribute_description: The event type of the message. Note that the header in the logs
# will use 'ce_type'.
set_custom_attribute('kafka_event_type', ",".join(event_types))
else:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span.
set_custom_attribute('kafka_received_message', False)

if kafka_error:
# .. custom_attribute_name: kafka_error_fatal
Expand Down
7 changes: 7 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ def raise_exception():
call("kafka_partition", 2),
call("kafka_offset", 12345),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
call("kafka_received_message", True),
] * len(mock_emit_side_effects),
any_order=True,
)
Expand Down Expand Up @@ -431,7 +432,13 @@ def poll_side_effect(*args, **kwargs):
expected_custom_attribute_calls += [
call("kafka_message_id", "1111-1111"),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
call("kafka_received_message", True),
]
else:
expected_custom_attribute_calls += [
call("kafka_received_message", False),
]

if has_kafka_error:
expected_custom_attribute_calls += [
call('kafka_error_fatal', is_fatal),
Expand Down

0 comments on commit aaffd52

Please sign in to comment.