Skip to content

Commit

Permalink
feat(redis): add span metric db.row_count to redis integrations (#5054)
Browse files Browse the repository at this point in the history
Adds `db.row_count` for redis and redis like integrations. Only adds row
count for get commands that return row data.

# Design Choices Made

For any integrations previously missing a row count tag (ex. Redis,
yaaredis), the tag is only added for `GET` style commands, not any
`SET`, `DELETE` or other operations. `GET` style commands are anything
returning data, such as basic `GET`s, `GET_MANY`, `GET_RANGE` for list
values, etc. For commands corresponding to single rows, such as `GET`,
`GET_RANGE`, any index retrievals, etc, row count is set to 1. For
`GET_MANY` operations operations, the rowcount tag is set to the number
of valid, returned values. If an error or no data is returned, the row
count tag is set to 0.

NOTE: Redis pipelines do not set `db.row_count`. Does not make sense to
set the tag for a pipeline of commands.

## Testing

Added the tag to snapshots and added some additional test cases for each
integration.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [ ] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
are followed.
- [ ] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).


## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
  • Loading branch information
wconti27 authored Sep 1, 2023
1 parent cb9e78e commit 1e3bd6d
Show file tree
Hide file tree
Showing 60 changed files with 293 additions and 12 deletions.
12 changes: 10 additions & 2 deletions ddtrace/contrib/aioredis/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
from ...internal.schema import schematize_service_name
from ...internal.utils.formats import CMD_MAX_LEN
from ...internal.utils.formats import stringify_cache_args
from ..redis.asyncio_patch import _run_redis_command_async
from ..trace_utils_redis import ROW_RETURNING_COMMANDS
from ..trace_utils_redis import _trace_redis_cmd
from ..trace_utils_redis import _trace_redis_execute_pipeline
from ..trace_utils_redis import determine_row_count


try:
Expand Down Expand Up @@ -86,8 +89,8 @@ async def traced_execute_command(func, instance, args, kwargs):
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _trace_redis_cmd(pin, config.aioredis, instance, args):
return await func(*args, **kwargs)
with _trace_redis_cmd(pin, config.aioredis, instance, args) as span:
return await _run_redis_command_async(span=span, func=func, args=args, kwargs=kwargs)


def traced_pipeline(func, instance, args, kwargs):
Expand Down Expand Up @@ -166,10 +169,15 @@ def _finish_span(future):
# - The future was cancelled (CancelledError)
# - There was an error executing the future (`future.exception()`)
# - The future is in an invalid state
redis_command = span.resource.split(" ")[0]
future.result()
if redis_command in ROW_RETURNING_COMMANDS:
determine_row_count(redis_command=redis_command, span=span, result=future.result())
# CancelledError exceptions extend from BaseException as of Python 3.8, instead of usual Exception
except BaseException:
span.set_exc_info(*sys.exc_info())
if redis_command in ROW_RETURNING_COMMANDS:
span.set_metric(db.ROWCOUNT, 0)
finally:
span.finish()

Expand Down
6 changes: 3 additions & 3 deletions ddtrace/contrib/aredis/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ...internal.utils.formats import stringify_cache_args
from ...internal.utils.wrappers import unwrap
from ...pin import Pin
from ..redis.asyncio_patch import _run_redis_command_async
from ..trace_utils_redis import _trace_redis_cmd
from ..trace_utils_redis import _trace_redis_execute_pipeline

Expand Down Expand Up @@ -61,9 +62,8 @@ async def traced_execute_command(func, instance, args, kwargs):
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _trace_redis_cmd(pin, config.aredis, instance, args):
# run the command
return await func(*args, **kwargs)
with _trace_redis_cmd(pin, config.aredis, instance, args) as span:
return await _run_redis_command_async(span=span, func=func, args=args, kwargs=kwargs)


async def traced_pipeline(func, instance, args, kwargs):
Expand Down
23 changes: 21 additions & 2 deletions ddtrace/contrib/redis/asyncio_patch.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from ddtrace import config

from ...ext import db
from ...internal.utils.formats import stringify_cache_args
from ...pin import Pin
from ..trace_utils_redis import ROW_RETURNING_COMMANDS
from ..trace_utils_redis import _trace_redis_cmd
from ..trace_utils_redis import _trace_redis_execute_async_cluster_pipeline
from ..trace_utils_redis import _trace_redis_execute_pipeline
from ..trace_utils_redis import determine_row_count


#
Expand All @@ -15,8 +18,8 @@ async def traced_async_execute_command(func, instance, args, kwargs):
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _trace_redis_cmd(pin, config.redis, instance, args):
return await func(*args, **kwargs)
with _trace_redis_cmd(pin, config.redis, instance, args) as span:
return await _run_redis_command_async(span=span, func=func, args=args, kwargs=kwargs)


async def traced_async_execute_pipeline(func, instance, args, kwargs):
Expand All @@ -30,6 +33,22 @@ async def traced_async_execute_pipeline(func, instance, args, kwargs):
return await func(*args, **kwargs)


async def _run_redis_command_async(span, func, args, kwargs):
try:
parsed_command = stringify_cache_args(args)
redis_command = parsed_command.split(" ")[0]

result = await func(*args, **kwargs)
return result
except Exception:
if redis_command in ROW_RETURNING_COMMANDS:
span.set_metric(db.ROWCOUNT, 0)
raise
finally:
if redis_command in ROW_RETURNING_COMMANDS:
determine_row_count(redis_command=redis_command, span=span, result=result)


async def traced_async_execute_cluster_pipeline(func, instance, args, kwargs):
pin = Pin.get_from(instance)
if not pin or not pin.enabled():
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/contrib/redis/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ...internal.utils.formats import stringify_cache_args
from ...pin import Pin
from ..trace_utils import unwrap
from ..trace_utils_redis import _run_redis_command
from ..trace_utils_redis import _trace_redis_cmd
from ..trace_utils_redis import _trace_redis_execute_pipeline

Expand Down Expand Up @@ -126,8 +127,8 @@ def _traced_execute_command(func, instance, args, kwargs):
if not pin or not pin.enabled():
return func(*args, **kwargs)

with _trace_redis_cmd(pin, integration_config, instance, args):
return func(*args, **kwargs)
with _trace_redis_cmd(pin, integration_config, instance, args) as span:
return _run_redis_command(span=span, func=func, args=args, kwargs=kwargs)

return _traced_execute_command

Expand Down
55 changes: 55 additions & 0 deletions ddtrace/contrib/trace_utils_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@

format_command_args = stringify_cache_args

SINGLE_KEY_COMMANDS = [
"GET",
"GETDEL",
"GETEX",
"GETRANGE",
"GETSET",
"LINDEX",
"LRANGE",
"RPOP",
"LPOP",
"HGET",
"HGETALL",
"HKEYS",
"HMGET",
"HRANDFIELD",
"HVALS",
]
MULTI_KEY_COMMANDS = ["MGET"]
ROW_RETURNING_COMMANDS = SINGLE_KEY_COMMANDS + MULTI_KEY_COMMANDS


def _extract_conn_tags(conn_kwargs):
"""Transform redis conn info into dogtrace metas"""
Expand All @@ -36,6 +56,41 @@ def _extract_conn_tags(conn_kwargs):
return {}


def determine_row_count(redis_command, span, result):
empty_results = [b"", [], {}, None]
# result can be an empty list / dict / string
if result not in empty_results:
if redis_command == "MGET":
# only include valid key results within count
result = [x for x in result if x not in empty_results]
span.set_metric(db.ROWCOUNT, len(result))
elif redis_command == "HMGET":
# only include valid key results within count
result = [x for x in result if x not in empty_results]
span.set_metric(db.ROWCOUNT, 1 if len(result) > 0 else 0)
else:
span.set_metric(db.ROWCOUNT, 1)
else:
# set count equal to 0 if an empty result
span.set_metric(db.ROWCOUNT, 0)


def _run_redis_command(span, func, args, kwargs):
try:
parsed_command = stringify_cache_args(args)
redis_command = parsed_command.split(" ")[0]

result = func(*args, **kwargs)
return result
except Exception:
if redis_command in ROW_RETURNING_COMMANDS:
span.set_metric(db.ROWCOUNT, 0)
raise
finally:
if redis_command in ROW_RETURNING_COMMANDS:
determine_row_count(redis_command=redis_command, span=span, result=result)


@contextmanager
def _trace_redis_cmd(pin, config_integration, instance, args):
"""Create a span for the execute command method and tag it"""
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/contrib/yaaredis/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ...internal.utils.formats import stringify_cache_args
from ...internal.utils.wrappers import unwrap
from ...pin import Pin
from ..redis.asyncio_patch import _run_redis_command_async
from ..trace_utils_redis import _trace_redis_cmd
from ..trace_utils_redis import _trace_redis_execute_pipeline

Expand Down Expand Up @@ -58,8 +59,8 @@ async def traced_execute_command(func, instance, args, kwargs):
if not pin or not pin.enabled():
return await func(*args, **kwargs)

with _trace_redis_cmd(pin, config.yaaredis, instance, args):
return await func(*args, **kwargs)
with _trace_redis_cmd(pin, config.yaaredis, instance, args) as span:
return await _run_redis_command_async(span=span, func=func, args=args, kwargs=kwargs)


async def traced_pipeline(func, instance, args, kwargs):
Expand Down
1 change: 0 additions & 1 deletion ddtrace/ext/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

# tags
QUERY = "sql.query" # the query text
ROWS = "sql.rows" # number of rows returned by a query
DB = "sql.db" # the name of the database


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Adds the `db.row_count` tag to redis and other redis-like integrations. The tag represents the number of returned results.
140 changes: 140 additions & 0 deletions tests/contrib/redis/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,37 @@ def tearDown(self):
unpatch()
super(TestRedisPatch, self).tearDown()

def command_test_rowcount(self, raw_command, row_count, expect_result=True, **kwargs):
command_args_as_list = raw_command.split(" ")

command_name = command_args_as_list[0].lower()

if hasattr(self.r, command_name):
func = getattr(self.r, command_name)

try:
# try to run function with kwargs, may fail due to redis version
result = yield func(*command_args_as_list[1:], **kwargs)
for k in kwargs.keys():
raw_command += " " + str(kwargs[k])
except Exception:
# try without keyword arguments
result = func(*command_args_as_list[1:])

if expect_result:
assert result is not None
else:
empty_result = [None, [], {}, b""]
if isinstance(result, list):
result = [x for x in result if x]
assert result in empty_result

command_span = self.get_spans()[-1]

assert command_span.name == "redis.command"
assert command_span.get_tag("redis.raw_command") == raw_command
assert command_span.get_metric("db.row_count") == row_count

def test_long_command(self):
self.r.mget(*range(1000))

Expand Down Expand Up @@ -250,6 +281,115 @@ def test_opentracing(self):
assert dd_span.get_metric("redis.args_length") == 2
assert dd_span.resource == "GET cheese"

def test_redis_rowcount_all_keys_valid(self):
self.r.set("key1", "value1")

get1 = self.r.get("key1")

assert get1 == b"value1"

spans = self.get_spans()
get_valid_key_span = spans[1]

assert get_valid_key_span.name == "redis.command"
assert get_valid_key_span.get_tag("redis.raw_command") == u"GET key1"
assert get_valid_key_span.get_metric("db.row_count") == 1

get_commands = ["GET key", "GETEX key", "GETRANGE key 0 2"]
list_get_commands = ["LINDEX lkey 0", "LRANGE lkey 0 3", "RPOP lkey", "LPOP lkey"]
hashing_get_commands = [
"HGET hkey field1",
"HGETALL hkey",
"HKEYS hkey",
"HMGET hkey field1 field2",
"HRANDFIELD hkey",
"HVALS hkey",
]
multi_key_get_commands = ["MGET key key2", "MGET key key2 key3", "MGET key key2 key3 key4"]

for command in get_commands:
self.r.set("key", "value")
self.command_test_rowcount(command, 1)
for command in list_get_commands:
self.r.lpush("lkey", "1", "2", "3", "4", "5")
self.command_test_rowcount(command, 1)
if command == "RPOP lkey": # lets get multiple values from the set and ensure rowcount is still 1
self.command_test_rowcount(command, 1, count=2)
for command in hashing_get_commands:
self.r.hset("hkey", "field1", "value1")
self.r.hset("hkey", "field2", "value2")
self.command_test_rowcount(command, 1)
for command in multi_key_get_commands:
self.r.mset({"key": "value", "key2": "value2", "key3": "value3", "key4": "value4"})
self.command_test_rowcount(command, len(command.split(" ")) - 1)

def test_redis_rowcount_some_keys_valid(self):
self.r.mset({"key": "value", "key2": "value2"})

get_both_valid = self.r.mget("key", "key2")
get_one_missing = self.r.mget("key", "missing_key")

assert get_both_valid == [b"value", b"value2"]
assert get_one_missing == [b"value", None]

spans = self.get_spans()
get_both_valid_span = spans[1]
get_one_missing_span = spans[2]

assert get_both_valid_span.name == "redis.command"
assert get_both_valid_span.get_tag("redis.raw_command") == u"MGET key key2"
assert get_both_valid_span.get_metric("db.row_count") == 2

assert get_one_missing_span.name == "redis.command"
assert get_one_missing_span.get_tag("redis.raw_command") == u"MGET key missing_key"
assert get_one_missing_span.get_metric("db.row_count") == 1

multi_key_get_commands = [
"MGET key key2",
"MGET key missing_key",
"MGET key key2 missing_key",
"MGET key missing_key missing_key2 key2",
]

for command in multi_key_get_commands:
command_keys = command.split(" ")[1:]
self.command_test_rowcount(command, len([key for key in command_keys if "missing_key" not in key]))

def test_redis_rowcount_no_keys_valid(self):
get_missing = self.r.get("missing_key")

assert get_missing is None

spans = self.get_spans()
get_missing_key_span = spans[0]

assert get_missing_key_span.name == "redis.command"
assert get_missing_key_span.get_tag("redis.raw_command") == u"GET missing_key"
assert get_missing_key_span.get_metric("db.row_count") == 0

get_commands = ["GET key", "GETDEL key", "GETEX key", "GETRANGE key 0 2"]
list_get_commands = ["LINDEX lkey 0", "LRANGE lkey 0 3", "RPOP lkey", "LPOP lkey"]
hashing_get_commands = [
"HGET hkey field1",
"HGETALL hkey",
"HKEYS hkey",
"HMGET hkey field1 field2",
"HRANDFIELD hkey",
"HVALS hkey",
]
multi_key_get_commands = ["MGET key key2", "MGET key key2 key3", "MGET key key2 key3 key4"]

for command in get_commands:
self.command_test_rowcount(command, 0, expect_result=False)
for command in list_get_commands:
self.command_test_rowcount(command, 0, expect_result=False)
if command == "RPOP lkey": # lets get multiple values from the set and ensure rowcount is still 1
self.command_test_rowcount(command, 0, expect_result=False, count=2)
for command in hashing_get_commands:
self.command_test_rowcount(command, 0, expect_result=False)
for command in multi_key_get_commands:
self.command_test_rowcount(command, 0, expect_result=False)

@TracerTestCase.run_in_subprocess(env_overrides=dict(DD_SERVICE="mysvc"))
def test_user_specified_service_default(self):
from ddtrace import config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"db.row_count": 0,
"network.destination.port": 6379,
"out.redis_db": 0,
"process_id": 20868,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"_dd.top_level": 1,
"_dd.tracer_kr": 1.0,
"_sampling_priority_v1": 1,
"db.row_count": 0,
"network.destination.port": 6379,
"out.redis_db": 0,
"process_id": 20868,
Expand Down
Loading

0 comments on commit 1e3bd6d

Please sign in to comment.