Skip to content

Commit

Permalink
feat(llmobs): allow users to toggle on/off sending llmobs data (#9242)
Browse files Browse the repository at this point in the history
This PR decouples starting/stopping LLMObs traces locally and sending
trace data to the LLMObs backend.

Previously these two concepts were tied together and configured through
the `enable()` and `DD_LLMOBS_ENABLED` environment variable. This is
problematic because it results in instrumented code to break if
`DD_LLMOBS_ENABLED` is false or the `enable()` function is not
explicitly called (since starting spans returns `None` if llmobs is not
enabled).

This is
a) inconvenient when unit testing
b) prevents people from disabling sending LLMObs data unless they strip
out all LLMObs code from their instrumented app

We want to enable LLMObs tracing locally while giving users the ability
to toggle actually sending traces to our backend. The state of
`DD_LLMOBS_ENABLED` and `LLMObs.enabled` variable should not impact any
of the user’s code at all. It purely configures data to be sent/not sent
to our backend.

This can be done through:
1. Having LLMObs service always have a default `_instance` with
`ddtrace.tracer` as the internal tracer.
2. Move starting the span and eval metric writer out of the `__init__`
function
3. Have span and eval metric writers only be started or stopped in
`enable()` and `disable()`

Case to consider:
When DD_LLMOBS_ENABLED is False, APM is enabled, and someone uses the
SDK decorators to start/stop llm spans
in this case trace processing for llmobs is turned off, and writers are
turned off, yet llmobs spans are being made by the SDK and submitted to
APM.


## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: lievan <[email protected]>
Co-authored-by: Yun Kim <[email protected]>
  • Loading branch information
3 people authored May 14, 2024
1 parent f682826 commit ab86515
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 76 deletions.
4 changes: 4 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@
OUTPUT_DOCUMENTS = "_ml_obs.meta.output.documents"
OUTPUT_MESSAGES = "_ml_obs.meta.output.messages"
OUTPUT_VALUE = "_ml_obs.meta.output.value"

SPAN_START_WHILE_DISABLED_WARNING = (
"Span started while LLMObs is disabled." " Spans will not be sent to LLM Observability."
)
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.hostname import get_hostname
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._llmobs import LLMObs
from ddtrace.llmobs._log_writer import V2LogWriter
from ddtrace.sampler import RateSampler
from ddtrace.settings import IntegrationConfig
Expand Down Expand Up @@ -71,7 +72,7 @@ def logs_enabled(self) -> bool:
@property
def llmobs_enabled(self) -> bool:
"""Return whether submitting llmobs payloads is enabled."""
return config._llmobs_enabled
return LLMObs.enabled

def is_pc_sampled_span(self, span: Span) -> bool:
if span.context.sampling_priority is not None:
Expand Down
99 changes: 57 additions & 42 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from ddtrace.internal import atexit
from ddtrace.internal.logger import get_logger
from ddtrace.internal.service import Service
from ddtrace.internal.service import ServiceStatusError
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._constants import INPUT_DOCUMENTS
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PARAMETERS
Expand All @@ -26,6 +28,7 @@
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import SESSION_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs._constants import TAGS
from ddtrace.llmobs._trace_processor import LLMObsTraceProcessor
from ddtrace.llmobs._utils import _get_ml_app
Expand All @@ -41,12 +44,13 @@


class LLMObs(Service):
_instance = None
_instance = None # type: LLMObs
enabled = False

def __init__(self, tracer=None):
super(LLMObs, self).__init__()
self.tracer = tracer or ddtrace.tracer

self._llmobs_span_writer = LLMObsSpanWriter(
site=config._dd_site,
api_key=config._dd_api_key,
Expand All @@ -59,67 +63,80 @@ def __init__(self, tracer=None):
interval=float(os.getenv("_DD_LLMOBS_WRITER_INTERVAL", 1.0)),
timeout=float(os.getenv("_DD_LLMOBS_WRITER_TIMEOUT", 2.0)),
)
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()

def _start_service(self) -> None:
tracer_filters = self.tracer._filters
if not any(isinstance(tracer_filter, LLMObsTraceProcessor) for tracer_filter in tracer_filters):
tracer_filters += [LLMObsTraceProcessor(self._llmobs_span_writer)]
self.tracer.configure(settings={"FILTERS": tracer_filters})
try:
self._llmobs_span_writer.start()
self._llmobs_eval_metric_writer.start()
except ServiceStatusError:
log.debug("Error starting LLMObs writers")

def _stop_service(self) -> None:
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

try:
self.tracer.shutdown()
except Exception:
log.warning("Failed to shutdown tracer", exc_info=True)

@classmethod
def enable(cls, tracer=None):
if cls._instance is not None:
if cls.enabled:
log.debug("%s already enabled", cls.__name__)
return

if os.getenv("DD_LLMOBS_ENABLED") and not asbool(os.getenv("DD_LLMOBS_ENABLED")):
log.debug("LLMObs.enable() called when DD_LLMOBS_ENABLED is set to false or 0, not starting LLMObs service")
return

if not config._dd_api_key:
cls.enabled = False
raise ValueError(
"DD_API_KEY is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)
if not config._llmobs_ml_app:
cls.enabled = False
raise ValueError(
"DD_LLMOBS_APP_NAME is required for sending LLMObs data. "
"Ensure this configuration is set before running your application."
)

# override the default _instance with a new tracer
cls._instance = cls(tracer=tracer)

cls.enabled = True

# turn on llmobs trace processing
cls._instance.start()

atexit.register(cls.disable)
log.debug("%s enabled", cls.__name__)

@classmethod
def disable(cls) -> None:
if cls._instance is None:
if not cls.enabled:
log.debug("%s not enabled", cls.__name__)
return
log.debug("Disabling %s", cls.__name__)
atexit.unregister(cls.disable)

cls._instance.stop()
cls._instance = None
cls.enabled = False
cls._instance.stop()

log.debug("%s disabled", cls.__name__)

@classmethod
def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan]:
"""Returns a simple representation of a span to export its span and trace IDs.
If no span is provided, the current active LLMObs-type span will be used.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.export_span() requires LLMObs to be enabled.")
return None
if span:
try:
if span.span_type != SpanTypes.LLM:
Expand Down Expand Up @@ -185,14 +202,14 @@ def llm(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.llm() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
if not model_name:
log.warning("model_name must be the specified name of the invoked model.")
return None
log.warning("LLMObs.llm() missing model_name")
if model_provider is None:
model_provider = "custom"
if model_name is None:
model_name = "unknown"
return cls._instance._start_span(
"llm", name, model_name=model_name, model_provider=model_provider, session_id=session_id, ml_app=ml_app
)
Expand All @@ -211,9 +228,8 @@ def tool(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.tool() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return cls._instance._start_span("tool", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
Expand All @@ -230,9 +246,8 @@ def task(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.task() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return cls._instance._start_span("task", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
Expand All @@ -249,9 +264,8 @@ def agent(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.agent() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return cls._instance._start_span("agent", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
Expand All @@ -268,9 +282,8 @@ def workflow(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.workflow() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return cls._instance._start_span("workflow", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
Expand All @@ -295,14 +308,14 @@ def embedding(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.embedding() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
if not model_name:
log.warning("model_name must be the specified name of the invoked model.")
return None
log.warning("LLMObs.embedding() missing model_name")
if model_provider is None:
model_provider = "custom"
if model_name is None:
model_name = "unknown"
return cls._instance._start_span(
"embedding",
name,
Expand All @@ -326,9 +339,8 @@ def retrieval(
:returns: The Span object representing the traced operation.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.retrieval() cannot be used while LLMObs is disabled.")
return None
if cls.enabled is False:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return cls._instance._start_span("retrieval", name=name, session_id=session_id, ml_app=ml_app)

@classmethod
Expand Down Expand Up @@ -369,9 +381,6 @@ def annotate(
:param metrics: Dictionary of JSON serializable key-value metric pairs,
such as `{prompt,completion,total}_tokens`.
"""
if cls.enabled is False or cls._instance is None:
log.warning("LLMObs.annotate() cannot be used while LLMObs is disabled.")
return
if span is None:
span = cls._instance.tracer.current_span()
if span is None:
Expand Down Expand Up @@ -562,8 +571,10 @@ def submit_evaluation(
:param value: The value of the evaluation metric.
Must be a string (categorical), integer (numerical/score), or float (numerical/score).
"""
if cls.enabled is False or cls._instance is None or cls._instance._llmobs_eval_metric_writer is None:
log.warning("LLMObs.submit_evaluation() requires LLMObs to be enabled.")
if cls.enabled is False:
log.warning(
"LLMObs.submit_evaluation() called when LLMObs is not enabled. Evaluation metric data will not be sent."
)
return
if not isinstance(span_context, dict):
log.warning(
Expand Down Expand Up @@ -597,3 +608,7 @@ def submit_evaluation(
"{}_value".format(metric_type): value,
}
)


# initialize the default llmobs instance
LLMObs._instance = LLMObs()
2 changes: 1 addition & 1 deletion ddtrace/llmobs/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _get_ml_app(span: Span) -> str:
nearest_llmobs_ancestor = _get_nearest_llmobs_ancestor(span)
if nearest_llmobs_ancestor:
ml_app = nearest_llmobs_ancestor.get_tag(ML_APP)
return ml_app or config._llmobs_ml_app
return ml_app or config._llmobs_ml_app or "uknown-ml-app"


def _get_session_id(span: Span) -> str:
Expand Down
12 changes: 7 additions & 5 deletions ddtrace/llmobs/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING


log = get_logger(__name__)
Expand All @@ -21,12 +22,13 @@ def decorator(
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not LLMObs.enabled or LLMObs._instance is None:
log.warning("LLMObs.%s() cannot be used while LLMObs is disabled.", operation_kind)
if not LLMObs.enabled:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return func(*args, **kwargs)
traced_model_name = model_name
if traced_model_name is None:
raise TypeError("model_name is required for LLMObs.{}()".format(operation_kind))
log.warning("model_name missing for LLMObs.%s() - default to 'unknown'", operation_kind)
traced_model_name = "unknown"
span_name = name
if span_name is None:
span_name = func.__name__
Expand Down Expand Up @@ -57,8 +59,8 @@ def decorator(
def inner(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not LLMObs.enabled or LLMObs._instance is None:
log.warning("LLMObs.%s() cannot be used while LLMObs is disabled.", operation_kind)
if not LLMObs.enabled:
log.warning(SPAN_START_WHILE_DISABLED_WARNING)
return func(*args, **kwargs)
span_name = name
if span_name is None:
Expand Down
5 changes: 3 additions & 2 deletions tests/llmobs/test_llmobs_decorators.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import mock
import pytest

from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING
from ddtrace.llmobs.decorators import agent
from ddtrace.llmobs.decorators import embedding
from ddtrace.llmobs.decorators import llm
Expand Down Expand Up @@ -29,7 +30,7 @@ def f():

LLMObs.disable()
f()
mock_logs.warning.assert_called_with("LLMObs.%s() cannot be used while LLMObs is disabled.", decorator_name)
mock_logs.warning.assert_called_with(SPAN_START_WHILE_DISABLED_WARNING)
mock_logs.reset_mock()


Expand All @@ -48,7 +49,7 @@ def f():

LLMObs.disable()
f()
mock_logs.warning.assert_called_with("LLMObs.%s() cannot be used while LLMObs is disabled.", decorator_name)
mock_logs.warning.assert_called_with(SPAN_START_WHILE_DISABLED_WARNING)
mock_logs.reset_mock()


Expand Down
Loading

0 comments on commit ab86515

Please sign in to comment.