Skip to content

Commit

Permalink
feat(llmobs): truncate LLMObs span events that are >1MB in size (#10148)
Browse files Browse the repository at this point in the history
Events that are >1MB are not supported by the event platform, and will
be truncated automatically in ways that make them no longer processable
by the LLMObs event processor. This change introduces truncation for
span events that exceed that size limitation.

Truncation is performed by dropping the input and output fields.
  • Loading branch information
tomshen authored Aug 13, 2024
1 parent 7112beb commit d7203df
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 4 deletions.
6 changes: 5 additions & 1 deletion ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@
EVP_PROXY_AGENT_ENDPOINT = "{}/api/v2/llmobs".format(EVP_PROXY_AGENT_BASE_PATH)
EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain"
EVP_SUBDOMAIN_HEADER_VALUE = "llmobs-intake"
EVP_PAYLOAD_SIZE_LIMIT = 5 << 20 # 5MB
EVP_PAYLOAD_SIZE_LIMIT = 5 << 20 # 5MB (actual limit is 5.1MB)
EVP_EVENT_SIZE_LIMIT = (1 << 20) - 1024 # 999KB (actual limit is 1MB)

AGENTLESS_BASE_URL = "https://llmobs-intake"
AGENTLESS_ENDPOINT = "api/v2/llmobs"

DROPPED_IO_TAG = "dropped_io:1"
DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]"
19 changes: 19 additions & 0 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from ddtrace.internal.writer import WriterClientBase
from ddtrace.llmobs._constants import AGENTLESS_BASE_URL
from ddtrace.llmobs._constants import AGENTLESS_ENDPOINT
from ddtrace.llmobs._constants import DROPPED_IO_TAG
from ddtrace.llmobs._constants import DROPPED_VALUE_TEXT
from ddtrace.llmobs._constants import EVP_EVENT_SIZE_LIMIT
from ddtrace.llmobs._constants import EVP_PAYLOAD_SIZE_LIMIT
from ddtrace.llmobs._constants import EVP_PROXY_AGENT_ENDPOINT
from ddtrace.llmobs._constants import EVP_SUBDOMAIN_HEADER_NAME
Expand Down Expand Up @@ -263,6 +266,14 @@ def stop(self, timeout=None):

def enqueue(self, event: LLMObsSpanEvent) -> None:
event_size = len(json.dumps(event))

if event_size >= EVP_EVENT_SIZE_LIMIT:
logger.warning(
"dropping event input/output because its size (%d) exceeds the event size limit (1MB)",
event_size,
)
event = _truncate_span_event(event)

for client in self._clients:
if isinstance(client, LLMObsEventClient) and isinstance(client.encoder, LLMObsSpanEncoder):
with client.encoder._lock:
Expand All @@ -281,3 +292,11 @@ def recreate(self):
interval=self._interval,
timeout=self._timeout,
)


def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent:
event["meta"]["input"] = {"value": DROPPED_VALUE_TEXT}
event["meta"]["output"] = {"value": DROPPED_VALUE_TEXT}

event["tags"].append(DROPPED_IO_TAG)
return event
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
features:
- |
LLM Observability: Span events that exceed the event platform event size limit (1 MB) will now have their inputs
and outputs dropped. The `dropped_io:1` tag will be added to these span events.
80 changes: 79 additions & 1 deletion tests/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,89 @@ def _large_event():
"output": {
"messages": [
{
"content": "A" * 3_000_000,
"content": "A" * 900_000,
"role": "assistant",
},
]
},
},
"metrics": {"input_tokens": 64, "output_tokens": 128, "total_tokens": 192},
}


def _oversized_llm_event():
return {
"span_id": "12345678904",
"trace_id": "98765432104",
"parent_id": "",
"session_id": "98765432104",
"name": "oversized_llm_event",
"tags": ["version:", "env:", "service:", "source:integration"],
"start_ns": 1707763310981223936,
"duration": 12345678900,
"error": 0,
"meta": {
"span.kind": "llm",
"model_name": "gpt-3.5-turbo",
"model_provider": "openai",
"input": {
"messages": [
{
"role": "system",
"content": "You are an evil dark lord looking for his one ring to rule them all",
},
{"role": "user", "content": "A" * 700_000},
],
"parameters": {"temperature": 0.9, "max_tokens": 256},
},
"output": {
"messages": [
{
"content": "A" * 700_000,
"role": "assistant",
},
]
},
},
"metrics": {"input_tokens": 64, "output_tokens": 128, "total_tokens": 192},
}


def _oversized_workflow_event():
return {
"span_id": "12345678905",
"trace_id": "98765432105",
"parent_id": "",
"session_id": "98765432105",
"name": "oversized_workflow_event",
"tags": ["version:", "env:", "service:", "source:integration"],
"start_ns": 1707763310981223936,
"duration": 12345678900,
"error": 0,
"meta": {
"span.kind": "workflow",
"input": {"value": "A" * 700_000},
"output": {"value": "A" * 700_000},
},
"metrics": {"input_tokens": 64, "output_tokens": 128, "total_tokens": 192},
}


def _oversized_retrieval_event():
return {
"span_id": "12345678906",
"trace_id": "98765432106",
"parent_id": "",
"session_id": "98765432106",
"name": "oversized_retrieval_event",
"tags": ["version:", "env:", "service:", "source:integration"],
"start_ns": 1707763310981223936,
"duration": 12345678900,
"error": 0,
"meta": {
"span.kind": "retrieval",
"input": {"documents": {"content": "A" * 700_000}},
"output": {"value": "A" * 700_000},
},
"metrics": {"input_tokens": 64, "output_tokens": 128, "total_tokens": 192},
}
23 changes: 22 additions & 1 deletion tests/llmobs/test_llmobs_span_agent_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from tests.llmobs._utils import _chat_completion_event
from tests.llmobs._utils import _completion_event
from tests.llmobs._utils import _large_event
from tests.llmobs._utils import _oversized_llm_event
from tests.llmobs._utils import _oversized_retrieval_event
from tests.llmobs._utils import _oversized_workflow_event


INTAKE_ENDPOINT = agent.get_trace_url()
Expand All @@ -33,10 +36,28 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(
llmobs_span_writer = LLMObsSpanWriter(is_agentless=False, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
mock_writer_logs.debug.assert_has_calls(
[
mock.call("flushing queue because queuing next event will exceed EVP payload limit"),
mock.call("encode %d LLMObs span events to be sent", 1),
mock.call("encode %d LLMObs span events to be sent", 5),
]
)


def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
mock_writer_logs.warning.assert_has_calls(
[
mock.call("dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400708),
mock.call("dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400448),
mock.call("dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400429),
]
)

Expand Down
30 changes: 29 additions & 1 deletion tests/llmobs/test_llmobs_span_agentless_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from tests.llmobs._utils import _chat_completion_event
from tests.llmobs._utils import _completion_event
from tests.llmobs._utils import _large_event
from tests.llmobs._utils import _oversized_llm_event
from tests.llmobs._utils import _oversized_retrieval_event
from tests.llmobs._utils import _oversized_workflow_event
from tests.utils import override_global_config


Expand Down Expand Up @@ -39,10 +42,35 @@ def test_flush_queue_when_event_cause_queue_to_exceed_payload_limit(
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
llmobs_span_writer.enqueue(_large_event())
mock_writer_logs.debug.assert_has_calls(
[
mock.call("flushing queue because queuing next event will exceed EVP payload limit"),
mock.call("encode %d LLMObs span events to be sent", 1),
mock.call("encode %d LLMObs span events to be sent", 5),
]
)


def test_truncating_oversized_events(mock_writer_logs, mock_http_writer_send_payload_response):
with override_global_config(dict(_dd_api_key="foobar.baz", _dd_site=DATADOG_SITE)):
llmobs_span_writer = LLMObsSpanWriter(is_agentless=True, interval=1000, timeout=1)
llmobs_span_writer.enqueue(_oversized_llm_event())
llmobs_span_writer.enqueue(_oversized_retrieval_event())
llmobs_span_writer.enqueue(_oversized_workflow_event())
mock_writer_logs.warning.assert_has_calls(
[
mock.call(
"dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400708
),
mock.call(
"dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400448
),
mock.call(
"dropping event input/output because its size (%d) exceeds the event size limit (1MB)", 1400429
),
]
)

Expand Down
19 changes: 19 additions & 0 deletions tests/llmobs/test_llmobs_span_truncation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json

from ddtrace.llmobs._constants import EVP_EVENT_SIZE_LIMIT
from ddtrace.llmobs._writer import _truncate_span_event
from tests.llmobs._utils import _oversized_llm_event
from tests.llmobs._utils import _oversized_retrieval_event
from tests.llmobs._utils import _oversized_workflow_event


def test_truncates_oversized_span_values():
assert len(json.dumps(_truncate_span_event(_oversized_workflow_event()))) < EVP_EVENT_SIZE_LIMIT


def test_truncates_oversized_span_messages():
assert len(json.dumps(_truncate_span_event(_oversized_llm_event()))) < EVP_EVENT_SIZE_LIMIT


def test_truncates_oversized_span_documents():
assert len(json.dumps(_truncate_span_event(_oversized_retrieval_event()))) < EVP_EVENT_SIZE_LIMIT

0 comments on commit d7203df

Please sign in to comment.