Skip to content

Commit

Permalink
chore(sampling): add rc for trace sampling rules (#8900)
Browse files Browse the repository at this point in the history
This PR implements remote config for `DD_TRACE_SAMPLING_RULES`:
1. Add the rc
2. Add `provenance` which we parse from rc and use to give a new
decision maker `_dd.p.dm` to either `-11` for `customer` configuration
or `-12` for `dynamic` configuration.
3. The most confusing part of this implementation takes place in[
tracer._on_global_config_update](https://github.com/DataDog/dd-trace-py/pull/8900/files#diff-e2ff2c401c4b927861c9fc104deb21aee510e2b273cc4569315bd611a64ff3baL1123-R1164).
Essentially implementing the logic for choosing the correct sample_rate
and sampling_rules depending on rc input.

In addition to the sample_rate we already had I added one for
sampling_rules and for the interaction between them. This PR also passes
the newly added
[system-tests](https://github.com/DataDog/system-tests/blob/main/tests/parametric/test_dynamic_configuration.py#L559-L683)
for this behavior.

Since this feature is in internal beta, there's no release note.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [ ] Title is accurate
- [ ] All changes are related to the pull request's stated goal
- [ ] Description motivates each change
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [ ] Testing strategy adequately addresses listed risks
- [ ] Change is maintainable (easy to change, telemetry, documentation)
- [ ] Release note makes sense to a user of the library
- [ ] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
ZStriker19 authored Apr 30, 2024
1 parent d59d0f9 commit 97af079
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 40 deletions.
59 changes: 45 additions & 14 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ def __init__(

self.enabled = config._tracing_enabled
self.context_provider = context_provider or DefaultContextProvider()
self._user_sampler: Optional[BaseSampler] = None
# _user_sampler is the backup in case we need to revert from remote config to local
self._user_sampler: Optional[BaseSampler] = DatadogSampler()
self._sampler: BaseSampler = DatadogSampler()
self._dogstatsd_url = agent.get_stats_url() if dogstatsd_url is None else dogstatsd_url
self._compute_stats = config._trace_compute_stats
Expand Down Expand Up @@ -286,7 +287,7 @@ def __init__(
self._shutdown_lock = RLock()

self._new_process = False
config._subscribe(["_trace_sample_rate"], self._on_global_config_update)
config._subscribe(["_trace_sample_rate", "_trace_sampling_rules"], self._on_global_config_update)
config._subscribe(["logs_injection"], self._on_global_config_update)
config._subscribe(["tags"], self._on_global_config_update)
config._subscribe(["_tracing_enabled"], self._on_global_config_update)
Expand Down Expand Up @@ -1125,19 +1126,10 @@ def _is_span_internal(span):

def _on_global_config_update(self, cfg, items):
# type: (Config, List) -> None
if "_trace_sample_rate" in items:
# Reset the user sampler if one exists
if cfg._get_source("_trace_sample_rate") != "remote_config" and self._user_sampler:
self._sampler = self._user_sampler
return

if cfg._get_source("_trace_sample_rate") != "default":
sample_rate = cfg._trace_sample_rate
else:
sample_rate = None

sampler = DatadogSampler(default_sample_rate=sample_rate)
self._sampler = sampler
# sampling configs always come as a pair
if "_trace_sample_rate" in items and "_trace_sampling_rules" in items:
self._handle_sampler_update(cfg)

if "tags" in items:
self._tags = cfg.tags.copy()
Expand All @@ -1160,3 +1152,42 @@ def _on_global_config_update(self, cfg, items):
from ddtrace.contrib.logging import unpatch

unpatch()

def _handle_sampler_update(self, cfg):
# type: (Config) -> None
if (
cfg._get_source("_trace_sample_rate") != "remote_config"
and cfg._get_source("_trace_sampling_rules") != "remote_config"
and self._user_sampler
):
# if we get empty configs from rc for both sample rate and rules, we should revert to the user sampler
self.sampler = self._user_sampler
return

if cfg._get_source("_trace_sample_rate") != "remote_config" and self._user_sampler:
try:
sample_rate = self._user_sampler.default_sample_rate # type: ignore[attr-defined]
except AttributeError:
log.debug("Custom non-DatadogSampler is being used, cannot pull default sample rate")
sample_rate = None
elif cfg._get_source("_trace_sample_rate") != "default":
sample_rate = cfg._trace_sample_rate
else:
sample_rate = None

if cfg._get_source("_trace_sampling_rules") != "remote_config" and self._user_sampler:
try:
sampling_rules = self._user_sampler.rules # type: ignore[attr-defined]
# we need to chop off the default_sample_rate rule so the new sample_rate can be applied
sampling_rules = sampling_rules[:-1]
except AttributeError:
log.debug("Custom non-DatadogSampler is being used, cannot pull sampling rules")
sampling_rules = None
elif cfg._get_source("_trace_sampling_rules") != "default":
sampling_rules = DatadogSampler._parse_rules_from_str(cfg._trace_sampling_rules)
else:
sampling_rules = None

sampler = DatadogSampler(rules=sampling_rules, default_sample_rate=sample_rate)

self._sampler = sampler
8 changes: 6 additions & 2 deletions ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@

class _PRIORITY_CATEGORY:
USER = "user"
RULE = "rule"
RULE_DEF = "rule_default"
RULE_CUSTOMER = "rule_customer"
RULE_DYNAMIC = "rule_dynamic"
AUTO = "auto"
DEFAULT = "default"

Expand All @@ -99,7 +101,9 @@ class _PRIORITY_CATEGORY:
# used to simplify code that selects sampling priority based on many factors
_CATEGORY_TO_PRIORITIES = {
_PRIORITY_CATEGORY.USER: (USER_KEEP, USER_REJECT),
_PRIORITY_CATEGORY.RULE: (USER_KEEP, USER_REJECT),
_PRIORITY_CATEGORY.RULE_DEF: (USER_KEEP, USER_REJECT),
_PRIORITY_CATEGORY.RULE_CUSTOMER: (USER_KEEP, USER_REJECT),
_PRIORITY_CATEGORY.RULE_DYNAMIC: (USER_KEEP, USER_REJECT),
_PRIORITY_CATEGORY.AUTO: (AUTO_KEEP, AUTO_REJECT),
_PRIORITY_CATEGORY.DEFAULT: (AUTO_KEEP, AUTO_REJECT),
}
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/internal/remoteconfig/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class Capabilities(enum.IntFlag):
APM_TRACING_HTTP_HEADER_TAGS = 1 << 14
APM_TRACING_CUSTOM_TAGS = 1 << 15
APM_TRACING_ENABLED = 1 << 19
APM_TRACING_SAMPLE_RULES = 1 << 29


class RemoteConfigError(Exception):
Expand Down Expand Up @@ -382,6 +383,7 @@ def _build_payload(self, state):
| Capabilities.APM_TRACING_HTTP_HEADER_TAGS
| Capabilities.APM_TRACING_CUSTOM_TAGS
| Capabilities.APM_TRACING_ENABLED
| Capabilities.APM_TRACING_SAMPLE_RULES
)
return dict(
client=dict(
Expand Down
22 changes: 19 additions & 3 deletions ddtrace/internal/sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ class SamplingMechanism(object):
REMOTE_RATE_USER = 6
REMOTE_RATE_DATADOG = 7
SPAN_SAMPLING_RULE = 8
REMOTE_USER_RULE = 11
REMOTE_DYNAMIC_RULE = 12


class PriorityCategory(object):
DEFAULT = "default"
AUTO = "auto"
RULE_DEFAULT = "rule_default"
RULE_CUSTOMER = "rule_customer"
RULE_DYNAMIC = "rule_dynamic"


# Use regex to validate trace tag value
Expand Down Expand Up @@ -278,11 +288,17 @@ def is_single_span_sampled(span):
def _set_sampling_tags(span, sampled, sample_rate, priority_category):
# type: (Span, bool, float, str) -> None
mechanism = SamplingMechanism.TRACE_SAMPLING_RULE
if priority_category == "rule":
if priority_category == PriorityCategory.RULE_DEFAULT:
span.set_metric(SAMPLING_RULE_DECISION, sample_rate)
if priority_category == PriorityCategory.RULE_CUSTOMER:
span.set_metric(SAMPLING_RULE_DECISION, sample_rate)
mechanism = SamplingMechanism.REMOTE_USER_RULE
if priority_category == PriorityCategory.RULE_DYNAMIC:
span.set_metric(SAMPLING_RULE_DECISION, sample_rate)
elif priority_category == "default":
mechanism = SamplingMechanism.REMOTE_DYNAMIC_RULE
elif priority_category == PriorityCategory.DEFAULT:
mechanism = SamplingMechanism.DEFAULT
elif priority_category == "auto":
elif priority_category == PriorityCategory.AUTO:
mechanism = SamplingMechanism.AGENT_RATE
span.set_metric(SAMPLING_AGENT_DECISION, sample_rate)
priorities = _CATEGORY_TO_PRIORITIES[priority_category]
Expand Down
6 changes: 4 additions & 2 deletions ddtrace/internal/telemetry/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
from .constants import TELEMETRY_TRACE_PEER_SERVICE_MAPPING
from .constants import TELEMETRY_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED
from .constants import TELEMETRY_TRACE_SAMPLING_LIMIT
from .constants import TELEMETRY_TRACE_SAMPLING_RULES
from .constants import TELEMETRY_TRACE_SPAN_ATTRIBUTE_SCHEMA
from .constants import TELEMETRY_TRACE_WRITER_BUFFER_SIZE_BYTES
from .constants import TELEMETRY_TRACE_WRITER_INTERVAL_SECONDS
Expand Down Expand Up @@ -386,6 +385,9 @@ def _telemetry_entry(self, cfg_name: str) -> Tuple[str, str, _ConfigSource]:
elif cfg_name == "_trace_sample_rate":
name = "trace_sample_rate"
value = str(item.value())
elif cfg_name == "_trace_sampling_rules":
name = "trace_sampling_rules"
value = str(item.value())
elif cfg_name == "logs_injection":
name = "logs_injection_enabled"
value = "true" if item.value() else "false"
Expand Down Expand Up @@ -428,6 +430,7 @@ def _app_started_event(self, register_app_shutdown=True):
self._telemetry_entry("_sca_enabled"),
self._telemetry_entry("_dsm_enabled"),
self._telemetry_entry("_trace_sample_rate"),
self._telemetry_entry("_trace_sampling_rules"),
self._telemetry_entry("logs_injection"),
self._telemetry_entry("trace_http_header_tags"),
self._telemetry_entry("tags"),
Expand Down Expand Up @@ -462,7 +465,6 @@ def _app_started_event(self, register_app_shutdown=True):
(TELEMETRY_TRACE_SAMPLING_LIMIT, config._trace_rate_limit, "unknown"),
(TELEMETRY_SPAN_SAMPLING_RULES, config._sampling_rules, "unknown"),
(TELEMETRY_SPAN_SAMPLING_RULES_FILE, config._sampling_rules_file, "unknown"),
(TELEMETRY_TRACE_SAMPLING_RULES, config._trace_sampling_rules, "unknown"),
(TELEMETRY_PRIORITY_SAMPLING, config._priority_sampling, "unknown"),
(TELEMETRY_PARTIAL_FLUSH_ENABLED, config._partial_flush_enabled, "unknown"),
(TELEMETRY_PARTIAL_FLUSH_MIN_SPANS, config._partial_flush_min_spans, "unknown"),
Expand Down
32 changes: 25 additions & 7 deletions ddtrace/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from .settings import _config as ddconfig


PROVENANCE_ORDER = ["customer", "dynamic", "default"]

try:
from json.decoder import JSONDecodeError
except ImportError:
Expand Down Expand Up @@ -158,7 +160,7 @@ def _choose_priority_category(self, sampler):
elif isinstance(sampler, _AgentRateSampler):
return _PRIORITY_CATEGORY.AUTO
else:
return _PRIORITY_CATEGORY.RULE
return _PRIORITY_CATEGORY.RULE_DEF

def _make_sampling_decision(self, span):
# type: (Span) -> Tuple[bool, BaseSampler]
Expand Down Expand Up @@ -204,7 +206,7 @@ class DatadogSampler(RateByServiceSampler):
per second.
"""

__slots__ = ("limiter", "rules")
__slots__ = ("limiter", "rules", "default_sample_rate")

NO_RATE_LIMIT = -1
# deprecate and remove the DEFAULT_RATE_LIMIT field from DatadogSampler
Expand All @@ -228,7 +230,7 @@ def __init__(
"""
# Use default sample rate of 1.0
super(DatadogSampler, self).__init__()

self.default_sample_rate = default_sample_rate
if default_sample_rate is None:
if ddconfig._get_source("_trace_sample_rate") != "default":
default_sample_rate = float(ddconfig._trace_sample_rate)
Expand All @@ -239,7 +241,7 @@ def __init__(
if rules is None:
env_sampling_rules = ddconfig._trace_sampling_rules
if env_sampling_rules:
rules = self._parse_rules_from_env_variable(env_sampling_rules)
rules = self._parse_rules_from_str(env_sampling_rules)
else:
rules = []
self.rules = rules
Expand Down Expand Up @@ -268,7 +270,8 @@ def __str__(self):

__repr__ = __str__

def _parse_rules_from_env_variable(self, rules):
@staticmethod
def _parse_rules_from_str(rules):
# type: (str) -> List[SamplingRule]
sampling_rules = []
try:
Expand All @@ -283,13 +286,22 @@ def _parse_rules_from_env_variable(self, rules):
name = rule.get("name", SamplingRule.NO_RULE)
resource = rule.get("resource", SamplingRule.NO_RULE)
tags = rule.get("tags", SamplingRule.NO_RULE)
provenance = rule.get("provenance", "default")
try:
sampling_rule = SamplingRule(
sample_rate=sample_rate, service=service, name=name, resource=resource, tags=tags
sample_rate=sample_rate,
service=service,
name=name,
resource=resource,
tags=tags,
provenance=provenance,
)
except ValueError as e:
raise ValueError("Error creating sampling rule {}: {}".format(json.dumps(rule), e))
sampling_rules.append(sampling_rule)

# Sort the sampling_rules list using a lambda function as the key
sampling_rules = sorted(sampling_rules, key=lambda rule: PROVENANCE_ORDER.index(rule.provenance))
return sampling_rules

def sample(self, span):
Expand Down Expand Up @@ -320,7 +332,13 @@ def sample(self, span):
def _choose_priority_category_with_rule(self, rule, sampler):
# type: (Optional[SamplingRule], BaseSampler) -> str
if rule:
return _PRIORITY_CATEGORY.RULE
provenance = rule.provenance
if provenance == "customer":
return _PRIORITY_CATEGORY.RULE_CUSTOMER
if provenance == "dynamic":
return _PRIORITY_CATEGORY.RULE_DYNAMIC
return _PRIORITY_CATEGORY.RULE_DEF

if self.limiter._has_been_configured:
return _PRIORITY_CATEGORY.USER
return super(DatadogSampler, self)._choose_priority_category(sampler)
5 changes: 4 additions & 1 deletion ddtrace/sampling_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
name=NO_RULE, # type: Any
resource=NO_RULE, # type: Any
tags=NO_RULE, # type: Any
provenance="default", # type: str
):
# type: (...) -> None
"""
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(
self.service = self.choose_matcher(service)
self.name = self.choose_matcher(name)
self.resource = self.choose_matcher(resource)
self.provenance = provenance

@property
def sample_rate(self):
Expand Down Expand Up @@ -236,13 +238,14 @@ def choose_matcher(self, prop):
return GlobMatcher(prop) if prop != SamplingRule.NO_RULE else SamplingRule.NO_RULE

def __repr__(self):
return "{}(sample_rate={!r}, service={!r}, name={!r}, resource={!r}, tags={!r})".format(
return "{}(sample_rate={!r}, service={!r}, name={!r}, resource={!r}, tags={!r}, provenance={!r})".format(
self.__class__.__name__,
self.sample_rate,
self._no_rule_or_self(self.service),
self._no_rule_or_self(self.name),
self._no_rule_or_self(self.resource),
self._no_rule_or_self(self.tags),
self.provenance,
)

__str__ = __repr__
Expand Down
Loading

0 comments on commit 97af079

Please sign in to comment.