-
Notifications
You must be signed in to change notification settings - Fork 412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(llmobs): submit langchain tool.invoke tool spans #10410
Changes from 7 commits
3a305a2
4ec344d
93def52
97cfe68
d6a08a0
dc4f63c
839f300
ee940db
e1eb5d2
024ce03
85d5f80
46e4e06
aefc770
09c5ae2
34ba377
f5481cd
26aa89b
34096be
7ec77b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -965,6 +965,158 @@ def traced_similarity_search(langchain, pin, func, instance, args, kwargs): | |
return documents | ||
|
||
|
||
@with_traced_module | ||
def traced_base_tool_invoke(langchain, pin, func, instance, args, kwargs): | ||
integration = langchain._datadog_integration | ||
tool_input = get_argument_value(args, kwargs, 0, "input") | ||
config = get_argument_value(args, kwargs, 1, "config") if len(args) >= 2 else None | ||
yahya-mouman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
span = integration.trace( | ||
pin, | ||
"%s.%s.%s.%s" % (func.__module__, func.__class__.__name__, func.__name__, func.__self__.name), | ||
interface_type="tool", | ||
submit_to_llmobs=True, | ||
) | ||
|
||
tool_output = None | ||
try: | ||
if instance.name: | ||
span.set_tag_str("langchain.request.tool.name", integration.trunc(str(instance.name))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Truncation is only ever performed on input/outputs, so no need to do it here |
||
if integration.is_pc_sampled_span(span): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
tool_attributes = [ | ||
"description", | ||
"return_direct", | ||
"verbose", | ||
"handle_tool_error", | ||
"handle_validation_error", | ||
"response_format", | ||
] | ||
for attribute in tool_attributes: | ||
value = getattr(instance, attribute, None) | ||
if value: | ||
span.set_tag_str("langchain.request.tool.%s" % attribute, integration.trunc(str(value))) | ||
if getattr(instance, "metadata", None): | ||
for key, value in instance.metadata.items(): | ||
span.set_tag_str("langchain.request.tool.metadata.%s" % key, integration.trunc(str(value))) | ||
if getattr(instance, "tags", None): | ||
for idx, tag in enumerate(instance.tags): | ||
span.set_tag_str("langchain.request.tool.tags.%d" % idx, integration.trunc(str(tag))) | ||
if tool_input: | ||
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input))) | ||
if config: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this enough or should I parse the configs one by one and add them There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
span.set_tag_str("langchain.request.config", integration.trunc(str(config))) | ||
tool_output = func(*args, **kwargs) | ||
if tool_output is not None: | ||
span.set_tag_str("langchain.response.output", integration.trunc(str(tool_output))) | ||
except Exception: | ||
span.set_exc_info(*sys.exc_info()) | ||
integration.metric(span, "incr", "request.error", 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need for adding datadog metrics here, I think this is out of scope for this work and our team. |
||
raise | ||
finally: | ||
if integration.is_pc_sampled_llmobs(span): | ||
integration.llmobs_set_tags( | ||
"tool", | ||
span, | ||
tool_input, | ||
tool_output, | ||
error=bool(span.error), | ||
) | ||
span.finish() | ||
integration.metric(span, "dist", "request.duration", span.duration_ns) | ||
if integration.is_pc_sampled_log(span): | ||
integration.log( | ||
span, | ||
"info" if span.error == 0 else "error", | ||
"sampled %s.%s.%s" % (func.__module__, func.__class__.__name__, func.__self__.__class__.__name__), | ||
attrs={ | ||
"tool_name": instance.__self__.name or "", | ||
"input": tool_input, | ||
"config": config or "", | ||
}, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. emitting logs is deprecated and discontinued, so can remove this |
||
return tool_output | ||
|
||
|
||
@with_traced_module | ||
async def traced_base_tool_ainvoke(langchain, pin, func, instance, args, kwargs): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a better way to trace the async variant besides duplicating the code ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately nope 😢 I think there's a couple ways to depollute the patch module (extract tracing functions to separate files, reuse common code in the sync/async functions) but we can keep this for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comments apply to the async function as above |
||
integration = langchain._datadog_integration | ||
tool_input = get_argument_value(args, kwargs, 0, "input") | ||
config = get_argument_value(args, kwargs, 1, "config") if len(args) >= 2 else None | ||
|
||
span = integration.trace( | ||
pin, | ||
"%s.%s.%s" % (func.__module__, func.__class__.__name__, func.__self__.__class__.__name__), | ||
interface_type="tool", | ||
submit_to_llmobs=True, | ||
) | ||
|
||
tool_output = None | ||
try: | ||
if instance.name: | ||
span.set_tag_str("langchain.request.tool.name", integration.trunc(str(instance.name))) | ||
if integration.is_pc_sampled_span(span): | ||
if instance.description: | ||
span.set_tag_str("langchain.request.tool.description", integration.trunc(str(instance.description))) | ||
if instance.return_direct: | ||
span.set_tag_str("langchain.request.tool.return_direct", integration.trunc(str(instance.return_direct))) | ||
if instance.verbose: | ||
span.set_tag_str("langchain.request.tool.verbose", integration.trunc(str(instance.verbose))) | ||
if instance.metadata: | ||
for key, value in instance.metadata.items(): | ||
span.set_tag_str("langchain.request.tool.metadata.%s" % key, integration.trunc(str(value))) | ||
if instance.tags: | ||
for idx, tag in enumerate(instance.tags): | ||
span.set_tag_str("langchain.request.tool.tags.%d" % idx, integration.trunc(str(tag))) | ||
if instance.handle_tool_error: | ||
span.set_tag_str( | ||
"langchain.request.tool.handle_tool_error", | ||
integration.trunc(str(instance.handle_tool_error)), | ||
) | ||
if instance.handle_validation_error: | ||
span.set_tag_str( | ||
"langchain.request.tool.handle_validation_error", | ||
integration.trunc(str(instance.handle_validation_error)), | ||
) | ||
if instance.response_format: | ||
span.set_tag_str( | ||
"langchain.request.tool.response_format", integration.trunc(str(instance.response_format)) | ||
) | ||
if tool_input: | ||
span.set_tag_str("langchain.request.input", integration.trunc(str(tool_input))) | ||
if config: | ||
span.set_tag_str("langchain.request.config", integration.trunc(str(config))) | ||
tool_output = await func(*args, **kwargs) | ||
if tool_output is not None: | ||
span.set_tag_str("langchain.response.output", integration.trunc(str(tool_output))) | ||
except Exception: | ||
span.set_exc_info(*sys.exc_info()) | ||
integration.metric(span, "incr", "request.error", 1) | ||
raise | ||
finally: | ||
if integration.is_pc_sampled_llmobs(span): | ||
integration.llmobs_set_tags( | ||
"tool", | ||
span, | ||
tool_input, | ||
tool_output, | ||
error=bool(span.error), | ||
) | ||
span.finish() | ||
integration.metric(span, "dist", "request.duration", span.duration_ns) | ||
if integration.is_pc_sampled_log(span): | ||
integration.log( | ||
span, | ||
"info" if span.error == 0 else "error", | ||
"sampled %s.%s.%s" % (func.__module__, func.__class__.__name__, func.__self__.__class__.__name__), | ||
attrs={ | ||
"tool_name": instance.__self__.name or "", | ||
"input": tool_input, | ||
"config": config or "", | ||
}, | ||
) | ||
return tool_output | ||
|
||
|
||
def _patch_embeddings_and_vectorstores(): | ||
""" | ||
Text embedding models override two abstract base methods instead of super calls, | ||
|
@@ -1092,6 +1244,8 @@ def patch(): | |
) | ||
wrap("langchain_core", "runnables.base.RunnableSequence.batch", traced_lcel_runnable_sequence(langchain)) | ||
wrap("langchain_core", "runnables.base.RunnableSequence.abatch", traced_lcel_runnable_sequence_async(langchain)) | ||
wrap("langchain_core", "tools.BaseTool.invoke", traced_base_tool_invoke(langchain)) | ||
wrap("langchain_core", "tools.BaseTool.ainvoke", traced_base_tool_ainvoke(langchain)) | ||
if langchain_openai: | ||
wrap("langchain_openai", "OpenAIEmbeddings.embed_documents", traced_embedding(langchain)) | ||
if langchain_pinecone: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,7 +44,7 @@ | |
"system": "system", | ||
} | ||
|
||
SUPPORTED_OPERATIONS = ["llm", "chat", "chain", "embedding"] | ||
SUPPORTED_OPERATIONS = ["llm", "chat", "chain", "embedding", "tool"] | ||
|
||
|
||
class LangChainIntegration(BaseLLMIntegration): | ||
|
@@ -89,6 +89,8 @@ def llmobs_set_tags( | |
self._llmobs_set_meta_tags_from_chain(span, inputs, response, error) | ||
elif operation == "embedding": | ||
self._llmobs_set_meta_tags_from_embedding(span, inputs, response, error, is_workflow=is_workflow) | ||
elif operation == "tool": | ||
self._llmobs_set_meta_tags_from_tool(span, inputs, response, error) | ||
span.set_tag_str(METRICS, json.dumps({})) | ||
|
||
def _llmobs_set_metadata(self, span: Span, model_provider: Optional[str] = None) -> None: | ||
|
@@ -260,6 +262,35 @@ def _llmobs_set_meta_tags_from_embedding( | |
except (TypeError, IndexError): | ||
log.warning("Failed to write output vectors", output_embedding) | ||
|
||
def _llmobs_set_meta_tags_from_tool( | ||
self, | ||
span: Span, | ||
tool_input: Union[str, Dict[str, Any], Any], | ||
tool_output: Any, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟠 Code Quality Violationdo not use Any, use a concrete type (...read more)Use the Learn More |
||
error: bool, | ||
) -> None: | ||
span.set_tag_str(SPAN_KIND, "tool") | ||
if tool_input is not None: | ||
try: | ||
formatted_inputs = self.format_io(tool_input) | ||
if isinstance(formatted_inputs, str): | ||
span.set_tag_str(INPUT_VALUE, formatted_inputs) | ||
else: | ||
span.set_tag_str(INPUT_VALUE, json.dumps(self.format_io(tool_input))) | ||
except TypeError: | ||
log.warning("Failed to serialize tool input data to JSON") | ||
if error: | ||
span.set_tag_str(OUTPUT_VALUE, "") | ||
elif tool_output is not None: | ||
try: | ||
formatted_outputs = self.format_io(tool_output) | ||
if isinstance(formatted_outputs, str): | ||
span.set_tag_str(OUTPUT_VALUE, formatted_outputs) | ||
else: | ||
span.set_tag_str(OUTPUT_VALUE, json.dumps(self.format_io(tool_output))) | ||
except TypeError: | ||
log.warning("Failed to serialize tool output data to JSON") | ||
|
||
def _set_base_span_tags( # type: ignore[override] | ||
self, | ||
span: Span, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
features: | ||
- | | ||
APM : The LangChain integration now submits spans from tool.invoke to APM as tool spans. | ||
LLM Observability: The LangChain integration now submits tool spans from tool.invoke to LLM Observability as retrieval spans. | ||
yahya-mouman marked this conversation as resolved.
Show resolved
Hide resolved
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1274,3 +1274,65 @@ def test_faiss_vectorstore_retrieval(langchain_community, langchain_openai, requ | |
retriever = faiss.as_retriever() | ||
with request_vcr.use_cassette("openai_retrieval_embedding.yaml"): | ||
retriever.invoke("What was the message of the last test query?") | ||
|
||
|
||
@pytest.mark.snapshot( | ||
ignores=["meta.langchain.request.tool.description", "meta.langchain.request.tool.response_format"], | ||
token="tests.contrib.langchain.test_langchain_community.test_base_tool_invoke", | ||
) | ||
def test_base_tool_invoke(langchain_core, request_vcr): | ||
""" | ||
Test that invoking a tool with langchain will | ||
result in a 1-span trace with a tool span. | ||
""" | ||
if langchain_core is None: | ||
pytest.skip("langchain-core not installed which is required for this test.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very small nit: I think it's ever-so-slightly preferable to use a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree but the issue is that this specific dependency isn't needed on all tests and if I understand correctly, fixtures are not available on the level of skipif annotations. Do you know of any workaround for this ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know of a workaround if you want to keep using My personal taste is that it's more traceable to Looks like there are ~10 tests that use the Anyway, it's a very small nit. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah i completely understand your point. |
||
|
||
from math import pi | ||
|
||
from langchain_core.tools import StructuredTool | ||
|
||
def circumference_tool(radius: float) -> float: | ||
return float(radius) * 2.0 * pi | ||
|
||
calculator = StructuredTool.from_function( | ||
func=circumference_tool, | ||
name="Circumference calculator", | ||
description="Use this tool when you need to calculate a circumference using the radius of a circle", | ||
return_direct=True, | ||
response_format="content", | ||
) | ||
|
||
calculator.invoke("2") | ||
|
||
|
||
@pytest.mark.asyncio | ||
@pytest.mark.snapshot( | ||
ignores=["meta.langchain.request.tool.description", "meta.langchain.request.tool.response_format"], | ||
token="tests.contrib.langchain.test_langchain_community.test_base_tool_ainvoke", | ||
) | ||
async def test_base_tool_ainvoke(langchain_core, request_vcr): | ||
""" | ||
Test that invoking a tool with langchain will | ||
result in a 1-span trace with a tool span. Async mode | ||
""" | ||
|
||
if langchain_core is None: | ||
pytest.skip("langchain-core not installed which is required for this test.") | ||
|
||
from math import pi | ||
|
||
from langchain_core.tools import StructuredTool | ||
|
||
def circumference_tool(radius: float) -> float: | ||
return float(radius) * 2.0 * pi | ||
|
||
calculator = StructuredTool.from_function( | ||
func=circumference_tool, | ||
name="Circumference calculator", | ||
description="Use this tool when you need to calculate a circumference using the radius of a circle", | ||
return_direct=True, | ||
response_format="content", | ||
) | ||
|
||
await calculator.ainvoke("2") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -147,6 +147,15 @@ def _embed_documents(cls, embedding_model, documents, mock_tracer, cassette_name | |
LLMObs.disable() | ||
return mock_tracer.pop_traces()[0] | ||
|
||
@classmethod | ||
def _invoke_tool(cls, tool, tool_input, mock_tracer, cassette_name): | ||
LLMObs.enable(ml_app=cls.ml_app, integrations_enabled=False, _tracer=mock_tracer) | ||
with get_request_vcr(subdirectory_name=cls.cassette_subdirectory_name).use_cassette(cassette_name): | ||
if LANGCHAIN_VERSION > (0, 1): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any particular reason we're version gating this here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the ticket description you mentioned this so I was trying to make sure it's the case. I'm guessing since it's in langchain_community test suite it's always going to be the case ? |
||
tool.invoke(tool_input) | ||
LLMObs.disable() | ||
return mock_tracer.pop_traces()[0][0] | ||
|
||
|
||
@pytest.mark.skipif(LANGCHAIN_VERSION >= (0, 1), reason="These tests are for langchain < 0.1.0") | ||
class TestLLMObsLangchain(BaseTestLLMObsLangchain): | ||
|
@@ -622,6 +631,41 @@ def test_llmobs_embedding_documents( | |
) | ||
) | ||
|
||
def test_llmobs_base_tool_invoke(self, langchain_core, mock_llmobs_span_writer, mock_tracer): | ||
if langchain_core is None: | ||
pytest.skip("langchain-core not installed which is required for this test.") | ||
|
||
from math import pi | ||
|
||
class CircumferenceTool(langchain_core.tools.BaseTool): | ||
name = "Circumference calculator" | ||
description = "use this tool when you need to calculate a circumference using the radius of a circle" | ||
|
||
def _run(self, radius): | ||
return float(radius) * 2.0 * pi | ||
|
||
def _arun(self, radius: int): | ||
raise NotImplementedError("This tool does not support async") | ||
|
||
cassette_name = "langchain_tool_invoke_39.yaml" if PY39 else "langchain_tool_invoke.yaml" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, not sure if we need a cassette file for this test. |
||
span = self._invoke_tool( | ||
tool=CircumferenceTool(), | ||
tool_input="2", | ||
mock_tracer=mock_tracer, | ||
cassette_name=cassette_name, | ||
) | ||
assert mock_llmobs_span_writer.enqueue.call_count == 1 | ||
mock_llmobs_span_writer.enqueue.assert_called_with( | ||
_expected_llmobs_non_llm_span_event( | ||
span, | ||
span_kind="tool", | ||
input_value="2", | ||
output_value="12.566370614359172", | ||
tags={"ml_app": "langchain_test"}, | ||
integration="langchain", | ||
) | ||
) | ||
|
||
|
||
@pytest.mark.skipif(LANGCHAIN_VERSION < (0, 1), reason="These tests are for langchain >= 0.1.0") | ||
class TestTraceStructureWithLLMIntegrations(SubprocessTestCase): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't think of any metrics to emit from this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you talking about datadog metrics? If so we can ignore, that's out of scope for our team and this integration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we use any metrics from those emitted from the langchain integration ?