From c95730901318de8ea1b18d56bbf85dc78fd2ebab Mon Sep 17 00:00:00 2001 From: Benjamin Wohlwend Date: Thu, 30 Jun 2022 09:25:15 +0200 Subject: [PATCH] fix issues when transaction is unsampled in Kafka instrumentation (#1579) * fix issues when transaction is unsampled in Kafka instrumentation closes #1578 * update changelog --- CHANGELOG.asciidoc | 8 ++++++ elasticapm/instrumentation/packages/kafka.py | 23 +++++++++-------- tests/instrumentation/kafka_tests.py | 26 ++++++++++++++++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0ffb14d1c..434358162 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -32,6 +32,14 @@ endif::[] [[release-notes-6.x]] === Python Agent version 6.x +[[release-notes-6.10.1]] +==== 6.10.1 - 2022-06-30 + +[float] +===== Bug fixes + +* Fix an issue with Kafka instrumentation and unsampled transactions {pull}1579[#1579] + [[release-notes-6.10.0]] ==== 6.10.0 - 2022-06-22 diff --git a/elasticapm/instrumentation/packages/kafka.py b/elasticapm/instrumentation/packages/kafka.py index 269c8e6d7..c3bc2d64d 100644 --- a/elasticapm/instrumentation/packages/kafka.py +++ b/elasticapm/instrumentation/packages/kafka.py @@ -36,7 +36,7 @@ from elasticapm.conf import constants from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule from elasticapm.traces import DroppedSpan, capture_span, execution_context -from elasticapm.utils.disttracing import TraceParent +from elasticapm.utils.disttracing import TraceParent, TracingOptions class KafkaInstrumentation(AbstractInstrumentedModule): @@ -48,6 +48,7 @@ class KafkaInstrumentation(AbstractInstrumentedModule): ] provider_name = "kafka" name = "kafka" + creates_transactions = True def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs): topic = args[0] if args else kwargs["topic"] @@ -68,7 +69,10 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs) ) as span: transaction = execution_context.get_transaction() if transaction: - tp = transaction.trace_parent.copy_from(span_id=span.id) + tp = transaction.trace_parent.copy_from( + span_id=span.id if span else transaction.id, + trace_options=None if span else TracingOptions(recorded=False), + ) if headers: headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())) else: @@ -79,22 +83,17 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs) else: kwargs["headers"] = headers result = wrapped(*args, **kwargs) - if instance and instance._metadata.controller and not isinstance(span, DroppedSpan): + if span and instance and instance._metadata.controller and not isinstance(span, DroppedSpan): address = instance._metadata.controller[1] port = instance._metadata.controller[2] span.context["destination"]["address"] = address span.context["destination"]["port"] = port return result - def call_if_sampling(self, module, method, wrapped, instance, args, kwargs): - # Contrasting to the superclass implementation, we *always* want to - # return a proxied connection, even if there is no ongoing elasticapm - # transaction yet. This ensures that we instrument the cursor once - # the transaction started. - return self.call(module, method, wrapped, instance, args, kwargs) - def call(self, module, method, wrapped, instance, args, kwargs): client = get_client() + if client is None: + return wrapped(*args, **kwargs) destination_info = { "service": {"name": "kafka", "resource": "kafka/", "type": "messaging"}, } @@ -118,7 +117,7 @@ def call(self, module, method, wrapped, instance, args, kwargs): "destination": destination_info, }, ) as span: - if not isinstance(span, DroppedSpan) and instance._subscription.subscription: + if span and not isinstance(span, DroppedSpan) and instance._subscription.subscription: span.name += " from " + ", ".join(sorted(instance._subscription.subscription)) results = wrapped(*args, **kwargs) return results @@ -146,7 +145,7 @@ def call(self, module, method, wrapped, instance, args, kwargs): except StopIteration: span.cancel() raise - if not isinstance(span, DroppedSpan): + if span and not isinstance(span, DroppedSpan): topic = result[0] if client.should_ignore_topic(topic): span.cancel() diff --git a/tests/instrumentation/kafka_tests.py b/tests/instrumentation/kafka_tests.py index 690ae18aa..71416c130 100644 --- a/tests/instrumentation/kafka_tests.py +++ b/tests/instrumentation/kafka_tests.py @@ -207,3 +207,29 @@ def delayed_send(): spans = elasticapm_client.events[SPAN] assert len(spans) == 1 assert spans[0]["name"] == "Kafka POLL from bar, foo, test" + + +def test_kafka_no_client(instrument, producer, consumer, topics): + assert elasticapm.get_client() is None + # the following code shouldn't trigger any errors + producer.send("test", key=b"foo", value=b"bar") + for item in consumer: + pass + + +def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + producer.send("test", key=b"foo", value=b"bar") + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0 + + +def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consumer, topics): + transaction_object = elasticapm_client.begin_transaction("transaction") + transaction_object.is_sampled = False + consumer.poll(timeout_ms=50) + elasticapm_client.end_transaction("foo") + spans = elasticapm_client.events[SPAN] + assert len(spans) == 0