diff --git a/ddtrace/internal/ci_visibility/encoder.py b/ddtrace/internal/ci_visibility/encoder.py index 6ad13a036a4..f78ed526215 100644 --- a/ddtrace/internal/ci_visibility/encoder.py +++ b/ddtrace/internal/ci_visibility/encoder.py @@ -15,8 +15,12 @@ from ddtrace.internal.ci_visibility.constants import SESSION_TYPE from ddtrace.internal.ci_visibility.constants import SUITE_ID from ddtrace.internal.ci_visibility.constants import SUITE_TYPE +from ddtrace.internal.ci_visibility.telemetry.payload import ENDPOINT +from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_count +from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_serialization_time from ddtrace.internal.encoding import JSONEncoderV2 from ddtrace.internal.logger import get_logger +from ddtrace.internal.utils.time import StopWatch from ddtrace.internal.writer.writer import NoEncodableSpansError @@ -37,6 +41,7 @@ class CIVisibilityEncoderV01(BufferedEncoder): PAYLOAD_FORMAT_VERSION = 1 TEST_SUITE_EVENT_VERSION = 1 TEST_EVENT_VERSION = 2 + ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE def __init__(self, *args): super(CIVisibilityEncoderV01, self).__init__() @@ -65,7 +70,9 @@ def encode_traces(self, traces): def encode(self): with self._lock: - payload = self._build_payload(self.buffer) + with StopWatch() as sw: + payload = self._build_payload(self.buffer) + record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed()) self._init_buffer() return payload @@ -73,6 +80,7 @@ def _build_payload(self, traces): normalized_spans = [self._convert_span(span, trace[0].context.dd_origin) for trace in traces for span in trace] if not normalized_spans: return None + record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans)) self._metadata = {k: v for k, v in self._metadata.items() if k in self.ALLOWED_METADATA_KEYS} # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size. return CIVisibilityEncoderV01._pack_payload( @@ -144,6 +152,7 @@ def _filter_ids(sp): class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01): PAYLOAD_FORMAT_VERSION = 2 + ENDPOINT_TYPE = ENDPOINT.CODE_COVERAGE boundary = uuid4().hex content_type = "multipart/form-data; boundary=%s" % boundary itr_suite_skipping_mode = False @@ -199,6 +208,7 @@ def _build_data(self, traces): ] if not normalized_covs: return None + record_endpoint_payload_events_count(endpoint=ENDPOINT.CODE_COVERAGE, count=len(normalized_covs)) # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size. return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs}) diff --git a/ddtrace/internal/ci_visibility/telemetry/coverage.py b/ddtrace/internal/ci_visibility/telemetry/coverage.py index febecb54e94..e3370fbee6e 100644 --- a/ddtrace/internal/ci_visibility/telemetry/coverage.py +++ b/ddtrace/internal/ci_visibility/telemetry/coverage.py @@ -15,7 +15,7 @@ class COVERAGE_TELEMETRY(str, Enum): STARTED = "code_coverage_started" FINISHED = "code_coverage_finished" - IS_EMTPY = "code_coverage.is_empty" + IS_EMPTY = "code_coverage.is_empty" FILES = "code_coverage.files" ERRORS = "code_coverage.errors" @@ -43,7 +43,7 @@ def record_code_coverage_finished(coverage_library: COVERAGE_LIBRARY, test_frame def record_code_coverage_empty(): log.debug("Recording code coverage empty telemetry") - telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMTPY, 1) + telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMPTY, 1) def record_code_coverage_files(count_files: int): diff --git a/ddtrace/internal/ci_visibility/telemetry/payload.py b/ddtrace/internal/ci_visibility/telemetry/payload.py new file mode 100644 index 00000000000..1cf41d306ff --- /dev/null +++ b/ddtrace/internal/ci_visibility/telemetry/payload.py @@ -0,0 +1,68 @@ +from enum import Enum + +from ddtrace.internal.ci_visibility.telemetry.constants import CIVISIBILITY_TELEMETRY_NAMESPACE as _NAMESPACE +from ddtrace.internal.logger import get_logger +from ddtrace.internal.telemetry import telemetry_writer + + +log = get_logger(__name__) + + +class ENDPOINT(str, Enum): + TEST_CYCLE = "test_cycle" + CODE_COVERAGE = "code_coverage" + + +class ENDPOINT_PAYLOAD_TELEMETRY(str, Enum): + BYTES = "endpoint_payload.bytes" + REQUESTS_COUNT = "endpoint_payload.requests" + REQUESTS_MS = "endpoint_payload.requests_ms" + REQUESTS_ERRORS = "endpoint_payload.requests_errors" + EVENTS_COUNT = "endpoint_payload.events_count" + EVENTS_SERIALIZATION_MS = "endpoint_payload.events_serialization_ms" + + +class REQUEST_ERROR_TYPE(str, Enum): + TIMEOUT = "timeout" + NETWORK = "network" + STATUS_CODE = "status_code" + + +def record_endpoint_payload_bytes(endpoint: ENDPOINT, nbytes: int) -> None: + log.debug("Recording endpoint payload bytes: %s, %s", endpoint, nbytes) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.BYTES.value, nbytes, tags) + + +def record_endpoint_payload_request(endpoint: ENDPOINT) -> None: + log.debug("Recording endpoint payload request: %s", endpoint) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_COUNT.value, 1, tags) + + +def record_endpoint_payload_request_time(endpoint: ENDPOINT, seconds: float) -> None: + log.debug("Recording endpoint payload request time: %s, %s seconds", endpoint, seconds) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric( + _NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_MS.value, seconds * 1000, tags + ) + + +def record_endpoint_payload_request_error(endpoint: ENDPOINT, error_type: REQUEST_ERROR_TYPE) -> None: + log.debug("Recording endpoint payload request error: %s, %s", endpoint, error_type) + tags = (("endpoint", endpoint.value), ("error_type", error_type)) + telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_ERRORS.value, 1, tags) + + +def record_endpoint_payload_events_count(endpoint: ENDPOINT, count: int) -> None: + log.debug("Recording endpoint payload events count: %s, %s", endpoint, count) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_COUNT.value, count, tags) + + +def record_endpoint_payload_events_serialization_time(endpoint: ENDPOINT, seconds: float) -> None: + log.debug("Recording endpoint payload serialization time: %s, %s seconds", endpoint, seconds) + tags = (("endpoint", endpoint.value),) + telemetry_writer.add_distribution_metric( + _NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_SERIALIZATION_MS.value, seconds * 1000, tags + ) diff --git a/ddtrace/internal/ci_visibility/writer.py b/ddtrace/internal/ci_visibility/writer.py index 6746e3c8b34..4071b0ce8e7 100644 --- a/ddtrace/internal/ci_visibility/writer.py +++ b/ddtrace/internal/ci_visibility/writer.py @@ -1,9 +1,12 @@ +from http.client import RemoteDisconnected import os +import socket from typing import TYPE_CHECKING # noqa:F401 from typing import Optional # noqa:F401 import ddtrace from ddtrace import config +from ddtrace.internal.utils.time import StopWatch from ddtrace.vendor.dogstatsd import DogStatsd # noqa:F401 from .. import agent @@ -22,12 +25,19 @@ from .constants import EVP_SUBDOMAIN_HEADER_NAME from .encoder import CIVisibilityCoverageEncoderV02 from .encoder import CIVisibilityEncoderV01 +from .telemetry.payload import REQUEST_ERROR_TYPE +from .telemetry.payload import record_endpoint_payload_bytes +from .telemetry.payload import record_endpoint_payload_request +from .telemetry.payload import record_endpoint_payload_request_error +from .telemetry.payload import record_endpoint_payload_request_time if TYPE_CHECKING: # pragma: no cover from typing import Dict # noqa:F401 from typing import List # noqa:F401 + from ddtrace.internal.utils.http import Response # noqa:F401 + class CIVisibilityEventClient(WriterClientBase): def __init__(self): @@ -146,3 +156,29 @@ def recreate(self): dogstatsd=self.dogstatsd, sync_mode=self._sync_mode, ) + + def _put(self, data, headers, client, no_trace): + # type: (bytes, Dict[str, str], WriterClientBase, bool) -> Response + request_error = None # type: Optional[REQUEST_ERROR_TYPE] + + with StopWatch() as sw: + try: + response = super()._put(data, headers, client, no_trace) + if response.status >= 400: + request_error = REQUEST_ERROR_TYPE.STATUS_CODE + except (TimeoutError, socket.timeout): + request_error = REQUEST_ERROR_TYPE.TIMEOUT + raise + except RemoteDisconnected: + request_error = REQUEST_ERROR_TYPE.NETWORK + raise + finally: + if isinstance(client.encoder, CIVisibilityEncoderV01): + endpoint = client.encoder.ENDPOINT_TYPE + record_endpoint_payload_bytes(endpoint, nbytes=len(data)) + record_endpoint_payload_request(endpoint) + record_endpoint_payload_request_time(endpoint, seconds=sw.elapsed()) + if request_error: + record_endpoint_payload_request_error(endpoint, request_error) + + return response diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index 42a7e5eaa48..d325643bdde 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -337,7 +337,7 @@ def add_event(self, payload, payload_type): Adds a Telemetry event to the TelemetryWriter event buffer :param Dict payload: stores a formatted telemetry event - :param str payload_type: The payload_type denotes the type of telmetery request. + :param str payload_type: The payload_type denotes the type of telemetry request. Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change """ if self.enable(): diff --git a/tests/ci_visibility/test_ci_visibility.py b/tests/ci_visibility/test_ci_visibility.py index 141d9d760fb..78d670a62ce 100644 --- a/tests/ci_visibility/test_ci_visibility.py +++ b/tests/ci_visibility/test_ci_visibility.py @@ -505,6 +505,7 @@ def test_civisibilitywriter_coverage_agentless_url(): assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0) @@ -524,6 +525,7 @@ def test_civisibilitywriter_coverage_agentless_with_intake_url_param(): assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0) @@ -542,6 +544,7 @@ def test_civisibilitywriter_coverage_evp_proxy_url(): assert cov_client.ENDPOINT == "/evp_proxy/v2/api/v2/citestcov" with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection: + _get_connection.return_value.getresponse.return_value.status = 200 dummy_writer._put("", {}, cov_client, no_trace=True) _get_connection.assert_called_once_with("http://localhost:9126", 2.0)