Skip to content

Commit

Permalink
Merge pull request #260 from openedx/diana/add-monitoring-information
Browse files Browse the repository at this point in the history
fix: Add monitoring information inside the function trace.
  • Loading branch information
dianakhuang authored Aug 1, 2024
2 parents 9961dd8 + 16646df commit 823645f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.8.0] - 2024-08-01
********************
Changed
=======
* Monitoring: Ensure that we have a root span for each iteration of the consume loop; renamed the trace name to be ``consumer.consume``.

[5.7.0] - 2024-03-22
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.7.0'
__version__ = '5.8.0'
50 changes: 25 additions & 25 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,11 @@ def _consume_indefinitely(self):
if CONSECUTIVE_ERRORS_LIMIT and consecutive_errors >= CONSECUTIVE_ERRORS_LIMIT:
raise Exception(f"Too many consecutive errors, exiting ({consecutive_errors} in a row)")

msg = None
try:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
with function_trace('_consume_indefinitely_consume_single_message'):
with function_trace('consumer.consume'):
msg = None
try:
msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is not None:
# Before processing, try to make sure our application state is cleaned
# up as would happen at the start of a Django request/response cycle.
# See https://github.com/openedx/openedx-events/issues/236 for details.
Expand All @@ -314,26 +314,26 @@ def _consume_indefinitely(self):
self.emit_signals_from_message(msg, signal)
consecutive_errors = 0

self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e:
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
# Kill the infinite loop if the error is fatal for the consumer
_, kafka_error = self._get_kafka_message_and_error(message=msg, error=e)
if kafka_error and kafka_error.fatal():
raise e
# Prevent fast error-looping when no event received from broker. Because
# DeserializingConsumer raises rather than returning a Message when it has an
# error() value, this may be triggered even when a Message *was* returned,
# slowing down the queue. This is probably close enough, though.
if msg is None:
time.sleep(POLL_FAILURE_SLEEP)
if msg:
# theoretically we could just call consumer.commit() without passing the specific message
# to commit all this consumer's current offset across all partitions since we only process one
# message at a time, but limit it to just the offset/partition of the specified message
# to be super safe
self.consumer.commit(message=msg)
self._add_message_monitoring(run_context=run_context, message=msg)
except Exception as e:
consecutive_errors += 1
self.record_event_consuming_error(run_context, e, msg)
# Kill the infinite loop if the error is fatal for the consumer
_, kafka_error = self._get_kafka_message_and_error(message=msg, error=e)
if kafka_error and kafka_error.fatal():
raise e
# Prevent fast error-looping when no event received from broker. Because
# DeserializingConsumer raises rather than returning a Message when it has an
# error() value, this may be triggered even when a Message *was* returned,
# slowing down the queue. This is probably close enough, though.
if msg is None:
time.sleep(POLL_FAILURE_SLEEP)
if msg:
# theoretically we could just call consumer.commit() without passing the specific message
# to commit all this consumer's current offset across all partitions since we only process one
# message at a time, but limit it to just the offset/partition of the specified message
# to be super safe
self.consumer.commit(message=msg)
finally:
self.consumer.close()

Expand Down

0 comments on commit 823645f

Please sign in to comment.