Skip to content

Commit

Permalink
Merge branch 'main' into romain.komorn/SDTEST-225/use_bytearrays_for_…
Browse files Browse the repository at this point in the history
…coverage
  • Loading branch information
romainkomorndatadog authored Aug 22, 2024
2 parents fc7799f + edb7739 commit 149caa0
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 4 deletions.
12 changes: 11 additions & 1 deletion ddtrace/internal/ci_visibility/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
from ddtrace.internal.ci_visibility.constants import SESSION_TYPE
from ddtrace.internal.ci_visibility.constants import SUITE_ID
from ddtrace.internal.ci_visibility.constants import SUITE_TYPE
from ddtrace.internal.ci_visibility.telemetry.payload import ENDPOINT
from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_count
from ddtrace.internal.ci_visibility.telemetry.payload import record_endpoint_payload_events_serialization_time
from ddtrace.internal.encoding import JSONEncoderV2
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.time import StopWatch
from ddtrace.internal.writer.writer import NoEncodableSpansError


Expand All @@ -37,6 +41,7 @@ class CIVisibilityEncoderV01(BufferedEncoder):
PAYLOAD_FORMAT_VERSION = 1
TEST_SUITE_EVENT_VERSION = 1
TEST_EVENT_VERSION = 2
ENDPOINT_TYPE = ENDPOINT.TEST_CYCLE

def __init__(self, *args):
super(CIVisibilityEncoderV01, self).__init__()
Expand Down Expand Up @@ -65,14 +70,17 @@ def encode_traces(self, traces):

def encode(self):
with self._lock:
payload = self._build_payload(self.buffer)
with StopWatch() as sw:
payload = self._build_payload(self.buffer)
record_endpoint_payload_events_serialization_time(endpoint=self.ENDPOINT_TYPE, seconds=sw.elapsed())
self._init_buffer()
return payload

def _build_payload(self, traces):
normalized_spans = [self._convert_span(span, trace[0].context.dd_origin) for trace in traces for span in trace]
if not normalized_spans:
return None
record_endpoint_payload_events_count(endpoint=ENDPOINT.TEST_CYCLE, count=len(normalized_spans))
self._metadata = {k: v for k, v in self._metadata.items() if k in self.ALLOWED_METADATA_KEYS}
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
return CIVisibilityEncoderV01._pack_payload(
Expand Down Expand Up @@ -144,6 +152,7 @@ def _filter_ids(sp):

class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01):
PAYLOAD_FORMAT_VERSION = 2
ENDPOINT_TYPE = ENDPOINT.CODE_COVERAGE
boundary = uuid4().hex
content_type = "multipart/form-data; boundary=%s" % boundary
itr_suite_skipping_mode = False
Expand Down Expand Up @@ -199,6 +208,7 @@ def _build_data(self, traces):
]
if not normalized_covs:
return None
record_endpoint_payload_events_count(endpoint=ENDPOINT.CODE_COVERAGE, count=len(normalized_covs))
# TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
return msgpack_packb({"version": self.PAYLOAD_FORMAT_VERSION, "coverages": normalized_covs})

Expand Down
4 changes: 2 additions & 2 deletions ddtrace/internal/ci_visibility/telemetry/coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
class COVERAGE_TELEMETRY(str, Enum):
STARTED = "code_coverage_started"
FINISHED = "code_coverage_finished"
IS_EMTPY = "code_coverage.is_empty"
IS_EMPTY = "code_coverage.is_empty"
FILES = "code_coverage.files"
ERRORS = "code_coverage.errors"

Expand Down Expand Up @@ -43,7 +43,7 @@ def record_code_coverage_finished(coverage_library: COVERAGE_LIBRARY, test_frame

def record_code_coverage_empty():
log.debug("Recording code coverage empty telemetry")
telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMTPY, 1)
telemetry_writer.add_count_metric(_NAMESPACE, COVERAGE_TELEMETRY.IS_EMPTY, 1)


def record_code_coverage_files(count_files: int):
Expand Down
68 changes: 68 additions & 0 deletions ddtrace/internal/ci_visibility/telemetry/payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from enum import Enum

from ddtrace.internal.ci_visibility.telemetry.constants import CIVISIBILITY_TELEMETRY_NAMESPACE as _NAMESPACE
from ddtrace.internal.logger import get_logger
from ddtrace.internal.telemetry import telemetry_writer


log = get_logger(__name__)


class ENDPOINT(str, Enum):
TEST_CYCLE = "test_cycle"
CODE_COVERAGE = "code_coverage"


class ENDPOINT_PAYLOAD_TELEMETRY(str, Enum):
BYTES = "endpoint_payload.bytes"
REQUESTS_COUNT = "endpoint_payload.requests"
REQUESTS_MS = "endpoint_payload.requests_ms"
REQUESTS_ERRORS = "endpoint_payload.requests_errors"
EVENTS_COUNT = "endpoint_payload.events_count"
EVENTS_SERIALIZATION_MS = "endpoint_payload.events_serialization_ms"


class REQUEST_ERROR_TYPE(str, Enum):
TIMEOUT = "timeout"
NETWORK = "network"
STATUS_CODE = "status_code"


def record_endpoint_payload_bytes(endpoint: ENDPOINT, nbytes: int) -> None:
log.debug("Recording endpoint payload bytes: %s, %s", endpoint, nbytes)
tags = (("endpoint", endpoint.value),)
telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.BYTES.value, nbytes, tags)


def record_endpoint_payload_request(endpoint: ENDPOINT) -> None:
log.debug("Recording endpoint payload request: %s", endpoint)
tags = (("endpoint", endpoint.value),)
telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_COUNT.value, 1, tags)


def record_endpoint_payload_request_time(endpoint: ENDPOINT, seconds: float) -> None:
log.debug("Recording endpoint payload request time: %s, %s seconds", endpoint, seconds)
tags = (("endpoint", endpoint.value),)
telemetry_writer.add_distribution_metric(
_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_MS.value, seconds * 1000, tags
)


def record_endpoint_payload_request_error(endpoint: ENDPOINT, error_type: REQUEST_ERROR_TYPE) -> None:
log.debug("Recording endpoint payload request error: %s, %s", endpoint, error_type)
tags = (("endpoint", endpoint.value), ("error_type", error_type))
telemetry_writer.add_count_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.REQUESTS_ERRORS.value, 1, tags)


def record_endpoint_payload_events_count(endpoint: ENDPOINT, count: int) -> None:
log.debug("Recording endpoint payload events count: %s, %s", endpoint, count)
tags = (("endpoint", endpoint.value),)
telemetry_writer.add_distribution_metric(_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_COUNT.value, count, tags)


def record_endpoint_payload_events_serialization_time(endpoint: ENDPOINT, seconds: float) -> None:
log.debug("Recording endpoint payload serialization time: %s, %s seconds", endpoint, seconds)
tags = (("endpoint", endpoint.value),)
telemetry_writer.add_distribution_metric(
_NAMESPACE, ENDPOINT_PAYLOAD_TELEMETRY.EVENTS_SERIALIZATION_MS.value, seconds * 1000, tags
)
36 changes: 36 additions & 0 deletions ddtrace/internal/ci_visibility/writer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from http.client import RemoteDisconnected
import os
import socket
from typing import TYPE_CHECKING # noqa:F401
from typing import Optional # noqa:F401

import ddtrace
from ddtrace import config
from ddtrace.internal.utils.time import StopWatch
from ddtrace.vendor.dogstatsd import DogStatsd # noqa:F401

from .. import agent
Expand All @@ -22,12 +25,19 @@
from .constants import EVP_SUBDOMAIN_HEADER_NAME
from .encoder import CIVisibilityCoverageEncoderV02
from .encoder import CIVisibilityEncoderV01
from .telemetry.payload import REQUEST_ERROR_TYPE
from .telemetry.payload import record_endpoint_payload_bytes
from .telemetry.payload import record_endpoint_payload_request
from .telemetry.payload import record_endpoint_payload_request_error
from .telemetry.payload import record_endpoint_payload_request_time


if TYPE_CHECKING: # pragma: no cover
from typing import Dict # noqa:F401
from typing import List # noqa:F401

from ddtrace.internal.utils.http import Response # noqa:F401


class CIVisibilityEventClient(WriterClientBase):
def __init__(self):
Expand Down Expand Up @@ -146,3 +156,29 @@ def recreate(self):
dogstatsd=self.dogstatsd,
sync_mode=self._sync_mode,
)

def _put(self, data, headers, client, no_trace):
# type: (bytes, Dict[str, str], WriterClientBase, bool) -> Response
request_error = None # type: Optional[REQUEST_ERROR_TYPE]

with StopWatch() as sw:
try:
response = super()._put(data, headers, client, no_trace)
if response.status >= 400:
request_error = REQUEST_ERROR_TYPE.STATUS_CODE
except (TimeoutError, socket.timeout):
request_error = REQUEST_ERROR_TYPE.TIMEOUT
raise
except RemoteDisconnected:
request_error = REQUEST_ERROR_TYPE.NETWORK
raise
finally:
if isinstance(client.encoder, CIVisibilityEncoderV01):
endpoint = client.encoder.ENDPOINT_TYPE
record_endpoint_payload_bytes(endpoint, nbytes=len(data))
record_endpoint_payload_request(endpoint)
record_endpoint_payload_request_time(endpoint, seconds=sw.elapsed())
if request_error:
record_endpoint_payload_request_error(endpoint, request_error)

return response
2 changes: 1 addition & 1 deletion ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ def add_event(self, payload, payload_type):
Adds a Telemetry event to the TelemetryWriter event buffer
:param Dict payload: stores a formatted telemetry event
:param str payload_type: The payload_type denotes the type of telmetery request.
:param str payload_type: The payload_type denotes the type of telemetry request.
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
"""
if self.enable():
Expand Down
3 changes: 3 additions & 0 deletions tests/ci_visibility/test_ci_visibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ def test_civisibilitywriter_coverage_agentless_url():
assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com"

with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection:
_get_connection.return_value.getresponse.return_value.status = 200
dummy_writer._put("", {}, cov_client, no_trace=True)
_get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0)

Expand All @@ -524,6 +525,7 @@ def test_civisibilitywriter_coverage_agentless_with_intake_url_param():
assert cov_client._intake_url == "https://citestcov-intake.datadoghq.com"

with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection:
_get_connection.return_value.getresponse.return_value.status = 200
dummy_writer._put("", {}, cov_client, no_trace=True)
_get_connection.assert_called_once_with("https://citestcov-intake.datadoghq.com", 2.0)

Expand All @@ -542,6 +544,7 @@ def test_civisibilitywriter_coverage_evp_proxy_url():
assert cov_client.ENDPOINT == "/evp_proxy/v2/api/v2/citestcov"

with mock.patch("ddtrace.internal.writer.writer.get_connection") as _get_connection:
_get_connection.return_value.getresponse.return_value.status = 200
dummy_writer._put("", {}, cov_client, no_trace=True)
_get_connection.assert_called_once_with("http://localhost:9126", 2.0)

Expand Down

0 comments on commit 149caa0

Please sign in to comment.