Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
emmettbutler committed Apr 30, 2024
1 parent a99a5a9 commit 3a2ec42
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 44 deletions.
4 changes: 2 additions & 2 deletions ddtrace/_trace/utils_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _instrument_redis_cmd(pin, config_integration, instance, args):


@contextmanager
def _trace_redis_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
def _instrument_redis_execute_pipeline(pin, config_integration, cmds, instance, is_cluster=False):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])
Expand All @@ -81,7 +81,7 @@ def _trace_redis_execute_pipeline(pin, config_integration, cmds, instance, is_cl


@contextmanager
def _trace_redis_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
def _instrument_redis_execute_async_cluster_pipeline(pin, config_integration, cmds, instance):
cmd_string = resource = "\n".join(cmds)
if config_integration.resource_only_command:
resource = "\n".join([cmd.split(" ")[0] for cmd in cmds])
Expand Down
17 changes: 7 additions & 10 deletions ddtrace/contrib/redis/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
from ddtrace import config
from ddtrace._trace.utils_redis import _instrument_redis_cmd
from ddtrace._trace.utils_redis import _trace_redis_execute_async_cluster_pipeline
from ddtrace._trace.utils_redis import _trace_redis_execute_pipeline
from ddtrace._trace.utils_redis import _instrument_redis_execute_async_cluster_pipeline
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
from ddtrace.contrib.redis_utils import _run_redis_command_async

from ...internal.utils.formats import stringify_cache_args
from ...pin import Pin


#
# tracing async functions
#
async def traced_async_execute_command(func, instance, args, kwargs):
async def instrumented_async_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)
Expand All @@ -20,21 +17,21 @@ async def traced_async_execute_command(func, instance, args, kwargs):
return await _run_redis_command_async(ctx=ctx, func=func, args=args, kwargs=kwargs)


async def traced_async_execute_pipeline(func, instance, args, kwargs):
async def instrumented_async_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c, cmd_max_len=config.redis.cmd_max_length) for c, _ in instance.command_stack]
with _trace_redis_execute_pipeline(pin, config.redis, cmds, instance):
with _instrument_redis_execute_pipeline(pin, config.redis, cmds, instance):
return await func(*args, **kwargs)


async def traced_async_execute_cluster_pipeline(func, instance, args, kwargs):
async def instrumented_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return await func(*args, **kwargs)

cmds = [stringify_cache_args(c.args, cmd_max_len=config.redis.cmd_max_length) for c in instance._command_stack]
with _trace_redis_execute_async_cluster_pipeline(pin, config.redis, cmds, instance):
with _instrument_redis_execute_async_cluster_pipeline(pin, config.redis, cmds, instance):
return await func(*args, **kwargs)
64 changes: 32 additions & 32 deletions ddtrace/contrib/redis/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from ddtrace import config
from ddtrace._trace.utils_redis import _instrument_redis_cmd
from ddtrace._trace.utils_redis import _trace_redis_execute_pipeline
from ddtrace._trace.utils_redis import _instrument_redis_execute_pipeline
from ddtrace.contrib.redis_utils import ROW_RETURNING_COMMANDS
from ddtrace.contrib.redis_utils import determine_row_count
from ddtrace.internal import core
Expand Down Expand Up @@ -46,44 +46,44 @@ def patch():
_w = wrapt.wrap_function_wrapper

if redis.VERSION < (3, 0, 0):
_w("redis", "StrictRedis.execute_command", traced_execute_command(config.redis))
_w("redis", "StrictRedis.pipeline", traced_pipeline)
_w("redis", "Redis.pipeline", traced_pipeline)
_w("redis.client", "BasePipeline.execute", traced_execute_pipeline(config.redis, False))
_w("redis.client", "BasePipeline.immediate_execute_command", traced_execute_command(config.redis))
_w("redis", "StrictRedis.execute_command", instrumented_execute_command(config.redis))
_w("redis", "StrictRedis.pipeline", instrumented_pipeline)
_w("redis", "Redis.pipeline", instrumented_pipeline)
_w("redis.client", "BasePipeline.execute", instrumented_execute_pipeline(config.redis, False))
_w("redis.client", "BasePipeline.immediate_execute_command", instrumented_execute_command(config.redis))
else:
_w("redis", "Redis.execute_command", traced_execute_command(config.redis))
_w("redis", "Redis.pipeline", traced_pipeline)
_w("redis.client", "Pipeline.execute", traced_execute_pipeline(config.redis, False))
_w("redis.client", "Pipeline.immediate_execute_command", traced_execute_command(config.redis))
_w("redis", "Redis.execute_command", instrumented_execute_command(config.redis))
_w("redis", "Redis.pipeline", instrumented_pipeline)
_w("redis.client", "Pipeline.execute", instrumented_execute_pipeline(config.redis, False))
_w("redis.client", "Pipeline.immediate_execute_command", instrumented_execute_command(config.redis))
if redis.VERSION >= (4, 1):
# Redis v4.1 introduced support for redis clusters and rediscluster package was deprecated.
# https://github.com/redis/redis-py/commit/9db1eec71b443b8e7e74ff503bae651dc6edf411
_w("redis.cluster", "RedisCluster.execute_command", traced_execute_command(config.redis))
_w("redis.cluster", "RedisCluster.pipeline", traced_pipeline)
_w("redis.cluster", "ClusterPipeline.execute", traced_execute_pipeline(config.redis, True))
_w("redis.cluster", "RedisCluster.execute_command", instrumented_execute_command(config.redis))
_w("redis.cluster", "RedisCluster.pipeline", instrumented_pipeline)
_w("redis.cluster", "ClusterPipeline.execute", instrumented_execute_pipeline(config.redis, True))
Pin(service=None).onto(redis.cluster.RedisCluster)
# Avoid mypy invalid syntax errors when parsing Python 2 files
if redis.VERSION >= (4, 2, 0):
from .asyncio_patch import traced_async_execute_command
from .asyncio_patch import traced_async_execute_pipeline
from .asyncio_patch import instrumented_async_execute_command
from .asyncio_patch import instrumented_async_execute_pipeline

_w("redis.asyncio.client", "Redis.execute_command", traced_async_execute_command)
_w("redis.asyncio.client", "Redis.pipeline", traced_pipeline)
_w("redis.asyncio.client", "Pipeline.execute", traced_async_execute_pipeline)
_w("redis.asyncio.client", "Pipeline.immediate_execute_command", traced_async_execute_command)
_w("redis.asyncio.client", "Redis.execute_command", instrumented_async_execute_command)
_w("redis.asyncio.client", "Redis.pipeline", instrumented_pipeline)
_w("redis.asyncio.client", "Pipeline.execute", instrumented_async_execute_pipeline)
_w("redis.asyncio.client", "Pipeline.immediate_execute_command", instrumented_async_execute_command)
Pin(service=None).onto(redis.asyncio.Redis)

if redis.VERSION >= (4, 3, 0):
from .asyncio_patch import traced_async_execute_command
from .asyncio_patch import instrumented_async_execute_command

_w("redis.asyncio.cluster", "RedisCluster.execute_command", traced_async_execute_command)
_w("redis.asyncio.cluster", "RedisCluster.execute_command", instrumented_async_execute_command)

if redis.VERSION >= (4, 3, 2):
from .asyncio_patch import traced_async_execute_cluster_pipeline
from .asyncio_patch import instrumented_async_execute_cluster_pipeline

_w("redis.asyncio.cluster", "RedisCluster.pipeline", traced_pipeline)
_w("redis.asyncio.cluster", "ClusterPipeline.execute", traced_async_execute_cluster_pipeline)
_w("redis.asyncio.cluster", "RedisCluster.pipeline", instrumented_pipeline)
_w("redis.asyncio.cluster", "ClusterPipeline.execute", instrumented_async_execute_cluster_pipeline)

Pin(service=None).onto(redis.asyncio.RedisCluster)

Expand Down Expand Up @@ -142,28 +142,28 @@ def _run_redis_command(ctx: core.ExecutionContext, func, args, kwargs):
#
# tracing functions
#
def traced_execute_command(integration_config):
def _traced_execute_command(func, instance, args, kwargs):
def instrumented_execute_command(integration_config):
def _instrumented_execute_command(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)

with _instrument_redis_cmd(pin, integration_config, instance, args) as ctx:
return _run_redis_command(ctx=ctx, func=func, args=args, kwargs=kwargs)

return _traced_execute_command
return _instrumented_execute_command


def traced_pipeline(func, instance, args, kwargs):
def instrumented_pipeline(func, instance, args, kwargs):
pipeline = func(*args, **kwargs)
pin = Pin.get_from(instance)
if pin:
pin.onto(pipeline)
return pipeline


def traced_execute_pipeline(integration_config, is_cluster=False):
def _traced_execute_pipeline(func, instance, args, kwargs):
def instrumented_execute_pipeline(integration_config, is_cluster=False):
def _instrumented_execute_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
return func(*args, **kwargs)
Expand All @@ -178,7 +178,7 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
stringify_cache_args(c, cmd_max_len=integration_config.cmd_max_length)
for c, _ in instance.command_stack
]
with _trace_redis_execute_pipeline(pin, integration_config, cmds, instance, is_cluster):
with _instrument_redis_execute_pipeline(pin, integration_config, cmds, instance, is_cluster):
return func(*args, **kwargs)

return _traced_execute_pipeline
return _instrumented_execute_pipeline

0 comments on commit 3a2ec42

Please sign in to comment.