Skip to content

Commit

Permalink
review changes minus testing
Browse files Browse the repository at this point in the history
  • Loading branch information
sabrenner committed Apr 15, 2024
1 parent 002d706 commit de414d4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 50 deletions.
46 changes: 22 additions & 24 deletions ddtrace/contrib/langchain/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,10 @@ def traced_embedding(langchain, pin, func, instance, args, kwargs):
def traced_chain_call(langchain, pin, func, instance, args, kwargs):
integration = langchain._datadog_integration
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
pin,
"{}.{}".format(instance.__module__, instance.__class__.__name__),
submit_to_llmobs=True,
interface_type="chain",
)
inputs = None
final_outputs = {}
Expand Down Expand Up @@ -624,11 +627,7 @@ def traced_chain_call(langchain, pin, func, instance, args, kwargs):
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
inputs,
)
integration.llmobs_set_tags("chain", span, inputs, final_outputs)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
if integration.is_pc_sampled_log(span):
Expand All @@ -655,7 +654,10 @@ def traced_chain_call(langchain, pin, func, instance, args, kwargs):
async def traced_chain_acall(langchain, pin, func, instance, args, kwargs):
integration = langchain._datadog_integration
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
pin,
"{}.{}".format(instance.__module__, instance.__class__.__name__),
submit_to_llmobs=True,
interface_type="chain",
)
inputs = None
final_outputs = {}
Expand All @@ -682,11 +684,7 @@ async def traced_chain_acall(langchain, pin, func, instance, args, kwargs):
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
inputs,
)
integration.llmobs_set_tags("chain", span, inputs, final_outputs)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
if integration.is_pc_sampled_log(span):
Expand Down Expand Up @@ -725,9 +723,13 @@ def traced_lcel_runnable_sequence(langchain, pin, func, instance, args, kwargs):
"""
integration = langchain._datadog_integration
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
pin,
"{}.{}".format(instance.__module__, instance.__class__.__name__),
submit_to_llmobs=True,
interface_type="chain",
)
raw_inputs, inputs = None, None
final_output = None
try:
raw_inputs = inputs = get_argument_value(args, kwargs, 0, "input")
if integration.is_pc_sampled_span(span):
Expand All @@ -752,11 +754,7 @@ def traced_lcel_runnable_sequence(langchain, pin, func, instance, args, kwargs):
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
raw_inputs,
)
integration.llmobs_set_tags("chain", span, raw_inputs, final_output)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
return final_output
Expand All @@ -769,9 +767,13 @@ async def traced_lcel_runnable_sequence_async(langchain, pin, func, instance, ar
"""
integration = langchain._datadog_integration
span = integration.trace(
pin, "%s.%s" % (instance.__module__, instance.__class__.__name__), submit_to_llmobs=True, interface_type="chain"
pin,
"{}.{}".format(instance.__module__, instance.__class__.__name__),
submit_to_llmobs=True,
interface_type="chain",
)
raw_inputs, inputs = None, None
final_output = None
try:
raw_inputs = inputs = get_argument_value(args, kwargs, 0, "input")
if integration.is_pc_sampled_span(span):
Expand All @@ -796,11 +798,7 @@ async def traced_lcel_runnable_sequence_async(langchain, pin, func, instance, ar
raise
finally:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
"chain",
span,
raw_inputs,
)
integration.llmobs_set_tags("chain", span, raw_inputs, final_output)
span.finish()
integration.metric(span, "dist", "request.duration", span.duration_ns)
return final_output
Expand Down
42 changes: 16 additions & 26 deletions ddtrace/llmobs/_integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import SPAN_KIND

from .base import BaseLLMIntegration
Expand All @@ -41,8 +42,8 @@ def llmobs_set_tags(
operation: str, # oneof "llm","chat","chain"
span: Span,
inputs: Any,
response: Optional[Any] = None,
error: Optional[bool] = False,
response: Any = None,
error: bool = False,
) -> None:
"""Sets meta tags and metrics for span events to be sent to LLMObs."""
if not self.llmobs_enabled:
Expand All @@ -51,24 +52,24 @@ def llmobs_set_tags(
span.set_tag_str(MODEL_NAME, span.get_tag(MODEL) or "")
span.set_tag_str(MODEL_PROVIDER, model_provider or "")

input_parameters = self._llmobs_set_input_parameters(span, model_provider)
self._llmobs_set_input_parameters(span, model_provider)

if operation == "llm":
self._llmobs_set_meta_tags_from_llm(span, inputs, response, error)
elif operation == "chat":
self._llmobs_set_meta_tags_from_chat_model(span, inputs, response, error)
elif operation == "chain":
self._llmobs_set_meta_tags_from_chain(span, inputs, input_parameters)
self._llmobs_set_meta_tags_from_chain(span, inputs, response)

span.set_tag_str(METRICS, json.dumps({}))

def _llmobs_set_input_parameters(
self,
span: Span,
model_provider: Optional[str] = None,
) -> dict:
):
if not model_provider:
return {}
return

input_parameters = {}
temperature = span.get_tag(f"langchain.request.{model_provider}.parameters.temperature") or span.get_tag(
Expand All @@ -87,14 +88,12 @@ def _llmobs_set_input_parameters(
if input_parameters:
span.set_tag_str(INPUT_PARAMETERS, json.dumps(input_parameters))

return input_parameters

def _llmobs_set_meta_tags_from_llm(
self,
span: Span,
prompts: List[Any],
completions: Any,
err: Optional[bool] = False,
err: bool = False,
) -> None:
span.set_tag_str(SPAN_KIND, "llm")

Expand All @@ -112,7 +111,7 @@ def _llmobs_set_meta_tags_from_chat_model(
span: Span,
chat_messages: List[List[Any]],
chat_completions: Any,
err: Optional[bool] = False,
err: bool = False,
) -> None:
span.set_tag_str(SPAN_KIND, "llm")

Expand Down Expand Up @@ -144,26 +143,17 @@ def _llmobs_set_meta_tags_from_chat_model(
span.set_tag_str(OUTPUT_MESSAGES, json.dumps(output_messages))

def _llmobs_set_meta_tags_from_chain(
self, span: Span, inputs: Union[str, Dict[str, Any], List[Union[str, Dict[str, Any]]]], input_parameters: dict
self,
span: Span,
inputs: Union[str, Dict[str, Any], List[Union[str, Dict[str, Any]]]],
outputs: Any,
) -> None:
span.set_tag_str(SPAN_KIND, "workflow")

if inputs is not None:
if isinstance(inputs, dict):
input_parameters.update(inputs)
elif isinstance(inputs, list): # batched chain call
input_parameters = {}
for idx, inp in enumerate(inputs):
if isinstance(inp, dict):
for k, v in inp.items():
input_parameters[f"input.{idx}.{k}"] = v
else:
input_parameters[f"input.{idx}"] = inp
elif isinstance(inputs, str):
span.set_tag_str(INPUT_VALUE, inputs)
return

span.set_tag_str(INPUT_PARAMETERS, json.dumps(input_parameters))
span.set_tag_str(INPUT_VALUE, str(inputs))
if outputs is not None:
span.set_tag_str(OUTPUT_VALUE, str(outputs))

def _set_base_span_tags( # type: ignore[override]
self,
Expand Down

0 comments on commit de414d4

Please sign in to comment.