diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 5100bad..6719ad3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,12 @@ Change Log Unreleased ********** +[5.8.0] - 2024-08-01 +******************** +Changed +======= +* Monitoring: Ensure that we have a root span for each iteration of the consume loop; renamed the trace name to be ``consumer.consume``. + [5.7.0] - 2024-03-22 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 406c57a..6e7d744 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.7.0' +__version__ = '5.8.0' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 6bd4a51..1318220 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -299,11 +299,11 @@ def _consume_indefinitely(self): if CONSECUTIVE_ERRORS_LIMIT and consecutive_errors >= CONSECUTIVE_ERRORS_LIMIT: raise Exception(f"Too many consecutive errors, exiting ({consecutive_errors} in a row)") - msg = None - try: - msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) - if msg is not None: - with function_trace('_consume_indefinitely_consume_single_message'): + with function_trace('consumer.consume'): + msg = None + try: + msg = self.consumer.poll(timeout=CONSUMER_POLL_TIMEOUT) + if msg is not None: # Before processing, try to make sure our application state is cleaned # up as would happen at the start of a Django request/response cycle. # See https://github.com/openedx/openedx-events/issues/236 for details. @@ -314,26 +314,26 @@ def _consume_indefinitely(self): self.emit_signals_from_message(msg, signal) consecutive_errors = 0 - self._add_message_monitoring(run_context=run_context, message=msg) - except Exception as e: - consecutive_errors += 1 - self.record_event_consuming_error(run_context, e, msg) - # Kill the infinite loop if the error is fatal for the consumer - _, kafka_error = self._get_kafka_message_and_error(message=msg, error=e) - if kafka_error and kafka_error.fatal(): - raise e - # Prevent fast error-looping when no event received from broker. Because - # DeserializingConsumer raises rather than returning a Message when it has an - # error() value, this may be triggered even when a Message *was* returned, - # slowing down the queue. This is probably close enough, though. - if msg is None: - time.sleep(POLL_FAILURE_SLEEP) - if msg: - # theoretically we could just call consumer.commit() without passing the specific message - # to commit all this consumer's current offset across all partitions since we only process one - # message at a time, but limit it to just the offset/partition of the specified message - # to be super safe - self.consumer.commit(message=msg) + self._add_message_monitoring(run_context=run_context, message=msg) + except Exception as e: + consecutive_errors += 1 + self.record_event_consuming_error(run_context, e, msg) + # Kill the infinite loop if the error is fatal for the consumer + _, kafka_error = self._get_kafka_message_and_error(message=msg, error=e) + if kafka_error and kafka_error.fatal(): + raise e + # Prevent fast error-looping when no event received from broker. Because + # DeserializingConsumer raises rather than returning a Message when it has an + # error() value, this may be triggered even when a Message *was* returned, + # slowing down the queue. This is probably close enough, though. + if msg is None: + time.sleep(POLL_FAILURE_SLEEP) + if msg: + # theoretically we could just call consumer.commit() without passing the specific message + # to commit all this consumer's current offset across all partitions since we only process one + # message at a time, but limit it to just the offset/partition of the specified message + # to be super safe + self.consumer.commit(message=msg) finally: self.consumer.close()