Skip to content

Commit

Permalink
feat(telemetry): validate unique telemetry log messages (#6213)
Browse files Browse the repository at this point in the history
Telemetry Logs: The telemetry writer validates unique messages for each
interval.

For example, if an application implements 
```
telemetry_metrics_writer.add_log("WARNING", "test error 1")
telemetry_metrics_writer.add_log("WARNING", "test error 1")
telemetry_metrics_writer.add_log("WARNING", "test error 2")
telemetry_metrics_writer.add_log("WARNING", "test error 2")
```
The telemetry writer reports two logs.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](../docs/contributing.rst#release-branch-maintenance))

## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
- [x] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](../docs/contributing.rst#release-branch-maintenance)

---------

Co-authored-by: Christophe Papazian <[email protected]>
Co-authored-by: Zachary Groves <[email protected]>
Co-authored-by: Juanjo Alvarez Martinez <[email protected]>
  • Loading branch information
4 people authored Jun 30, 2023
1 parent e264852 commit 0bb7eab
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 27 deletions.
40 changes: 28 additions & 12 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union

Expand Down Expand Up @@ -44,6 +45,19 @@
log = get_logger(__name__)


class LogData(dict):
def __hash__(self):
return hash((self["message"], self["level"], self.get("tags"), self.get("stack_trace")))

def __eq__(self, other):
return (
self["message"] == other["message"]
and self["level"] == other["level"]
and self.get("tags") == other.get("tags")
and self.get("stack_trace") == other.get("stack_trace")
)


def _get_heartbeat_interval_or_default():
# type: () -> float
return float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", default=60))
Expand Down Expand Up @@ -241,7 +255,7 @@ def __init__(self):
# type: () -> None
super(TelemetryLogsMetricsWriter, self).__init__(interval=_get_telemetry_metrics_interval_or_default())
self._namespace = MetricNamespace()
self._logs = [] # type: List[Dict[str, Any]]
self._logs = set() # type: Set[Dict[str, Any]]

def enable(self, start_worker_thread=True):
# type: (bool) -> bool
Expand All @@ -259,16 +273,18 @@ def add_log(self, level, message, stack_trace="", tags={}):
This will make support cycles easier and ensure we know about potentially silent issues in libraries.
"""
if self.enable():
data = {
"message": message,
"level": level,
"tracer_time": int(time.time()),
}
data = LogData(
{
"message": message,
"level": level,
"tracer_time": int(time.time()),
}
)
if tags:
data["tags"] = ",".join(["%s:%s" % (k, str(v).lower()) for k, v in tags.items()])
if stack_trace:
data["stack_trace"] = stack_trace
self._logs.append(data)
self._logs.add(data)

def add_gauge_metric(self, namespace, name, value, tags=None):
# type: (str,str, float, MetricTagType) -> None
Expand Down Expand Up @@ -342,10 +358,10 @@ def periodic(self):
self._client.send_event(telemetry_event)

def _flush_log_metrics(self):
# type () -> List[Metric]
# type () -> Set[Metric]
with self._lock:
log_metrics = self._logs
self._logs = []
self._logs = set()
return log_metrics

def _generate_metrics_event(self, namespace_metrics):
Expand All @@ -364,9 +380,9 @@ def _generate_metrics_event(self, namespace_metrics):
self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS)

def _generate_logs_event(self, payload):
# type: (List[Dict[str, str]]) -> None
# type: (Set[Dict[str, str]]) -> None
log.debug("%s request payload", TELEMETRY_TYPE_LOGS)
self.add_event(payload, TELEMETRY_TYPE_LOGS)
self.add_event(list(payload), TELEMETRY_TYPE_LOGS)

def on_shutdown(self):
self.periodic()
Expand All @@ -375,7 +391,7 @@ def reset_queues(self):
# type: () -> None
super(TelemetryLogsMetricsWriter, self).reset_queues()
self._namespace.flush()
self._logs = []
self._logs = set()


class TelemetryWriter(TelemetryBase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
telemetry: telemetry writer validates unique log messages for each interval.
29 changes: 16 additions & 13 deletions tests/appsec/test_telemety.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ def test_log_metric_error_ddwaf_init(mock_logs_telemetry_metrics_writer):
):
AppSecSpanProcessor()

assert len(mock_logs_telemetry_metrics_writer._logs) == 1
assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "WAF init error. Invalid rules"
assert mock_logs_telemetry_metrics_writer._logs[0]["stack_trace"].startswith("DDWAF.__init__: invalid rules")
assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"]
list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs)
assert len(list_metrics_logs) == 1
assert list_metrics_logs[0]["message"] == "WAF init error. Invalid rules"
assert list_metrics_logs[0]["stack_trace"].startswith("DDWAF.__init__: invalid rules")
assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"]


def test_log_metric_error_ddwaf_timeout(mock_logs_telemetry_metrics_writer, tracer):
Expand All @@ -154,19 +155,21 @@ def test_log_metric_error_ddwaf_timeout(mock_logs_telemetry_metrics_writer, trac
Config(),
)

print(mock_logs_telemetry_metrics_writer._logs)
assert len(mock_logs_telemetry_metrics_writer._logs) == 2
assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "WAF run. Timeout errors"
assert mock_logs_telemetry_metrics_writer._logs[0].get("stack_trace") is None
assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"]
list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs)
assert len(list_metrics_logs) == 1
assert list_metrics_logs[0]["message"] == "WAF run. Timeout errors"
assert list_metrics_logs[0].get("stack_trace") is None
assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"]


@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_log_metric_error_ddwaf_update(mock_logs_telemetry_metrics_writer):
with override_global_config(dict(_appsec_enabled=True, _telemetry_metrics_enabled=True)):
span_processor = AppSecSpanProcessor()
span_processor._update_rules("{}")
assert len(mock_logs_telemetry_metrics_writer._logs) == 1
assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "Error updating ASM rules. Invalid rules"
assert mock_logs_telemetry_metrics_writer._logs[0].get("stack_trace") is None
assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"]

list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs)
assert len(list_metrics_logs) == 1
assert list_metrics_logs[0]["message"] == "Error updating ASM rules. Invalid rules"
assert list_metrics_logs[0].get("stack_trace") is None
assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"]
80 changes: 78 additions & 2 deletions tests/telemetry/test_telemetry_metrics.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import sys
from time import sleep

from mock.mock import ANY
import pytest

from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE_TAG_APPSEC
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE_TAG_TRACER
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION
Expand Down Expand Up @@ -53,8 +59,11 @@ def _assert_logs(

# Python 2.7 and Python 3.5 fail with dictionaries and lists order
expected_body = _get_request_body(expected_payload, TELEMETRY_TYPE_LOGS, seq_id)
expected_body_sorted = expected_body["payload"].sort(key=lambda x: x["message"], reverse=False)
result_event = events[0]["payload"].sort(key=lambda x: x["message"], reverse=False)
expected_body["payload"].sort(key=lambda x: x["message"], reverse=False)
expected_body_sorted = expected_body["payload"]

events[0]["payload"].sort(key=lambda x: x["message"], reverse=False)
result_event = events[0]["payload"]

assert result_event == expected_body_sorted

Expand Down Expand Up @@ -374,6 +383,7 @@ def test_send_log_metric_simple(telemetry_metrics_writer, test_agent_metrics_ses
_assert_logs(test_agent_metrics_session, expected_payload)


@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only, tags order fails on 2.7 and 3.5")
def test_send_log_metric_simple_tags(telemetry_metrics_writer, test_agent_metrics_session, mock_time):
"""Check the queue of metrics is empty after run periodic method of PeriodicService"""
with override_global_config(dict(_telemetry_metrics_enabled=True)):
Expand Down Expand Up @@ -409,3 +419,69 @@ def test_send_multiple_log_metric(telemetry_metrics_writer, test_agent_metrics_s
telemetry_metrics_writer.add_log("WARNING", "test error 1", "Traceback:\nValueError", {"a": "b"})

_assert_logs(test_agent_metrics_session, expected_payload, seq_id=2)


def test_send_multiple_log_metric_no_duplicates(telemetry_metrics_writer, test_agent_metrics_session, mock_time):
with override_global_config(dict(_telemetry_metrics_enabled=True)):
for _ in range(10):
telemetry_metrics_writer.add_log("WARNING", "test error 1", "Traceback:\nValueError", {"a": "b"})

expected_payload = [
{
"level": "WARNING",
"message": "test error 1",
"stack_trace": "Traceback:\nValueError",
"tracer_time": 1642544540,
"tags": "a:b",
},
]

_assert_logs(test_agent_metrics_session, expected_payload)


def test_send_multiple_log_metric_no_duplicates_for_each_interval(
telemetry_metrics_writer, test_agent_metrics_session, mock_time
):
with override_global_config(dict(_telemetry_metrics_enabled=True)):
for _ in range(10):
telemetry_metrics_writer.add_log("WARNING", "test error 1")

expected_payload = [
{
"level": "WARNING",
"message": "test error 1",
"tracer_time": 1642544540,
},
]

_assert_logs(test_agent_metrics_session, expected_payload)

for _ in range(10):
telemetry_metrics_writer.add_log("WARNING", "test error 1")

_assert_logs(test_agent_metrics_session, expected_payload, seq_id=2)


def test_send_multiple_log_metric_no_duplicates_for_each_interval_check_time(
telemetry_metrics_writer, test_agent_metrics_session
):
with override_global_config(dict(_telemetry_metrics_enabled=True)):
for _ in range(3):
sleep(0.1)
telemetry_metrics_writer.add_log("WARNING", "test error 1")

expected_payload = [
{
"level": "WARNING",
"message": "test error 1",
"tracer_time": ANY,
},
]

_assert_logs(test_agent_metrics_session, expected_payload)

for _ in range(3):
sleep(0.1)
telemetry_metrics_writer.add_log("WARNING", "test error 1")

_assert_logs(test_agent_metrics_session, expected_payload, seq_id=2)

0 comments on commit 0bb7eab

Please sign in to comment.