From edb77397756a4cffdc16a57d917e1c723cbad518 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADtor=20De=20Ara=C3=BAjo?= Date: Thu, 22 Aug 2024 11:27:20 +0100 Subject: [PATCH] chore(ci_visibility): add endpoint metrics telemetry for CI visibility (#10278) This PR adds the `endpoint_payload.*` metrics to CI visibility telemetry. ### Testing strategy - Clone https://github.com/pallets/flask as a test project - Install it and `ddtrace` in editable mode in an environment (and also `pytest` and `coverage`) - Run `_DD_CIVISIBILITY_ITR_FORCE_ENABLE_COVERAGE=1 DD_TRACE_DEBUG=1 DD_SITE=datad0g.com DD_API_KEY=... DD_CIVISIBILITY_AGENTLESS_ENABLED=true DD_SERVICE=flask.service pytest --ddtrace -s` - Watch the telemetry data show up in the [Test Visibility Libraries Telemetry](https://ddstaging.datadoghq.com/dashboard/r2j-7jz-66i/test-visibility-libraries-telemetry?fromUser=false&refresh_mode=sliding&tpl_var_language_name%5B0%5D=python&view=spans&from_ts=1721550410463&to_ts=1724142410463&live=true) dashboard under "TestCycle Endpoint" and "Code Coverage Endpoint" - Check that the `Recording endpoint payload ...` messages match the expected data to be sent. ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --- ddtrace/internal/ci_visibility/encoder.py | 12 +++- .../ci_visibility/telemetry/coverage.py | 4 +- .../ci_visibility/telemetry/payload.py | 68 +++++++++++++++++++ ddtrace/internal/ci_visibility/writer.py | 36 ++++++++++ ddtrace/internal/telemetry/writer.py | 2 +- tests/ci_visibility/test_ci_visibility.py | 3 + 6 files changed, 121 insertions(+), 4 deletions(-) create mode 100644 ddtrace/internal/ci_visibility/telemetry/payload.py diff --git a/ddtrace/internal/ci_visibility/encoder.py b/ddtrace/internal/ci_visibility/encoder.py index d2b095c7ed6..43f54d09414 100644 --- a/ddtrace/internal/ci_visibility/encoder.py +++ b/ddtrace/internal/ci_visibility/encoder.py @@ -15,7 +15,11 @@ from ddtrace.internal.ci_visibility.constants import SESSION_TYPE from ddtrace.internal.ci_visibility.constants import SUITE_ID from ddtrace.internal.ci_visibility.constants import SUITE_TYPE +from ddtrace.internal.ci_visibility.telemetry.payload import ENDPOINT +from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_count +from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_serialization_time from ddtrace.internal.encoding import JSONEncoderV2 +from ddtrace.internal.utils.time import StopWatch from ddtrace.internal.writer.writer import NoEncodableSpansError @@ -34,6 +38,7 @@ class CIVisibilityEncoderV01(BufferedEncoder): PAYLOAD_FORMAT_VERSION = 1 TEST_SUITE_EVENT_VERSION = 1 TEST_EVENT_VERSION = 2 + ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE def __init__(self, *args): super(CIVisibilityEncoderV01, self).__init__() @@ -62,7 +67,9 @@ def encode_traces(self, traces): def encode(self): with self._lock: - payload = self._build_payload(self.buffer) + with StopWatch() as sw: + payload = self._build_payload(self.buffer) + record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed()) self._init_buffer() return payload @@ -70,6 +77,7 @@ def _build_payload(self, traces): normalized_spans = [self._convert_span(span, trace[0].context.dd_origin) for trace in traces for span in trace] if not normalized_spans: return None + record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans)) self._metadata = {k: v for k, v in self._metadata.items() if k in self.ALLOWED_METADATA_KEYS} # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size. return CIVisibilityEncoderV01._pack_payload( @@ -141,6 +149,7 @@ def _filter_ids(sp): class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01): PAYLOAD_FORMAT_VERSION = 2 + ENDPOINT_TYPE = ENDPOINT.CODE_COVERAGE boundary = uuid4().hex content_type = "multipart/form-data; boundary=%s" % boundary itr_suite_skipping_mode = False @@ -189,6 +198,7 @@ def _build_data(self, traces): ] if not normalized_covs: return None + record_endpoint_payload_events_count(endpoint=ENDPOINT.CODE_COVERAGE, count=len(normalized_covs)) # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size. return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs}) diff --git a/ddtrace/internal/ci_visibility/telemetry/coverage.py b/ddtrace/internal/ci_visibility/telemetry/coverage.py index febecb54e94..e3370fbee6e 100644 --- a/ddtrace/internal/ci_visibility/telemetry/coverage.py +++ b/ddtrace/internal/ci_visibility/telemetry/coverage.py @@ -15,7 +15,7 @@ class COVERAGE_TELEMETRY(str, Enum): STARTED = "code_coverage_started" FINISHED = "code_coverage_finished" - IS_EMTPY = "code_coverage.is_empty" + IS_EMPTY = "code_coverage.is_empty" FILES = "code_coverage.files" ERRORS = "code_coverage.errors" @@ -43,7 +43,7 @@ def record_code_coverage_finished(coverage_library: COVERAGE_LIBRARY, test_frame def record_code_coverage_empty(): log.debug("Recording code coverage empty telemetry") - telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMTPY, 1) + telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMPTY, 1) def record_code_coverage_files(count_files: int): diff --git a/ddtrace/internal/ci_visibility/telemetry/payload.py b/ddtrace/internal/ci_visibility/telemetry/payload.py new file mode 100644 index 00000000000..1cf41d306ff --- /dev/null +++ b/ddtrace/internal/ci_visibility/telemetry/payload.py @@ -0,0 +1,68 @@ +from enum import Enum + +from ddtrace.internal.ci_visibility.telemetry.constants import CIVISIBILITY_TELEMETRY_NAMESPACE as _NAMESPACE +from ddtrace.internal.logger import get_logger +from ddtrace.internal.telemetry import telemetry_writer + + +log = get_logger(__name__) + + +class ENDPOINT(str, Enum): + TEST_CYCLE = "test_cycle" + CODE_COVERAGE = "code_coverage" + + +class ENDPOINT_PAYLOAD_TELEMETRY(str, Enum): + BYTES = "endpoint_payload.bytes" + REQUESTS_COUNT = "endpoint_payload.requests" + REQUESTS_MS = "endpoint_payload.requests_ms" + REQUESTS_ERRORS = "endpoint_payload.requests_errors" + EVENTS_COUNT = "endpoint_payload.events_count" + EVENTS_SERIALIZATION_MS = "endpoint_payload.events_serialization_ms" + + +class REQUEST_ERROR_TYPE(str, Enum): + TIMEOUT = "timeout" + NETWORK = "network" + STATUS_CODE = "status_code" + + +def record_endpoint_payload_bytes(endpoint: ENDPOINT, nbytes: int) -> None: + log.debug("Recording endpoint payload bytes: %s, %s", endpoint, nbytes) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.BYTES.value, nbytes, tags) + + +def record_endpoint_payload_request(endpoint: ENDPOINT) -> None: + log.debug("Recording endpoint payload request: %s", endpoint) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_COUNT.value, 1, tags) + + +def record_endpoint_payload_request_time(endpoint: ENDPOINT, seconds: float) -> None: + log.debug("Recording endpoint payload request time: %s, %s seconds", endpoint, seconds) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric( + _NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_MS.value, seconds * 1000, tags + ) + + +def record_endpoint_payload_request_error(endpoint: ENDPOINT, error_type: REQUEST_ERROR_TYPE) -> None: + log.debug("Recording endpoint payload request error: %s, %s", endpoint, error_type) + tags = (("endpoint", endpoint.value), ("error_type", error_type)) + telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_ERRORS.value, 1, tags) + + +def record_endpoint_payload_events_count(endpoint: ENDPOINT, count: int) -> None: + log.debug("Recording endpoint payload events count: %s, %s", endpoint, count) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_COUNT.value, count, tags) + + +def record_endpoint_payload_events_serialization_time(endpoint: ENDPOINT, seconds: float) -> None: + log.debug("Recording endpoint payload serialization time: %s, %s seconds", endpoint, seconds) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric( + _NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_SERIALIZATION_MS.value, seconds * 1000, tags + ) diff --git a/ddtrace/internal/ci_visibility/writer.py b/ddtrace/internal/ci_visibility/writer.py index 6746e3c8b34..4071b0ce8e7 100644 --- a/ddtrace/internal/ci_visibility/writer.py +++ b/ddtrace/internal/ci_visibility/writer.py @@ -1,9 +1,12 @@ +from http.client import RemoteDisconnected import os +import socket from typing import TYPE_CHECKING # noqa:F401 from typing import Optional # noqa:F401 import ddtrace from ddtrace import config +from ddtrace.internal.utils.time import StopWatch from ddtrace.vendor.dogstatsd import DogStatsd # noqa:F401 from .. import agent @@ -22,12 +25,19 @@ from .constants import EVP_SUBDOMAIN_HEADER_NAME from .encoder import CIVisibilityCoverageEncoderV02 from .encoder import CIVisibilityEncoderV01 +from .telemetry.payload import REQUEST_ERROR_TYPE +from .telemetry.payload import record_endpoint_payload_bytes +from .telemetry.payload import record_endpoint_payload_request +from .telemetry.payload import record_endpoint_payload_request_error +from .telemetry.payload import record_endpoint_payload_request_time if TYPE_CHECKING: # pragma: no cover from typing import Dict # noqa:F401 from typing import List # noqa:F401 + from ddtrace.internal.utils.http import Response # noqa:F401 + class CIVisibilityEventClient(WriterClientBase): def __init__(self): @@ -146,3 +156,29 @@ def recreate(self): dogstatsd=self.dogstatsd, sync_mode=self._sync_mode, ) + + def _put(self, data, headers, client, no_trace): + # type: (bytes, Dict[str, str], WriterClientBase, bool) -> Response + request_error = None # type: Optional[REQUEST_ERROR_TYPE] + + with StopWatch() as sw: + try: + response = super()._put(data, headers, client, no_trace) + if response.status >= 400: + request_error = REQUEST_ERROR_TYPE.STATUS_CODE + except (TimeoutError, socket.timeout): + request_error = REQUEST_ERROR_TYPE.TIMEOUT + raise + except RemoteDisconnected: + request_error = REQUEST_ERROR_TYPE.NETWORK + raise + finally: + if isinstance(client.encoder, CIVisibilityEncoderV01): + endpoint = client.encoder.ENDPOINT_TYPE + record_endpoint_payload_bytes(endpoint, nbytes=len(data)) + record_endpoint_payload_request(endpoint) + record_endpoint_payload_request_time(endpoint, seconds=sw.elapsed()) + if request_error: + record_endpoint_payload_request_error(endpoint, request_error) + + return response diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 42a7e5eaa48..d325643bdde 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -337,7 +337,7 @@ def add_event(self, payload, payload_type): Adds a Telemetry event to the TelemetryWriter event buffer :param Dict payload: stores a formatted telemetry event - :param str payload_type: The payload_type denotes the type of telmetery request. + :param str payload_type: The payload_type denotes the type of telemetry request. Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ if self.enable(): diff --git a/tests/ci_visibility/test_ci_visibility.py b/tests/ci_visibility/test_ci_visibility.py index 141d9d760fb..78d670a62ce 100644 --- a/tests/ci_visibility/test_ci_visibility.py +++ b/tests/ci_visibility/test_ci_visibility.py @@ -505,6 +505,7 @@ def test_civisibilitywriter_coverage_agentless_url(): assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0) @@ -524,6 +525,7 @@ def test_civisibilitywriter_coverage_agentless_with_intake_url_param(): assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0) @@ -542,6 +544,7 @@ def test_civisibilitywriter_coverage_evp_proxy_url(): assert cov_client.ENDPOINT == "/evp_proxy/v2/api/v2/citestcov" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("http://localhost:9126", 2.0)