Skip to content

Commit

Permalink
feat(llmobs): add ML App override option to LLMObs SDK span methods (#…
Browse files Browse the repository at this point in the history
…8721)

This PR follows up on #8530 by adding an ml_app override option on the
LLMObs SDK span start methods if a service provides functionality for
more than one ml_app as configured in `DD_LLMOBS_APP_NAME`.

Users can now add an `ml_app: str` keyword argument in the
`LLMObs.{llm/agent/workflow/task/tool}` methods and
`LLMObs.decorators.{llm/agent/workflow/task/tool}` decorators, which
will override the tagged `ml_app` value instead of using the default
`DD_LLMOBS_APP_NAME` value.

This PR also moves the `LLMObsTraceProcessor` class into its own
`ddtrace/llmobs/_trace_processor.py` module.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.
- [x] If change touches code that signs or publishes builds or packages,
or handles credentials of any kind, I've requested a review from
`@DataDog/security-design-and-guidance`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
Yun-Kim authored Mar 19, 2024
1 parent b819636 commit 48615e2
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 131 deletions.
1 change: 1 addition & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
SESSION_ID = "_ml_obs.session_id"
METRICS = "_ml_obs.metrics"
TAGS = "_ml_obs.tags"
ML_APP = "_ml_obs.meta.ml_app"

MODEL_NAME = "_ml_obs.meta.model_name"
MODEL_PROVIDER = "_ml_obs.meta.model_provider"
Expand Down
160 changes: 33 additions & 127 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@
import ddtrace
from ddtrace import Span
from ddtrace import config
from ddtrace._trace.processor import TraceProcessor
from ddtrace.constants import ERROR_MSG
from ddtrace.constants import ERROR_STACK
from ddtrace.constants import ERROR_TYPE
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal.logger import get_logger
Expand All @@ -21,13 +17,15 @@
from ddtrace.llmobs._constants import INPUT_PARAMETERS
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import ML_APP
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import SESSION_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
from ddtrace.llmobs._writer import LLMObsWriter


Expand Down Expand Up @@ -106,6 +104,7 @@ def _start_span(
session_id: Optional[str] = None,
model_name: Optional[str] = None,
model_provider: Optional[str] = None,
ml_app: Optional[str] = None,
) -> Span:
if name is None:
name = operation_kind
Expand All @@ -117,6 +116,8 @@ def _start_span(
span.set_tag_str(MODEL_NAME, model_name)
if model_provider is not None:
span.set_tag_str(MODEL_PROVIDER, model_provider)
if ml_app is not None:
span.set_tag_str(ML_APP, ml_app)
return span

@classmethod
Expand All @@ -126,6 +127,7 @@ def llm(
name: Optional[str] = None,
model_provider: Optional[str] = None,
session_id: Optional[str] = None,
ml_app: Optional[str] = None,
) -> Optional[Span]:
"""
Trace an interaction with a large language model (LLM).
Expand All @@ -135,6 +137,8 @@ def llm(
:param str model_provider: The name of the invoked LLM provider (ex: openai, bedrock).
If not provided, a default value of "custom" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value DD_LLMOBS_APP_NAME will be set.
:returns: The Span object representing the traced operation.
"""
Expand All @@ -147,68 +151,84 @@ def llm(
if model_provider is None:
model_provider = "custom"
return cls._instance._start_span(
"llm", name, model_name=model_name, model_provider=model_provider, session_id=session_id
"llm", name, model_name=model_name, model_provider=model_provider, session_id=session_id, ml_app=ml_app
)

@classmethod
def tool(cls, name: Optional[str] = None, session_id: Optional[str] = None) -> Optional[Span]:
def tool(
cls, name: Optional[str] = None, session_id: Optional[str] = None, ml_app: Optional[str] = None
) -> Optional[Span]:
"""
Trace an operation of an interface/software used for interacting with or supporting an LLM.
:param str name: The name of the traced operation. If not provided, a default value of "tool" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value DD_LLMOBS_APP_NAME will be set.
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.tool() cannot be used while LLMObs is disabled.")
return None
return cls._instance._start_span("tool", name=name, session_id=session_id)
return cls._instance._start_span("tool", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
def task(cls, name: Optional[str] = None, session_id: Optional[str] = None) -> Optional[Span]:
def task(
cls, name: Optional[str] = None, session_id: Optional[str] = None, ml_app: Optional[str] = None
) -> Optional[Span]:
"""
Trace an operation of a function/task that is part of a larger workflow involving an LLM.
:param str name: The name of the traced operation. If not provided, a default value of "task" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value DD_LLMOBS_APP_NAME will be set.
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.task() cannot be used while LLMObs is disabled.")
return None
return cls._instance._start_span("task", name=name, session_id=session_id)
return cls._instance._start_span("task", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
def agent(cls, name: Optional[str] = None, session_id: Optional[str] = None) -> Optional[Span]:
def agent(
cls, name: Optional[str] = None, session_id: Optional[str] = None, ml_app: Optional[str] = None
) -> Optional[Span]:
"""
Trace a workflow orchestrated by an LLM agent.
:param str name: The name of the traced operation. If not provided, a default value of "agent" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value DD_LLMOBS_APP_NAME will be set.
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.agent() cannot be used while LLMObs is disabled.")
return None
return cls._instance._start_span("agent", name=name, session_id=session_id)
return cls._instance._start_span("agent", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
def workflow(cls, name: Optional[str] = None, session_id: Optional[str] = None) -> Optional[Span]:
def workflow(
cls, name: Optional[str] = None, session_id: Optional[str] = None, ml_app: Optional[str] = None
) -> Optional[Span]:
"""
Trace a sequence of operations that are part of a larger workflow involving an LLM.
:param str name: The name of the traced operation. If not provided, a default value of "workflow" will be set.
:param str session_id: The ID of the underlying user session. Required for tracking sessions.
:param str ml_app: The name of the ML application that the agent is orchestrating. If not provided, the default
value DD_LLMOBS_APP_NAME will be set.
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.workflow() cannot be used while LLMObs is disabled.")
return None
return cls._instance._start_span("workflow", name=name, session_id=session_id)
return cls._instance._start_span("workflow", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
def annotate(
Expand Down Expand Up @@ -327,117 +347,3 @@ def _tag_metrics(span: Span, metrics: Dict[str, Any]) -> None:
log.warning("metrics must be a dictionary of string key - numeric value pairs.")
return
span.set_tag_str(METRICS, json.dumps(metrics))


class LLMObsTraceProcessor(TraceProcessor):
"""
Processor that extracts LLM-type spans in a trace to submit as separate LLMObs span events to LLM Observability.
"""

def __init__(self, llmobs_writer):
self._writer = llmobs_writer

def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
if not trace:
return None
for span in trace:
if span.span_type == SpanTypes.LLM:
self.submit_llmobs_span(span)
return trace

def submit_llmobs_span(self, span: Span) -> None:
"""Generate and submit an LLMObs span event to be sent to LLMObs."""
span_event = self._llmobs_span_event(span)
self._writer.enqueue(span_event)

def _llmobs_span_event(self, span: Span) -> Dict[str, Any]:
"""Span event object structure."""
tags = self._llmobs_tags(span)
meta: Dict[str, Any] = {"span.kind": span._meta.pop(SPAN_KIND), "input": {}, "output": {}}
if span.get_tag(MODEL_NAME):
meta["model_name"] = span._meta.pop(MODEL_NAME)
meta["model_provider"] = span._meta.pop(MODEL_PROVIDER, "custom").lower()
if span.get_tag(INPUT_PARAMETERS):
meta["input"]["parameters"] = json.loads(span._meta.pop(INPUT_PARAMETERS))
if span.get_tag(INPUT_MESSAGES):
meta["input"]["messages"] = json.loads(span._meta.pop(INPUT_MESSAGES))
if span.get_tag(INPUT_VALUE):
meta["input"]["value"] = span._meta.pop(INPUT_VALUE)
if span.get_tag(OUTPUT_MESSAGES):
meta["output"]["messages"] = json.loads(span._meta.pop(OUTPUT_MESSAGES))
if span.get_tag(OUTPUT_VALUE):
meta["output"]["value"] = span._meta.pop(OUTPUT_VALUE)
if span.error:
meta["error.message"] = span.get_tag(ERROR_MSG)
meta["error.stack"] = span.get_tag(ERROR_STACK)
if not meta["input"]:
meta.pop("input")
if not meta["output"]:
meta.pop("output")
metrics = json.loads(span._meta.pop(METRICS, "{}"))

return {
"trace_id": "{:x}".format(span.trace_id),
"span_id": str(span.span_id),
"parent_id": str(self._get_llmobs_parent_id(span) or "undefined"),
"session_id": self._get_session_id(span),
"name": span.name,
"tags": tags,
"start_ns": span.start_ns,
"duration": span.duration_ns,
"error": span.error,
"meta": meta,
"metrics": metrics,
}

def _llmobs_tags(self, span: Span) -> List[str]:
tags = [
"version:{}".format(config.version or ""),
"env:{}".format(config.env or ""),
"service:{}".format(span.service or ""),
"source:integration",
"ml_app:{}".format(config._llmobs_ml_app or "unnamed-ml-app"),
"ddtrace.version:{}".format(ddtrace.__version__),
"error:%d" % span.error,
]
err_type = span.get_tag(ERROR_TYPE)
if err_type:
tags.append("error_type:%s" % err_type)
existing_tags = span.get_tag(TAGS)
if existing_tags is not None:
span_tags = json.loads(existing_tags)
tags.extend(["{}:{}".format(k, v) for k, v in span_tags.items()])
return tags

def _get_session_id(self, span: Span) -> str:
"""
Return the session ID for a given span, in priority order:
1) Span's session ID tag (if set manually)
2) Session ID from the span's nearest LLMObs span ancestor
3) Span's trace ID if no session ID is found
"""
session_id = span._meta.pop(SESSION_ID, None)
if not session_id:
nearest_llmobs_ancestor = self._get_nearest_llmobs_ancestor(span)
if nearest_llmobs_ancestor:
session_id = nearest_llmobs_ancestor.get_tag(SESSION_ID)
return session_id or "{:x}".format(span.trace_id)

def _get_llmobs_parent_id(self, span: Span) -> Optional[int]:
"""Return the span ID of the nearest LLMObs-type span in the span's ancestor tree."""
nearest_llmobs_ancestor = self._get_nearest_llmobs_ancestor(span)
if nearest_llmobs_ancestor:
return nearest_llmobs_ancestor.span_id
return None

@staticmethod
def _get_nearest_llmobs_ancestor(span: Span) -> Optional[Span]:
"""Return the nearest LLMObs-type ancestor span of a given span."""
if span.span_type != SpanTypes.LLM:
return None
parent = span._parent
while parent:
if parent.span_type == SpanTypes.LLM:
return parent
parent = parent._parent
return None
Loading

0 comments on commit 48615e2

Please sign in to comment.