Skip to content

Commit

Permalink
chore(botocore): decouple kinesis integration from tracing and datast…
Browse files Browse the repository at this point in the history
…reams monitoring (#8989)

This change uses the Core API to decouple the Kinesis integration from
the Tracing and Datastreams Monitoring products, improving the
separation of concerns between them.

Existing tests cover the changed functionality.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
emmettbutler authored Apr 15, 2024
1 parent 3480c5f commit 5376eb5
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 117 deletions.
9 changes: 9 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace.internal.logger import get_logger
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
from ddtrace.internal.utils import http as http_utils
from ddtrace.propagation.http import HTTPPropagator
from ddtrace.vendor import wrapt


Expand Down Expand Up @@ -597,6 +598,13 @@ def _on_botocore_trace_context_injection_prepared(
log.warning("Unable to inject trace context", exc_info=True)


def _on_botocore_kinesis_update_record(ctx, stream, data_obj: Dict, record, inject_trace_context):
if inject_trace_context:
if "_datadog" not in data_obj:
data_obj["_datadog"] = {}
HTTPPropagator.inject(ctx[ctx["call_key"]].context, data_obj["_datadog"])


def listen():
core.on("wsgi.block.started", _wsgi_make_block_content, "status_headers_content")
core.on("asgi.block.started", _asgi_make_block_content, "status_headers_content")
Expand Down Expand Up @@ -629,6 +637,7 @@ def listen():
core.on("botocore.prep_context_injection.post", _on_botocore_trace_context_injection_prepared)
core.on("botocore.patched_api_call.started", _on_botocore_patched_api_call_started)
core.on("botocore.patched_kinesis_api_call.started", _on_botocore_patched_api_call_started)
core.on("botocore.kinesis.update_record", _on_botocore_kinesis_update_record)

for context_name in (
"flask.call",
Expand Down
181 changes: 68 additions & 113 deletions ddtrace/contrib/botocore/services/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from datetime import datetime
import json
from typing import Any # noqa:F401
from typing import Dict # noqa:F401
from typing import List # noqa:F401
from typing import Optional # noqa:F401
from typing import Any
from typing import Dict
from typing import List
from typing import Tuple

import botocore.client
import botocore.exceptions

from ddtrace import Span # noqa:F401
from ddtrace import config
from ddtrace.internal import core
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
Expand All @@ -18,8 +17,6 @@
from ....internal.logger import get_logger
from ....internal.schema import schematize_cloud_messaging_operation
from ....internal.schema import schematize_service_name
from ....pin import Pin # noqa:F401
from ....propagation.http import HTTPPropagator
from ..utils import extract_DD_context
from ..utils import get_kinesis_data_object
from ..utils import set_patched_api_call_span_tags
Expand All @@ -29,76 +26,46 @@
log = get_logger(__name__)


MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB
ONE_MB = 1 << 20
MAX_KINESIS_DATA_SIZE = ONE_MB


class TraceInjectionSizeExceed(Exception):
pass


def inject_trace_to_kinesis_stream_data(record, span, stream, inject_trace_context=True):
# type: (Dict[str, Any], Span, str, bool) -> None
"""
:record: contains args for the current botocore action, Kinesis record is at index 1
:span: the span which provides the trace context to be propagated
:inject_trace_context: whether to inject DataDog trace context
Inject trace headers and DSM headers into the Kinesis record's Data field in addition to the existing
data. Only possible if the existing data is JSON string or base64 encoded JSON string
Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/)
"""
if "Data" not in record:
log.warning("Unable to inject context. The kinesis stream has no data")
return

# always inject if Data Stream is enabled, otherwise only inject if distributed tracing is enabled and this is the
# first record in the payload
if (config.botocore["distributed_tracing"] and inject_trace_context) or config._data_streams_enabled:
data = record["Data"]
line_break, data_obj = get_kinesis_data_object(data)
if data_obj is not None:
data_obj["_datadog"] = {}

if config.botocore["distributed_tracing"] and inject_trace_context:
HTTPPropagator.inject(span.context, data_obj["_datadog"])

core.dispatch("botocore.kinesis.start", [stream, data_obj["_datadog"], record])
def update_record(ctx, record: Dict[str, Any], stream: str, inject_trace_context: bool = True) -> None:
line_break, data_obj = get_kinesis_data_object(record["Data"])
if data_obj is not None:
core.dispatch(
"botocore.kinesis.update_record",
[ctx, stream, data_obj, record, inject_trace_context],
)

try:
data_json = json.dumps(data_obj)
except Exception:
log.warning("Unable to update kinesis record", exc_info=True)

# if original string had a line break, add it back
if line_break is not None:
data_json += line_break
if line_break is not None:
data_json += line_break

data_size = len(data_json)
if data_size >= MAX_KINESIS_DATA_SIZE:
log.warning("Data including trace injection (%d) exceeds (%d)", data_size, MAX_KINESIS_DATA_SIZE)

record["Data"] = data_json

# check if data size will exceed max size with headers
data_size = len(data_json)
if data_size >= MAX_KINESIS_DATA_SIZE:
raise TraceInjectionSizeExceed(
"Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE)
)

record["Data"] = data_json


def inject_trace_to_kinesis_stream(params, span, inject_trace_context=True):
# type: (List[Any], Span, bool) -> None
"""
:params: contains the params for the current botocore action
:span: the span which provides the trace context to be propagated
:inject_trace_context: whether to inject DataDog trace context
Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/)
"""
stream = params.get("StreamARN", params.get("StreamName", ""))
if "Records" in params:
records = params["Records"]

if records:
for i in range(0, len(records)):
inject_to_trace = inject_trace_context and i == 0
inject_trace_to_kinesis_stream_data(
records[i], span, stream, inject_trace_context=inject_to_trace
) # only inject trace data to first record
def select_records_for_injection(params: List[Any], inject_trace_context: bool) -> List[Tuple[Any, bool]]:
records_to_inject_into = []
if "Records" in params and params["Records"]:
for i, record in enumerate(params["Records"]):
if "Data" in record:
records_to_inject_into.append((record, inject_trace_context and i == 0))
elif "Data" in params:
inject_trace_to_kinesis_stream_data(params, span, stream, inject_trace_context=inject_trace_context)
records_to_inject_into.append((params, inject_trace_context))
return records_to_inject_into


def patched_kinesis_api_call(original_func, instance, args, kwargs, function_vars):
Expand All @@ -109,22 +76,21 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
operation = function_vars.get("operation")

message_received = False
func_run = False
func_run_err = None
is_getrecords_call = False
getrecords_error = None
child_of = None
start_ns = None
result = None

if operation == "GetRecords":
try:
start_ns = time_ns()
func_run = True
is_getrecords_call = True
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params])
result = original_func(*args, **kwargs)

records = result["Records"]

# dispatch to DSM to set checkpoints
for record in records:
_, data_obj = get_kinesis_data_object(record["Data"])
time_estimate = record.get("ApproximateArrivalTimestamp", datetime.now()).timestamp()
Expand All @@ -134,20 +100,31 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
)

except Exception as e:
func_run_err = e
getrecords_error = e
if result is not None and "Records" in result and len(result["Records"]) >= 1:
message_received = True
if config.botocore.propagation_enabled:
child_of = extract_DD_context(result["Records"])

"""
We only want to create a span for the following cases:
- not func_run: The function is not `getRecords` and we need to run it
- func_run and message_received: Received a message when polling
- config.empty_poll_enabled: We want to trace empty poll operations
- func_run_err: There was an error when calling the `getRecords` function
"""
if (func_run and message_received) or config.botocore.empty_poll_enabled or not func_run or func_run_err:
if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}:
span_name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="kinesis",
direction=SpanDirection.OUTBOUND,
)
else:
span_name = trace_operation
stream_arn = params.get("StreamARN", params.get("StreamName", ""))
function_is_not_getrecords = not is_getrecords_call
received_message_when_polling = is_getrecords_call and message_received
instrument_empty_poll_calls = config.botocore.empty_poll_enabled
should_instrument = (
received_message_when_polling or instrument_empty_poll_calls or function_is_not_getrecords or getrecords_error
)
is_kinesis_put_operation = endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}

if should_instrument:
with core.context_with_data(
"botocore.patched_kinesis_api_call",
instance=instance,
Expand All @@ -159,35 +136,29 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
call_trace=False,
context_started_callback=set_patched_api_call_span_tags,
pin=pin,
span_name=trace_operation,
span_name=span_name,
span_type=SpanTypes.HTTP,
child_of=child_of if child_of is not None else pin.tracer.context_provider.active(),
activate=True,
func_run=func_run,
func_run=is_getrecords_call,
start_ns=start_ns,
call_key="patched_kinesis_api_call",
) as ctx, ctx.get_item(ctx.get_item("call_key")) as span:
) as ctx, ctx.get_item(ctx.get_item("call_key")):
core.dispatch("botocore.patched_kinesis_api_call.started", [ctx])

if config.botocore["distributed_tracing"] or config._data_streams_enabled:
try_inject_DD_context(
endpoint_name,
operation,
params,
span,
trace_operation,
inject_trace_context=bool(config.botocore["distributed_tracing"]),
)
if is_kinesis_put_operation:
records_to_process = select_records_for_injection(params, bool(config.botocore["distributed_tracing"]))
for record, should_inject_trace_context in records_to_process:
update_record(ctx, record, stream_arn, inject_trace_context=should_inject_trace_context)

try:
if not func_run:
if not is_getrecords_call:
core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params])
result = original_func(*args, **kwargs)
core.dispatch(f"botocore.{endpoint_name}.{operation}.post", [params, result])

# raise error if it was encountered before the span was started
if func_run_err:
raise func_run_err
if getrecords_error:
raise getrecords_error

core.dispatch("botocore.patched_kinesis_api_call.success", [ctx, result, set_response_metadata_tags])
return result
Expand All @@ -198,23 +169,7 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var
[ctx, e.response, botocore.exceptions.ClientError, set_response_metadata_tags],
)
raise
# return results in the case that we ran the function, but no records were returned and empty
# poll spans are disabled
elif func_run:
if func_run_err:
raise func_run_err
elif is_getrecords_call:
if getrecords_error:
raise getrecords_error
return result


def try_inject_DD_context(endpoint_name, operation, params, span, trace_operation, inject_trace_context):
try:
if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}:
inject_trace_to_kinesis_stream(params, span, inject_trace_context=inject_trace_context)
span.name = schematize_cloud_messaging_operation(
trace_operation,
cloud_provider="aws",
cloud_service="kinesis",
direction=SpanDirection.OUTBOUND,
)
except Exception:
log.warning("Unable to inject trace context", exc_info=True)
11 changes: 7 additions & 4 deletions ddtrace/internal/datastreams/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ def calculate_kinesis_payload_size(message, trace_data=None):
return payload_size


def handle_kinesis_produce(stream, dd_ctx_json, record):
if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param)
inject_context(dd_ctx_json, "kinesis", stream, record)
def handle_kinesis_produce(ctx, stream, dd_ctx_json, record, *args):
if config._data_streams_enabled:
if "_datadog" not in dd_ctx_json:
dd_ctx_json["_datadog"] = {}
if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param)
inject_context(dd_ctx_json["_datadog"], "kinesis", stream, record)


def handle_sqs_sns_produce(endpoint_service, trace_data, params, message=None):
Expand Down Expand Up @@ -211,7 +214,7 @@ def handle_kinesis_receive(params, time_estimate, context_json, record):


if config._data_streams_enabled:
core.on("botocore.kinesis.start", handle_kinesis_produce)
core.on("botocore.kinesis.update_record", handle_kinesis_produce)
core.on("botocore.sqs_sns.start", handle_sqs_sns_produce)
core.on("botocore.sqs.ReceiveMessage.pre", handle_sqs_prepare)
core.on("botocore.sqs.ReceiveMessage.post", handle_sqs_receive)
Expand Down

0 comments on commit 5376eb5

Please sign in to comment.