Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llmobs): submit span events for chains from langchain #8920

Merged
merged 24 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 44 additions & 8 deletions ddtrace/contrib/langchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,10 @@ def traced_embedding(langchain, pin, func, instance, args, kwargs):
@with_traced_module
def traced_chain_call(langchain, pin, func, instance, args, kwargs):
integration = langchain._datadog_integration
span = integration.trace(pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), interface_type="chain")
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
)
inputs = None
final_outputs = {}
try:
if SHOULD_PATCH_LANGCHAIN_COMMUNITY:
Expand All @@ -604,6 +607,12 @@ def traced_chain_call(langchain, pin, func, instance, args, kwargs):
integration.metric(span, "incr", "request.error", 1)
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
inputs,
)
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
if integration.is_pc_sampled_log(span):
Expand All @@ -629,7 +638,10 @@ def traced_chain_call(langchain, pin, func, instance, args, kwargs):
@with_traced_module
async def traced_chain_acall(langchain, pin, func, instance, args, kwargs):
integration = langchain._datadog_integration
span = integration.trace(pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), interface_type="chain")
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
)
inputs = None
final_outputs = {}
try:
if SHOULD_PATCH_LANGCHAIN_COMMUNITY:
Expand All @@ -653,6 +665,12 @@ async def traced_chain_acall(langchain, pin, func, instance, args, kwargs):
integration.metric(span, "incr", "request.error", 1)
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
inputs,
)
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
if integration.is_pc_sampled_log(span):
Expand Down Expand Up @@ -690,11 +708,14 @@ def traced_lcel_runnable_sequence(langchain, pin, func, instance, args, kwargs):
This method captures the initial inputs to the chain, as well as the final outputs, and tags them appropriately.
"""
integration = langchain._datadog_integration
span = integration.trace(pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), interface_type="chain")
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
)
raw_inputs, inputs = None, None
try:
inputs = get_argument_value(args, kwargs, 0, "input")
raw_inputs = inputs = get_argument_value(args, kwargs, 0, "input")
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
if integration.is_pc_sampled_span(span):
if not isinstance(inputs, list):
if not isinstance(raw_inputs, list):
inputs = [inputs]
for idx, inp in enumerate(inputs):
if isinstance(inp, str):
Expand All @@ -714,6 +735,12 @@ def traced_lcel_runnable_sequence(langchain, pin, func, instance, args, kwargs):
integration.metric(span, "incr", "request.error", 1)
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
raw_inputs,
)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
return final_output
Expand All @@ -725,11 +752,14 @@ async def traced_lcel_runnable_sequence_async(langchain, pin, func, instance, ar
Similar to `traced_lcel_runnable_sequence`, but for async chaining calls.
"""
integration = langchain._datadog_integration
span = integration.trace(pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), interface_type="chain")
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
)
raw_inputs, inputs = None, None
try:
inputs = get_argument_value(args, kwargs, 0, "input")
raw_inputs = inputs = get_argument_value(args, kwargs, 0, "input")
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
if integration.is_pc_sampled_span(span):
if not isinstance(inputs, list):
if not isinstance(raw_inputs, list):
inputs = [inputs]
for idx, inp in enumerate(inputs):
if isinstance(inp, str):
Expand All @@ -749,6 +779,12 @@ async def traced_lcel_runnable_sequence_async(langchain, pin, func, instance, ar
integration.metric(span, "incr", "request.error", 1)
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
raw_inputs,
)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
return final_output
Expand Down
51 changes: 40 additions & 11 deletions ddtrace/llmobs/_integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

from ddtrace import config
from ddtrace._trace.span import Span
from ddtrace.constants import ERROR_TYPE
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import INPUT_PARAMETERS
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
Expand Down Expand Up @@ -39,35 +41,34 @@ def llmobs_set_tags(
operation: str, # oneof "llm","chat","chain"
span: Span,
inputs: Any,
response: Any,
error: bool = False,
response: Optional[Any] = None,
error: Optional[bool] = False,
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
"""Sets meta tags and metrics for span events to be sent to LLMObs."""
if not self.llmobs_enabled:
return
model_provider = span.get_tag(PROVIDER)
span.set_tag_str(SPAN_KIND, "llm")
span.set_tag_str(MODEL_NAME, span.get_tag(MODEL) or "")
span.set_tag_str(MODEL_PROVIDER, model_provider or "")

self._llmobs_set_input_parameters(span, model_provider)
input_parameters = self._llmobs_set_input_parameters(span, model_provider)

if operation == "llm":
self._llmobs_set_meta_tags_from_llm(span, inputs, response, error)
elif operation == "chat":
self._llmobs_set_meta_tags_from_chat_model(span, inputs, response, error)
elif operation == "chain":
pass
self._llmobs_set_meta_tags_from_chain(span, inputs, input_parameters)

span.set_tag_str(METRICS, json.dumps({}))

def _llmobs_set_input_parameters(
self,
span: Span,
model_provider: Optional[str] = None,
) -> None:
) -> dict:
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
if not model_provider:
return
return {}

input_parameters = {}
temperature = span.get_tag(f"langchain.request.{model_provider}.parameters.temperature") or span.get_tag(
Expand All @@ -79,20 +80,24 @@ def _llmobs_set_input_parameters(
or span.get_tag(f"langchain.request.{model_provider}.parameters.model_kwargs.max_tokens") # huggingface
)

if temperature:
if temperature is not None:
input_parameters["temperature"] = float(temperature)
if max_tokens:
if max_tokens is not None:
input_parameters["max_tokens"] = int(max_tokens)
if input_parameters:
span.set_tag_str(INPUT_PARAMETERS, json.dumps(input_parameters))

return input_parameters

def _llmobs_set_meta_tags_from_llm(
self,
span: Span,
prompts: List[Any],
completions: Any,
err: bool = False,
err: Optional[bool] = False,
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
span.set_tag_str(SPAN_KIND, "llm")

if isinstance(prompts, str):
prompts = [prompts]
span.set_tag_str(INPUT_MESSAGES, json.dumps([{"content": str(prompt)} for prompt in prompts]))
Expand All @@ -107,8 +112,10 @@ def _llmobs_set_meta_tags_from_chat_model(
span: Span,
chat_messages: List[List[Any]],
chat_completions: Any,
err: bool = False,
err: Optional[bool] = False,
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
span.set_tag_str(SPAN_KIND, "llm")

input_messages = []
for message_set in chat_messages:
for message in message_set:
Expand Down Expand Up @@ -136,6 +143,28 @@ def _llmobs_set_meta_tags_from_chat_model(
)
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

def _llmobs_set_meta_tags_from_chain(
self, span: Span, inputs: Union[str, Dict[str, Any], List[Union[str, Dict[str, Any]]]], input_parameters: dict
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
span.set_tag_str(SPAN_KIND, "workflow")

if inputs is not None:
if isinstance(inputs, dict):
input_parameters.update(inputs)
elif isinstance(inputs, list): # batched chain call
input_parameters = {}
for idx, inp in enumerate(inputs):
if isinstance(inp, dict):
for k, v in inp.items():
input_parameters[f"input.{idx}.{k}"] = v
else:
input_parameters[f"input.{idx}"] = inp
elif isinstance(inputs, str):
span.set_tag_str(INPUT_VALUE, inputs)
sabrenner marked this conversation as resolved.
Show resolved Hide resolved
return

span.set_tag_str(INPUT_PARAMETERS, json.dumps(input_parameters))

def _set_base_span_tags( # type: ignore[override]
self,
span: Span,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
interactions:
- request:
body: '{"messages": [{"role": "user", "content": "Can you explain what an LLM
chain is?"}], "model": "gpt-3.5-turbo", "max_tokens": 256, "stream": false,
"n": 1, "temperature": 0.0}'
headers:
Accept:
- '*/*'
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
Content-Length:
- '174'
Content-Type:
- application/json
User-Agent:
- OpenAI/v1 PythonBindings/0.27.8
X-OpenAI-Client-User-Agent:
- '{"bindings_version": "0.27.8", "httplib": "requests", "lang": "python", "lang_version":
"3.10.13", "platform": "macOS-13.6.5-arm64-arm-64bit", "publisher": "openai",
"uname": "Darwin 22.6.0 Darwin Kernel Version 22.6.0: Mon Feb 19 19:45:09
PST 2024; root:xnu-8796.141.3.704.6~1/RELEASE_ARM64_T6000 arm64 arm"}'
method: POST
uri: https://api.openai.com/v1/chat/completions
response:
body:
string: "{\n \"id\": \"chatcmpl-9C8PFrPeDGqajEgFLZaqgkzXID7Px\",\n \"object\":
\"chat.completion\",\n \"created\": 1712679277,\n \"model\": \"gpt-3.5-turbo-0125\",\n
\ \"choices\": [\n {\n \"index\": 0,\n \"message\": {\n \"role\":
\"assistant\",\n \"content\": \"An LLM chain, also known as a Limited
Liability Master Limited Partnership (LLM), is a type of business structure
that combines the benefits of a master limited partnership (MLP) with the
limited liability protection of a limited liability company (LLC). In an LLM
chain, the general partner is typically an LLC, which provides limited liability
protection to the owners or partners of the business. This structure allows
for the flow-through tax benefits of an MLP, while also shielding the owners
from personal liability for the debts and obligations of the business. LLM
chains are commonly used in the energy and natural resources industries, where
they can help to attract investment and manage risk.\"\n },\n \"logprobs\":
null,\n \"finish_reason\": \"stop\"\n }\n ],\n \"usage\": {\n \"prompt_tokens\":
17,\n \"completion_tokens\": 134,\n \"total_tokens\": 151\n },\n \"system_fingerprint\":
\"fp_b28b39ffa8\"\n}\n"
headers:
CF-Cache-Status:
- DYNAMIC
CF-RAY:
- 871bca881ceb176c-EWR
Cache-Control:
- no-cache, must-revalidate
Connection:
- keep-alive
Content-Encoding:
- gzip
Content-Type:
- application/json
Date:
- Tue, 09 Apr 2024 16:14:40 GMT
Server:
- cloudflare
Set-Cookie:
- __cf_bm=P_A3eEpwJvlwNo_lYI5KnWC8I3YCpFU9NLmtseZcxHU-1712679280-1.0.1.1-H_Q93FobYi5ChT1UdnaLy5SqSrXRzslzmbM0G3OtXew9Fa5kjch7zUWAmQwq5mbB3TYMWNHV5yRpOuzTTyQQLA;
path=/; expires=Tue, 09-Apr-24 16:44:40 GMT; domain=.api.openai.com; HttpOnly;
Secure; SameSite=None
- _cfuvid=w7KNhheJeVoLvTJtbGGRMdAw698oHGYkvMTRO6Y_PE4-1712679280070-0.0.1.1-604800000;
path=/; domain=.api.openai.com; HttpOnly; Secure; SameSite=None
Transfer-Encoding:
- chunked
access-control-allow-origin:
- '*'
alt-svc:
- h3=":443"; ma=86400
openai-model:
- gpt-3.5-turbo-0125
openai-organization:
- user-vgqng3jybrjkfe7a1gf2l5ch
openai-processing-ms:
- '3144'
openai-version:
- '2020-10-01'
strict-transport-security:
- max-age=15724800; includeSubDomains
x-ratelimit-limit-requests:
- '200'
x-ratelimit-limit-tokens:
- '40000'
x-ratelimit-remaining-requests:
- '194'
x-ratelimit-remaining-tokens:
- '39733'
x-ratelimit-reset-requests:
- 40m32.016s
x-ratelimit-reset-tokens:
- 400ms
x-request-id:
- req_f05f797b55ec8ff19b979b928dcbb377
status:
code: 200
message: OK
version: 1
Loading
Loading