From 5376eb5c2c91f153cf72cbf80457a78c398f3312 Mon Sep 17 00:00:00 2001 From: Emmett Butler <723615+emmettbutler@users.noreply.github.com> Date: Mon, 15 Apr 2024 14:30:02 -0700 Subject: [PATCH] chore(botocore): decouple kinesis integration from tracing and datastreams monitoring (#8989) This change uses the Core API to decouple the Kinesis integration from the Tracing and Datastreams Monitoring products, improving the separation of concerns between them. Existing tests cover the changed functionality. ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --- ddtrace/_trace/trace_handlers.py | 9 + ddtrace/contrib/botocore/services/kinesis.py | 181 +++++++------------ ddtrace/internal/datastreams/botocore.py | 11 +- 3 files changed, 84 insertions(+), 117 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index b92c53a826d..1cff120b5a4 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -28,6 +28,7 @@ from ddtrace.internal.logger import get_logger from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import http as http_utils +from ddtrace.propagation.http import HTTPPropagator from ddtrace.vendor import wrapt @@ -597,6 +598,13 @@ def _on_botocore_trace_context_injection_prepared( log.warning("Unable to inject trace context", exc_info=True) +def _on_botocore_kinesis_update_record(ctx, stream, data_obj: Dict, record, inject_trace_context): + if inject_trace_context: + if "_datadog" not in data_obj: + data_obj["_datadog"] = {} + HTTPPropagator.inject(ctx[ctx["call_key"]].context, data_obj["_datadog"]) + + def listen(): core.on("wsgi.block.started", _wsgi_make_block_content, "status_headers_content") core.on("asgi.block.started", _asgi_make_block_content, "status_headers_content") @@ -629,6 +637,7 @@ def listen(): core.on("botocore.prep_context_injection.post", _on_botocore_trace_context_injection_prepared) core.on("botocore.patched_api_call.started", _on_botocore_patched_api_call_started) core.on("botocore.patched_kinesis_api_call.started", _on_botocore_patched_api_call_started) + core.on("botocore.kinesis.update_record", _on_botocore_kinesis_update_record) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/botocore/services/kinesis.py b/ddtrace/contrib/botocore/services/kinesis.py index 1c55e203437..58d361f8b93 100644 --- a/ddtrace/contrib/botocore/services/kinesis.py +++ b/ddtrace/contrib/botocore/services/kinesis.py @@ -1,14 +1,13 @@ from datetime import datetime import json -from typing import Any # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 +from typing import Any +from typing import Dict +from typing import List +from typing import Tuple import botocore.client import botocore.exceptions -from ddtrace import Span # noqa:F401 from ddtrace import config from ddtrace.internal import core from ddtrace.internal.schema.span_attribute_schema import SpanDirection @@ -18,8 +17,6 @@ from ....internal.logger import get_logger from ....internal.schema import schematize_cloud_messaging_operation from ....internal.schema import schematize_service_name -from ....pin import Pin # noqa:F401 -from ....propagation.http import HTTPPropagator from ..utils import extract_DD_context from ..utils import get_kinesis_data_object from ..utils import set_patched_api_call_span_tags @@ -29,76 +26,46 @@ log = get_logger(__name__) -MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB +ONE_MB = 1 << 20 +MAX_KINESIS_DATA_SIZE = ONE_MB class TraceInjectionSizeExceed(Exception): pass -def inject_trace_to_kinesis_stream_data(record, span, stream, inject_trace_context=True): - # type: (Dict[str, Any], Span, str, bool) -> None - """ - :record: contains args for the current botocore action, Kinesis record is at index 1 - :span: the span which provides the trace context to be propagated - :inject_trace_context: whether to inject DataDog trace context - Inject trace headers and DSM headers into the Kinesis record's Data field in addition to the existing - data. Only possible if the existing data is JSON string or base64 encoded JSON string - Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/) - """ - if "Data" not in record: - log.warning("Unable to inject context. The kinesis stream has no data") - return - - # always inject if Data Stream is enabled, otherwise only inject if distributed tracing is enabled and this is the - # first record in the payload - if (config.botocore["distributed_tracing"] and inject_trace_context) or config._data_streams_enabled: - data = record["Data"] - line_break, data_obj = get_kinesis_data_object(data) - if data_obj is not None: - data_obj["_datadog"] = {} - - if config.botocore["distributed_tracing"] and inject_trace_context: - HTTPPropagator.inject(span.context, data_obj["_datadog"]) - - core.dispatch("botocore.kinesis.start", [stream, data_obj["_datadog"], record]) +def update_record(ctx, record: Dict[str, Any], stream: str, inject_trace_context: bool = True) -> None: + line_break, data_obj = get_kinesis_data_object(record["Data"]) + if data_obj is not None: + core.dispatch( + "botocore.kinesis.update_record", + [ctx, stream, data_obj, record, inject_trace_context], + ) + try: data_json = json.dumps(data_obj) + except Exception: + log.warning("Unable to update kinesis record", exc_info=True) - # if original string had a line break, add it back - if line_break is not None: - data_json += line_break + if line_break is not None: + data_json += line_break + + data_size = len(data_json) + if data_size >= MAX_KINESIS_DATA_SIZE: + log.warning("Data including trace injection (%d) exceeds (%d)", data_size, MAX_KINESIS_DATA_SIZE) + + record["Data"] = data_json - # check if data size will exceed max size with headers - data_size = len(data_json) - if data_size >= MAX_KINESIS_DATA_SIZE: - raise TraceInjectionSizeExceed( - "Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE) - ) - record["Data"] = data_json - - -def inject_trace_to_kinesis_stream(params, span, inject_trace_context=True): - # type: (List[Any], Span, bool) -> None - """ - :params: contains the params for the current botocore action - :span: the span which provides the trace context to be propagated - :inject_trace_context: whether to inject DataDog trace context - Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/) - """ - stream = params.get("StreamARN", params.get("StreamName", "")) - if "Records" in params: - records = params["Records"] - - if records: - for i in range(0, len(records)): - inject_to_trace = inject_trace_context and i == 0 - inject_trace_to_kinesis_stream_data( - records[i], span, stream, inject_trace_context=inject_to_trace - ) # only inject trace data to first record +def select_records_for_injection(params: List[Any], inject_trace_context: bool) -> List[Tuple[Any, bool]]: + records_to_inject_into = [] + if "Records" in params and params["Records"]: + for i, record in enumerate(params["Records"]): + if "Data" in record: + records_to_inject_into.append((record, inject_trace_context and i == 0)) elif "Data" in params: - inject_trace_to_kinesis_stream_data(params, span, stream, inject_trace_context=inject_trace_context) + records_to_inject_into.append((params, inject_trace_context)) + return records_to_inject_into def patched_kinesis_api_call(original_func, instance, args, kwargs, function_vars): @@ -109,8 +76,8 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var operation = function_vars.get("operation") message_received = False - func_run = False - func_run_err = None + is_getrecords_call = False + getrecords_error = None child_of = None start_ns = None result = None @@ -118,13 +85,12 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var if operation == "GetRecords": try: start_ns = time_ns() - func_run = True + is_getrecords_call = True core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params]) result = original_func(*args, **kwargs) records = result["Records"] - # dispatch to DSM to set checkpoints for record in records: _, data_obj = get_kinesis_data_object(record["Data"]) time_estimate = record.get("ApproximateArrivalTimestamp", datetime.now()).timestamp() @@ -134,20 +100,31 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var ) except Exception as e: - func_run_err = e + getrecords_error = e if result is not None and "Records" in result and len(result["Records"]) >= 1: message_received = True if config.botocore.propagation_enabled: child_of = extract_DD_context(result["Records"]) - """ - We only want to create a span for the following cases: - - not func_run: The function is not `getRecords` and we need to run it - - func_run and message_received: Received a message when polling - - config.empty_poll_enabled: We want to trace empty poll operations - - func_run_err: There was an error when calling the `getRecords` function - """ - if (func_run and message_received) or config.botocore.empty_poll_enabled or not func_run or func_run_err: + if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}: + span_name = schematize_cloud_messaging_operation( + trace_operation, + cloud_provider="aws", + cloud_service="kinesis", + direction=SpanDirection.OUTBOUND, + ) + else: + span_name = trace_operation + stream_arn = params.get("StreamARN", params.get("StreamName", "")) + function_is_not_getrecords = not is_getrecords_call + received_message_when_polling = is_getrecords_call and message_received + instrument_empty_poll_calls = config.botocore.empty_poll_enabled + should_instrument = ( + received_message_when_polling or instrument_empty_poll_calls or function_is_not_getrecords or getrecords_error + ) + is_kinesis_put_operation = endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"} + + if should_instrument: with core.context_with_data( "botocore.patched_kinesis_api_call", instance=instance, @@ -159,35 +136,29 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var call_trace=False, context_started_callback=set_patched_api_call_span_tags, pin=pin, - span_name=trace_operation, + span_name=span_name, span_type=SpanTypes.HTTP, child_of=child_of if child_of is not None else pin.tracer.context_provider.active(), activate=True, - func_run=func_run, + func_run=is_getrecords_call, start_ns=start_ns, call_key="patched_kinesis_api_call", - ) as ctx, ctx.get_item(ctx.get_item("call_key")) as span: + ) as ctx, ctx.get_item(ctx.get_item("call_key")): core.dispatch("botocore.patched_kinesis_api_call.started", [ctx]) - if config.botocore["distributed_tracing"] or config._data_streams_enabled: - try_inject_DD_context( - endpoint_name, - operation, - params, - span, - trace_operation, - inject_trace_context=bool(config.botocore["distributed_tracing"]), - ) + if is_kinesis_put_operation: + records_to_process = select_records_for_injection(params, bool(config.botocore["distributed_tracing"])) + for record, should_inject_trace_context in records_to_process: + update_record(ctx, record, stream_arn, inject_trace_context=should_inject_trace_context) try: - if not func_run: + if not is_getrecords_call: core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params]) result = original_func(*args, **kwargs) core.dispatch(f"botocore.{endpoint_name}.{operation}.post", [params, result]) - # raise error if it was encountered before the span was started - if func_run_err: - raise func_run_err + if getrecords_error: + raise getrecords_error core.dispatch("botocore.patched_kinesis_api_call.success", [ctx, result, set_response_metadata_tags]) return result @@ -198,23 +169,7 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var [ctx, e.response, botocore.exceptions.ClientError, set_response_metadata_tags], ) raise - # return results in the case that we ran the function, but no records were returned and empty - # poll spans are disabled - elif func_run: - if func_run_err: - raise func_run_err + elif is_getrecords_call: + if getrecords_error: + raise getrecords_error return result - - -def try_inject_DD_context(endpoint_name, operation, params, span, trace_operation, inject_trace_context): - try: - if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}: - inject_trace_to_kinesis_stream(params, span, inject_trace_context=inject_trace_context) - span.name = schematize_cloud_messaging_operation( - trace_operation, - cloud_provider="aws", - cloud_service="kinesis", - direction=SpanDirection.OUTBOUND, - ) - except Exception: - log.warning("Unable to inject trace context", exc_info=True) diff --git a/ddtrace/internal/datastreams/botocore.py b/ddtrace/internal/datastreams/botocore.py index 74b5f60ec3c..39bb7a55633 100644 --- a/ddtrace/internal/datastreams/botocore.py +++ b/ddtrace/internal/datastreams/botocore.py @@ -107,9 +107,12 @@ def calculate_kinesis_payload_size(message, trace_data=None): return payload_size -def handle_kinesis_produce(stream, dd_ctx_json, record): - if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param) - inject_context(dd_ctx_json, "kinesis", stream, record) +def handle_kinesis_produce(ctx, stream, dd_ctx_json, record, *args): + if config._data_streams_enabled: + if "_datadog" not in dd_ctx_json: + dd_ctx_json["_datadog"] = {} + if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param) + inject_context(dd_ctx_json["_datadog"], "kinesis", stream, record) def handle_sqs_sns_produce(endpoint_service, trace_data, params, message=None): @@ -211,7 +214,7 @@ def handle_kinesis_receive(params, time_estimate, context_json, record): if config._data_streams_enabled: - core.on("botocore.kinesis.start", handle_kinesis_produce) + core.on("botocore.kinesis.update_record", handle_kinesis_produce) core.on("botocore.sqs_sns.start", handle_sqs_sns_produce) core.on("botocore.sqs.ReceiveMessage.pre", handle_sqs_prepare) core.on("botocore.sqs.ReceiveMessage.post", handle_sqs_receive)