Skip to content

Commit

Permalink
chore(telemetry): ensure metrics are aggegated every 10 secs but sent…
Browse files Browse the repository at this point in the history
… after 60 secs (#6712)

## Description
- Ensure telemetry metrics and logs are aggregated every 10 seconds.
This field should not be configurable. This will impact how metrics are
computed and visualized in the UI
- Ensure telemetry events are sent AT MOST every 60 seconds. The
telemetry writer interval can be set via
``DD_TELEMETRY_HEARTBEAT_INTERVAL`` this will not change.

- Example: heartbeat interval is 120seconds and TelemetryWriter.interval
is 10seconds.
- In a given 120 seconds window `TelemetryWriter.periodic()` will be
called 12 times.
- For each `TelemetryWriter.periodic()` call, metrics and logs will be
aggregated into a telemetry payload.
- On the 12th (and last) call, all telemetry events will be batched and
sent to the agent.

## Motivation

Align the behavior of the python telemetry client with the .NET tracer.
This behavior will also be implemented in all other tracers.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [ ] Title is accurate.
- [ ] No unnecessary changes are introduced.
- [ ] Description motivates each change.
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Testing strategy adequately addresses listed risk(s).
- [ ] Change is maintainable (easy to change, telemetry, documentation).
- [ ] Release note makes sense to a user of the library.
- [ ] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
mabdinur authored Aug 25, 2023
1 parent b97591c commit b2f35e4
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 70 deletions.
2 changes: 1 addition & 1 deletion ddtrace/internal/processor/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def shutdown(self, timeout):
self._queue_span_count_metrics("spans_finished", "integration_name", None)
# The telemetry metrics writer can be shutdown before the tracer.
# This ensures all tracer metrics always sent.
telemetry_writer.periodic()
telemetry_writer.periodic(True)

try:
self._writer.stop(timeout)
Expand Down
66 changes: 34 additions & 32 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ def __eq__(self, other):
)


def _get_heartbeat_interval_or_default():
# type: () -> float
return float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", default=60))


class _TelemetryClient:
def __init__(self, endpoint):
# type: (str) -> None
Expand Down Expand Up @@ -162,9 +157,15 @@ class TelemetryWriter(PeriodicService):
# payloads is only used in tests and is not required to process Telemetry events.
_sequence = itertools.count(1)

def __init__(self):
# type: () -> None
super(TelemetryWriter, self).__init__(interval=_get_heartbeat_interval_or_default())
def __init__(self, is_periodic=True):
# type: (bool) -> None
super(TelemetryWriter, self).__init__(interval=min(config._telemetry_heartbeat_interval, 10))
# Decouples the aggregation and sending of the telemetry events
# TelemetryWriter events will only be sent when _periodic_count == _periodic_threshold.
# By default this will occur at 10 second intervals.
self._periodic_threshold = int(config._telemetry_heartbeat_interval // self.interval) - 1
self._periodic_count = 0
self._is_periodic = is_periodic
self._integrations_queue = dict() # type: Dict[str, Dict]
# Currently telemetry only supports reporting a single error.
# If we'd like to report multiple errors in the future
Expand All @@ -185,8 +186,8 @@ def __init__(self):

self._client = _TelemetryClient(self.ENDPOINT_V2)

def enable(self, start_worker_thread=True):
# type: (bool) -> bool
def enable(self):
# type: () -> bool
"""
Enable the instrumentation telemetry collection service. If the service has already been
activated before, this method does nothing. Use ``disable`` to turn off the telemetry collection service.
Expand All @@ -197,10 +198,11 @@ def enable(self, start_worker_thread=True):
if self.status == ServiceStatus.RUNNING:
return True

if start_worker_thread:
if self._is_periodic:
self.start()
atexit.register(self.app_shutdown)
return True

self.status = ServiceStatus.RUNNING
return True

Expand All @@ -212,8 +214,7 @@ def disable(self):
"""
self._disabled = True
self.reset_queues()

if self.is_periodic:
if self._is_periodic and self.status is ServiceStatus.RUNNING:
atexit.unregister(self.stop)
self.stop()
else:
Expand Down Expand Up @@ -242,15 +243,6 @@ def add_event(self, payload, payload_type):
}
self._events_queue.append(event)

@property
def is_periodic(self):
# type: () -> bool
"""
Returns true if the the telemetry writer is running and was enabled using
telemetry_writer.enable(start_worker_thread=True)
"""
return self.status is ServiceStatus.RUNNING and self._worker and self._worker.is_alive()

def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None):
# type: (str, bool, Optional[bool], Optional[str]) -> None
"""
Expand Down Expand Up @@ -535,15 +527,7 @@ def _generate_logs_event(self, payload):
log.debug("%s request payload", TELEMETRY_TYPE_LOGS)
self.add_event(list(payload), TELEMETRY_TYPE_LOGS)

def periodic(self):
integrations = self._flush_integrations_queue()
if integrations:
self._app_integrations_changed_event(integrations)

configurations = self._flush_configuration_queue()
if configurations:
self._app_client_configuration_changed_event(configurations)

def periodic(self, force_flush=False):
namespace_metrics = self._namespace.flush()
if namespace_metrics:
self._generate_metrics_event(namespace_metrics)
Expand All @@ -552,6 +536,24 @@ def periodic(self):
if logs_metrics:
self._generate_logs_event(logs_metrics)

# Telemetry metrics and logs should be aggregated into payloads every time periodic is called.
# This ensures metrics and logs are submitted in 0 to 10 second time buckets.
# Optimization: All other events should be aggregated using `config._telemetry_heartbeat_interval`.
# Telemetry payloads will be submitted according to `config._telemetry_heartbeat_interval`.
if self._is_periodic and force_flush is False:
if self._periodic_count < self._periodic_threshold:
self._periodic_count += 1
return
self._periodic_count = 0

integrations = self._flush_integrations_queue()
if integrations:
self._app_integrations_changed_event(integrations)

configurations = self._flush_configuration_queue()
if configurations:
self._app_client_configuration_changed_event(configurations)

if not self._events_queue:
# Optimization: only queue heartbeat if no other events are queued
self._app_heartbeat_event()
Expand All @@ -571,7 +573,7 @@ def start(self, *args, **kwargs):

def app_shutdown(self):
self._app_closing_event()
self.periodic()
self.periodic(force_flush=True)

def reset_queues(self):
# type: () -> None
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/settings/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ def __init__(self):
self.health_metrics_enabled = asbool(os.getenv("DD_TRACE_HEALTH_METRICS_ENABLED", default=False))

self._telemetry_enabled = asbool(os.getenv("DD_INSTRUMENTATION_TELEMETRY_ENABLED", True))
self._telemetry_heartbeat_interval = float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "60"))

self._runtime_metrics_enabled = asbool(os.getenv("DD_RUNTIME_METRICS_ENABLED", False))

self._128_bit_trace_id_enabled = asbool(os.getenv("DD_TRACE_128_BIT_TRACEID_GENERATION_ENABLED", False))
Expand Down
4 changes: 2 additions & 2 deletions tests/telemetry/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

@pytest.fixture
def telemetry_writer():
telemetry_writer = TelemetryWriter()
telemetry_writer.enable(start_worker_thread=False)
telemetry_writer = TelemetryWriter(is_periodic=False)
telemetry_writer.enable()
yield telemetry_writer


Expand Down
16 changes: 9 additions & 7 deletions tests/telemetry/test_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess
print(get_runtime_id())
# Call periodic to send heartbeat event
telemetry_writer.periodic()
telemetry_writer.periodic(True)
# Disable telemetry writer to avoid sending app-closed event
telemetry_writer.disable()
"""
Expand All @@ -135,15 +135,17 @@ def test_enable_fork_heartbeat(test_agent_session, run_python_code_in_subprocess

def test_heartbeat_interval_configuration(run_python_code_in_subprocess):
"""assert that DD_TELEMETRY_HEARTBEAT_INTERVAL config sets the telemetry writer interval"""
heartbeat_interval = "1.5"
env = os.environ.copy()
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = heartbeat_interval
env["DD_TELEMETRY_HEARTBEAT_INTERVAL"] = "61"
code = """
from ddtrace import config
assert config._telemetry_heartbeat_interval == 61
from ddtrace.internal.telemetry import telemetry_writer
assert telemetry_writer.interval == {}
""".format(
heartbeat_interval
)
assert telemetry_writer._is_periodic is True
assert telemetry_writer.interval == 10
assert telemetry_writer._periodic_threshold == 5
"""

_, stderr, status, _ = run_python_code_in_subprocess(code, env=env)
assert status == 0, stderr
Expand Down
16 changes: 8 additions & 8 deletions tests/telemetry/test_telemetry_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def test_send_metric_datapoint_with_different_types(telemetry_writer, test_agent
"points": [[1642544540, 1.0]],
"tags": ["a:b"],
"type": "gauge",
"interval": 60,
"interval": 10,
},
]
_assert_metric(test_agent_session, expected_series)
Expand Down Expand Up @@ -234,17 +234,17 @@ def test_send_appsec_rate_metric(telemetry_writer, test_agent_session, mock_time
expected_series = [
{
"common": True,
"interval": 60,
"interval": 10,
"metric": "test-metric",
"points": [[1642544540, 0.1]],
"points": [[1642544540, 0.6]],
"tags": ["hi:hello", "name:candy"],
"type": "rate",
},
{
"common": True,
"interval": 60,
"interval": 10,
"metric": "test-metric",
"points": [[1642544540, 0.2]],
"points": [[1642544540, 1.2]],
"tags": [],
"type": "rate",
},
Expand All @@ -269,23 +269,23 @@ def test_send_appsec_gauge_metric(telemetry_writer, test_agent_session, mock_tim
expected_series = [
{
"common": True,
"interval": 60,
"interval": 10,
"metric": "test-metric",
"points": [[1642544540, 5.0]],
"tags": ["hi:hello", "name:candy"],
"type": "gauge",
},
{
"common": True,
"interval": 60,
"interval": 10,
"metric": "test-metric",
"points": [[1642544540, 5.0]],
"tags": ["a:b"],
"type": "gauge",
},
{
"common": True,
"interval": 60,
"interval": 10,
"metric": "test-metric",
"points": [[1642544540, 6.0]],
"tags": [],
Expand Down
38 changes: 18 additions & 20 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from ddtrace.internal.utils.version import _pep440_to_semver
from ddtrace.settings import _config as config

from .conftest import TelemetryTestSession


def test_add_event(telemetry_writer, test_agent_session, mock_time):
"""asserts that add_event queues a telemetry request with valid headers and payload"""
Expand Down Expand Up @@ -139,7 +137,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
telemetry_writer.enable()
telemetry_writer.reset_queues()
telemetry_writer._app_started_event()
telemetry_writer.periodic()
telemetry_writer.periodic(force_flush=True)
telemetry_writer.disable()
"""

Expand Down Expand Up @@ -186,13 +184,14 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
assert status == 0, stderr

events = test_agent_session.get_events()
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])

assert len(events) == 1
events[0]["payload"]["configuration"].sort(key=lambda c: c["name"])
assert events[0]["payload"]["configuration"] == [
{"name": "DD_APPSEC_ENABLED", "origin": "unknown", "value": False},
{"name": "DD_CALL_BASIC_CONFIG", "origin": "unknown", "value": True},
{"name": "DD_DATA_STREAMS_ENABLED", "origin": "unknown", "value": False},
{"name": "DD_DYNAMIC_INSTRUMENTATION_ENABLED", "origin": "unknown", "value": False},
{"name": "DD_DYNAMIC_INSTRUMENTATION_ENABLED", "origin": "unknown", "value": True},
{"name": "DD_EXCEPTION_DEBUGGING_ENABLED", "origin": "unknown", "value": True},
{"name": "DD_INSTRUMENTATION_TELEMETRY_ENABLED", "origin": "unknown", "value": True},
{"name": "DD_LOGS_INJECTION", "origin": "unknown", "value": True},
Expand Down Expand Up @@ -370,11 +369,22 @@ def test_app_heartbeat_event_periodic(mock_time, telemetry_writer, test_agent_se
# type: (mock.Mock, Any, TelemetryWriter) -> None
"""asserts that we queue/send app-heartbeat when periodc() is called"""

# Assert default hearbeat interval is 60 seconds
assert telemetry_writer.interval == 60
# Ensure telemetry writer is initialized to send periodic events
telemetry_writer._is_periodic = True
# Assert default telemetry interval is 10 seconds and the expected periodic threshold and counts are set
assert telemetry_writer.interval == 10
assert telemetry_writer._periodic_threshold == 5
assert telemetry_writer._periodic_count == 0

# Assert next flush contains app-heartbeat event
for _ in range(telemetry_writer._periodic_threshold):
telemetry_writer.periodic()
assert len(test_agent_session.get_events()) == 0

telemetry_writer.periodic()
_assert_app_heartbeat_event(1, test_agent_session)
events = test_agent_session.get_events()
heartbeat_events = [event for event in events if event["request_type"] == "app-heartbeat"]
assert len(heartbeat_events) == 1


def test_app_heartbeat_event(mock_time, telemetry_writer, test_agent_session):
Expand All @@ -392,18 +402,6 @@ def test_app_heartbeat_event(mock_time, telemetry_writer, test_agent_session):
assert len(events) == 1


def _assert_app_heartbeat_event(seq_id, test_agent_session):
# type: (int, TelemetryTestSession) -> None
"""used to test heartbeat events received by the testagent"""
events = test_agent_session.get_events()
assert len(events) == seq_id
# The test_agent returns telemetry events in reverse chronological order
# The first event in the list is last event sent by the Telemetry Client
last_event = events[0]
assert last_event["request_type"] == "app-heartbeat"
assert last_event == _get_request_body({}, "app-heartbeat", seq_id=seq_id)


def _get_request_body(payload, payload_type, seq_id=1):
# type: (Dict, str, int) -> Dict
"""used to test the body of requests received by the testagent"""
Expand Down

0 comments on commit b2f35e4

Please sign in to comment.