diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index b92c53a826d..1cff120b5a4 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -28,6 +28,7 @@ from ddtrace.internal.logger import get_logger from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import http as http_utils +from ddtrace.propagation.http import HTTPPropagator from ddtrace.vendor import wrapt @@ -597,6 +598,13 @@ def _on_botocore_trace_context_injection_prepared( log.warning("Unable to inject trace context", exc_info=True) +def _on_botocore_kinesis_update_record(ctx, stream, data_obj: Dict, record, inject_trace_context): + if inject_trace_context: + if "_datadog" not in data_obj: + data_obj["_datadog"] = {} + HTTPPropagator.inject(ctx[ctx["call_key"]].context, data_obj["_datadog"]) + + def listen(): core.on("wsgi.block.started", _wsgi_make_block_content, "status_headers_content") core.on("asgi.block.started", _asgi_make_block_content, "status_headers_content") @@ -629,6 +637,7 @@ def listen(): core.on("botocore.prep_context_injection.post", _on_botocore_trace_context_injection_prepared) core.on("botocore.patched_api_call.started", _on_botocore_patched_api_call_started) core.on("botocore.patched_kinesis_api_call.started", _on_botocore_patched_api_call_started) + core.on("botocore.kinesis.update_record", _on_botocore_kinesis_update_record) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/botocore/services/kinesis.py b/ddtrace/contrib/botocore/services/kinesis.py index 1c55e203437..58d361f8b93 100644 --- a/ddtrace/contrib/botocore/services/kinesis.py +++ b/ddtrace/contrib/botocore/services/kinesis.py @@ -1,14 +1,13 @@ from datetime import datetime import json -from typing import Any # noqa:F401 -from typing import Dict # noqa:F401 -from typing import List # noqa:F401 -from typing import Optional # noqa:F401 +from typing import Any +from typing import Dict +from typing import List +from typing import Tuple import botocore.client import botocore.exceptions -from ddtrace import Span # noqa:F401 from ddtrace import config from ddtrace.internal import core from ddtrace.internal.schema.span_attribute_schema import SpanDirection @@ -18,8 +17,6 @@ from ....internal.logger import get_logger from ....internal.schema import schematize_cloud_messaging_operation from ....internal.schema import schematize_service_name -from ....pin import Pin # noqa:F401 -from ....propagation.http import HTTPPropagator from ..utils import extract_DD_context from ..utils import get_kinesis_data_object from ..utils import set_patched_api_call_span_tags @@ -29,76 +26,46 @@ log = get_logger(__name__) -MAX_KINESIS_DATA_SIZE = 1 << 20 # 1MB +ONE_MB = 1 << 20 +MAX_KINESIS_DATA_SIZE = ONE_MB class TraceInjectionSizeExceed(Exception): pass -def inject_trace_to_kinesis_stream_data(record, span, stream, inject_trace_context=True): - # type: (Dict[str, Any], Span, str, bool) -> None - """ - :record: contains args for the current botocore action, Kinesis record is at index 1 - :span: the span which provides the trace context to be propagated - :inject_trace_context: whether to inject DataDog trace context - Inject trace headers and DSM headers into the Kinesis record's Data field in addition to the existing - data. Only possible if the existing data is JSON string or base64 encoded JSON string - Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/) - """ - if "Data" not in record: - log.warning("Unable to inject context. The kinesis stream has no data") - return - - # always inject if Data Stream is enabled, otherwise only inject if distributed tracing is enabled and this is the - # first record in the payload - if (config.botocore["distributed_tracing"] and inject_trace_context) or config._data_streams_enabled: - data = record["Data"] - line_break, data_obj = get_kinesis_data_object(data) - if data_obj is not None: - data_obj["_datadog"] = {} - - if config.botocore["distributed_tracing"] and inject_trace_context: - HTTPPropagator.inject(span.context, data_obj["_datadog"]) - - core.dispatch("botocore.kinesis.start", [stream, data_obj["_datadog"], record]) +def update_record(ctx, record: Dict[str, Any], stream: str, inject_trace_context: bool = True) -> None: + line_break, data_obj = get_kinesis_data_object(record["Data"]) + if data_obj is not None: + core.dispatch( + "botocore.kinesis.update_record", + [ctx, stream, data_obj, record, inject_trace_context], + ) + try: data_json = json.dumps(data_obj) + except Exception: + log.warning("Unable to update kinesis record", exc_info=True) - # if original string had a line break, add it back - if line_break is not None: - data_json += line_break + if line_break is not None: + data_json += line_break + + data_size = len(data_json) + if data_size >= MAX_KINESIS_DATA_SIZE: + log.warning("Data including trace injection (%d) exceeds (%d)", data_size, MAX_KINESIS_DATA_SIZE) + + record["Data"] = data_json - # check if data size will exceed max size with headers - data_size = len(data_json) - if data_size >= MAX_KINESIS_DATA_SIZE: - raise TraceInjectionSizeExceed( - "Data including trace injection ({}) exceeds ({})".format(data_size, MAX_KINESIS_DATA_SIZE) - ) - record["Data"] = data_json - - -def inject_trace_to_kinesis_stream(params, span, inject_trace_context=True): - # type: (List[Any], Span, bool) -> None - """ - :params: contains the params for the current botocore action - :span: the span which provides the trace context to be propagated - :inject_trace_context: whether to inject DataDog trace context - Max data size per record is 1MB (https://aws.amazon.com/kinesis/data-streams/faqs/) - """ - stream = params.get("StreamARN", params.get("StreamName", "")) - if "Records" in params: - records = params["Records"] - - if records: - for i in range(0, len(records)): - inject_to_trace = inject_trace_context and i == 0 - inject_trace_to_kinesis_stream_data( - records[i], span, stream, inject_trace_context=inject_to_trace - ) # only inject trace data to first record +def select_records_for_injection(params: List[Any], inject_trace_context: bool) -> List[Tuple[Any, bool]]: + records_to_inject_into = [] + if "Records" in params and params["Records"]: + for i, record in enumerate(params["Records"]): + if "Data" in record: + records_to_inject_into.append((record, inject_trace_context and i == 0)) elif "Data" in params: - inject_trace_to_kinesis_stream_data(params, span, stream, inject_trace_context=inject_trace_context) + records_to_inject_into.append((params, inject_trace_context)) + return records_to_inject_into def patched_kinesis_api_call(original_func, instance, args, kwargs, function_vars): @@ -109,8 +76,8 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var operation = function_vars.get("operation") message_received = False - func_run = False - func_run_err = None + is_getrecords_call = False + getrecords_error = None child_of = None start_ns = None result = None @@ -118,13 +85,12 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var if operation == "GetRecords": try: start_ns = time_ns() - func_run = True + is_getrecords_call = True core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params]) result = original_func(*args, **kwargs) records = result["Records"] - # dispatch to DSM to set checkpoints for record in records: _, data_obj = get_kinesis_data_object(record["Data"]) time_estimate = record.get("ApproximateArrivalTimestamp", datetime.now()).timestamp() @@ -134,20 +100,31 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var ) except Exception as e: - func_run_err = e + getrecords_error = e if result is not None and "Records" in result and len(result["Records"]) >= 1: message_received = True if config.botocore.propagation_enabled: child_of = extract_DD_context(result["Records"]) - """ - We only want to create a span for the following cases: - - not func_run: The function is not `getRecords` and we need to run it - - func_run and message_received: Received a message when polling - - config.empty_poll_enabled: We want to trace empty poll operations - - func_run_err: There was an error when calling the `getRecords` function - """ - if (func_run and message_received) or config.botocore.empty_poll_enabled or not func_run or func_run_err: + if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}: + span_name = schematize_cloud_messaging_operation( + trace_operation, + cloud_provider="aws", + cloud_service="kinesis", + direction=SpanDirection.OUTBOUND, + ) + else: + span_name = trace_operation + stream_arn = params.get("StreamARN", params.get("StreamName", "")) + function_is_not_getrecords = not is_getrecords_call + received_message_when_polling = is_getrecords_call and message_received + instrument_empty_poll_calls = config.botocore.empty_poll_enabled + should_instrument = ( + received_message_when_polling or instrument_empty_poll_calls or function_is_not_getrecords or getrecords_error + ) + is_kinesis_put_operation = endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"} + + if should_instrument: with core.context_with_data( "botocore.patched_kinesis_api_call", instance=instance, @@ -159,35 +136,29 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var call_trace=False, context_started_callback=set_patched_api_call_span_tags, pin=pin, - span_name=trace_operation, + span_name=span_name, span_type=SpanTypes.HTTP, child_of=child_of if child_of is not None else pin.tracer.context_provider.active(), activate=True, - func_run=func_run, + func_run=is_getrecords_call, start_ns=start_ns, call_key="patched_kinesis_api_call", - ) as ctx, ctx.get_item(ctx.get_item("call_key")) as span: + ) as ctx, ctx.get_item(ctx.get_item("call_key")): core.dispatch("botocore.patched_kinesis_api_call.started", [ctx]) - if config.botocore["distributed_tracing"] or config._data_streams_enabled: - try_inject_DD_context( - endpoint_name, - operation, - params, - span, - trace_operation, - inject_trace_context=bool(config.botocore["distributed_tracing"]), - ) + if is_kinesis_put_operation: + records_to_process = select_records_for_injection(params, bool(config.botocore["distributed_tracing"])) + for record, should_inject_trace_context in records_to_process: + update_record(ctx, record, stream_arn, inject_trace_context=should_inject_trace_context) try: - if not func_run: + if not is_getrecords_call: core.dispatch(f"botocore.{endpoint_name}.{operation}.pre", [params]) result = original_func(*args, **kwargs) core.dispatch(f"botocore.{endpoint_name}.{operation}.post", [params, result]) - # raise error if it was encountered before the span was started - if func_run_err: - raise func_run_err + if getrecords_error: + raise getrecords_error core.dispatch("botocore.patched_kinesis_api_call.success", [ctx, result, set_response_metadata_tags]) return result @@ -198,23 +169,7 @@ def patched_kinesis_api_call(original_func, instance, args, kwargs, function_var [ctx, e.response, botocore.exceptions.ClientError, set_response_metadata_tags], ) raise - # return results in the case that we ran the function, but no records were returned and empty - # poll spans are disabled - elif func_run: - if func_run_err: - raise func_run_err + elif is_getrecords_call: + if getrecords_error: + raise getrecords_error return result - - -def try_inject_DD_context(endpoint_name, operation, params, span, trace_operation, inject_trace_context): - try: - if endpoint_name == "kinesis" and operation in {"PutRecord", "PutRecords"}: - inject_trace_to_kinesis_stream(params, span, inject_trace_context=inject_trace_context) - span.name = schematize_cloud_messaging_operation( - trace_operation, - cloud_provider="aws", - cloud_service="kinesis", - direction=SpanDirection.OUTBOUND, - ) - except Exception: - log.warning("Unable to inject trace context", exc_info=True) diff --git a/ddtrace/internal/datastreams/botocore.py b/ddtrace/internal/datastreams/botocore.py index 74b5f60ec3c..39bb7a55633 100644 --- a/ddtrace/internal/datastreams/botocore.py +++ b/ddtrace/internal/datastreams/botocore.py @@ -107,9 +107,12 @@ def calculate_kinesis_payload_size(message, trace_data=None): return payload_size -def handle_kinesis_produce(stream, dd_ctx_json, record): - if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param) - inject_context(dd_ctx_json, "kinesis", stream, record) +def handle_kinesis_produce(ctx, stream, dd_ctx_json, record, *args): + if config._data_streams_enabled: + if "_datadog" not in dd_ctx_json: + dd_ctx_json["_datadog"] = {} + if stream: # If stream ARN / stream name isn't specified, we give up (it is not a required param) + inject_context(dd_ctx_json["_datadog"], "kinesis", stream, record) def handle_sqs_sns_produce(endpoint_service, trace_data, params, message=None): @@ -211,7 +214,7 @@ def handle_kinesis_receive(params, time_estimate, context_json, record): if config._data_streams_enabled: - core.on("botocore.kinesis.start", handle_kinesis_produce) + core.on("botocore.kinesis.update_record", handle_kinesis_produce) core.on("botocore.sqs_sns.start", handle_sqs_sns_produce) core.on("botocore.sqs.ReceiveMessage.pre", handle_sqs_prepare) core.on("botocore.sqs.ReceiveMessage.post", handle_sqs_receive)