diff --git a/ddtrace/internal/telemetry/writer.py b/ddtrace/internal/telemetry/writer.py index f478a14a874..3ba61430662 100644 --- a/ddtrace/internal/telemetry/writer.py +++ b/ddtrace/internal/telemetry/writer.py @@ -7,6 +7,7 @@ from typing import Dict from typing import List from typing import Optional +from typing import Set from typing import Tuple from typing import Union @@ -44,6 +45,19 @@ log = get_logger(__name__) +class LogData(dict): + def __hash__(self): + return hash((self["message"], self["level"], self.get("tags"), self.get("stack_trace"))) + + def __eq__(self, other): + return ( + self["message"] == other["message"] + and self["level"] == other["level"] + and self.get("tags") == other.get("tags") + and self.get("stack_trace") == other.get("stack_trace") + ) + + def _get_heartbeat_interval_or_default(): # type: () -> float return float(os.getenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", default=60)) @@ -241,7 +255,7 @@ def __init__(self): # type: () -> None super(TelemetryLogsMetricsWriter, self).__init__(interval=_get_telemetry_metrics_interval_or_default()) self._namespace = MetricNamespace() - self._logs = [] # type: List[Dict[str, Any]] + self._logs = set() # type: Set[Dict[str, Any]] def enable(self, start_worker_thread=True): # type: (bool) -> bool @@ -259,16 +273,18 @@ def add_log(self, level, message, stack_trace="", tags={}): This will make support cycles easier and ensure we know about potentially silent issues in libraries. """ if self.enable(): - data = { - "message": message, - "level": level, - "tracer_time": int(time.time()), - } + data = LogData( + { + "message": message, + "level": level, + "tracer_time": int(time.time()), + } + ) if tags: data["tags"] = ",".join(["%s:%s" % (k, str(v).lower()) for k, v in tags.items()]) if stack_trace: data["stack_trace"] = stack_trace - self._logs.append(data) + self._logs.add(data) def add_gauge_metric(self, namespace, name, value, tags=None): # type: (str,str, float, MetricTagType) -> None @@ -342,10 +358,10 @@ def periodic(self): self._client.send_event(telemetry_event) def _flush_log_metrics(self): - # type () -> List[Metric] + # type () -> Set[Metric] with self._lock: log_metrics = self._logs - self._logs = [] + self._logs = set() return log_metrics def _generate_metrics_event(self, namespace_metrics): @@ -364,9 +380,9 @@ def _generate_metrics_event(self, namespace_metrics): self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS) def _generate_logs_event(self, payload): - # type: (List[Dict[str, str]]) -> None + # type: (Set[Dict[str, str]]) -> None log.debug("%s request payload", TELEMETRY_TYPE_LOGS) - self.add_event(payload, TELEMETRY_TYPE_LOGS) + self.add_event(list(payload), TELEMETRY_TYPE_LOGS) def on_shutdown(self): self.periodic() @@ -375,7 +391,7 @@ def reset_queues(self): # type: () -> None super(TelemetryLogsMetricsWriter, self).reset_queues() self._namespace.flush() - self._logs = [] + self._logs = set() class TelemetryWriter(TelemetryBase): diff --git a/releasenotes/notes/telemetry-logs-validate-unique-messages-5a020f0531952e0c.yaml b/releasenotes/notes/telemetry-logs-validate-unique-messages-5a020f0531952e0c.yaml new file mode 100644 index 00000000000..6310521ead9 --- /dev/null +++ b/releasenotes/notes/telemetry-logs-validate-unique-messages-5a020f0531952e0c.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + telemetry: telemetry writer validates unique log messages for each interval. diff --git a/tests/appsec/test_telemety.py b/tests/appsec/test_telemety.py index 4f572b7b0db..bbbe008ed90 100644 --- a/tests/appsec/test_telemety.py +++ b/tests/appsec/test_telemety.py @@ -136,10 +136,11 @@ def test_log_metric_error_ddwaf_init(mock_logs_telemetry_metrics_writer): ): AppSecSpanProcessor() - assert len(mock_logs_telemetry_metrics_writer._logs) == 1 - assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "WAF init error. Invalid rules" - assert mock_logs_telemetry_metrics_writer._logs[0]["stack_trace"].startswith("DDWAF.__init__: invalid rules") - assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"] + list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs) + assert len(list_metrics_logs) == 1 + assert list_metrics_logs[0]["message"] == "WAF init error. Invalid rules" + assert list_metrics_logs[0]["stack_trace"].startswith("DDWAF.__init__: invalid rules") + assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"] def test_log_metric_error_ddwaf_timeout(mock_logs_telemetry_metrics_writer, tracer): @@ -154,11 +155,11 @@ def test_log_metric_error_ddwaf_timeout(mock_logs_telemetry_metrics_writer, trac Config(), ) - print(mock_logs_telemetry_metrics_writer._logs) - assert len(mock_logs_telemetry_metrics_writer._logs) == 2 - assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "WAF run. Timeout errors" - assert mock_logs_telemetry_metrics_writer._logs[0].get("stack_trace") is None - assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"] + list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs) + assert len(list_metrics_logs) == 1 + assert list_metrics_logs[0]["message"] == "WAF run. Timeout errors" + assert list_metrics_logs[0].get("stack_trace") is None + assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"] @pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only") @@ -166,7 +167,9 @@ def test_log_metric_error_ddwaf_update(mock_logs_telemetry_metrics_writer): with override_global_config(dict(_appsec_enabled=True, _telemetry_metrics_enabled=True)): span_processor = AppSecSpanProcessor() span_processor._update_rules("{}") - assert len(mock_logs_telemetry_metrics_writer._logs) == 1 - assert mock_logs_telemetry_metrics_writer._logs[0]["message"] == "Error updating ASM rules. Invalid rules" - assert mock_logs_telemetry_metrics_writer._logs[0].get("stack_trace") is None - assert "waf_version:{}".format(version()) in mock_logs_telemetry_metrics_writer._logs[0]["tags"] + + list_metrics_logs = list(mock_logs_telemetry_metrics_writer._logs) + assert len(list_metrics_logs) == 1 + assert list_metrics_logs[0]["message"] == "Error updating ASM rules. Invalid rules" + assert list_metrics_logs[0].get("stack_trace") is None + assert "waf_version:{}".format(version()) in list_metrics_logs[0]["tags"] diff --git a/tests/telemetry/test_telemetry_metrics.py b/tests/telemetry/test_telemetry_metrics.py index 78463fd3c33..708d5330e4b 100644 --- a/tests/telemetry/test_telemetry_metrics.py +++ b/tests/telemetry/test_telemetry_metrics.py @@ -1,3 +1,9 @@ +import sys +from time import sleep + +from mock.mock import ANY +import pytest + from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE_TAG_APPSEC from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE_TAG_TRACER from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION @@ -53,8 +59,11 @@ def _assert_logs( # Python 2.7 and Python 3.5 fail with dictionaries and lists order expected_body = _get_request_body(expected_payload, TELEMETRY_TYPE_LOGS, seq_id) - expected_body_sorted = expected_body["payload"].sort(key=lambda x: x["message"], reverse=False) - result_event = events[0]["payload"].sort(key=lambda x: x["message"], reverse=False) + expected_body["payload"].sort(key=lambda x: x["message"], reverse=False) + expected_body_sorted = expected_body["payload"] + + events[0]["payload"].sort(key=lambda x: x["message"], reverse=False) + result_event = events[0]["payload"] assert result_event == expected_body_sorted @@ -374,6 +383,7 @@ def test_send_log_metric_simple(telemetry_metrics_writer, test_agent_metrics_ses _assert_logs(test_agent_metrics_session, expected_payload) +@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only, tags order fails on 2.7 and 3.5") def test_send_log_metric_simple_tags(telemetry_metrics_writer, test_agent_metrics_session, mock_time): """Check the queue of metrics is empty after run periodic method of PeriodicService""" with override_global_config(dict(_telemetry_metrics_enabled=True)): @@ -409,3 +419,69 @@ def test_send_multiple_log_metric(telemetry_metrics_writer, test_agent_metrics_s telemetry_metrics_writer.add_log("WARNING", "test error 1", "Traceback:\nValueError", {"a": "b"}) _assert_logs(test_agent_metrics_session, expected_payload, seq_id=2) + + +def test_send_multiple_log_metric_no_duplicates(telemetry_metrics_writer, test_agent_metrics_session, mock_time): + with override_global_config(dict(_telemetry_metrics_enabled=True)): + for _ in range(10): + telemetry_metrics_writer.add_log("WARNING", "test error 1", "Traceback:\nValueError", {"a": "b"}) + + expected_payload = [ + { + "level": "WARNING", + "message": "test error 1", + "stack_trace": "Traceback:\nValueError", + "tracer_time": 1642544540, + "tags": "a:b", + }, + ] + + _assert_logs(test_agent_metrics_session, expected_payload) + + +def test_send_multiple_log_metric_no_duplicates_for_each_interval( + telemetry_metrics_writer, test_agent_metrics_session, mock_time +): + with override_global_config(dict(_telemetry_metrics_enabled=True)): + for _ in range(10): + telemetry_metrics_writer.add_log("WARNING", "test error 1") + + expected_payload = [ + { + "level": "WARNING", + "message": "test error 1", + "tracer_time": 1642544540, + }, + ] + + _assert_logs(test_agent_metrics_session, expected_payload) + + for _ in range(10): + telemetry_metrics_writer.add_log("WARNING", "test error 1") + + _assert_logs(test_agent_metrics_session, expected_payload, seq_id=2) + + +def test_send_multiple_log_metric_no_duplicates_for_each_interval_check_time( + telemetry_metrics_writer, test_agent_metrics_session +): + with override_global_config(dict(_telemetry_metrics_enabled=True)): + for _ in range(3): + sleep(0.1) + telemetry_metrics_writer.add_log("WARNING", "test error 1") + + expected_payload = [ + { + "level": "WARNING", + "message": "test error 1", + "tracer_time": ANY, + }, + ] + + _assert_logs(test_agent_metrics_session, expected_payload) + + for _ in range(3): + sleep(0.1) + telemetry_metrics_writer.add_log("WARNING", "test error 1") + + _assert_logs(test_agent_metrics_session, expected_payload, seq_id=2)