From 6f46cc7c0f1721fbf6d980399748b27067888506 Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Fri, 9 Aug 2024 14:33:44 -0400 Subject: [PATCH] chore(integrations): move fastapi,flask,flask_cache,futures to internal (#10127) - Moves all integration internals in ddtrace/contrib/(integration name)/ to ddtrace/contrib/internal/(integration name)/ for fasapi, flask, flask_cache, and futures - Ensures ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/ continue to expose the same functions, classes, imports, and module level variables (via from ..internal.integration.module import * imports). - Log a deprecation warning if internal modules in ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/. Only patch and unpack methods should be exposed by these packages. - https://github.com/DataDog/dd-trace-py/pull/9996 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [ ] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: Emmett Butler <723615+emmettbutler@users.noreply.github.com> --- ddtrace/contrib/fastapi/__init__.py | 10 +- ddtrace/contrib/fastapi/patch.py | 103 +--- ddtrace/contrib/flask/__init__.py | 9 +- ddtrace/contrib/flask/patch.py | 565 +----------------- ddtrace/contrib/flask/wrappers.py | 103 +--- ddtrace/contrib/flask_cache/__init__.py | 8 +- ddtrace/contrib/flask_cache/tracers.py | 195 +----- ddtrace/contrib/flask_cache/utils.py | 70 +-- ddtrace/contrib/futures/__init__.py | 10 +- ddtrace/contrib/futures/patch.py | 43 +- ddtrace/contrib/futures/threading.py | 43 +- ddtrace/contrib/internal/fastapi/patch.py | 103 ++++ ddtrace/contrib/internal/flask/patch.py | 564 +++++++++++++++++ ddtrace/contrib/internal/flask/wrappers.py | 95 +++ .../contrib/internal/flask_cache/tracers.py | 188 ++++++ ddtrace/contrib/internal/flask_cache/utils.py | 62 ++ ddtrace/contrib/internal/futures/patch.py | 43 ++ ddtrace/contrib/internal/futures/threading.py | 36 ++ ...-to-internal-fastapi-348259e3fe88465e.yaml | 10 + tests/contrib/flask/test_idempotency.py | 10 +- tests/contrib/flask_cache/test_utils.py | 6 +- 21 files changed, 1183 insertions(+), 1093 deletions(-) create mode 100644 ddtrace/contrib/internal/fastapi/patch.py create mode 100644 ddtrace/contrib/internal/flask/patch.py create mode 100644 ddtrace/contrib/internal/flask/wrappers.py create mode 100644 ddtrace/contrib/internal/flask_cache/tracers.py create mode 100644 ddtrace/contrib/internal/flask_cache/utils.py create mode 100644 ddtrace/contrib/internal/futures/patch.py create mode 100644 ddtrace/contrib/internal/futures/threading.py create mode 100644 releasenotes/notes/move-integrations-to-internal-fastapi-348259e3fe88465e.yaml diff --git a/ddtrace/contrib/fastapi/__init__.py b/ddtrace/contrib/fastapi/__init__.py index 60da03ef28a..12e27ccd5cb 100644 --- a/ddtrace/contrib/fastapi/__init__.py +++ b/ddtrace/contrib/fastapi/__init__.py @@ -57,8 +57,12 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch - from .patch import unpatch + # Required to allow users to import from `ddtrace.contrib.fastapi.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.fastapi.patch import get_version + from ..internal.fastapi.patch import patch + from ..internal.fastapi.patch import unpatch __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/fastapi/patch.py b/ddtrace/contrib/fastapi/patch.py index 974dcf64d84..06c723f0939 100644 --- a/ddtrace/contrib/fastapi/patch.py +++ b/ddtrace/contrib/fastapi/patch.py @@ -1,103 +1,4 @@ -import os +from ..internal.fastapi.patch import * # noqa: F401,F403 -import fastapi -import fastapi.routing -from ddtrace import Pin -from ddtrace import config -from ddtrace.contrib.asgi.middleware import TraceMiddleware -from ddtrace.contrib.starlette.patch import _trace_background_tasks -from ddtrace.contrib.starlette.patch import traced_handler -from ddtrace.internal.logger import get_logger -from ddtrace.internal.schema import schematize_service_name -from ddtrace.internal.utils.wrappers import unwrap as _u -from ddtrace.vendor.wrapt import ObjectProxy -from ddtrace.vendor.wrapt import wrap_function_wrapper as _w - - -log = get_logger(__name__) - -config._add( - "fastapi", - dict( - _default_service=schematize_service_name("fastapi"), - request_span_name="fastapi.request", - distributed_tracing=True, - trace_query_string=None, # Default to global config - _trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False), - ), -) - - -def get_version(): - # type: () -> str - return getattr(fastapi, "__version__", "") - - -def wrap_middleware_stack(wrapped, instance, args, kwargs): - return TraceMiddleware(app=wrapped(*args, **kwargs), integration_config=config.fastapi) - - -async def traced_serialize_response(wrapped, instance, args, kwargs): - """Wrapper for fastapi.routing.serialize_response function. - - This function is called on all non-Response objects to - convert them to a serializable form. - - This is the wrapper which calls ``jsonable_encoder``. - - This function does not do the actual encoding from - obj -> json string (e.g. json.dumps()). That is handled - by the Response.render function. - - DEV: We do not wrap ``jsonable_encoder`` because it calls - itself recursively, so there is a chance the overhead - added by creating spans will be higher than desired for - the result. - """ - pin = Pin.get_from(fastapi) - if not pin or not pin.enabled(): - return await wrapped(*args, **kwargs) - - with pin.tracer.trace("fastapi.serialize_response"): - return await wrapped(*args, **kwargs) - - -def patch(): - if getattr(fastapi, "_datadog_patch", False): - return - - fastapi._datadog_patch = True - Pin().onto(fastapi) - _w("fastapi.applications", "FastAPI.build_middleware_stack", wrap_middleware_stack) - _w("fastapi.routing", "serialize_response", traced_serialize_response) - - if not isinstance(fastapi.BackgroundTasks.add_task, ObjectProxy): - _w("fastapi", "BackgroundTasks.add_task", _trace_background_tasks(fastapi)) - - # We need to check that Starlette instrumentation hasn't already patched these - if not isinstance(fastapi.routing.APIRoute.handle, ObjectProxy): - _w("fastapi.routing", "APIRoute.handle", traced_handler) - - if not isinstance(fastapi.routing.Mount.handle, ObjectProxy): - _w("starlette.routing", "Mount.handle", traced_handler) - - -def unpatch(): - if not getattr(fastapi, "_datadog_patch", False): - return - - fastapi._datadog_patch = False - - _u(fastapi.applications.FastAPI, "build_middleware_stack") - _u(fastapi.routing, "serialize_response") - - # We need to check that Starlette instrumentation hasn't already unpatched these - if isinstance(fastapi.routing.APIRoute.handle, ObjectProxy): - _u(fastapi.routing.APIRoute, "handle") - - if isinstance(fastapi.routing.Mount.handle, ObjectProxy): - _u(fastapi.routing.Mount, "handle") - - if isinstance(fastapi.BackgroundTasks.add_task, ObjectProxy): - _u(fastapi.BackgroundTasks, "add_task") +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/flask/__init__.py b/ddtrace/contrib/flask/__init__.py index c3a619a6bc2..e161bac99b7 100644 --- a/ddtrace/contrib/flask/__init__.py +++ b/ddtrace/contrib/flask/__init__.py @@ -104,10 +104,9 @@ def index(): with require_modules(required_modules) as missing_modules: if not missing_modules: # DEV: We do this so we can `@mock.patch('ddtrace.contrib.flask._patch.')` in tests - from . import patch as _patch - - patch = _patch.patch - unpatch = _patch.unpatch - get_version = _patch.get_version + from . import patch as _ # noqa: F401, I001 + from ..internal.flask.patch import patch + from ..internal.flask.patch import unpatch + from ..internal.flask.patch import get_version __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/flask/patch.py b/ddtrace/contrib/flask/patch.py index ae017bc5b16..98bf00b2be1 100644 --- a/ddtrace/contrib/flask/patch.py +++ b/ddtrace/contrib/flask/patch.py @@ -1,565 +1,4 @@ -import flask -import werkzeug -from werkzeug.exceptions import BadRequest -from werkzeug.exceptions import NotFound -from werkzeug.exceptions import abort +from ..internal.flask.patch import * # noqa: F401,F403 -from ddtrace._trace.trace_handlers import _ctype_from_headers -from ddtrace.contrib import trace_utils -from ddtrace.ext import SpanTypes -from ddtrace.internal.constants import COMPONENT -from ddtrace.internal.constants import HTTP_REQUEST_BLOCKED -from ddtrace.internal.constants import STATUS_403_TYPE_AUTO -from ddtrace.internal.packages import get_version_for_package -from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ...internal import core -from ...internal.schema import schematize_service_name -from ...internal.schema import schematize_url_operation -from ...internal.utils import http as http_utils - - -# Not all versions of flask/werkzeug have this mixin -try: - from werkzeug.wrappers.json import JSONMixin - - _HAS_JSON_MIXIN = True -except ImportError: - _HAS_JSON_MIXIN = False - -from ddtrace import Pin -from ddtrace import config -from ddtrace.vendor.wrapt import wrap_function_wrapper as _w - -from ...contrib.wsgi.wsgi import _DDWSGIMiddlewareBase -from ...internal.logger import get_logger -from ...internal.utils import get_argument_value -from ...internal.utils.importlib import func_name -from ...internal.utils.version import parse_version -from ..trace_utils import unwrap as _u -from .wrappers import _wrap_call_with_pin_check -from .wrappers import get_current_app -from .wrappers import simple_call_wrapper -from .wrappers import with_instance_pin -from .wrappers import wrap_function -from .wrappers import wrap_view - - -try: - from json import JSONDecodeError -except ImportError: - # handling python 2.X import error - JSONDecodeError = ValueError # type: ignore - - -log = get_logger(__name__) - -FLASK_VERSION = "flask.version" -_BODY_METHODS = {"POST", "PUT", "DELETE", "PATCH"} - -# Configure default configuration -config._add( - "flask", - dict( - # Flask service configuration - _default_service=schematize_service_name("flask"), - collect_view_args=True, - distributed_tracing_enabled=True, - template_default_name="", - trace_signals=True, - ), -) - - -def get_version(): - # type: () -> str - return get_version_for_package("flask") - - -if _HAS_JSON_MIXIN: - - class RequestWithJson(werkzeug.Request, JSONMixin): - pass - - _RequestType = RequestWithJson -else: - _RequestType = werkzeug.Request - -# Extract flask version into a tuple e.g. (0, 12, 1) or (1, 0, 2) -# DEV: This makes it so we can do `if flask_version >= (0, 12, 0):` -# DEV: Example tests: -# (0, 10, 0) > (0, 10) -# (0, 10, 0) >= (0, 10, 0) -# (0, 10, 1) >= (0, 10) -# (0, 11, 1) >= (0, 10) -# (0, 11, 1) >= (0, 10, 2) -# (1, 0, 0) >= (0, 10) -# (0, 9) == (0, 9) -# (0, 9, 0) != (0, 9) -# (0, 8, 5) <= (0, 9) -flask_version_str = get_version() -flask_version = parse_version(flask_version_str) - - -class _FlaskWSGIMiddleware(_DDWSGIMiddlewareBase): - _request_call_name = schematize_url_operation("flask.request", protocol="http", direction=SpanDirection.INBOUND) - _application_call_name = "flask.application" - _response_call_name = "flask.response" - - def _wrapped_start_response(self, start_response, ctx, status_code, headers, exc_info=None): - core.dispatch("flask.start_response.pre", (flask.request, ctx, config.flask, status_code, headers)) - if not core.get_item(HTTP_REQUEST_BLOCKED): - headers_from_context = "" - result_waf = core.dispatch_with_results("flask.start_response", ("Flask",)).waf - if result_waf: - headers_from_context = result_waf.value - if core.get_item(HTTP_REQUEST_BLOCKED): - # response code must be set here, or it will be too late - result_content = core.dispatch_with_results("flask.block.request.content", ()).block_requested - if result_content: - _, status, response_headers = result_content.value - result = start_response(str(status), response_headers) - else: - block_config = core.get_item(HTTP_REQUEST_BLOCKED) - desired_type = block_config.get("type", "auto") - status = block_config.get("status_code", 403) - if desired_type == "none": - response_headers = [] - else: - ctype = _ctype_from_headers(block_config, headers_from_context) - response_headers = [("content-type", ctype)] - result = start_response(str(status), response_headers) - core.dispatch("flask.start_response.blocked", (ctx, config.flask, response_headers, status)) - else: - result = start_response(status_code, headers) - else: - result = start_response(status_code, headers) - return result - - def _request_call_modifier(self, ctx, parsed_headers=None): - environ = ctx.get_item("environ") - # Create a werkzeug request from the `environ` to make interacting with it easier - # DEV: This executes before a request context is created - request = _RequestType(environ) - - req_body = None - result = core.dispatch_with_results( - "flask.request_call_modifier", - ( - ctx, - config.flask, - request, - environ, - _HAS_JSON_MIXIN, - FLASK_VERSION, - flask_version_str, - BadRequest, - ), - ).request_body - if result: - req_body = result.value - core.dispatch("flask.request_call_modifier.post", (ctx, config.flask, request, req_body)) - - -def patch(): - """ - Patch `flask` module for tracing - """ - # Check to see if we have patched Flask yet or not - if getattr(flask, "_datadog_patch", False): - return - flask._datadog_patch = True - - Pin().onto(flask.Flask) - core.dispatch("flask.patch", (flask_version,)) - # flask.app.Flask methods that have custom tracing (add metadata, wrap functions, etc) - _w("flask", "Flask.wsgi_app", patched_wsgi_app) - _w("flask", "Flask.dispatch_request", request_patcher("dispatch_request")) - _w("flask", "Flask.preprocess_request", request_patcher("preprocess_request")) - _w("flask", "Flask.add_url_rule", patched_add_url_rule) - _w("flask", "Flask.endpoint", patched_endpoint) - - _w("flask", "Flask.finalize_request", patched_finalize_request) - - if flask_version >= (2, 0, 0): - _w("flask", "Flask.register_error_handler", patched_register_error_handler) - else: - _w("flask", "Flask._register_error_handler", patched__register_error_handler) - - # flask.blueprints.Blueprint methods that have custom tracing (add metadata, wrap functions, etc) - _w("flask", "Blueprint.register", patched_blueprint_register) - _w("flask", "Blueprint.add_url_rule", patched_blueprint_add_url_rule) - - flask_hooks = [ - "before_request", - "after_request", - "teardown_request", - "teardown_appcontext", - ] - if flask_version < (2, 3, 0): - flask_hooks.append("before_first_request") - - for hook in flask_hooks: - _w("flask", "Flask.{}".format(hook), patched_flask_hook) - _w("flask", "after_this_request", patched_flask_hook) - - flask_app_traces = [ - "process_response", - "handle_exception", - "handle_http_exception", - "handle_user_exception", - "do_teardown_request", - "do_teardown_appcontext", - "send_static_file", - ] - if flask_version < (2, 2, 0): - flask_app_traces.append("try_trigger_before_first_request_functions") - - for name in flask_app_traces: - _w("flask", "Flask.{}".format(name), simple_call_wrapper("flask.{}".format(name))) - # flask static file helpers - _w("flask", "send_file", simple_call_wrapper("flask.send_file")) - - # flask.json.jsonify - _w("flask", "jsonify", patched_jsonify) - - _w("flask.templating", "_render", patched_render) - _w("flask", "render_template", _build_render_template_wrapper("render_template")) - _w("flask", "render_template_string", _build_render_template_wrapper("render_template_string")) - - bp_hooks = [ - "after_app_request", - "after_request", - "before_app_request", - "before_request", - "teardown_request", - "teardown_app_request", - ] - if flask_version < (2, 3, 0): - bp_hooks.append("before_app_first_request") - - for hook in bp_hooks: - _w("flask", "Blueprint.{}".format(hook), patched_flask_hook) - - if config.flask["trace_signals"]: - signals = [ - "template_rendered", - "request_started", - "request_finished", - "request_tearing_down", - "got_request_exception", - "appcontext_tearing_down", - ] - # These were added in 0.11.0 - if flask_version >= (0, 11): - signals.append("before_render_template") - - # These were added in 0.10.0 - if flask_version >= (0, 10): - signals.append("appcontext_pushed") - signals.append("appcontext_popped") - signals.append("message_flashed") - - for signal in signals: - module = "flask" - - # v0.9 missed importing `appcontext_tearing_down` in `flask/__init__.py` - # https://github.com/pallets/flask/blob/0.9/flask/__init__.py#L35-L37 - # https://github.com/pallets/flask/blob/0.9/flask/signals.py#L52 - # DEV: Version 0.9 doesn't have a patch version - if flask_version <= (0, 9) and signal == "appcontext_tearing_down": - module = "flask.signals" - - # DEV: Patch `receivers_for` instead of `connect` to ensure we don't mess with `disconnect` - _w(module, "{}.receivers_for".format(signal), patched_signal_receivers_for(signal)) - - -def unpatch(): - if not getattr(flask, "_datadog_patch", False): - return - flask._datadog_patch = False - - props = [ - # Flask - "Flask.wsgi_app", - "Flask.dispatch_request", - "Flask.add_url_rule", - "Flask.endpoint", - "Flask.preprocess_request", - "Flask.process_response", - "Flask.handle_exception", - "Flask.handle_http_exception", - "Flask.handle_user_exception", - "Flask.do_teardown_request", - "Flask.do_teardown_appcontext", - "Flask.send_static_file", - # Flask Hooks - "Flask.before_request", - "Flask.after_request", - "Flask.teardown_request", - "Flask.teardown_appcontext", - # Blueprint - "Blueprint.register", - "Blueprint.add_url_rule", - # Blueprint Hooks - "Blueprint.after_app_request", - "Blueprint.after_request", - "Blueprint.before_app_request", - "Blueprint.before_request", - "Blueprint.teardown_request", - "Blueprint.teardown_app_request", - # Signals - "template_rendered.receivers_for", - "request_started.receivers_for", - "request_finished.receivers_for", - "request_tearing_down.receivers_for", - "got_request_exception.receivers_for", - "appcontext_tearing_down.receivers_for", - # Top level props - "after_this_request", - "send_file", - "jsonify", - "render_template", - "render_template_string", - "templating._render", - ] - - props.append("Flask.finalize_request") - - if flask_version >= (2, 0, 0): - props.append("Flask.register_error_handler") - else: - props.append("Flask._register_error_handler") - - # These were added in 0.11.0 - if flask_version >= (0, 11): - props.append("before_render_template.receivers_for") - - # These were added in 0.10.0 - if flask_version >= (0, 10): - props.append("appcontext_pushed.receivers_for") - props.append("appcontext_popped.receivers_for") - props.append("message_flashed.receivers_for") - - # These were removed in 2.2.0 - if flask_version < (2, 2, 0): - props.append("Flask.try_trigger_before_first_request_functions") - - # These were removed in 2.3.0 - if flask_version < (2, 3, 0): - props.append("Flask.before_first_request") - props.append("Blueprint.before_app_first_request") - - for prop in props: - # Handle 'flask.request_started.receivers_for' - obj = flask - - # v0.9.0 missed importing `appcontext_tearing_down` in `flask/__init__.py` - # https://github.com/pallets/flask/blob/0.9/flask/__init__.py#L35-L37 - # https://github.com/pallets/flask/blob/0.9/flask/signals.py#L52 - # DEV: Version 0.9 doesn't have a patch version - if flask_version <= (0, 9) and prop == "appcontext_tearing_down.receivers_for": - obj = flask.signals - - if "." in prop: - attr, _, prop = prop.partition(".") - obj = getattr(obj, attr, object()) - _u(obj, prop) - - -@with_instance_pin -def patched_wsgi_app(pin, wrapped, instance, args, kwargs): - # This wrapper is the starting point for all requests. - # DEV: This is safe before this is the args for a WSGI handler - # https://www.python.org/dev/peps/pep-3333/ - environ, start_response = args - middleware = _FlaskWSGIMiddleware(wrapped, pin.tracer, config.flask, pin) - return middleware(environ, start_response) - - -def patched_finalize_request(wrapped, instance, args, kwargs): - """ - Wrapper for flask.app.Flask.finalize_request - """ - rv = wrapped(*args, **kwargs) - response = None - headers = None - if getattr(rv, "is_sequence", False): - response = rv.response - headers = rv.headers - core.dispatch("flask.finalize_request.post", (response, headers)) - return rv - - -def patched_blueprint_register(wrapped, instance, args, kwargs): - """ - Wrapper for flask.blueprints.Blueprint.register - - This wrapper just ensures the blueprint has a pin, either set manually on - itself from the user or inherited from the application - """ - app = get_argument_value(args, kwargs, 0, "app") - # Check if this Blueprint has a pin, otherwise clone the one from the app onto it - pin = Pin.get_from(instance) - if not pin: - pin = Pin.get_from(app) - if pin: - pin.clone().onto(instance) - return wrapped(*args, **kwargs) - - -def patched_blueprint_add_url_rule(wrapped, instance, args, kwargs): - pin = Pin._find(wrapped, instance) - if not pin: - return wrapped(*args, **kwargs) - - def _wrap(rule, endpoint=None, view_func=None, **kwargs): - if view_func: - pin.clone().onto(view_func) - return wrapped(rule, endpoint=endpoint, view_func=view_func, **kwargs) - - return _wrap(*args, **kwargs) - - -def patched_add_url_rule(wrapped, instance, args, kwargs): - """Wrapper for flask.app.Flask.add_url_rule to wrap all views attached to this app""" - - def _wrap(rule, endpoint=None, view_func=None, provide_automatic_options=None, **kwargs): - if view_func: - # TODO: `if hasattr(view_func, 'view_class')` then this was generated from a `flask.views.View` - # should we do something special with these views? Change the name/resource? Add tags? - view_func = wrap_view(instance, view_func, name=endpoint, resource=rule) - - return wrapped( - rule, endpoint=endpoint, view_func=view_func, provide_automatic_options=provide_automatic_options, **kwargs - ) - - return _wrap(*args, **kwargs) - - -def patched_endpoint(wrapped, instance, args, kwargs): - """Wrapper for flask.app.Flask.endpoint to ensure all endpoints are wrapped""" - endpoint = kwargs.get("endpoint", args[0]) - - def _wrapper(func): - return wrapped(endpoint)(wrap_function(instance, func, resource=endpoint)) - - return _wrapper - - -def patched_flask_hook(wrapped, instance, args, kwargs): - func = get_argument_value(args, kwargs, 0, "f") - return wrapped(wrap_function(instance, func)) - - -def traced_render_template(wrapped, instance, args, kwargs): - return _build_render_template_wrapper("render_template")(wrapped, instance, args, kwargs) - - -def traced_render_template_string(wrapped, instance, args, kwargs): - return _build_render_template_wrapper("render_template_string")(wrapped, instance, args, kwargs) - - -def _build_render_template_wrapper(name): - name = "flask.%s" % name - - def traced_render(wrapped, instance, args, kwargs): - pin = Pin._find(wrapped, instance, get_current_app()) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - with core.context_with_data( - "flask.render_template", - span_name=name, - pin=pin, - flask_config=config.flask, - tags={COMPONENT: config.flask.integration_name}, - span_type=SpanTypes.TEMPLATE, - call_key=[name + ".call", "current_span"], - ) as ctx, ctx.get_item(name + ".call"): - return wrapped(*args, **kwargs) - - return traced_render - - -def patched_render(wrapped, instance, args, kwargs): - pin = Pin._find(wrapped, instance, get_current_app()) - - if not pin.enabled: - return wrapped(*args, **kwargs) - - def _wrap(template, context, app): - core.dispatch("flask.render", (template, config.flask)) - return wrapped(*args, **kwargs) - - return _wrap(*args, **kwargs) - - -def patched__register_error_handler(wrapped, instance, args, kwargs): - def _wrap(key, code_or_exception, f): - return wrapped(key, code_or_exception, wrap_function(instance, f)) - - return _wrap(*args, **kwargs) - - -def patched_register_error_handler(wrapped, instance, args, kwargs): - def _wrap(code_or_exception, f): - return wrapped(code_or_exception, wrap_function(instance, f)) - - return _wrap(*args, **kwargs) - - -def _block_request_callable(call): - core.set_item(HTTP_REQUEST_BLOCKED, STATUS_403_TYPE_AUTO) - core.dispatch("flask.blocked_request_callable", (call,)) - ctype = _ctype_from_headers(STATUS_403_TYPE_AUTO, flask.request.headers) - abort(flask.Response(http_utils._get_blocked_template(ctype), content_type=ctype, status=403)) - - -def request_patcher(name): - @with_instance_pin - def _patched_request(pin, wrapped, instance, args, kwargs): - with core.context_with_data( - "flask._patched_request", - span_name=".".join(("flask", name)), - pin=pin, - service=trace_utils.int_service(pin, config.flask, pin), - flask_config=config.flask, - flask_request=flask.request, - block_request_callable=_block_request_callable, - ignored_exception_type=NotFound, - call_key="flask_request_call", - tags={COMPONENT: config.flask.integration_name}, - ) as ctx, ctx.get_item("flask_request_call"): - core.dispatch("flask._patched_request", (ctx,)) - return wrapped(*args, **kwargs) - - return _patched_request - - -def patched_signal_receivers_for(signal): - def outer(wrapped, instance, args, kwargs): - sender = get_argument_value(args, kwargs, 0, "sender") - # See if they gave us the flask.app.Flask as the sender - app = None - if isinstance(sender, flask.Flask): - app = sender - for receiver in wrapped(*args, **kwargs): - yield _wrap_call_with_pin_check(receiver, app, func_name(receiver), signal=signal) - - return outer - - -def patched_jsonify(wrapped, instance, args, kwargs): - pin = Pin._find(wrapped, instance, get_current_app()) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - with core.context_with_data( - "flask.jsonify", - span_name="flask.jsonify", - flask_config=config.flask, - tags={COMPONENT: config.flask.integration_name}, - pin=pin, - call_key="flask_jsonify_call", - ) as ctx, ctx.get_item("flask_jsonify_call"): - return wrapped(*args, **kwargs) +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/flask/wrappers.py b/ddtrace/contrib/flask/wrappers.py index e1a7e470bca..be71b3cfe5b 100644 --- a/ddtrace/contrib/flask/wrappers.py +++ b/ddtrace/contrib/flask/wrappers.py @@ -1,96 +1,15 @@ -import flask +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate -from ddtrace import config -from ddtrace.contrib import trace_utils -from ddtrace.internal import core -from ddtrace.internal.constants import COMPONENT -from ddtrace.vendor.wrapt import function_wrapper +from ..internal.flask.wrappers import * # noqa: F401,F403 -from ...internal.logger import get_logger -from ...internal.utils.importlib import func_name -from ...pin import Pin +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) -log = get_logger(__name__) - - -def wrap_view(instance, func, name=None, resource=None): - return _wrap_call_with_pin_check(func, instance, name or func_name(func), resource=resource, do_dispatch=True) - - -def get_current_app(): - """Helper to get the flask.app.Flask from the current app context""" - try: - return flask.current_app - except RuntimeError: - # raised if current_app is None: https://github.com/pallets/flask/blob/2.1.3/src/flask/globals.py#L40 - pass - return None - - -def _wrap_call( - wrapped, pin, name, resource=None, signal=None, span_type=None, do_dispatch=False, args=None, kwargs=None -): - args = args or [] - kwargs = kwargs or {} - tags = {COMPONENT: config.flask.integration_name} - if signal: - tags["flask.signal"] = signal - with core.context_with_data( - "flask.call", - span_name=name, - pin=pin, - resource=resource, - service=trace_utils.int_service(pin, config.flask), - span_type=span_type, - tags=tags, - call_key="flask_call", - ) as ctx, ctx.get_item("flask_call"): - if do_dispatch: - result = core.dispatch_with_results("flask.wrapped_view", (kwargs,)).callback_and_args - if result: - callback_block, _kwargs = result.value - if callback_block: - return callback_block() - if _kwargs: - for k in kwargs: - kwargs[k] = _kwargs[k] - return wrapped(*args, **kwargs) - - -def _wrap_call_with_pin_check(func, instance, name, resource=None, signal=None, do_dispatch=False): - @function_wrapper - def patch_func(wrapped, _instance, args, kwargs): - pin = Pin._find(wrapped, _instance, instance, get_current_app()) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - return _wrap_call( - wrapped, pin, name, resource=resource, signal=signal, do_dispatch=do_dispatch, args=args, kwargs=kwargs - ) - - return patch_func(func) - - -def wrap_function(instance, func, name=None, resource=None): - return _wrap_call_with_pin_check(func, instance, name or func_name(func), resource=resource) - - -def simple_call_wrapper(name, span_type=None): - @with_instance_pin - def wrapper(pin, wrapped, instance, args, kwargs): - return _wrap_call(wrapped, pin, name, span_type=span_type, args=args, kwargs=kwargs) - - return wrapper - - -def with_instance_pin(func): - """Helper to wrap a function wrapper and ensure an enabled pin is available for the `instance`""" - - def wrapper(wrapped, instance, args, kwargs): - pin = Pin._find(wrapped, instance, get_current_app()) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - return func(pin, wrapped, instance, args, kwargs) - - return wrapper + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/flask_cache/__init__.py b/ddtrace/contrib/flask_cache/__init__.py index a859b533008..895664d2f1c 100644 --- a/ddtrace/contrib/flask_cache/__init__.py +++ b/ddtrace/contrib/flask_cache/__init__.py @@ -51,7 +51,11 @@ def counter(): with require_modules(required_modules) as missing_modules: if len(missing_modules) < len(required_modules): - from .tracers import get_traced_cache - from .tracers import get_version + # Required to allow users to import from `ddtrace.contrib.aiohttp.patch` directly + from . import tracers as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.flask_cache.tracers import get_traced_cache + from ..internal.flask_cache.tracers import get_version __all__ = ["get_traced_cache", "get_version"] diff --git a/ddtrace/contrib/flask_cache/tracers.py b/ddtrace/contrib/flask_cache/tracers.py index aba1b95e25c..fe9e1dc60ff 100644 --- a/ddtrace/contrib/flask_cache/tracers.py +++ b/ddtrace/contrib/flask_cache/tracers.py @@ -1,188 +1,15 @@ -""" -Datadog trace code for flask_cache -""" +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate -import logging -import typing +from ..internal.flask_cache.tracers import * # noqa: F401,F403 -from ddtrace import config -from ddtrace.internal.constants import COMPONENT -from ...constants import ANALYTICS_SAMPLE_RATE_KEY -from ...constants import SPAN_MEASURED_KEY -from ...ext import SpanTypes -from ...ext import db -from ...internal.schema import schematize_cache_operation -from ...internal.schema import schematize_service_name -from .utils import _extract_client -from .utils import _extract_conn_tags -from .utils import _resource_from_cache_prefix +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) - -if typing.TYPE_CHECKING: # pragma: no cover - from ddtrace import Span # noqa:F401 - - -log = logging.Logger(__name__) - -DEFAULT_SERVICE = config.service or schematize_service_name("flask-cache") - -# standard tags -COMMAND_KEY = "flask_cache.key" -CACHE_BACKEND = "flask_cache.backend" -CONTACT_POINTS = "flask_cache.contact_points" - - -def get_version(): - # type: () -> str - try: - import flask_caching - - return getattr(flask_caching, "__version__", "") - except ImportError: - return "" - - -def get_traced_cache(ddtracer, service=DEFAULT_SERVICE, meta=None, cache_cls=None): - """ - Return a traced Cache object that behaves exactly as ``cache_cls``. - - ``cache_cls`` defaults to ``flask.ext.cache.Cache`` if Flask-Cache is installed - or ``flask_caching.Cache`` if flask-caching is installed. - """ - - if cache_cls is None: - # for compatibility reason, first check if flask_cache is present - try: - from flask.ext.cache import Cache - - cache_cls = Cache - except ImportError: - # use flask_caching if flask_cache is not present - from flask_caching import Cache - - cache_cls = Cache - - class TracedCache(cache_cls): - """ - Traced cache backend that monitors any operations done by flask_cache. Observed actions are: - * get, set, add, delete, clear - * all ``many_`` operations - """ - - _datadog_tracer = ddtracer - _datadog_service = service - _datadog_meta = meta - - def __trace(self, cmd): - # type: (str, bool) -> Span - """ - Start a tracing with default attributes and tags - """ - # create a new span - s = self._datadog_tracer.trace( - schematize_cache_operation(cmd, cache_provider="flask_cache"), - span_type=SpanTypes.CACHE, - service=self._datadog_service, - ) - - s.set_tag_str(COMPONENT, config.flask_cache.integration_name) - - s.set_tag(SPAN_MEASURED_KEY) - # set span tags - s.set_tag_str(CACHE_BACKEND, self.config.get("CACHE_TYPE")) - s.set_tags(self._datadog_meta) - # set analytics sample rate - s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.flask_cache.get_analytics_sample_rate()) - # add connection meta if there is one - client = _extract_client(self.cache) - if client is not None: - try: - s.set_tags(_extract_conn_tags(client)) - except Exception: - log.debug("error parsing connection tags", exc_info=True) - - return s - - def get(self, *args, **kwargs): - """ - Track ``get`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("GET", self.config) - if len(args) > 0: - span.set_tag_str(COMMAND_KEY, args[0]) - result = super(TracedCache, self).get(*args, **kwargs) - span.set_metric(db.ROWCOUNT, 1 if result else 0) - return result - - def set(self, *args, **kwargs): - """ - Track ``set`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("SET", self.config) - if len(args) > 0: - span.set_tag_str(COMMAND_KEY, args[0]) - return super(TracedCache, self).set(*args, **kwargs) - - def add(self, *args, **kwargs): - """ - Track ``add`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("ADD", self.config) - if len(args) > 0: - span.set_tag_str(COMMAND_KEY, args[0]) - return super(TracedCache, self).add(*args, **kwargs) - - def delete(self, *args, **kwargs): - """ - Track ``delete`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("DELETE", self.config) - if len(args) > 0: - span.set_tag_str(COMMAND_KEY, args[0]) - return super(TracedCache, self).delete(*args, **kwargs) - - def delete_many(self, *args, **kwargs): - """ - Track ``delete_many`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("DELETE_MANY", self.config) - span.set_tag(COMMAND_KEY, list(args)) - return super(TracedCache, self).delete_many(*args, **kwargs) - - def clear(self, *args, **kwargs): - """ - Track ``clear`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("CLEAR", self.config) - return super(TracedCache, self).clear(*args, **kwargs) - - def get_many(self, *args, **kwargs): - """ - Track ``get_many`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("GET_MANY", self.config) - span.set_tag(COMMAND_KEY, list(args)) - result = super(TracedCache, self).get_many(*args, **kwargs) - # get many returns a list, with either the key value or None if it doesn't exist - span.set_metric(db.ROWCOUNT, sum(1 for val in result if val)) - return result - - def set_many(self, *args, **kwargs): - """ - Track ``set_many`` operation - """ - with self.__trace("flask_cache.cmd") as span: - span.resource = _resource_from_cache_prefix("SET_MANY", self.config) - if len(args) > 0: - span.set_tag(COMMAND_KEY, list(args[0].keys())) - return super(TracedCache, self).set_many(*args, **kwargs) - - return TracedCache + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/flask_cache/utils.py b/ddtrace/contrib/flask_cache/utils.py index d770b1d065b..9acedddfd85 100644 --- a/ddtrace/contrib/flask_cache/utils.py +++ b/ddtrace/contrib/flask_cache/utils.py @@ -1,63 +1,15 @@ -# project -from ddtrace._trace.utils_redis import _extract_conn_tags as extract_redis_tags +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate -from ...ext import net -from ..pylibmc.addrs import parse_addresses +from ..internal.flask_cache.utils import * # noqa: F401,F403 -def _resource_from_cache_prefix(resource, cache): - """ - Combine the resource name with the cache prefix (if any) - """ - if getattr(cache, "key_prefix", None): - name = "{} {}".format(resource, cache.key_prefix) - else: - name = resource +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) - # enforce lowercase to make the output nicer to read - return name.lower() - - -def _extract_client(cache): - """ - Get the client from the cache instance according to the current operation - """ - client = getattr(cache, "_client", None) - if client is None: - # flask-caching has _read_clients & _write_client for the redis backend - # These use the same connection so just try to get a reference to one of them. - # flask-caching < 2.0.0 uses _read_clients so look for that one too. - for attr in ("_write_client", "_read_client", "_read_clients"): - client = getattr(cache, attr, None) - if client is not None: - break - return client - - -def _extract_conn_tags(client): - """ - For the given client extracts connection tags - """ - tags = {} - - if hasattr(client, "servers"): - # Memcached backend supports an address pool - if isinstance(client.servers, list) and len(client.servers) > 0: - # use the first address of the pool as a host because - # the code doesn't expose more information - contact_point = client.servers[0].address - tags[net.TARGET_HOST] = contact_point[0] - tags[net.TARGET_PORT] = contact_point[1] - elif hasattr(client, "connection_pool"): - # Redis main connection - redis_tags = extract_redis_tags(client.connection_pool.connection_kwargs) - tags.update(**redis_tags) - elif hasattr(client, "addresses"): - # pylibmc - # FIXME[matt] should we memoize this? - addrs = parse_addresses(client.addresses) - if addrs: - _, host, port, _ = addrs[0] - tags[net.TARGET_PORT] = port - tags[net.TARGET_HOST] = host - return tags + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/futures/__init__.py b/ddtrace/contrib/futures/__init__.py index c11b61b0a5e..686b400c49f 100644 --- a/ddtrace/contrib/futures/__init__.py +++ b/ddtrace/contrib/futures/__init__.py @@ -23,9 +23,13 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch - from .patch import unpatch + # Required to allow users to import from `ddtrace.contrib.futures.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.futures.patch import get_version + from ..internal.futures.patch import patch + from ..internal.futures.patch import unpatch __all__ = [ "get_version", diff --git a/ddtrace/contrib/futures/patch.py b/ddtrace/contrib/futures/patch.py index 86687d7dead..5bae0b6b648 100644 --- a/ddtrace/contrib/futures/patch.py +++ b/ddtrace/contrib/futures/patch.py @@ -1,43 +1,4 @@ -import sys +from ..internal.futures.patch import * # noqa: F401,F403 -from ddtrace.internal.wrapping import unwrap as _u -from ddtrace.internal.wrapping import wrap as _w -from .threading import _wrap_submit - - -def get_version(): - # type: () -> str - return "" - - -def patch(): - """Enables Context Propagation between threads""" - try: - # Ensure that we get hold of the reloaded module if module cleanup was - # performed. - thread = sys.modules["concurrent.futures.thread"] - except KeyError: - import concurrent.futures.thread as thread - - if getattr(thread, "__datadog_patch", False): - return - thread.__datadog_patch = True - - _w(thread.ThreadPoolExecutor.submit, _wrap_submit) - - -def unpatch(): - """Disables Context Propagation between threads""" - try: - # Ensure that we get hold of the reloaded module if module cleanup was - # performed. - thread = sys.modules["concurrent.futures.thread"] - except KeyError: - return - - if not getattr(thread, "__datadog_patch", False): - return - thread.__datadog_patch = False - - _u(thread.ThreadPoolExecutor.submit, _wrap_submit) +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/futures/threading.py b/ddtrace/contrib/futures/threading.py index deea68e2c17..70c20736509 100644 --- a/ddtrace/contrib/futures/threading.py +++ b/ddtrace/contrib/futures/threading.py @@ -1,36 +1,15 @@ -from typing import Optional +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate -import ddtrace -from ddtrace._trace.context import Context +from ..internal.futures.threading import * # noqa: F401,F403 -def _wrap_submit(func, args, kwargs): - """ - Wrap `Executor` method used to submit a work executed in another - thread. This wrapper ensures that a new `Context` is created and - properly propagated using an intermediate function. - """ - # DEV: Be sure to propagate a Context and not a Span since we are crossing thread boundaries - current_ctx: Optional[Context] = ddtrace.tracer.current_trace_context() +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) - # The target function can be provided as a kwarg argument "fn" or the first positional argument - self = args[0] - if "fn" in kwargs: - fn = kwargs.pop("fn") - fn_args = args[1:] - else: - fn, fn_args = args[1], args[2:] - return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs) - - -def _wrap_execution(ctx: Optional[Context], fn, args, kwargs): - """ - Intermediate target function that is executed in a new thread; - it receives the original function with arguments and keyword - arguments, including our tracing `Context`. The current context - provider sets the Active context in a thread local storage - variable because it's outside the asynchronous loop. - """ - if ctx is not None: - ddtrace.tracer.context_provider.activate(ctx) - return fn(*args, **kwargs) + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/internal/fastapi/patch.py b/ddtrace/contrib/internal/fastapi/patch.py new file mode 100644 index 00000000000..974dcf64d84 --- /dev/null +++ b/ddtrace/contrib/internal/fastapi/patch.py @@ -0,0 +1,103 @@ +import os + +import fastapi +import fastapi.routing + +from ddtrace import Pin +from ddtrace import config +from ddtrace.contrib.asgi.middleware import TraceMiddleware +from ddtrace.contrib.starlette.patch import _trace_background_tasks +from ddtrace.contrib.starlette.patch import traced_handler +from ddtrace.internal.logger import get_logger +from ddtrace.internal.schema import schematize_service_name +from ddtrace.internal.utils.wrappers import unwrap as _u +from ddtrace.vendor.wrapt import ObjectProxy +from ddtrace.vendor.wrapt import wrap_function_wrapper as _w + + +log = get_logger(__name__) + +config._add( + "fastapi", + dict( + _default_service=schematize_service_name("fastapi"), + request_span_name="fastapi.request", + distributed_tracing=True, + trace_query_string=None, # Default to global config + _trace_asgi_websocket=os.getenv("DD_ASGI_TRACE_WEBSOCKET", default=False), + ), +) + + +def get_version(): + # type: () -> str + return getattr(fastapi, "__version__", "") + + +def wrap_middleware_stack(wrapped, instance, args, kwargs): + return TraceMiddleware(app=wrapped(*args, **kwargs), integration_config=config.fastapi) + + +async def traced_serialize_response(wrapped, instance, args, kwargs): + """Wrapper for fastapi.routing.serialize_response function. + + This function is called on all non-Response objects to + convert them to a serializable form. + + This is the wrapper which calls ``jsonable_encoder``. + + This function does not do the actual encoding from + obj -> json string (e.g. json.dumps()). That is handled + by the Response.render function. + + DEV: We do not wrap ``jsonable_encoder`` because it calls + itself recursively, so there is a chance the overhead + added by creating spans will be higher than desired for + the result. + """ + pin = Pin.get_from(fastapi) + if not pin or not pin.enabled(): + return await wrapped(*args, **kwargs) + + with pin.tracer.trace("fastapi.serialize_response"): + return await wrapped(*args, **kwargs) + + +def patch(): + if getattr(fastapi, "_datadog_patch", False): + return + + fastapi._datadog_patch = True + Pin().onto(fastapi) + _w("fastapi.applications", "FastAPI.build_middleware_stack", wrap_middleware_stack) + _w("fastapi.routing", "serialize_response", traced_serialize_response) + + if not isinstance(fastapi.BackgroundTasks.add_task, ObjectProxy): + _w("fastapi", "BackgroundTasks.add_task", _trace_background_tasks(fastapi)) + + # We need to check that Starlette instrumentation hasn't already patched these + if not isinstance(fastapi.routing.APIRoute.handle, ObjectProxy): + _w("fastapi.routing", "APIRoute.handle", traced_handler) + + if not isinstance(fastapi.routing.Mount.handle, ObjectProxy): + _w("starlette.routing", "Mount.handle", traced_handler) + + +def unpatch(): + if not getattr(fastapi, "_datadog_patch", False): + return + + fastapi._datadog_patch = False + + _u(fastapi.applications.FastAPI, "build_middleware_stack") + _u(fastapi.routing, "serialize_response") + + # We need to check that Starlette instrumentation hasn't already unpatched these + if isinstance(fastapi.routing.APIRoute.handle, ObjectProxy): + _u(fastapi.routing.APIRoute, "handle") + + if isinstance(fastapi.routing.Mount.handle, ObjectProxy): + _u(fastapi.routing.Mount, "handle") + + if isinstance(fastapi.BackgroundTasks.add_task, ObjectProxy): + _u(fastapi.BackgroundTasks, "add_task") diff --git a/ddtrace/contrib/internal/flask/patch.py b/ddtrace/contrib/internal/flask/patch.py new file mode 100644 index 00000000000..7645c3aa369 --- /dev/null +++ b/ddtrace/contrib/internal/flask/patch.py @@ -0,0 +1,564 @@ +import flask +import werkzeug +from werkzeug.exceptions import BadRequest +from werkzeug.exceptions import NotFound +from werkzeug.exceptions import abort + +from ddtrace._trace.trace_handlers import _ctype_from_headers +from ddtrace.contrib import trace_utils +from ddtrace.ext import SpanTypes +from ddtrace.internal import core +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.constants import HTTP_REQUEST_BLOCKED +from ddtrace.internal.constants import STATUS_403_TYPE_AUTO +from ddtrace.internal.packages import get_version_for_package +from ddtrace.internal.schema import schematize_service_name +from ddtrace.internal.schema import schematize_url_operation +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils import http as http_utils + + +# Not all versions of flask/werkzeug have this mixin +try: + from werkzeug.wrappers.json import JSONMixin + + _HAS_JSON_MIXIN = True +except ImportError: + _HAS_JSON_MIXIN = False + +from ddtrace import Pin +from ddtrace import config +from ddtrace.contrib.trace_utils import unwrap as _u +from ddtrace.contrib.wsgi.wsgi import _DDWSGIMiddlewareBase +from ddtrace.internal.logger import get_logger +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils.importlib import func_name +from ddtrace.internal.utils.version import parse_version +from ddtrace.vendor.wrapt import wrap_function_wrapper as _w + +from .wrappers import _wrap_call_with_pin_check +from .wrappers import get_current_app +from .wrappers import simple_call_wrapper +from .wrappers import with_instance_pin +from .wrappers import wrap_function +from .wrappers import wrap_view + + +try: + from json import JSONDecodeError +except ImportError: + # handling python 2.X import error + JSONDecodeError = ValueError # type: ignore + + +log = get_logger(__name__) + +FLASK_VERSION = "flask.version" +_BODY_METHODS = {"POST", "PUT", "DELETE", "PATCH"} + +# Configure default configuration +config._add( + "flask", + dict( + # Flask service configuration + _default_service=schematize_service_name("flask"), + collect_view_args=True, + distributed_tracing_enabled=True, + template_default_name="", + trace_signals=True, + ), +) + + +def get_version(): + # type: () -> str + return get_version_for_package("flask") + + +if _HAS_JSON_MIXIN: + + class RequestWithJson(werkzeug.Request, JSONMixin): + pass + + _RequestType = RequestWithJson +else: + _RequestType = werkzeug.Request + +# Extract flask version into a tuple e.g. (0, 12, 1) or (1, 0, 2) +# DEV: This makes it so we can do `if flask_version >= (0, 12, 0):` +# DEV: Example tests: +# (0, 10, 0) > (0, 10) +# (0, 10, 0) >= (0, 10, 0) +# (0, 10, 1) >= (0, 10) +# (0, 11, 1) >= (0, 10) +# (0, 11, 1) >= (0, 10, 2) +# (1, 0, 0) >= (0, 10) +# (0, 9) == (0, 9) +# (0, 9, 0) != (0, 9) +# (0, 8, 5) <= (0, 9) +flask_version_str = get_version() +flask_version = parse_version(flask_version_str) + + +class _FlaskWSGIMiddleware(_DDWSGIMiddlewareBase): + _request_call_name = schematize_url_operation("flask.request", protocol="http", direction=SpanDirection.INBOUND) + _application_call_name = "flask.application" + _response_call_name = "flask.response" + + def _wrapped_start_response(self, start_response, ctx, status_code, headers, exc_info=None): + core.dispatch("flask.start_response.pre", (flask.request, ctx, config.flask, status_code, headers)) + if not core.get_item(HTTP_REQUEST_BLOCKED): + headers_from_context = "" + result_waf = core.dispatch_with_results("flask.start_response", ("Flask",)).waf + if result_waf: + headers_from_context = result_waf.value + if core.get_item(HTTP_REQUEST_BLOCKED): + # response code must be set here, or it will be too late + result_content = core.dispatch_with_results("flask.block.request.content", ()).block_requested + if result_content: + _, status, response_headers = result_content.value + result = start_response(str(status), response_headers) + else: + block_config = core.get_item(HTTP_REQUEST_BLOCKED) + desired_type = block_config.get("type", "auto") + status = block_config.get("status_code", 403) + if desired_type == "none": + response_headers = [] + else: + ctype = _ctype_from_headers(block_config, headers_from_context) + response_headers = [("content-type", ctype)] + result = start_response(str(status), response_headers) + core.dispatch("flask.start_response.blocked", (ctx, config.flask, response_headers, status)) + else: + result = start_response(status_code, headers) + else: + result = start_response(status_code, headers) + return result + + def _request_call_modifier(self, ctx, parsed_headers=None): + environ = ctx.get_item("environ") + # Create a werkzeug request from the `environ` to make interacting with it easier + # DEV: This executes before a request context is created + request = _RequestType(environ) + + req_body = None + result = core.dispatch_with_results( + "flask.request_call_modifier", + ( + ctx, + config.flask, + request, + environ, + _HAS_JSON_MIXIN, + FLASK_VERSION, + flask_version_str, + BadRequest, + ), + ).request_body + if result: + req_body = result.value + core.dispatch("flask.request_call_modifier.post", (ctx, config.flask, request, req_body)) + + +def patch(): + """ + Patch `flask` module for tracing + """ + # Check to see if we have patched Flask yet or not + if getattr(flask, "_datadog_patch", False): + return + flask._datadog_patch = True + + Pin().onto(flask.Flask) + core.dispatch("flask.patch", (flask_version,)) + # flask.app.Flask methods that have custom tracing (add metadata, wrap functions, etc) + _w("flask", "Flask.wsgi_app", patched_wsgi_app) + _w("flask", "Flask.dispatch_request", request_patcher("dispatch_request")) + _w("flask", "Flask.preprocess_request", request_patcher("preprocess_request")) + _w("flask", "Flask.add_url_rule", patched_add_url_rule) + _w("flask", "Flask.endpoint", patched_endpoint) + + _w("flask", "Flask.finalize_request", patched_finalize_request) + + if flask_version >= (2, 0, 0): + _w("flask", "Flask.register_error_handler", patched_register_error_handler) + else: + _w("flask", "Flask._register_error_handler", patched__register_error_handler) + + # flask.blueprints.Blueprint methods that have custom tracing (add metadata, wrap functions, etc) + _w("flask", "Blueprint.register", patched_blueprint_register) + _w("flask", "Blueprint.add_url_rule", patched_blueprint_add_url_rule) + + flask_hooks = [ + "before_request", + "after_request", + "teardown_request", + "teardown_appcontext", + ] + if flask_version < (2, 3, 0): + flask_hooks.append("before_first_request") + + for hook in flask_hooks: + _w("flask", "Flask.{}".format(hook), patched_flask_hook) + _w("flask", "after_this_request", patched_flask_hook) + + flask_app_traces = [ + "process_response", + "handle_exception", + "handle_http_exception", + "handle_user_exception", + "do_teardown_request", + "do_teardown_appcontext", + "send_static_file", + ] + if flask_version < (2, 2, 0): + flask_app_traces.append("try_trigger_before_first_request_functions") + + for name in flask_app_traces: + _w("flask", "Flask.{}".format(name), simple_call_wrapper("flask.{}".format(name))) + # flask static file helpers + _w("flask", "send_file", simple_call_wrapper("flask.send_file")) + + # flask.json.jsonify + _w("flask", "jsonify", patched_jsonify) + + _w("flask.templating", "_render", patched_render) + _w("flask", "render_template", _build_render_template_wrapper("render_template")) + _w("flask", "render_template_string", _build_render_template_wrapper("render_template_string")) + + bp_hooks = [ + "after_app_request", + "after_request", + "before_app_request", + "before_request", + "teardown_request", + "teardown_app_request", + ] + if flask_version < (2, 3, 0): + bp_hooks.append("before_app_first_request") + + for hook in bp_hooks: + _w("flask", "Blueprint.{}".format(hook), patched_flask_hook) + + if config.flask["trace_signals"]: + signals = [ + "template_rendered", + "request_started", + "request_finished", + "request_tearing_down", + "got_request_exception", + "appcontext_tearing_down", + ] + # These were added in 0.11.0 + if flask_version >= (0, 11): + signals.append("before_render_template") + + # These were added in 0.10.0 + if flask_version >= (0, 10): + signals.append("appcontext_pushed") + signals.append("appcontext_popped") + signals.append("message_flashed") + + for signal in signals: + module = "flask" + + # v0.9 missed importing `appcontext_tearing_down` in `flask/__init__.py` + # https://github.com/pallets/flask/blob/0.9/flask/__init__.py#L35-L37 + # https://github.com/pallets/flask/blob/0.9/flask/signals.py#L52 + # DEV: Version 0.9 doesn't have a patch version + if flask_version <= (0, 9) and signal == "appcontext_tearing_down": + module = "flask.signals" + + # DEV: Patch `receivers_for` instead of `connect` to ensure we don't mess with `disconnect` + _w(module, "{}.receivers_for".format(signal), patched_signal_receivers_for(signal)) + + +def unpatch(): + if not getattr(flask, "_datadog_patch", False): + return + flask._datadog_patch = False + + props = [ + # Flask + "Flask.wsgi_app", + "Flask.dispatch_request", + "Flask.add_url_rule", + "Flask.endpoint", + "Flask.preprocess_request", + "Flask.process_response", + "Flask.handle_exception", + "Flask.handle_http_exception", + "Flask.handle_user_exception", + "Flask.do_teardown_request", + "Flask.do_teardown_appcontext", + "Flask.send_static_file", + # Flask Hooks + "Flask.before_request", + "Flask.after_request", + "Flask.teardown_request", + "Flask.teardown_appcontext", + # Blueprint + "Blueprint.register", + "Blueprint.add_url_rule", + # Blueprint Hooks + "Blueprint.after_app_request", + "Blueprint.after_request", + "Blueprint.before_app_request", + "Blueprint.before_request", + "Blueprint.teardown_request", + "Blueprint.teardown_app_request", + # Signals + "template_rendered.receivers_for", + "request_started.receivers_for", + "request_finished.receivers_for", + "request_tearing_down.receivers_for", + "got_request_exception.receivers_for", + "appcontext_tearing_down.receivers_for", + # Top level props + "after_this_request", + "send_file", + "jsonify", + "render_template", + "render_template_string", + "templating._render", + ] + + props.append("Flask.finalize_request") + + if flask_version >= (2, 0, 0): + props.append("Flask.register_error_handler") + else: + props.append("Flask._register_error_handler") + + # These were added in 0.11.0 + if flask_version >= (0, 11): + props.append("before_render_template.receivers_for") + + # These were added in 0.10.0 + if flask_version >= (0, 10): + props.append("appcontext_pushed.receivers_for") + props.append("appcontext_popped.receivers_for") + props.append("message_flashed.receivers_for") + + # These were removed in 2.2.0 + if flask_version < (2, 2, 0): + props.append("Flask.try_trigger_before_first_request_functions") + + # These were removed in 2.3.0 + if flask_version < (2, 3, 0): + props.append("Flask.before_first_request") + props.append("Blueprint.before_app_first_request") + + for prop in props: + # Handle 'flask.request_started.receivers_for' + obj = flask + + # v0.9.0 missed importing `appcontext_tearing_down` in `flask/__init__.py` + # https://github.com/pallets/flask/blob/0.9/flask/__init__.py#L35-L37 + # https://github.com/pallets/flask/blob/0.9/flask/signals.py#L52 + # DEV: Version 0.9 doesn't have a patch version + if flask_version <= (0, 9) and prop == "appcontext_tearing_down.receivers_for": + obj = flask.signals + + if "." in prop: + attr, _, prop = prop.partition(".") + obj = getattr(obj, attr, object()) + _u(obj, prop) + + +@with_instance_pin +def patched_wsgi_app(pin, wrapped, instance, args, kwargs): + # This wrapper is the starting point for all requests. + # DEV: This is safe before this is the args for a WSGI handler + # https://www.python.org/dev/peps/pep-3333/ + environ, start_response = args + middleware = _FlaskWSGIMiddleware(wrapped, pin.tracer, config.flask, pin) + return middleware(environ, start_response) + + +def patched_finalize_request(wrapped, instance, args, kwargs): + """ + Wrapper for flask.app.Flask.finalize_request + """ + rv = wrapped(*args, **kwargs) + response = None + headers = None + if getattr(rv, "is_sequence", False): + response = rv.response + headers = rv.headers + core.dispatch("flask.finalize_request.post", (response, headers)) + return rv + + +def patched_blueprint_register(wrapped, instance, args, kwargs): + """ + Wrapper for flask.blueprints.Blueprint.register + + This wrapper just ensures the blueprint has a pin, either set manually on + itself from the user or inherited from the application + """ + app = get_argument_value(args, kwargs, 0, "app") + # Check if this Blueprint has a pin, otherwise clone the one from the app onto it + pin = Pin.get_from(instance) + if not pin: + pin = Pin.get_from(app) + if pin: + pin.clone().onto(instance) + return wrapped(*args, **kwargs) + + +def patched_blueprint_add_url_rule(wrapped, instance, args, kwargs): + pin = Pin._find(wrapped, instance) + if not pin: + return wrapped(*args, **kwargs) + + def _wrap(rule, endpoint=None, view_func=None, **kwargs): + if view_func: + pin.clone().onto(view_func) + return wrapped(rule, endpoint=endpoint, view_func=view_func, **kwargs) + + return _wrap(*args, **kwargs) + + +def patched_add_url_rule(wrapped, instance, args, kwargs): + """Wrapper for flask.app.Flask.add_url_rule to wrap all views attached to this app""" + + def _wrap(rule, endpoint=None, view_func=None, provide_automatic_options=None, **kwargs): + if view_func: + # TODO: `if hasattr(view_func, 'view_class')` then this was generated from a `flask.views.View` + # should we do something special with these views? Change the name/resource? Add tags? + view_func = wrap_view(instance, view_func, name=endpoint, resource=rule) + + return wrapped( + rule, endpoint=endpoint, view_func=view_func, provide_automatic_options=provide_automatic_options, **kwargs + ) + + return _wrap(*args, **kwargs) + + +def patched_endpoint(wrapped, instance, args, kwargs): + """Wrapper for flask.app.Flask.endpoint to ensure all endpoints are wrapped""" + endpoint = kwargs.get("endpoint", args[0]) + + def _wrapper(func): + return wrapped(endpoint)(wrap_function(instance, func, resource=endpoint)) + + return _wrapper + + +def patched_flask_hook(wrapped, instance, args, kwargs): + func = get_argument_value(args, kwargs, 0, "f") + return wrapped(wrap_function(instance, func)) + + +def traced_render_template(wrapped, instance, args, kwargs): + return _build_render_template_wrapper("render_template")(wrapped, instance, args, kwargs) + + +def traced_render_template_string(wrapped, instance, args, kwargs): + return _build_render_template_wrapper("render_template_string")(wrapped, instance, args, kwargs) + + +def _build_render_template_wrapper(name): + name = "flask.%s" % name + + def traced_render(wrapped, instance, args, kwargs): + pin = Pin._find(wrapped, instance, get_current_app()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + with core.context_with_data( + "flask.render_template", + span_name=name, + pin=pin, + flask_config=config.flask, + tags={COMPONENT: config.flask.integration_name}, + span_type=SpanTypes.TEMPLATE, + call_key=[name + ".call", "current_span"], + ) as ctx, ctx.get_item(name + ".call"): + return wrapped(*args, **kwargs) + + return traced_render + + +def patched_render(wrapped, instance, args, kwargs): + pin = Pin._find(wrapped, instance, get_current_app()) + + if not pin.enabled: + return wrapped(*args, **kwargs) + + def _wrap(template, context, app): + core.dispatch("flask.render", (template, config.flask)) + return wrapped(*args, **kwargs) + + return _wrap(*args, **kwargs) + + +def patched__register_error_handler(wrapped, instance, args, kwargs): + def _wrap(key, code_or_exception, f): + return wrapped(key, code_or_exception, wrap_function(instance, f)) + + return _wrap(*args, **kwargs) + + +def patched_register_error_handler(wrapped, instance, args, kwargs): + def _wrap(code_or_exception, f): + return wrapped(code_or_exception, wrap_function(instance, f)) + + return _wrap(*args, **kwargs) + + +def _block_request_callable(call): + core.set_item(HTTP_REQUEST_BLOCKED, STATUS_403_TYPE_AUTO) + core.dispatch("flask.blocked_request_callable", (call,)) + ctype = _ctype_from_headers(STATUS_403_TYPE_AUTO, flask.request.headers) + abort(flask.Response(http_utils._get_blocked_template(ctype), content_type=ctype, status=403)) + + +def request_patcher(name): + @with_instance_pin + def _patched_request(pin, wrapped, instance, args, kwargs): + with core.context_with_data( + "flask._patched_request", + span_name=".".join(("flask", name)), + pin=pin, + service=trace_utils.int_service(pin, config.flask, pin), + flask_config=config.flask, + flask_request=flask.request, + block_request_callable=_block_request_callable, + ignored_exception_type=NotFound, + call_key="flask_request_call", + tags={COMPONENT: config.flask.integration_name}, + ) as ctx, ctx.get_item("flask_request_call"): + core.dispatch("flask._patched_request", (ctx,)) + return wrapped(*args, **kwargs) + + return _patched_request + + +def patched_signal_receivers_for(signal): + def outer(wrapped, instance, args, kwargs): + sender = get_argument_value(args, kwargs, 0, "sender") + # See if they gave us the flask.app.Flask as the sender + app = None + if isinstance(sender, flask.Flask): + app = sender + for receiver in wrapped(*args, **kwargs): + yield _wrap_call_with_pin_check(receiver, app, func_name(receiver), signal=signal) + + return outer + + +def patched_jsonify(wrapped, instance, args, kwargs): + pin = Pin._find(wrapped, instance, get_current_app()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + with core.context_with_data( + "flask.jsonify", + span_name="flask.jsonify", + flask_config=config.flask, + tags={COMPONENT: config.flask.integration_name}, + pin=pin, + call_key="flask_jsonify_call", + ) as ctx, ctx.get_item("flask_jsonify_call"): + return wrapped(*args, **kwargs) diff --git a/ddtrace/contrib/internal/flask/wrappers.py b/ddtrace/contrib/internal/flask/wrappers.py new file mode 100644 index 00000000000..fc73f894fc1 --- /dev/null +++ b/ddtrace/contrib/internal/flask/wrappers.py @@ -0,0 +1,95 @@ +import flask + +from ddtrace import config +from ddtrace.contrib import trace_utils +from ddtrace.internal import core +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.logger import get_logger +from ddtrace.internal.utils.importlib import func_name +from ddtrace.pin import Pin +from ddtrace.vendor.wrapt import function_wrapper + + +log = get_logger(__name__) + + +def wrap_view(instance, func, name=None, resource=None): + return _wrap_call_with_pin_check(func, instance, name or func_name(func), resource=resource, do_dispatch=True) + + +def get_current_app(): + """Helper to get the flask.app.Flask from the current app context""" + try: + return flask.current_app + except RuntimeError: + # raised if current_app is None: https://github.com/pallets/flask/blob/2.1.3/src/flask/globals.py#L40 + pass + return None + + +def _wrap_call( + wrapped, pin, name, resource=None, signal=None, span_type=None, do_dispatch=False, args=None, kwargs=None +): + args = args or [] + kwargs = kwargs or {} + tags = {COMPONENT: config.flask.integration_name} + if signal: + tags["flask.signal"] = signal + with core.context_with_data( + "flask.call", + span_name=name, + pin=pin, + resource=resource, + service=trace_utils.int_service(pin, config.flask), + span_type=span_type, + tags=tags, + call_key="flask_call", + ) as ctx, ctx.get_item("flask_call"): + if do_dispatch: + result = core.dispatch_with_results("flask.wrapped_view", (kwargs,)).callback_and_args + if result: + callback_block, _kwargs = result.value + if callback_block: + return callback_block() + if _kwargs: + for k in kwargs: + kwargs[k] = _kwargs[k] + return wrapped(*args, **kwargs) + + +def _wrap_call_with_pin_check(func, instance, name, resource=None, signal=None, do_dispatch=False): + @function_wrapper + def patch_func(wrapped, _instance, args, kwargs): + pin = Pin._find(wrapped, _instance, instance, get_current_app()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + return _wrap_call( + wrapped, pin, name, resource=resource, signal=signal, do_dispatch=do_dispatch, args=args, kwargs=kwargs + ) + + return patch_func(func) + + +def wrap_function(instance, func, name=None, resource=None): + return _wrap_call_with_pin_check(func, instance, name or func_name(func), resource=resource) + + +def simple_call_wrapper(name, span_type=None): + @with_instance_pin + def wrapper(pin, wrapped, instance, args, kwargs): + return _wrap_call(wrapped, pin, name, span_type=span_type, args=args, kwargs=kwargs) + + return wrapper + + +def with_instance_pin(func): + """Helper to wrap a function wrapper and ensure an enabled pin is available for the `instance`""" + + def wrapper(wrapped, instance, args, kwargs): + pin = Pin._find(wrapped, instance, get_current_app()) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + return func(pin, wrapped, instance, args, kwargs) + + return wrapper diff --git a/ddtrace/contrib/internal/flask_cache/tracers.py b/ddtrace/contrib/internal/flask_cache/tracers.py new file mode 100644 index 00000000000..055f73fc45c --- /dev/null +++ b/ddtrace/contrib/internal/flask_cache/tracers.py @@ -0,0 +1,188 @@ +""" +Datadog trace code for flask_cache +""" + +import logging +import typing + +from ddtrace import config +from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY +from ddtrace.constants import SPAN_MEASURED_KEY +from ddtrace.ext import SpanTypes +from ddtrace.ext import db +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.schema import schematize_cache_operation +from ddtrace.internal.schema import schematize_service_name + +from .utils import _extract_client +from .utils import _extract_conn_tags +from .utils import _resource_from_cache_prefix + + +if typing.TYPE_CHECKING: # pragma: no cover + from ddtrace import Span # noqa:F401 + + +log = logging.Logger(__name__) + +DEFAULT_SERVICE = config.service or schematize_service_name("flask-cache") + +# standard tags +COMMAND_KEY = "flask_cache.key" +CACHE_BACKEND = "flask_cache.backend" +CONTACT_POINTS = "flask_cache.contact_points" + + +def get_version(): + # type: () -> str + try: + import flask_caching + + return getattr(flask_caching, "__version__", "") + except ImportError: + return "" + + +def get_traced_cache(ddtracer, service=DEFAULT_SERVICE, meta=None, cache_cls=None): + """ + Return a traced Cache object that behaves exactly as ``cache_cls``. + + ``cache_cls`` defaults to ``flask.ext.cache.Cache`` if Flask-Cache is installed + or ``flask_caching.Cache`` if flask-caching is installed. + """ + + if cache_cls is None: + # for compatibility reason, first check if flask_cache is present + try: + from flask.ext.cache import Cache + + cache_cls = Cache + except ImportError: + # use flask_caching if flask_cache is not present + from flask_caching import Cache + + cache_cls = Cache + + class TracedCache(cache_cls): + """ + Traced cache backend that monitors any operations done by flask_cache. Observed actions are: + * get, set, add, delete, clear + * all ``many_`` operations + """ + + _datadog_tracer = ddtracer + _datadog_service = service + _datadog_meta = meta + + def __trace(self, cmd): + # type: (str, bool) -> Span + """ + Start a tracing with default attributes and tags + """ + # create a new span + s = self._datadog_tracer.trace( + schematize_cache_operation(cmd, cache_provider="flask_cache"), + span_type=SpanTypes.CACHE, + service=self._datadog_service, + ) + + s.set_tag_str(COMPONENT, config.flask_cache.integration_name) + + s.set_tag(SPAN_MEASURED_KEY) + # set span tags + s.set_tag_str(CACHE_BACKEND, self.config.get("CACHE_TYPE")) + s.set_tags(self._datadog_meta) + # set analytics sample rate + s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.flask_cache.get_analytics_sample_rate()) + # add connection meta if there is one + client = _extract_client(self.cache) + if client is not None: + try: + s.set_tags(_extract_conn_tags(client)) + except Exception: + log.debug("error parsing connection tags", exc_info=True) + + return s + + def get(self, *args, **kwargs): + """ + Track ``get`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("GET", self.config) + if len(args) > 0: + span.set_tag_str(COMMAND_KEY, args[0]) + result = super(TracedCache, self).get(*args, **kwargs) + span.set_metric(db.ROWCOUNT, 1 if result else 0) + return result + + def set(self, *args, **kwargs): + """ + Track ``set`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("SET", self.config) + if len(args) > 0: + span.set_tag_str(COMMAND_KEY, args[0]) + return super(TracedCache, self).set(*args, **kwargs) + + def add(self, *args, **kwargs): + """ + Track ``add`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("ADD", self.config) + if len(args) > 0: + span.set_tag_str(COMMAND_KEY, args[0]) + return super(TracedCache, self).add(*args, **kwargs) + + def delete(self, *args, **kwargs): + """ + Track ``delete`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("DELETE", self.config) + if len(args) > 0: + span.set_tag_str(COMMAND_KEY, args[0]) + return super(TracedCache, self).delete(*args, **kwargs) + + def delete_many(self, *args, **kwargs): + """ + Track ``delete_many`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("DELETE_MANY", self.config) + span.set_tag(COMMAND_KEY, list(args)) + return super(TracedCache, self).delete_many(*args, **kwargs) + + def clear(self, *args, **kwargs): + """ + Track ``clear`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("CLEAR", self.config) + return super(TracedCache, self).clear(*args, **kwargs) + + def get_many(self, *args, **kwargs): + """ + Track ``get_many`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("GET_MANY", self.config) + span.set_tag(COMMAND_KEY, list(args)) + result = super(TracedCache, self).get_many(*args, **kwargs) + # get many returns a list, with either the key value or None if it doesn't exist + span.set_metric(db.ROWCOUNT, sum(1 for val in result if val)) + return result + + def set_many(self, *args, **kwargs): + """ + Track ``set_many`` operation + """ + with self.__trace("flask_cache.cmd") as span: + span.resource = _resource_from_cache_prefix("SET_MANY", self.config) + if len(args) > 0: + span.set_tag(COMMAND_KEY, list(args[0].keys())) + return super(TracedCache, self).set_many(*args, **kwargs) + + return TracedCache diff --git a/ddtrace/contrib/internal/flask_cache/utils.py b/ddtrace/contrib/internal/flask_cache/utils.py new file mode 100644 index 00000000000..242af103db7 --- /dev/null +++ b/ddtrace/contrib/internal/flask_cache/utils.py @@ -0,0 +1,62 @@ +# project +from ddtrace._trace.utils_redis import _extract_conn_tags as extract_redis_tags +from ddtrace.contrib.pylibmc.addrs import parse_addresses +from ddtrace.ext import net + + +def _resource_from_cache_prefix(resource, cache): + """ + Combine the resource name with the cache prefix (if any) + """ + if getattr(cache, "key_prefix", None): + name = "{} {}".format(resource, cache.key_prefix) + else: + name = resource + + # enforce lowercase to make the output nicer to read + return name.lower() + + +def _extract_client(cache): + """ + Get the client from the cache instance according to the current operation + """ + client = getattr(cache, "_client", None) + if client is None: + # flask-caching has _read_clients & _write_client for the redis backend + # These use the same connection so just try to get a reference to one of them. + # flask-caching < 2.0.0 uses _read_clients so look for that one too. + for attr in ("_write_client", "_read_client", "_read_clients"): + client = getattr(cache, attr, None) + if client is not None: + break + return client + + +def _extract_conn_tags(client): + """ + For the given client extracts connection tags + """ + tags = {} + + if hasattr(client, "servers"): + # Memcached backend supports an address pool + if isinstance(client.servers, list) and len(client.servers) > 0: + # use the first address of the pool as a host because + # the code doesn't expose more information + contact_point = client.servers[0].address + tags[net.TARGET_HOST] = contact_point[0] + tags[net.TARGET_PORT] = contact_point[1] + elif hasattr(client, "connection_pool"): + # Redis main connection + redis_tags = extract_redis_tags(client.connection_pool.connection_kwargs) + tags.update(**redis_tags) + elif hasattr(client, "addresses"): + # pylibmc + # FIXME[matt] should we memoize this? + addrs = parse_addresses(client.addresses) + if addrs: + _, host, port, _ = addrs[0] + tags[net.TARGET_PORT] = port + tags[net.TARGET_HOST] = host + return tags diff --git a/ddtrace/contrib/internal/futures/patch.py b/ddtrace/contrib/internal/futures/patch.py new file mode 100644 index 00000000000..86687d7dead --- /dev/null +++ b/ddtrace/contrib/internal/futures/patch.py @@ -0,0 +1,43 @@ +import sys + +from ddtrace.internal.wrapping import unwrap as _u +from ddtrace.internal.wrapping import wrap as _w + +from .threading import _wrap_submit + + +def get_version(): + # type: () -> str + return "" + + +def patch(): + """Enables Context Propagation between threads""" + try: + # Ensure that we get hold of the reloaded module if module cleanup was + # performed. + thread = sys.modules["concurrent.futures.thread"] + except KeyError: + import concurrent.futures.thread as thread + + if getattr(thread, "__datadog_patch", False): + return + thread.__datadog_patch = True + + _w(thread.ThreadPoolExecutor.submit, _wrap_submit) + + +def unpatch(): + """Disables Context Propagation between threads""" + try: + # Ensure that we get hold of the reloaded module if module cleanup was + # performed. + thread = sys.modules["concurrent.futures.thread"] + except KeyError: + return + + if not getattr(thread, "__datadog_patch", False): + return + thread.__datadog_patch = False + + _u(thread.ThreadPoolExecutor.submit, _wrap_submit) diff --git a/ddtrace/contrib/internal/futures/threading.py b/ddtrace/contrib/internal/futures/threading.py new file mode 100644 index 00000000000..deea68e2c17 --- /dev/null +++ b/ddtrace/contrib/internal/futures/threading.py @@ -0,0 +1,36 @@ +from typing import Optional + +import ddtrace +from ddtrace._trace.context import Context + + +def _wrap_submit(func, args, kwargs): + """ + Wrap `Executor` method used to submit a work executed in another + thread. This wrapper ensures that a new `Context` is created and + properly propagated using an intermediate function. + """ + # DEV: Be sure to propagate a Context and not a Span since we are crossing thread boundaries + current_ctx: Optional[Context] = ddtrace.tracer.current_trace_context() + + # The target function can be provided as a kwarg argument "fn" or the first positional argument + self = args[0] + if "fn" in kwargs: + fn = kwargs.pop("fn") + fn_args = args[1:] + else: + fn, fn_args = args[1], args[2:] + return func(self, _wrap_execution, current_ctx, fn, fn_args, kwargs) + + +def _wrap_execution(ctx: Optional[Context], fn, args, kwargs): + """ + Intermediate target function that is executed in a new thread; + it receives the original function with arguments and keyword + arguments, including our tracing `Context`. The current context + provider sets the Active context in a thread local storage + variable because it's outside the asynchronous loop. + """ + if ctx is not None: + ddtrace.tracer.context_provider.activate(ctx) + return fn(*args, **kwargs) diff --git a/releasenotes/notes/move-integrations-to-internal-fastapi-348259e3fe88465e.yaml b/releasenotes/notes/move-integrations-to-internal-fastapi-348259e3fe88465e.yaml new file mode 100644 index 00000000000..a635d2b60cb --- /dev/null +++ b/releasenotes/notes/move-integrations-to-internal-fastapi-348259e3fe88465e.yaml @@ -0,0 +1,10 @@ +--- +deprecations: + - | + fastapi: Deprecates all modules in the ``ddtrace.contrib.fastapi`` package. Use attributes exposed in ``ddtrace.contrib.fastapi.__all__`` instead. + - | + flask: Deprecates all modules in the ``ddtrace.contrib.flask`` package. Use attributes exposed in ``ddtrace.contrib.flask.__all__`` instead. + - | + flask_cache: Deprecates all modules in the ``ddtrace.contrib.flask_cache`` package. Use attributes exposed in ``ddtrace.contrib.flask_cache.__all__`` instead. + - | + futures: Deprecates all modules in the ``ddtrace.contrib.futures`` package. Use attributes exposed in ``ddtrace.contrib.futures.__all__`` instead. diff --git a/tests/contrib/flask/test_idempotency.py b/tests/contrib/flask/test_idempotency.py index 866870f98a4..b161535026e 100644 --- a/tests/contrib/flask/test_idempotency.py +++ b/tests/contrib/flask/test_idempotency.py @@ -5,8 +5,8 @@ from ddtrace.contrib.flask import patch from ddtrace.contrib.flask import unpatch -from ddtrace.contrib.flask.patch import _u -from ddtrace.contrib.flask.patch import _w +from ddtrace.contrib.internal.flask.patch import _u +from ddtrace.contrib.internal.flask.patch import _w from ddtrace.vendor import wrapt @@ -38,7 +38,7 @@ def test_datadog_patch(self): self.assert_is_not_patched() # DEV: Use `side_effect` so the original function still gets called - @mock.patch("ddtrace.contrib.flask._patch._w", side_effect=_w) + @mock.patch("ddtrace.contrib.internal.flask.patch._w", side_effect=_w) def test_patch_idempotency(self, _w): # Ensure we didn't do any patching automatically _w.assert_not_called() @@ -59,8 +59,8 @@ def test_patch_idempotency(self, _w): self.assert_is_patched() # DEV: Use `side_effect` so the original function still gets called - @mock.patch("ddtrace.contrib.flask._patch._w", side_effect=_w) - @mock.patch("ddtrace.contrib.flask._patch._u", side_effect=_u) + @mock.patch("ddtrace.contrib.internal.flask.patch._w", side_effect=_w) + @mock.patch("ddtrace.contrib.internal.flask.patch._u", side_effect=_u) def test_unpatch_idempotency(self, _u, _w): # We need to patch in order to unpatch patch() diff --git a/tests/contrib/flask_cache/test_utils.py b/tests/contrib/flask_cache/test_utils.py index bf0a0556ce9..9d8322af3d8 100644 --- a/tests/contrib/flask_cache/test_utils.py +++ b/tests/contrib/flask_cache/test_utils.py @@ -4,9 +4,9 @@ from ddtrace._trace.tracer import Tracer from ddtrace.contrib.flask_cache import get_traced_cache -from ddtrace.contrib.flask_cache.utils import _extract_client -from ddtrace.contrib.flask_cache.utils import _extract_conn_tags -from ddtrace.contrib.flask_cache.utils import _resource_from_cache_prefix +from ddtrace.contrib.internal.flask_cache.utils import _extract_client +from ddtrace.contrib.internal.flask_cache.utils import _extract_conn_tags +from ddtrace.contrib.internal.flask_cache.utils import _resource_from_cache_prefix from ..config import MEMCACHED_CONFIG from ..config import REDIS_CONFIG