Skip to content

Commit

Permalink
fix issues when transaction is unsampled in Kafka instrumentation (#1579
Browse files Browse the repository at this point in the history
)

* fix issues when transaction is unsampled in Kafka instrumentation

closes #1578

* update changelog
  • Loading branch information
beniwohli authored Jun 30, 2022
1 parent 1873b31 commit c957309
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ endif::[]
[[release-notes-6.x]]
=== Python Agent version 6.x
[[release-notes-6.10.1]]
==== 6.10.1 - 2022-06-30
[float]
===== Bug fixes
* Fix an issue with Kafka instrumentation and unsampled transactions {pull}1579[#1579]
[[release-notes-6.10.0]]
==== 6.10.0 - 2022-06-22
Expand Down
23 changes: 11 additions & 12 deletions elasticapm/instrumentation/packages/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from elasticapm.conf import constants
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import DroppedSpan, capture_span, execution_context
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.disttracing import TraceParent, TracingOptions


class KafkaInstrumentation(AbstractInstrumentedModule):
Expand All @@ -48,6 +48,7 @@ class KafkaInstrumentation(AbstractInstrumentedModule):
]
provider_name = "kafka"
name = "kafka"
creates_transactions = True

def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
topic = args[0] if args else kwargs["topic"]
Expand All @@ -68,7 +69,10 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
) as span:
transaction = execution_context.get_transaction()
if transaction:
tp = transaction.trace_parent.copy_from(span_id=span.id)
tp = transaction.trace_parent.copy_from(
span_id=span.id if span else transaction.id,
trace_options=None if span else TracingOptions(recorded=False),
)
if headers:
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
else:
Expand All @@ -79,22 +83,17 @@ def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs)
else:
kwargs["headers"] = headers
result = wrapped(*args, **kwargs)
if instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
if span and instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
address = instance._metadata.controller[1]
port = instance._metadata.controller[2]
span.context["destination"]["address"] = address
span.context["destination"]["port"] = port
return result

def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
# Contrasting to the superclass implementation, we *always* want to
# return a proxied connection, even if there is no ongoing elasticapm
# transaction yet. This ensures that we instrument the cursor once
# the transaction started.
return self.call(module, method, wrapped, instance, args, kwargs)

def call(self, module, method, wrapped, instance, args, kwargs):
client = get_client()
if client is None:
return wrapped(*args, **kwargs)
destination_info = {
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
}
Expand All @@ -118,7 +117,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
"destination": destination_info,
},
) as span:
if not isinstance(span, DroppedSpan) and instance._subscription.subscription:
if span and not isinstance(span, DroppedSpan) and instance._subscription.subscription:
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
results = wrapped(*args, **kwargs)
return results
Expand Down Expand Up @@ -146,7 +145,7 @@ def call(self, module, method, wrapped, instance, args, kwargs):
except StopIteration:
span.cancel()
raise
if not isinstance(span, DroppedSpan):
if span and not isinstance(span, DroppedSpan):
topic = result[0]
if client.should_ignore_topic(topic):
span.cancel()
Expand Down
26 changes: 26 additions & 0 deletions tests/instrumentation/kafka_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,29 @@ def delayed_send():
spans = elasticapm_client.events[SPAN]
assert len(spans) == 1
assert spans[0]["name"] == "Kafka POLL from bar, foo, test"


def test_kafka_no_client(instrument, producer, consumer, topics):
assert elasticapm.get_client() is None
# the following code shouldn't trigger any errors
producer.send("test", key=b"foo", value=b"bar")
for item in consumer:
pass


def test_kafka_send_unsampled_transaction(instrument, elasticapm_client, producer, topics):
transaction_object = elasticapm_client.begin_transaction("transaction")
transaction_object.is_sampled = False
producer.send("test", key=b"foo", value=b"bar")
elasticapm_client.end_transaction("foo")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 0


def test_kafka_poll_unsampled_transaction(instrument, elasticapm_client, consumer, topics):
transaction_object = elasticapm_client.begin_transaction("transaction")
transaction_object.is_sampled = False
consumer.poll(timeout_ms=50)
elasticapm_client.end_transaction("foo")
spans = elasticapm_client.events[SPAN]
assert len(spans) == 0

0 comments on commit c957309

Please sign in to comment.