Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pymongo): use bytecode wrapping to trace pymongo clients #10516

Merged
merged 23 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ddtrace/contrib/internal/mongoengine/patch.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TODO(mabdinue): Remove the pymongoengine integration, this integration does nothing special
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
# it just uses the pymongo integration and creates unnecessary pin objects
import mongoengine

from ..pymongo.patch import patch as patch_pymongo_module
from ..pymongo.patch import unpatch as unpatch_pymongo_module
from .trace import WrappedConnect


Expand All @@ -13,10 +17,16 @@ def get_version():


def patch():
if getattr(mongoengine, "_datadog_patch", False):
return
mongoengine.connect = WrappedConnect(_connect)
mongoengine._datadog_patch = True
patch_pymongo_module()


def unpatch():
if not getattr(mongoengine, "_datadog_patch", False):
return
mongoengine.connect = _connect
mongoengine._datadog_patch = False
unpatch_pymongo_module()
9 changes: 4 additions & 5 deletions ddtrace/contrib/internal/mongoengine/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import wrapt

import ddtrace
from ddtrace.contrib.internal.pymongo.client import TracedMongoClient

# keep the TracedMongoClient import to avoid breaking the public api
from ddtrace.contrib.internal.pymongo.client import TracedMongoClient # noqa: F401
from ddtrace.ext import mongo as mongox
from ddtrace.internal.schema import schematize_service_name

Expand All @@ -13,6 +15,7 @@
_SERVICE = schematize_service_name(mongox.SERVICE)


# TODO(mabdinur): Remove this class when ``ddtrace.contrib.mongoengine.trace`` is removed
class WrappedConnect(wrapt.ObjectProxy):
"""WrappedConnect wraps mongoengines 'connect' function to ensure
that all returned connections are wrapped for tracing.
Expand All @@ -26,10 +29,6 @@ def __call__(self, *args, **kwargs):
client = self.__wrapped__(*args, **kwargs)
pin = ddtrace.Pin.get_from(self)
if pin:
# mongoengine uses pymongo internally, so we can just piggyback on the
# existing pymongo integration and make sure that the connections it
# uses internally are traced.
client = TracedMongoClient(client)
ddtrace.Pin(service=pin.service, tracer=pin.tracer).onto(client)

return client
29 changes: 29 additions & 0 deletions ddtrace/contrib/internal/pymongo/client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# stdlib
import contextlib
import functools
import json
from typing import Iterable

Expand Down Expand Up @@ -45,6 +46,31 @@
_DEFAULT_SERVICE = schematize_service_name("pymongo")


def trace_mongo_client_init(func, args, kwargs):
# Call MongoClient.__init__
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
func(*args, **kwargs)
client = get_argument_value(args, kwargs, 0, "self")
# The MongoClient attempts to trace all of the network
# calls in the trace library. This is good because it measures the
# actual network time. It's bad because it uses a private API which
# could change. We'll see how this goes.
if not isinstance(client._topology, TracedTopology):
client._topology = TracedTopology(client._topology)

def __setddpin__(client, pin):
pin.onto(client._topology)

def __getddpin__(client):
return ddtrace.Pin.get_from(client._topology)

client.__setddpin__ = functools.partial(__setddpin__, client)
client.__getddpin__ = functools.partial(__getddpin__, client)

# Default Pin
ddtrace.Pin(service=_DEFAULT_SERVICE).onto(client)


# TODO(<owner>): Remove TracedMongoClient when ddtrace.contrib.pymongo.client is removed from the public API.
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
class TracedMongoClient(ObjectProxy):
def __init__(self, client=None, *args, **kwargs):
# To support the former trace_mongo_client interface, we have to keep this old interface
Expand Down Expand Up @@ -88,6 +114,9 @@ def __getddpin__(self):

@contextlib.contextmanager
def wrapped_validate_session(wrapped, instance, args, kwargs):
# The function is exposed in the public API, but it is not used in the codebase.
# TODO(<owner>): Remove this function when ddtrace.contrib.pymongo.client is removed.
mabdinur marked this conversation as resolved.
Show resolved Hide resolved

# We do this to handle a validation `A is B` in pymongo that
# relies on IDs being equal. Since we are proxying objects, we need
# to ensure we're compare proxy with proxy or wrapped with wrapped
Expand Down
50 changes: 23 additions & 27 deletions ddtrace/contrib/internal/pymongo/patch.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import contextlib

import pymongo
from wrapt import wrap_function_wrapper as _w

from ddtrace import Pin
from ddtrace import config
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import SPAN_MEASURED_KEY
from ddtrace.contrib import trace_utils
from ddtrace.contrib.trace_utils import unwrap as _u
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import mongo
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.utils import get_argument_value
from ddtrace.internal.wrapping import unwrap as _u
from ddtrace.internal.wrapping import wrap as _w

from .client import TracedMongoClient
# keep TracedMongoClient import to maintain bakcwards compatibility
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
from .client import TracedMongoClient # noqa: F401
from .client import set_address_tags
from .client import wrapped_validate_session
from .client import trace_mongo_client_init


config._add(
Expand All @@ -31,53 +33,47 @@ def get_version():
return getattr(pymongo, "__version__", "")


# Original Client class
_MongoClient = pymongo.MongoClient

_VERSION = pymongo.version_tuple
_CHECKOUT_FN_NAME = "get_socket" if _VERSION < (4, 5) else "checkout"
_VERIFY_VERSION_CLASS = pymongo.pool.SocketInfo if _VERSION < (4, 5) else pymongo.pool.Connection


def patch():
if getattr(pymongo, "_datadog_patch", False):
return
patch_pymongo_module()
# We should progressively get rid of TracedMongoClient. We now try to
# wrap methods individually. cf #1501
pymongo.MongoClient = TracedMongoClient
_w(pymongo.MongoClient.__init__, trace_mongo_client_init)
pymongo._datadog_patch = True


def unpatch():
if not getattr(pymongo, "_datadog_patch", False):
return
unpatch_pymongo_module()
pymongo.MongoClient = _MongoClient
_u(pymongo.MongoClient, pymongo.MongoClient.__init__)
pymongo._datadog_patch = False


def patch_pymongo_module():
if getattr(pymongo, "_datadog_patch", False):
return
pymongo._datadog_patch = True
Pin().onto(pymongo.server.Server)

# Whenever a pymongo command is invoked, the lib either:
# - Creates a new socket & performs a TCP handshake
# - Grabs a socket already initialized before
_w("pymongo.server", "Server.%s" % _CHECKOUT_FN_NAME, traced_get_socket)
_w("pymongo.pool", f"{_VERIFY_VERSION_CLASS.__name__}.validate_session", wrapped_validate_session)
checkout_fn = getattr(pymongo.server.Server, _CHECKOUT_FN_NAME)
_w(checkout_fn, traced_get_socket)


def unpatch_pymongo_module():
if not getattr(pymongo, "_datadog_patch", False):
return
pymongo._datadog_patch = False

_u(pymongo.server.Server, _CHECKOUT_FN_NAME)
_u(_VERIFY_VERSION_CLASS, "validate_session")
checkout_fn = getattr(pymongo.server.Server, _CHECKOUT_FN_NAME)
_u(checkout_fn, traced_get_socket)


@contextlib.contextmanager
def traced_get_socket(wrapped, instance, args, kwargs):
pin = Pin._find(wrapped, instance)
def traced_get_socket(func, args, kwargs):
instance = get_argument_value(args, kwargs, 0, "self")
pin = Pin._find(func, instance)
if not pin or not pin.enabled():
with wrapped(*args, **kwargs) as sock_info:
with func(*args, **kwargs) as sock_info:
yield sock_info
return

Expand All @@ -92,7 +88,7 @@ def traced_get_socket(wrapped, instance, args, kwargs):
# set span.kind tag equal to type of operation being performed
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT)

with wrapped(*args, **kwargs) as sock_info:
with func(*args, **kwargs) as sock_info:
set_address_tags(span, sock_info.address)
span.set_tag(SPAN_MEASURED_KEY)
yield sock_info
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
pymongo: Ensures instances of the ``pymongo.MongoClient`` can be patch after pymongo is imported.
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
10 changes: 4 additions & 6 deletions tests/contrib/mongoengine/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY
from ddtrace.contrib.mongoengine.patch import patch
from ddtrace.contrib.mongoengine.patch import unpatch
from ddtrace.contrib.pymongo.client import TracedMongoClient
from ddtrace.contrib.pymongo.client import TracedTopology
from ddtrace.ext import mongo as mongox
from ddtrace.internal.schema import DEFAULT_SPAN_SERVICE_NAME
Expand Down Expand Up @@ -155,7 +154,10 @@ def test_opentracing(self):

# confirm the parenting
assert ot_span.parent_id is None
assert dd_span.parent_id == ot_span.span_id
# dd_span is a child of the pymongo.checkout span, this span is created by the global tracer
# and is not captured by the DummyTracer. dd_span._parent.parent_id is equal to ot_span.span_id
# TODO: Ensure the Pin used to trace pymongo clients and servers pin onto a common object.
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
# assert dd_span.parent_id == ot_span.span_id

assert ot_span.name == "ot_span"
assert ot_span.service == "my_svc"
Expand Down Expand Up @@ -419,15 +421,11 @@ def test_multiple_connect_no_double_patching(self):
Regression test for https://github.com/DataDog/dd-trace-py/issues/2474
"""
client = mongoengine.connect(port=MONGO_CONFIG["port"])
assert isinstance(client, TracedMongoClient)
assert not isinstance(client.__wrapped__, TracedMongoClient)
assert isinstance(client._topology, TracedTopology)
assert not isinstance(client._topology.__wrapped__, TracedTopology)
client.close()

client = mongoengine.connect(port=MONGO_CONFIG["port"])
assert isinstance(client, TracedMongoClient)
assert not isinstance(client.__wrapped__, TracedMongoClient)
assert isinstance(client._topology, TracedTopology)
assert not isinstance(client._topology.__wrapped__, TracedTopology)
client.close()
Expand Down
19 changes: 18 additions & 1 deletion tests/contrib/pymongo/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# 3p
import pymongo
import pytest

# project
from ddtrace import Pin
Expand Down Expand Up @@ -741,7 +742,7 @@ def setUp(self):
# and choose from to perform classic operations. For the sake of our tests,
# let's limit this number to 1
self.client = pymongo.MongoClient(port=MONGO_CONFIG["port"], maxPoolSize=1)
# Override TracedMongoClient's pin's tracer with our dummy tracer
# Override MongoClient's pin's tracer with our dummy tracer
Pin.override(self.client, tracer=self.tracer, service="testdb")

def tearDown(self):
Expand Down Expand Up @@ -843,3 +844,19 @@ def test_server_info(self):
assert len(spans) == 2
self.check_socket_metadata(spans[0])
assert spans[1].name == "pymongo.cmd"


@pytest.mark.snapshot(variants={"pre_45": pymongo.version_tuple < (4, 5), "post_45": pymongo.version_tuple >= (4, 5)})
def test_patch_pymongo_client_after_import():
# Ensure that we can patch a pymongo client after it has been imported
assert not getattr(pymongo, "_datadog_patch", False)
from pymongo import MongoClient

try:
patch() # Patch the pymongo client
assert pymongo._datadog_patch

client = MongoClient(port=MONGO_CONFIG["port"])
client.server_info()
finally:
unpatch()
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
[[
{
"name": "pymongo.checkout",
"service": "pymongo",
"resource": "pymongo.checkout",
"trace_id": 0,
"span_id": 1,
"parent_id": 0,
"type": "mongodb",
"error": 0,
"meta": {
"_dd.base_service": "",
"_dd.p.dm": "-0",
"_dd.p.tid": "66da429b00000000",
"component": "pymongo",
"db.system": "mongodb",
"language": "python",
"out.host": "localhost",
"runtime-id": "dca37e95aff44f08b61d9f40e9558ca4",
"server.address": "localhost",
"span.kind": "client"
},
"metrics": {
"_dd.measured": 1,
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"network.destination.port": 27017,
"process_id": 96116
},
"duration": 8814000,
"start": 1725579931065857000
},
{
"name": "pymongo.cmd",
"service": "pymongo",
"resource": "buildinfo 1",
"trace_id": 0,
"span_id": 2,
"parent_id": 1,
"type": "mongodb",
"error": 0,
"meta": {
"_dd.base_service": "",
"component": "pymongo",
"db.system": "mongodb",
"mongodb.db": "admin",
"out.host": "localhost",
"server.address": "localhost",
"span.kind": "client"
},
"metrics": {
"_dd.measured": 1,
"mongodb.collection": 1,
"network.destination.port": 27017
},
"duration": 1711000,
"start": 1725579931072848000
}]]
Loading
Loading