From 9f0a3db02d2fb135e4ddc6bc03d35e2b8d7f23a2 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 13 Jun 2024 09:13:34 -0700 Subject: [PATCH] [processor/probabilisticsampling] encoded sampling probability (support OTEP 235) (#31894) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **Description:** Creates new sampler modes named "equalizing" and "proportional". Preserves existing functionality under the mode named "hash_seed". Fixes #31918 This is the final step in a sequence, the whole of this work was factored into 3+ PRs, including the new `pkg/sampling` and the previous step, #31946. The two new Sampler modes enable mixing OTel sampling SDKs with Collectors in a consistent way. The existing hash_seed mode is also a consistent sampling mode, which makes it possible to have a 1:1 mapping between its decisions and the OTEP 235 randomness and threshold values. Specifically, the 14-bit hash value and sampling probability are mapped into 56-bit R-value and T-value encodings, so that all sampling decisions in all modes include threshold information. This implements the semantic conventions of https://github.com/open-telemetry/semantic-conventions/pull/793, namely the `sampling.randomness` and `sampling.threshold` attributes used for logs where there is no tracestate. The default sampling mode remains HashSeed. We consider a future change of default to Proportional to be desirable, because: 1. Sampling probability is the same, only the hashing algorithm changes 2. Proportional respects and preserves information about earlier sampling decisions, which HashSeed can't do, so it has greater interoperability with OTel SDKs which may also adopt OTEP 235 samplers. **Link to tracking Issue:** Draft for https://github.com/open-telemetry/opentelemetry-specification/issues/3602. Previously https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/24811, see also https://github.com/open-telemetry/oteps/pull/235 Part of #29738 **Testing:** New testing has been added. **Documentation:** ✅ --------- Co-authored-by: Juraci Paixão Kröhling --- .chloggen/probabilisticsampler_modes.yaml | 27 + .../probabilisticsamplerprocessor/README.md | 156 +++- .../probabilisticsamplerprocessor/config.go | 65 +- .../config_test.go | 10 +- .../probabilisticsamplerprocessor/factory.go | 6 +- .../metadata/generated_telemetry_test.go | 3 +- .../logsprocessor.go | 130 +++- .../logsprocessor_test.go | 369 ++++++++-- .../sampler_mode.go | 304 ++++++-- .../sampler_mode_test.go | 31 + .../testdata/config.yaml | 2 + .../testdata/invalid_inf.yaml | 17 + .../testdata/invalid_prec.yaml | 18 + .../testdata/invalid_small.yaml | 18 + .../testdata/invalid_zero.yaml | 18 + .../tracesprocessor.go | 95 ++- .../tracesprocessor_test.go | 693 +++++++++++++++++- 17 files changed, 1807 insertions(+), 155 deletions(-) create mode 100644 .chloggen/probabilisticsampler_modes.yaml create mode 100644 processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml create mode 100644 processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml create mode 100644 processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml create mode 100644 processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml diff --git a/.chloggen/probabilisticsampler_modes.yaml b/.chloggen/probabilisticsampler_modes.yaml new file mode 100644 index 000000000000..e823b78e1c2b --- /dev/null +++ b/.chloggen/probabilisticsampler_modes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: probabilisticsamplerprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add Proportional and Equalizing sampling modes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31918] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Both the existing hash_seed mode and the two new modes use OTEP 235 semantic conventions to encode sampling probability. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/probabilisticsamplerprocessor/README.md b/processor/probabilisticsamplerprocessor/README.md index 57728dc9d6f9..596ad23a38a6 100644 --- a/processor/probabilisticsamplerprocessor/README.md +++ b/processor/probabilisticsamplerprocessor/README.md @@ -1,3 +1,4 @@ + # Probabilistic Sampling Processor @@ -115,7 +116,9 @@ interpreted as a percentage, with values >= 100 equal to 100% sampling. The logs sampling priority attribute is configured via `sampling_priority`. -## Sampling algorithm +## Mode Selection + +There are three sampling modes available. All modes are consistent. ### Hash seed @@ -135,7 +138,154 @@ In order for hashing to be consistent, all collectors for a given tier at different collector tiers to support additional sampling requirements. -This mode uses 14 bits of sampling precision. +This mode uses 14 bits of information in its sampling decision; the +default `sampling_precision`, which is 4 hexadecimal digits, exactly +encodes this information. + +This mode is selected by default. + +#### Hash seed: Use-cases + +The hash seed mode is most useful in logs sampling, because it can be +applied to units of telemetry other than TraceID. For example, a +deployment consisting of 100 pods can be sampled according to the +`service.instance.id` resource attribute. In this case, 10% sampling +implies collecting log records from an expected value of 10 pods. + +### Proportional + +OpenTelemetry specifies a consistent sampling mechanism using 56 bits +of randomness, which may be obtained from the Trace ID according to +the W3C Trace Context Level 2 specification. Randomness can also be +explicly encoding in the OpenTelemetry `tracestate` field, where it is +known as the R-value. + +This mode is named because it reduces the number of items transmitted +proportionally, according to the sampling probability. In this mode, +items are selected for sampling without considering how much they were +already sampled by preceding samplers. + +This mode uses 56 bits of information in its calculations. The +default `sampling_precision` (4) will cause thresholds to be rounded +in some cases when they contain more than 16 significant bits. + +#### Proportional: Use-cases + +The proportional mode is generally applicable in trace sampling, +because it is based on OpenTelemetry and W3C specifications. This +mode is selected by default, because it enforces a predictable +(probabilistic) ratio between incoming items and outgoing items of +telemetry. No matter how SDKs and other sources of telemetry have +been configured with respect to sampling, a collector configured with +25% proportional sampling will output (an expected value of) 1 item +for every 4 items input. + +### Equalizing + +This mode uses the same randomness mechanism as the propotional +sampling mode, in this case considering how much each item was already +sampled by preceding samplers. This mode can be used to lower +sampling probability to a minimum value across a whole pipeline, +making it possible to conditionally adjust sampling probabilities. + +This mode compares a 56 bit threshold against the configured sampling +probability and updates when the threshold is larger. The default +`sampling_precision` (4) will cause updated thresholds to be rounded +in some cases when they contain more than 16 significant bits. + +#### Equalizing: Use-cases + +The equalizing mode is useful in collector deployments where client +SDKs have mixed sampling configuration and the user wants to apply a +uniform sampling probability across the system. For example, a user's +system consists of mostly components developed in-house, but also some +third-party software. Seeking to lower the overall cost of tracing, +the configures 10% sampling in the samplers for all of their in-house +components. This leaves third-party software components unsampled, +making the savings less than desired. In this case, the user could +configure a 10% equalizing probabilistic sampler. Already-sampled +items of telemetry from the in-house components will pass-through one +for one in this scenario, while items of telemetry from third-party +software will be sampled by the intended amount. + +## Sampling threshold information + +In all modes, information about the effective sampling probability is +added into the item of telemetry. The random variable that was used +may also be recorded, in case it was not derived from the TraceID +using a standard algorithm. + +For traces, threshold and optional randomness information are encoded +in the W3C Trace Context `tracestate` fields. The tracestate is +divided into sections according to a two-character vendor code; +OpenTelemetry uses "ot" as its section designator. Within the +OpenTelemetry section, the sampling threshold is encoded using "th" +and the optional random variable is encoded using "rv". + +For example, 25% sampling is encoded in a tracing Span as: + +``` +tracestate: ot=th:c +``` + +Users can randomness values in this way, independently, making it +possible to apply consistent sampling across traces for example. If +the Trace was initialized with pre-determined randomness value +`9b8233f7e3a151` and 100% sampling, it would read: + +``` +tracestate: ot=th:0;rv:9b8233f7e3a151 +``` + +This component, using either proportional or equalizing modes, could +apply 50% sampling the Span. This span with randomness value +`9b8233f7e3a151` is consistently sampled at 50% because the threshold, +when zero padded (i.e., `80000000000000`), is less than the randomess +value. The resulting span will have the following tracestate: + +``` +tracestate: ot=th:8;rv:9b8233f7e3a151 +``` + +For log records, threshold and randomness information are encoded in +the log record itself, using attributes. For example, 25% sampling +with an explicit randomness value is encoded as: + +``` +sampling.threshold: c +sampling.randomness: e05a99c8df8d32 +``` + +### Sampling precision + +When encoding sampling probability in the form of a threshold, +variable precision is permitted making it possible for the user to +restrict sampling probabilities to rounded numbers of fixed width. + +Because the threshold is encoded using hexadecimal digits, each digit +contributes 4 bits of information. One digit of sampling precision +can express exact sampling probabilities 1/16, 2/16, ... through +16/16. Two digits of sampling precision can express exact sampling +probabilities 1/256, 2/256, ... through 256/256. With N digits of +sampling precision, there are exactly `(2^N)-1` exactly representable +probabilities. + +Depending on the mode, there are different maximum reasonable settings +for this parameter. + +- The `hash_seed` mode uses a 14-bit hash function, therefore + precision 4 completely captures the available information. +- The `equalizing` mode configures a sampling probability after + parsing a `float32` value, which contains 20 bits of precision, + therefore precision 5 completely captures the available information. +- The `proportional` mode configures its ratio using a `float32` + value, however it carries out the arithmetic using 56-bits of + precision. In this mode, increasing precision has the effect + of preserving precision applied by preceding samplers. + +In cases where larger precision is configured than is actually +available, the added precision has no effect because trailing zeros +are eliminated by the encoding. ### Error handling @@ -153,9 +303,11 @@ false, in which case erroneous data will pass through the processor. The following configuration options can be modified: +- `mode` (string, optional): One of "proportional", "equalizing", or "hash_seed"; the default is "proportional" unless either `hash_seed` is configured or `attribute_source` is set to `record`. - `sampling_percentage` (32-bit floating point, required): Percentage at which items are sampled; >= 100 samples all items, 0 rejects all items. - `hash_seed` (32-bit unsigned integer, optional, default = 0): An integer used to compute the hash algorithm. Note that all collectors for a given tier (e.g. behind the same load balancer) should have the same hash_seed. - `fail_closed` (boolean, optional, default = true): Whether to reject items with sampling-related errors. +- `sampling_precision` (integer, optional, default = 4): Determines the number of hexadecimal digits used to encode the sampling threshold. Permitted values are 1..14. ### Logs-specific configuration diff --git a/processor/probabilisticsamplerprocessor/config.go b/processor/probabilisticsamplerprocessor/config.go index c4bc83eb6b11..b79d3136b02d 100644 --- a/processor/probabilisticsamplerprocessor/config.go +++ b/processor/probabilisticsamplerprocessor/config.go @@ -5,8 +5,11 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "fmt" + "math" "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) type AttributeSource string @@ -35,6 +38,33 @@ type Config struct { // different sampling rates, configuring different seeds avoids that. HashSeed uint32 `mapstructure:"hash_seed"` + // Mode selects the sampling behavior. Supported values: + // + // - "hash_seed": the legacy behavior of this processor. + // Using an FNV hash combined with the HashSeed value, this + // sampler performs a non-consistent probabilistic + // downsampling. The number of spans output is expected to + // equal SamplingPercentage (as a ratio) times the number of + // spans inpout, assuming good behavior from FNV and good + // entropy in the hashed attributes or TraceID. + // + // - "equalizing": Using an OTel-specified consistent sampling + // mechanism, this sampler selectively reduces the effective + // sampling probability of arriving spans. This can be + // useful to select a small fraction of complete traces from + // a stream with mixed sampling rates. The rate of spans + // passing through depends on how much sampling has already + // been applied. If an arriving span was head sampled at + // the same probability it passes through. If the span + // arrives with lower probability, a warning is logged + // because it means this sampler is configured with too + // large a sampling probability to ensure complete traces. + // + // - "proportional": Using an OTel-specified consistent sampling + // mechanism, this sampler reduces the effective sampling + // probability of each span by `SamplingProbability`. + Mode SamplerMode `mapstructure:"mode"` + // FailClosed indicates to not sample data (the processor will // fail "closed") in case of error, such as failure to parse // the tracestate field or missing the randomness attribute. @@ -45,6 +75,14 @@ type Config struct { // despite errors using priority. FailClosed bool `mapstructure:"fail_closed"` + // SamplingPrecision is how many hex digits of sampling + // threshold will be encoded, from 1 up to 14. Default is 4. + // 0 is treated as full precision. + SamplingPrecision int `mapstructure:"sampling_precision"` + + /////// + // Logs only fields below. + // AttributeSource (logs only) defines where to look for the attribute in from_attribute. The allowed values are // `traceID` or `record`. Default is `traceID`. AttributeSource `mapstructure:"attribute_source"` @@ -61,11 +99,34 @@ var _ component.Config = (*Config)(nil) // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { - if cfg.SamplingPercentage < 0 { - return fmt.Errorf("negative sampling rate: %.2f", cfg.SamplingPercentage) + pct := float64(cfg.SamplingPercentage) + + if math.IsInf(pct, 0) || math.IsNaN(pct) { + return fmt.Errorf("sampling rate is invalid: %f%%", cfg.SamplingPercentage) + } + ratio := pct / 100.0 + + switch { + case ratio < 0: + return fmt.Errorf("sampling rate is negative: %f%%", cfg.SamplingPercentage) + case ratio == 0: + // Special case + case ratio < sampling.MinSamplingProbability: + // Too-small case + return fmt.Errorf("sampling rate is too small: %g%%", cfg.SamplingPercentage) + default: + // Note that ratio > 1 is specifically allowed by the README, taken to mean 100% } + if cfg.AttributeSource != "" && !validAttributeSource[cfg.AttributeSource] { return fmt.Errorf("invalid attribute source: %v. Expected: %v or %v", cfg.AttributeSource, traceIDAttributeSource, recordAttributeSource) } + + if cfg.SamplingPrecision == 0 { + return fmt.Errorf("invalid sampling precision: 0") + } else if cfg.SamplingPrecision > sampling.NumHexDigits { + return fmt.Errorf("sampling precision is too great, should be <= 14: %d", cfg.SamplingPrecision) + } + return nil } diff --git a/processor/probabilisticsamplerprocessor/config_test.go b/processor/probabilisticsamplerprocessor/config_test.go index 3200f049569d..46477ca0c52e 100644 --- a/processor/probabilisticsamplerprocessor/config_test.go +++ b/processor/probabilisticsamplerprocessor/config_test.go @@ -26,6 +26,8 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ SamplingPercentage: 15.3, + SamplingPrecision: 4, + Mode: "proportional", AttributeSource: "traceID", FailClosed: true, }, @@ -34,7 +36,9 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ SamplingPercentage: 15.3, + SamplingPrecision: defaultPrecision, HashSeed: 22, + Mode: "", AttributeSource: "record", FromAttribute: "foo", SamplingPriority: "bar", @@ -68,7 +72,11 @@ func TestLoadInvalidConfig(t *testing.T) { file string contains string }{ - {"invalid_negative.yaml", "negative sampling rate"}, + {"invalid_negative.yaml", "sampling rate is negative"}, + {"invalid_small.yaml", "sampling rate is too small"}, + {"invalid_inf.yaml", "sampling rate is invalid: +Inf%"}, + {"invalid_prec.yaml", "sampling precision is too great"}, + {"invalid_zero.yaml", "invalid sampling precision"}, } { t.Run(test.file, func(t *testing.T) { factories, err := otelcoltest.NopFactories() diff --git a/processor/probabilisticsamplerprocessor/factory.go b/processor/probabilisticsamplerprocessor/factory.go index 35d594eb8443..ec8a96afb91d 100644 --- a/processor/probabilisticsamplerprocessor/factory.go +++ b/processor/probabilisticsamplerprocessor/factory.go @@ -40,8 +40,10 @@ func NewFactory() processor.Factory { func createDefaultConfig() component.Config { return &Config{ - AttributeSource: defaultAttributeSource, - FailClosed: true, + AttributeSource: defaultAttributeSource, + FailClosed: true, + Mode: modeUnset, + SamplingPrecision: defaultPrecision, } } diff --git a/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go b/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go index d1e2cff5b34e..ea85bfdad123 100644 --- a/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/probabilisticsamplerprocessor/internal/metadata/generated_telemetry_test.go @@ -6,14 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" ) type mockMeter struct { diff --git a/processor/probabilisticsamplerprocessor/logsprocessor.go b/processor/probabilisticsamplerprocessor/logsprocessor.go index 0a7d0b8b892b..1a8e81507e6e 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor.go @@ -5,6 +5,7 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" + "errors" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -20,33 +21,107 @@ type logsProcessor struct { sampler dataSampler samplingPriority string + precision int failClosed bool logger *zap.Logger } type recordCarrier struct { record plog.LogRecord + + parsed struct { + tvalue string + threshold sampling.Threshold + + rvalue string + randomness sampling.Randomness + } } var _ samplingCarrier = &recordCarrier{} -func newLogRecordCarrier(l plog.LogRecord) samplingCarrier { - return &recordCarrier{ +func (rc *recordCarrier) get(key string) string { + val, ok := rc.record.Attributes().Get(key) + if !ok || val.Type() != pcommon.ValueTypeStr { + return "" + } + return val.Str() +} + +func newLogRecordCarrier(l plog.LogRecord) (samplingCarrier, error) { + var ret error + carrier := &recordCarrier{ record: l, } + if tvalue := carrier.get("sampling.threshold"); len(tvalue) != 0 { + th, err := sampling.TValueToThreshold(tvalue) + if err != nil { + ret = errors.Join(err, ret) + } else { + carrier.parsed.tvalue = tvalue + carrier.parsed.threshold = th + } + } + if rvalue := carrier.get("sampling.randomness"); len(rvalue) != 0 { + rnd, err := sampling.RValueToRandomness(rvalue) + if err != nil { + ret = errors.Join(err, ret) + } else { + carrier.parsed.rvalue = rvalue + carrier.parsed.randomness = rnd + } + } + return carrier, ret +} + +func (rc *recordCarrier) threshold() (sampling.Threshold, bool) { + return rc.parsed.threshold, len(rc.parsed.tvalue) != 0 +} + +func (rc *recordCarrier) explicitRandomness() (randomnessNamer, bool) { + if len(rc.parsed.rvalue) == 0 { + return newMissingRandomnessMethod(), false + } + return newSamplingRandomnessMethod(rc.parsed.randomness), true +} + +func (rc *recordCarrier) updateThreshold(th sampling.Threshold) error { + exist, has := rc.threshold() + if has && sampling.ThresholdLessThan(th, exist) { + return sampling.ErrInconsistentSampling + } + rc.record.Attributes().PutStr("sampling.threshold", th.TValue()) + return nil +} + +func (rc *recordCarrier) setExplicitRandomness(rnd randomnessNamer) { + rc.parsed.randomness = rnd.randomness() + rc.parsed.rvalue = rnd.randomness().RValue() + rc.record.Attributes().PutStr("sampling.randomness", rnd.randomness().RValue()) +} + +func (rc *recordCarrier) clearThreshold() { + rc.parsed.threshold = sampling.NeverSampleThreshold + rc.parsed.tvalue = "" + rc.record.Attributes().Remove("sampling.threshold") +} + +func (rc *recordCarrier) reserialize() error { + return nil } -func (*neverSampler) randomnessFromLogRecord(_ plog.LogRecord) (randomnessNamer, samplingCarrier, error) { +func (*neverSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { // We return a fake randomness value, since it will not be used. // This avoids a consistency check error for missing randomness. - return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil + lrc, err := newLogRecordCarrier(logRec) + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), lrc, err } // randomnessFromLogRecord (hashingSampler) uses a hash function over -// the TraceID +// the TraceID or logs attribute source. func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { rnd := newMissingRandomnessMethod() - lrc := newLogRecordCarrier(logRec) + lrc, err := newLogRecordCarrier(logRec) if th.logsTraceIDEnabled { value := logRec.TraceID() @@ -67,15 +142,52 @@ func (th *hashingSampler) randomnessFromLogRecord(logRec plog.LogRecord) (random } } - return rnd, lrc, nil + if err != nil { + // The sampling.randomness or sampling.threshold attributes + // had a parse error, in this case. + lrc = nil + } else if _, hasRnd := lrc.explicitRandomness(); hasRnd { + // If the log record contains a randomness value, do not update. + err = ErrRandomnessInUse + lrc = nil + } else if _, hasTh := lrc.threshold(); hasTh { + // If the log record contains a threshold value, do not update. + err = ErrThresholdInUse + lrc = nil + } else if !isMissing(rnd) { + // When no sampling information is already present and we have + // calculated new randomness, add it to the record. + lrc.setExplicitRandomness(rnd) + } + + return rnd, lrc, err +} + +// randomnessFromLogRecord (hashingSampler) uses OTEP 235 semantic +// conventions basing its deicsion only on the TraceID. +func (ctc *consistentTracestateCommon) randomnessFromLogRecord(logRec plog.LogRecord) (randomnessNamer, samplingCarrier, error) { + lrc, err := newLogRecordCarrier(logRec) + rnd := newMissingRandomnessMethod() + + if err != nil { + // Parse error in sampling.randomness or sampling.threshold + lrc = nil + } else if rv, hasRnd := lrc.explicitRandomness(); hasRnd { + rnd = rv + } else if tid := logRec.TraceID(); !tid.IsEmpty() { + rnd = newTraceIDW3CSpecMethod(sampling.TraceIDToRandomness(tid)) + } + + return rnd, lrc, err } // newLogsProcessor returns a processor.LogsProcessor that will perform head sampling according to the given // configuration. func newLogsProcessor(ctx context.Context, set processor.Settings, nextConsumer consumer.Logs, cfg *Config) (processor.Logs, error) { lsp := &logsProcessor{ - sampler: makeSampler(cfg), + sampler: makeSampler(cfg, true), samplingPriority: cfg.SamplingPriority, + precision: cfg.SamplingPrecision, failClosed: cfg.FailClosed, logger: set.Logger, } @@ -144,7 +256,7 @@ func (lsp *logsProcessor) logRecordToPriorityThreshold(logRec plog.LogRecord) sa minProb = float64(localPriority.Int()) / 100.0 } if minProb != 0 { - if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, defaultPrecision); err == nil { + if th, err := sampling.ProbabilityToThresholdWithPrecision(minProb, lsp.precision); err == nil { // The record has supplied a valid alternative sampling probability return th } diff --git a/processor/probabilisticsamplerprocessor/logsprocessor_test.go b/processor/probabilisticsamplerprocessor/logsprocessor_test.go index e0181ca6e853..7cfeb896a230 100644 --- a/processor/probabilisticsamplerprocessor/logsprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/logsprocessor_test.go @@ -18,6 +18,8 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) func TestNewLogsProcessor(t *testing.T) { @@ -78,6 +80,11 @@ func TestLogsSampling(t *testing.T) { name: "nothing", cfg: &Config{ SamplingPercentage: 0, + + // FailClosed because the test + // includes one empty TraceID which + // would otherwise fail open. + FailClosed: true, }, received: 0, }, @@ -86,6 +93,7 @@ func TestLogsSampling(t *testing.T) { cfg: &Config{ SamplingPercentage: 50, AttributeSource: traceIDAttributeSource, + Mode: HashSeed, FailClosed: true, }, // Note: This count excludes one empty TraceID @@ -119,7 +127,11 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "foo", - FailClosed: true, + + // FailClosed: true so that we do not + // sample when the attribute is + // missing. + FailClosed: true, }, received: 23, }, @@ -129,7 +141,11 @@ func TestLogsSampling(t *testing.T) { SamplingPercentage: 50, AttributeSource: recordAttributeSource, FromAttribute: "bar", - FailClosed: true, + + // FailClosed: true so that we do not + // sample when the attribute is + // missing. + FailClosed: true, }, received: 29, // probabilistic... doesn't yield the same results as foo }, @@ -191,76 +207,319 @@ func TestLogsSampling(t *testing.T) { } } -func TestLogsMissingRandomness(t *testing.T) { - type test struct { - pct float32 - source AttributeSource - failClosed bool - sampled bool - } +func TestLogsSamplingState(t *testing.T) { + // This hard-coded TraceID will sample at 50% and not at 49%. + // The equivalent randomness is 0x80000000000000. + var defaultTID = mustParseTID("fefefefefefefefefe80000000000000") - for _, tt := range []test{ - {0, recordAttributeSource, true, false}, - {50, recordAttributeSource, true, false}, - {100, recordAttributeSource, true, false}, - - {0, recordAttributeSource, false, false}, - {50, recordAttributeSource, false, true}, - {100, recordAttributeSource, false, true}, - - {0, traceIDAttributeSource, true, false}, - {50, traceIDAttributeSource, true, false}, - {100, traceIDAttributeSource, true, false}, - - {0, traceIDAttributeSource, false, false}, - {50, traceIDAttributeSource, false, true}, - {100, traceIDAttributeSource, false, true}, - } { - t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed), func(t *testing.T) { + tests := []struct { + name string + cfg *Config + tid pcommon.TraceID + attrs map[string]any + log string + sampled bool + adjCount float64 + expect map[string]any + }{ + { + name: "100 percent traceID", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: Proportional, + }, + tid: defaultTID, + attrs: map[string]any{ + "ignored": "value", + }, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "sampling.threshold": "0", + "ignored": "value", + }, + }, + { + name: "100 percent traceID hash_seed", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: "hash_seed", + HashSeed: 22, + }, + attrs: map[string]any{ + "K": "V", + }, + tid: defaultTID, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "K": "V", + "sampling.threshold": "0", + "sampling.randomness": randomnessFromBytes(defaultTID[:], 22).RValue(), + }, + }, + { + name: "100 percent attribute", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: recordAttributeSource, + FromAttribute: "veryrandom", + HashSeed: 49, + }, + attrs: map[string]any{ + "veryrandom": "1234", + }, + sampled: true, + adjCount: 1, + expect: map[string]any{ + "sampling.threshold": "0", + "sampling.randomness": randomnessFromBytes([]byte("1234"), 49).RValue(), + "veryrandom": "1234", + }, + }, + { + name: "0 percent traceID", + cfg: &Config{ + SamplingPercentage: 0, + AttributeSource: traceIDAttributeSource, + }, + tid: defaultTID, + sampled: false, + }, + { + name: "10 percent priority sampled incoming randomness", + cfg: &Config{ + SamplingPercentage: 0, + AttributeSource: traceIDAttributeSource, + SamplingPriority: "veryrandom", + SamplingPrecision: 6, + }, + tid: defaultTID, + attrs: map[string]any{ + "sampling.randomness": "e6147c00000000", + "veryrandom": 10.125, + }, + sampled: true, + adjCount: 9.876654321, + expect: map[string]any{ + "sampling.randomness": "e6147c00000000", + "sampling.threshold": "e6147b", + "veryrandom": 10.125, + }, + }, + { + name: "25 percent incoming", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Proportional, + }, + tid: mustParseTID("fefefefefefefefefef0000000000000"), + attrs: map[string]any{ + "sampling.threshold": "c", + }, + sampled: true, + adjCount: 8, + expect: map[string]any{ + "sampling.threshold": "e", + }, + }, + { + name: "25 percent arriving inconsistent", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Equalizing, + FailClosed: true, + }, + tid: mustParseTID("fefefefefefefefefeb0000000000000"), + attrs: map[string]any{ + // "c" is an invalid threshold for the TraceID + // i.e., T <= R is false, should be rejected. + "sampling.threshold": "c", // Corresponds with 25% + }, + log: "inconsistent arriving threshold", + sampled: false, + }, + { + name: "25 percent arriving equalizing", + cfg: &Config{ + SamplingPercentage: 50, + AttributeSource: traceIDAttributeSource, + Mode: Equalizing, + SamplingPriority: "prio", + }, + tid: mustParseTID("fefefefefefefefefefefefefefefefe"), + attrs: map[string]any{ + "sampling.threshold": "c", // Corresponds with 25% + "prio": 37, // Lower than 50, higher than 25 + }, + sampled: true, + adjCount: 4, + expect: map[string]any{ + "sampling.threshold": "c", + "prio": int64(37), + }, + log: "cannot raise existing sampling probability", + }, + { + name: "hash_seed with spec randomness", + cfg: &Config{ + SamplingPercentage: 100, + AttributeSource: traceIDAttributeSource, + Mode: HashSeed, + }, + tid: defaultTID, + attrs: map[string]any{ + "sampling.randomness": "f2341234123412", + }, + sampled: true, + adjCount: 0, // No threshold + log: "item has sampling randomness", + expect: map[string]any{ + "sampling.randomness": "f2341234123412", + }, + }, + } + for _, tt := range tests { + t.Run(fmt.Sprint(tt.name), func(t *testing.T) { - ctx := context.Background() - logs := plog.NewLogs() - record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() - record.SetTraceID(pcommon.TraceID{}) // invalid TraceID - - cfg := &Config{ - SamplingPercentage: tt.pct, - HashSeed: defaultHashSeed, - FailClosed: tt.failClosed, - AttributeSource: tt.source, - FromAttribute: "unused", + sink := new(consumertest.LogsSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg } - sink := new(consumertest.LogsSink) set := processortest.NewNopSettings() - // Note: there is a debug-level log we are expecting when FailClosed - // causes a drop. logger, observed := observer.New(zap.DebugLevel) set.Logger = zap.New(logger) - lp, err := newLogsProcessor(ctx, set, sink, cfg) + tsp, err := newLogsProcessor(context.Background(), set, sink, cfg) require.NoError(t, err) - err = lp.ConsumeLogs(ctx, logs) + logs := plog.NewLogs() + lr := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + record := lr.AppendEmpty() + record.SetTimestamp(pcommon.Timestamp(time.Unix(1649400860, 0).Unix())) + record.SetSeverityNumber(plog.SeverityNumberDebug) + record.SetTraceID(tt.tid) + require.NoError(t, record.Attributes().FromRaw(tt.attrs)) + + err = tsp.ConsumeLogs(context.Background(), logs) require.NoError(t, err) + if len(tt.log) == 0 { + require.Equal(t, 0, len(observed.All()), "should not have logs: %v", observed.All()) + require.Equal(t, "", tt.log) + } else { + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), tt.log) + } + sampledData := sink.AllLogs() + if tt.sampled { require.Equal(t, 1, len(sampledData)) assert.Equal(t, 1, sink.LogRecordCount()) - } else { - require.Equal(t, 0, len(sampledData)) - assert.Equal(t, 0, sink.LogRecordCount()) - } - - if tt.pct != 0 { - // pct==0 bypasses the randomness check - require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) - require.Contains(t, observed.All()[0].Message, "logs sampler") - require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") - } else { - require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + got := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + gotAttrs := got.Attributes() + require.Equal(t, tt.expect, gotAttrs.AsRaw()) + thVal, hasTh := gotAttrs.Get("sampling.threshold") + if tt.adjCount == 0 { + require.False(t, hasTh) + } else { + th, err := sampling.TValueToThreshold(thVal.Str()) + require.NoError(t, err) + if cfg.SamplingPrecision == 0 { + assert.InEpsilon(t, tt.adjCount, th.AdjustedCount(), 1e-9, + "compare %v %v", tt.adjCount, th.AdjustedCount()) + } else { + assert.InEpsilon(t, tt.adjCount, th.AdjustedCount(), 1e-3, + "compare %v %v", tt.adjCount, th.AdjustedCount()) + } + } } }) } } + +func TestLogsMissingRandomness(t *testing.T) { + type test struct { + pct float32 + source AttributeSource + failClosed bool + sampled bool + } + + for _, mode := range AllModes { + for _, tt := range []test{ + {0, recordAttributeSource, true, false}, + {50, recordAttributeSource, true, false}, + {100, recordAttributeSource, true, false}, + + {0, recordAttributeSource, false, false}, + {50, recordAttributeSource, false, true}, + {100, recordAttributeSource, false, true}, + + {0, traceIDAttributeSource, true, false}, + {50, traceIDAttributeSource, true, false}, + {100, traceIDAttributeSource, true, false}, + + {0, traceIDAttributeSource, false, false}, + {50, traceIDAttributeSource, false, true}, + {100, traceIDAttributeSource, false, true}, + } { + t.Run(fmt.Sprint(tt.pct, "_", tt.source, "_", tt.failClosed, "_", mode), func(t *testing.T) { + + ctx := context.Background() + logs := plog.NewLogs() + record := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + record.SetTraceID(pcommon.TraceID{}) // invalid TraceID + record.Attributes().PutStr("unused", "") + + cfg := &Config{ + SamplingPercentage: tt.pct, + Mode: mode, + HashSeed: defaultHashSeed, + FailClosed: tt.failClosed, + AttributeSource: tt.source, + FromAttribute: "unused", + } + + sink := new(consumertest.LogsSink) + set := processortest.NewNopSettings() + // Note: there is a debug-level log we are expecting when FailClosed + // causes a drop. + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + lp, err := newLogsProcessor(ctx, set, sink, cfg) + require.NoError(t, err) + + err = lp.ConsumeLogs(ctx, logs) + require.NoError(t, err) + + sampledData := sink.AllLogs() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.LogRecordCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.LogRecordCount()) + } + + if tt.pct != 0 { + // pct==0 bypasses the randomness check + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "logs sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), "missing randomness") + } else { + require.Equal(t, 0, len(observed.All()), "should have no logs: %v", observed.All()) + } + }) + } + } +} diff --git a/processor/probabilisticsamplerprocessor/sampler_mode.go b/processor/probabilisticsamplerprocessor/sampler_mode.go index 6bf09caa271f..377f717bed09 100644 --- a/processor/probabilisticsamplerprocessor/sampler_mode.go +++ b/processor/probabilisticsamplerprocessor/sampler_mode.go @@ -19,6 +19,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) +const ( + // These four can happen at runtime and be returned by + // randomnessFromXXX() + + ErrInconsistentArrivingTValue samplerError = "inconsistent arriving threshold: item should not have been sampled" + ErrMissingRandomness samplerError = "missing randomness" + ErrRandomnessInUse samplerError = "item has sampling randomness, equalizing or proportional mode recommended" + ErrThresholdInUse samplerError = "item has sampling threshold, equalizing or proportional mode recommended" +) + const ( // Hashing method: The constants below help translate user friendly percentages // to numbers direct used in sampling. @@ -28,22 +38,40 @@ const ( percentageScaleFactor = numHashBuckets / 100.0 ) -// SamplerMode controls the logic used in making a sampling decision. -// The HashSeed mode is the only mode, presently, and it is also the -// default mode. -// -// TODO: In the future, when OTEP 235 is introduced, there will be two -// new modes. +// samplerErrors are conditions reported by the sampler that are somewhat +// ordinary and should log as info-level. +type samplerError string + +var _ error = samplerError("") + +func (s samplerError) Error() string { + return string(s) +} + +// SamplerMode determines which of several modes is used for the +// sampling decision. type SamplerMode string const ( - HashSeed SamplerMode = "hash_seed" - DefaultMode SamplerMode = HashSeed - modeUnset SamplerMode = "" -) + // HashSeed applies the hash/fnv hash function originally used in this component. + HashSeed SamplerMode = "hash_seed" + + // Equalizing uses OpenTelemetry consistent probability + // sampling information (OTEP 235), applies an absolute + // threshold to equalize incoming sampling probabilities. + Equalizing SamplerMode = "equalizing" + + // Proportional uses OpenTelemetry consistent probability + // sampling information (OTEP 235), multiplies incoming + // sampling probaiblities. + Proportional SamplerMode = "proportional" -// ErrMissingRandomness indicates no randomness source was found. -var ErrMissingRandomness = errors.New("missing randomness") + // defaultHashSeed is applied when the mode is unset. + defaultMode SamplerMode = HashSeed + + // modeUnset indicates the user has not configured the mode. + modeUnset SamplerMode = "" +) type randomnessNamer interface { randomness() sampling.Randomness @@ -57,6 +85,8 @@ func (rm randomnessMethod) randomness() sampling.Randomness { } type traceIDHashingMethod struct{ randomnessMethod } +type traceIDW3CSpecMethod struct{ randomnessMethod } +type samplingRandomnessMethod struct{ randomnessMethod } type samplingPriorityMethod struct{ randomnessMethod } type missingRandomnessMethod struct{} @@ -82,12 +112,22 @@ func (traceIDHashingMethod) policyName() string { return "trace_id_hash" } +func (samplingRandomnessMethod) policyName() string { + return "sampling_randomness" +} + +func (traceIDW3CSpecMethod) policyName() string { + return "trace_id_w3c" +} + func (samplingPriorityMethod) policyName() string { return "sampling_priority" } var _ randomnessNamer = missingRandomnessMethod{} var _ randomnessNamer = traceIDHashingMethod{} +var _ randomnessNamer = traceIDW3CSpecMethod{} +var _ randomnessNamer = samplingRandomnessMethod{} var _ randomnessNamer = samplingPriorityMethod{} func newMissingRandomnessMethod() randomnessNamer { @@ -99,6 +139,14 @@ func isMissing(rnd randomnessNamer) bool { return ok } +func newSamplingRandomnessMethod(rnd sampling.Randomness) randomnessNamer { + return samplingRandomnessMethod{randomnessMethod(rnd)} +} + +func newTraceIDW3CSpecMethod(rnd sampling.Randomness) randomnessNamer { + return traceIDW3CSpecMethod{randomnessMethod(rnd)} +} + func newTraceIDHashingMethod(rnd sampling.Randomness) randomnessNamer { return traceIDHashingMethod{randomnessMethod(rnd)} } @@ -114,10 +162,41 @@ func newAttributeHashingMethod(attribute string, rnd sampling.Randomness) random } } -// TODO: Placeholder interface, see #31894 for its future contents, -// will become a non-empty interface. (Linter forces us to write "any".) -type samplingCarrier any +// samplingCarrier conveys information about the underlying data item +// (whether span or log record) through the sampling decision. +type samplingCarrier interface { + // explicitRandomness returns a randomness value and a boolean + // indicating whether the item had sampling randomness + // explicitly set. + explicitRandomness() (randomnessNamer, bool) + + // setExplicitRandomness updates the item with the signal-specific + // encoding for an explicit randomness value. + setExplicitRandomness(randomnessNamer) + + // clearThreshold unsets a sampling threshold, which is used to + // clear information that breaks the expected sampling invariants + // described in OTEP 235. + clearThreshold() + + // threshold returns a sampling threshold and a boolean + // indicating whether the item had sampling threshold + // explicitly set. + threshold() (sampling.Threshold, bool) + + // updateThreshold modifies the sampling threshold. This + // returns an error if the updated sampling threshold has a + // lower adjusted account; the only permissible updates raise + // adjusted count (i.e., reduce sampling probability). + updateThreshold(sampling.Threshold) error + + // reserialize re-encodes the updated sampling information + // into the item, if necessary. For Spans, this re-encodes + // the tracestate. This is a no-op for logs records. + reserialize() error +} +// dataSampler implements the logic of a sampling mode. type dataSampler interface { // decide reports the result based on a probabilistic decision. decide(carrier samplingCarrier) sampling.Threshold @@ -129,11 +208,11 @@ type dataSampler interface { randomnessFromLogRecord(s plog.LogRecord) (randomness randomnessNamer, carrier samplingCarrier, err error) } -var AllModes = []SamplerMode{HashSeed} - func (sm *SamplerMode) UnmarshalText(in []byte) error { switch mode := SamplerMode(in); mode { case HashSeed, + Equalizing, + Proportional, modeUnset: *sm = mode return nil @@ -161,6 +240,12 @@ func (th *hashingSampler) decide(_ samplingCarrier) sampling.Threshold { return th.tvalueThreshold } +// consistentTracestateCommon contains the common aspects of the +// Proportional and Equalizing sampler modes. These samplers sample +// using the TraceID and do not support use of logs source attribute. +type consistentTracestateCommon struct { +} + // neverSampler always decides false. type neverSampler struct { } @@ -169,6 +254,52 @@ func (*neverSampler) decide(_ samplingCarrier) sampling.Threshold { return sampling.NeverSampleThreshold } +// equalizingSampler raises thresholds up to a fixed value. +type equalizingSampler struct { + // TraceID-randomness-based calculation + tvalueThreshold sampling.Threshold + + consistentTracestateCommon +} + +func (te *equalizingSampler) decide(carrier samplingCarrier) sampling.Threshold { + if tv, has := carrier.threshold(); has && sampling.ThresholdLessThan(te.tvalueThreshold, tv) { + return tv + } + return te.tvalueThreshold +} + +// proportionalSampler raises thresholds relative to incoming value. +type proportionalSampler struct { + // ratio in the range [2**-56, 1] + ratio float64 + + // precision is the precision in number of hex digits + precision int + + consistentTracestateCommon +} + +func (tp *proportionalSampler) decide(carrier samplingCarrier) sampling.Threshold { + incoming := 1.0 + if tv, has := carrier.threshold(); has { + incoming = tv.Probability() + } + + // There is a potential here for the product probability to + // underflow, which is checked here. + threshold, err := sampling.ProbabilityToThresholdWithPrecision(incoming*tp.ratio, tp.precision) + + // Check the only known error condition. + if errors.Is(err, sampling.ErrProbabilityRange) { + // Considered valid, a case where the sampling probability + // has fallen below the minimum supported value and simply + // becomes unsampled. + return sampling.NeverSampleThreshold + } + return threshold +} + func getBytesFromValue(value pcommon.Value) []byte { if value.Type() == pcommon.ValueTypeBytes { return value.Bytes().AsRaw() @@ -214,13 +345,28 @@ func randomnessFromBytes(b []byte, hashSeed uint32) sampling.Randomness { return rnd } -// consistencyCheck checks for certain inconsistent inputs. -// -// if the randomness is missing, returns ErrMissingRandomness. -func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { +func consistencyCheck(rnd randomnessNamer, carrier samplingCarrier) error { + // Without randomness, do not check the threshold. if isMissing(rnd) { return ErrMissingRandomness } + // When the carrier is nil, it means there was trouble parsing the + // tracestate or trace-related attributes. In this case, skip the + // consistency check. + if carrier == nil { + return nil + } + // Consistency check: if the TraceID is out of range, the + // TValue is a lie. If inconsistent, clear it and return an error. + if tv, has := carrier.threshold(); has { + if !tv.ShouldSample(rnd.randomness()) { + // In case we fail open, the threshold is cleared as + // recommended in the OTel spec. + carrier.clearThreshold() + return ErrInconsistentArrivingTValue + } + } + return nil } @@ -230,46 +376,82 @@ func consistencyCheck(rnd randomnessNamer, _ samplingCarrier) error { // // Extending this logic, we round very small probabilities up to the // minimum supported value(s) which varies according to sampler mode. -func makeSampler(cfg *Config) dataSampler { +func makeSampler(cfg *Config, isLogs bool) dataSampler { // README allows percents >100 to equal 100%. pct := cfg.SamplingPercentage if pct > 100 { pct = 100 } - - never := &neverSampler{} + mode := cfg.Mode + if mode == modeUnset { + // Reasons to choose the legacy behavior include: + // (a) having set the hash seed + // (b) logs signal w/o trace ID source + if cfg.HashSeed != 0 || (isLogs && cfg.AttributeSource != traceIDAttributeSource) { + mode = HashSeed + } else { + mode = defaultMode + } + } if pct == 0 { - return never + return &neverSampler{} + } + // Note: Convert to float64 before dividing by 100, otherwise loss of precision. + // If the probability is too small, round it up to the minimum. + ratio := float64(pct) / 100 + // Like the pct > 100 test above, but for values too small to + // express in 14 bits of precision. + if ratio < sampling.MinSamplingProbability { + ratio = sampling.MinSamplingProbability } - // Note: the original hash function used in this code - // is preserved to ensure consistency across updates. - // - // uint32(pct * percentageScaleFactor) - // - // (a) carried out the multiplication in 32-bit precision - // (b) rounded to zero instead of nearest. - scaledSampleRate := uint32(pct * percentageScaleFactor) + switch mode { + case Equalizing: + // The error case below is ignored, we have rounded the probability so + // that it is in-range + threshold, _ := sampling.ProbabilityToThresholdWithPrecision(ratio, cfg.SamplingPrecision) - if scaledSampleRate == 0 { - return never - } + return &equalizingSampler{ + tvalueThreshold: threshold, + } + + case Proportional: + return &proportionalSampler{ + ratio: ratio, + precision: cfg.SamplingPrecision, + } + + default: // i.e., HashSeed + + // Note: the original hash function used in this code + // is preserved to ensure consistency across updates. + // + // uint32(pct * percentageScaleFactor) + // + // (a) carried out the multiplication in 32-bit precision + // (b) rounded to zero instead of nearest. + scaledSamplerate := uint32(pct * percentageScaleFactor) + + if scaledSamplerate == 0 { + return &neverSampler{} + } - // Convert the accept threshold to a reject threshold, - // then shift it into 56-bit value. - reject := numHashBuckets - scaledSampleRate - reject56 := uint64(reject) << 42 + // Convert the accept threshold to a reject threshold, + // then shift it into 56-bit value. + reject := numHashBuckets - scaledSamplerate + reject56 := uint64(reject) << 42 - threshold, _ := sampling.UnsignedToThreshold(reject56) + threshold, _ := sampling.UnsignedToThreshold(reject56) - return &hashingSampler{ - tvalueThreshold: threshold, - hashSeed: cfg.HashSeed, + return &hashingSampler{ + tvalueThreshold: threshold, + hashSeed: cfg.HashSeed, - // Logs specific: - logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, - logsRandomnessSourceAttribute: cfg.FromAttribute, + // Logs specific: + logsTraceIDEnabled: cfg.AttributeSource == traceIDAttributeSource, + logsRandomnessSourceAttribute: cfg.FromAttribute, + } } } @@ -279,7 +461,7 @@ type randFunc[T any] func(T) (randomnessNamer, samplingCarrier, error) // priorityFunc makes changes resulting from sampling priority. type priorityFunc[T any] func(T, randomnessNamer, sampling.Threshold) (randomnessNamer, sampling.Threshold) -// commonSamplingLogic implements sampling on a per-item basis +// commonShouldSampleLogic implements sampling on a per-item basis // independent of the signal type, as embodied in the functional // parameters: func commonShouldSampleLogic[T any]( @@ -293,12 +475,18 @@ func commonShouldSampleLogic[T any]( logger *zap.Logger, ) bool { rnd, carrier, err := randFunc(item) + if err == nil { err = consistencyCheck(rnd, carrier) } var threshold sampling.Threshold if err != nil { - logger.Debug(description, zap.Error(err)) + var se samplerError + if errors.As(err, &se) { + logger.Debug(description, zap.Error(err)) + } else { + logger.Info(description, zap.Error(err)) + } if failClosed { threshold = sampling.NeverSampleThreshold } else { @@ -312,6 +500,24 @@ func commonShouldSampleLogic[T any]( sampled := threshold.ShouldSample(rnd.randomness()) + if sampled && carrier != nil { + // Note: updateThreshold limits loss of adjusted count, by + // preventing the threshold from being lowered, only allowing + // probability to fall and never to rise. + if err := carrier.updateThreshold(threshold); err != nil { + if errors.Is(err, sampling.ErrInconsistentSampling) { + // This is working-as-intended. You can't lower + // the threshold, it's illogical. + logger.Debug(description, zap.Error(err)) + } else { + logger.Info(description, zap.Error(err)) + } + } + if err := carrier.reserialize(); err != nil { + logger.Info(description, zap.Error(err)) + } + } + _ = stats.RecordWithTags( ctx, []tag.Mutator{tag.Upsert(tagPolicyKey, rnd.policyName()), tag.Upsert(tagSampledKey, strconv.FormatBool(sampled))}, diff --git a/processor/probabilisticsamplerprocessor/sampler_mode_test.go b/processor/probabilisticsamplerprocessor/sampler_mode_test.go index 170da3ed6d44..d0a2aef2a472 100644 --- a/processor/probabilisticsamplerprocessor/sampler_mode_test.go +++ b/processor/probabilisticsamplerprocessor/sampler_mode_test.go @@ -4,12 +4,15 @@ package probabilisticsamplerprocessor import ( + "math" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +var AllModes = []SamplerMode{HashSeed, Equalizing, Proportional} + func TestUnmarshalText(t *testing.T) { tests := []struct { samplerMode string @@ -18,6 +21,12 @@ func TestUnmarshalText(t *testing.T) { { samplerMode: "hash_seed", }, + { + samplerMode: "equalizing", + }, + { + samplerMode: "proportional", + }, { samplerMode: "", }, @@ -39,3 +48,25 @@ func TestUnmarshalText(t *testing.T) { }) } } + +func TestHashSeedRoundingDown(t *testing.T) { + // The original hash function rounded thresholds down, in the + // direction of zero. + + // pct is approximately 75% of the minimum 14-bit probability, so it + // would round up, but it does not. + const pct = 0x3p-16 * 100 + + require.Equal(t, 1.0, math.Round((pct/100)*numHashBuckets)) + + for _, isLogs := range []bool{false, true} { + cfg := Config{ + Mode: HashSeed, + SamplingPercentage: pct, + HashSeed: defaultHashSeed, + } + + _, ok := makeSampler(&cfg, isLogs).(*neverSampler) + require.True(t, ok, "is neverSampler") + } +} diff --git a/processor/probabilisticsamplerprocessor/testdata/config.yaml b/processor/probabilisticsamplerprocessor/testdata/config.yaml index 0e853f77cbe3..6adc453015a3 100644 --- a/processor/probabilisticsamplerprocessor/testdata/config.yaml +++ b/processor/probabilisticsamplerprocessor/testdata/config.yaml @@ -11,6 +11,8 @@ processors: # zero, i.e.: no sample. Values greater or equal 100 are treated as # "sample all traces". sampling_percentage: 15.3 + # mode determines the type of sampling logic applied, see the README for details. + mode: "proportional" probabilistic_sampler/logs: # the percentage rate at which logs are going to be sampled. Defaults to diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml new file mode 100644 index 000000000000..4ff2ab115142 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_inf.yaml @@ -0,0 +1,17 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: +Inf + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml new file mode 100644 index 000000000000..96d93b6eddc1 --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_prec.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: 50 + sampling_precision: 15 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml new file mode 100644 index 000000000000..1f8bdc271f6c --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_small.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + # This is smaller than 2**-56 + sampling_percentage: .000000000000001 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml b/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml new file mode 100644 index 000000000000..2b80e340b64b --- /dev/null +++ b/processor/probabilisticsamplerprocessor/testdata/invalid_zero.yaml @@ -0,0 +1,18 @@ +receivers: + nop: + +processors: + + probabilistic_sampler/traces: + sampling_percentage: 15.3 + sampling_precision: 0 + +exporters: + nop: + +service: + pipelines: + traces: + receivers: [ nop ] + processors: [ probabilistic_sampler/traces ] + exporters: [ nop ] diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor.go b/processor/probabilisticsamplerprocessor/tracesprocessor.go index 197e289e9e5d..5a81215ac500 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor.go @@ -6,6 +6,7 @@ package probabilisticsamplerprocessor // import "github.com/open-telemetry/opent import ( "context" "strconv" + "strings" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -47,14 +48,51 @@ type traceProcessor struct { // decide. type tracestateCarrier struct { span ptrace.Span + sampling.W3CTraceState } var _ samplingCarrier = &tracestateCarrier{} -func newTracestateCarrier(s ptrace.Span) samplingCarrier { - return &tracestateCarrier{ +func newTracestateCarrier(s ptrace.Span) (samplingCarrier, error) { + var err error + tsc := &tracestateCarrier{ span: s, } + tsc.W3CTraceState, err = sampling.NewW3CTraceState(s.TraceState().AsRaw()) + return tsc, err +} + +func (tc *tracestateCarrier) threshold() (sampling.Threshold, bool) { + return tc.W3CTraceState.OTelValue().TValueThreshold() +} + +func (tc *tracestateCarrier) explicitRandomness() (randomnessNamer, bool) { + rnd, ok := tc.W3CTraceState.OTelValue().RValueRandomness() + if !ok { + return newMissingRandomnessMethod(), false + } + return newSamplingRandomnessMethod(rnd), true +} + +func (tc *tracestateCarrier) updateThreshold(th sampling.Threshold) error { + return tc.W3CTraceState.OTelValue().UpdateTValueWithSampling(th) +} + +func (tc *tracestateCarrier) setExplicitRandomness(rnd randomnessNamer) { + tc.W3CTraceState.OTelValue().SetRValue(rnd.randomness()) +} + +func (tc *tracestateCarrier) clearThreshold() { + tc.W3CTraceState.OTelValue().ClearTValue() +} + +func (tc *tracestateCarrier) reserialize() error { + var w strings.Builder + err := tc.W3CTraceState.Serialize(&w) + if err == nil { + tc.span.TraceState().FromRaw(w.String()) + } + return err } // newTracesProcessor returns a processor.TracesProcessor that will @@ -62,7 +100,7 @@ func newTracestateCarrier(s ptrace.Span) samplingCarrier { // configuration. func newTracesProcessor(ctx context.Context, set processor.Settings, cfg *Config, nextConsumer consumer.Traces) (processor.Traces, error) { tp := &traceProcessor{ - sampler: makeSampler(cfg), + sampler: makeSampler(cfg, false), failClosed: cfg.FailClosed, logger: set.Logger, } @@ -75,21 +113,56 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, cfg *Config processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true})) } -func (th *neverSampler) randomnessFromSpan(_ ptrace.Span) (randomnessNamer, samplingCarrier, error) { - // We return a fake randomness value, since it will not be used. - // This avoids a consistency check error for missing randomness. - return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), nil, nil -} - func (th *hashingSampler) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { tid := s.TraceID() - tsc := newTracestateCarrier(s) + tsc, err := newTracestateCarrier(s) rnd := newMissingRandomnessMethod() if !tid.IsEmpty() { rnd = newTraceIDHashingMethod(randomnessFromBytes(tid[:], th.hashSeed)) } - return rnd, tsc, nil + + // If the tracestate contains a proper R-value or T-value, we + // have to leave it alone. The user should not be using this + // sampler mode if they are using specified forms of consistent + // sampling in OTel. + if err != nil { + return rnd, nil, err + } else if _, has := tsc.explicitRandomness(); has { + err = ErrRandomnessInUse + tsc = nil + } else if _, has := tsc.threshold(); has { + err = ErrThresholdInUse + tsc = nil + } else { + // When no sampling information is present, add a + // Randomness value. + tsc.setExplicitRandomness(rnd) + } + return rnd, tsc, err } + +func (ctc *consistentTracestateCommon) randomnessFromSpan(s ptrace.Span) (randomnessNamer, samplingCarrier, error) { + rnd := newMissingRandomnessMethod() + tsc, err := newTracestateCarrier(s) + if err != nil { + tsc = nil + } else if rv, has := tsc.explicitRandomness(); has { + // When the tracestate is OK and has r-value, use it. + rnd = rv + } else if !s.TraceID().IsEmpty() { + rnd = newTraceIDW3CSpecMethod(sampling.TraceIDToRandomness(s.TraceID())) + } + + return rnd, tsc, err +} + +func (th *neverSampler) randomnessFromSpan(span ptrace.Span) (randomnessNamer, samplingCarrier, error) { + // We return a fake randomness value, since it will not be used. + // This avoids a consistency check error for missing randomness. + tsc, err := newTracestateCarrier(span) + return newSamplingPriorityMethod(sampling.AllProbabilitiesRandomness), tsc, err +} + func (tp *traceProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { td.ResourceSpans().RemoveIf(func(rs ptrace.ResourceSpans) bool { rs.ScopeSpans().RemoveIf(func(ils ptrace.ScopeSpans) bool { diff --git a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go index d46b13035ae5..608296e94e4c 100644 --- a/processor/probabilisticsamplerprocessor/tracesprocessor_test.go +++ b/processor/probabilisticsamplerprocessor/tracesprocessor_test.go @@ -23,6 +23,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/idutils" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling" ) // defaultHashSeed is used throughout to ensure that the HashSeed is real @@ -105,16 +106,16 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { }, numBatches: 1e5, numTracesPerBatch: 2, - acceptableDelta: 0.01, + acceptableDelta: 0.02, }, { name: "random_sampling_small", cfg: &Config{ SamplingPercentage: 5, }, - numBatches: 1e5, + numBatches: 1e6, numTracesPerBatch: 2, - acceptableDelta: 0.01, + acceptableDelta: 0.1, }, { name: "random_sampling_medium", @@ -123,7 +124,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { }, numBatches: 1e5, numTracesPerBatch: 4, - acceptableDelta: 0.1, + acceptableDelta: 0.2, }, { name: "random_sampling_high", @@ -148,6 +149,7 @@ func Test_tracesamplerprocessor_SamplingPercentageRange(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sink := newAssertTraces(t, testSvcName) + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), tt.cfg, sink) if err != nil { t.Errorf("error when creating traceSamplerProcessor: %v", err) @@ -383,28 +385,35 @@ func Test_tracesamplerprocessor_SpanSamplingPriority(t *testing.T) { sampled: true, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, mode := range AllModes { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { - sink := new(consumertest.TracesSink) - cfg := *tt.cfg + sink := new(consumertest.TracesSink) - cfg.HashSeed = defaultHashSeed - tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), &cfg, sink) - require.NoError(t, err) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.HashSeed = defaultHashSeed - err = tsp.ConsumeTraces(context.Background(), tt.td) - require.NoError(t, err) + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, sink) + require.NoError(t, err) - sampledData := sink.AllTraces() - if tt.sampled { - require.Equal(t, 1, len(sampledData)) - assert.Equal(t, 1, sink.SpanCount()) - } else { - require.Equal(t, 0, len(sampledData)) - assert.Equal(t, 0, sink.SpanCount()) - } - }) + err = tsp.ConsumeTraces(context.Background(), tt.td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + if tt.sampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + } + }) + } } } @@ -489,6 +498,632 @@ func Test_parseSpanSamplingPriority(t *testing.T) { } } +// Test_tracesamplerprocessor_TraceState checks if handling of the context +// tracestate is correct with a number o cases that exercise the two +// consistent sampling modes. +func Test_tracesamplerprocessor_TraceState(t *testing.T) { + // This hard-coded TraceID will sample at 50% and not at 49%. + // The equivalent randomness is 0x80000000000000. + var defaultTID = mustParseTID("fefefefefefefefefe80000000000000") + + // improbableTraceID will sample at all supported probabilities. In + // hex, the leading 18 digits do not matter, the trailing 14 are all `f`. + var improbableTraceID = mustParseTID("111111111111111111ffffffffffffff") + + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + name string + tid pcommon.TraceID + cfg *Config + ts string + key string + value pcommon.Value + log string + sf func(SamplerMode) (sampled bool, adjCount float64, tracestate string) + }{ + { + name: "100 percent", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=th:0" + }, + }, + { + name: "50 percent sampled", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return true, 2, "ot=th:8" }, + }, + { + name: "25 percent sampled", + tid: mustParseTID("ddddddddddddddddddc0000000000000"), + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return true, 4, "ot=th:c" }, + }, + { + name: "25 percent unsampled", + tid: mustParseTID("ddddddddddddddddddb0000000000000"), + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { return false, 0, "" }, + }, + { + name: "1 percent sampled", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 0, + }, + // 99/100 = .fd70a3d70a3d70a3d + ts: "ot=rv:FD70A3D70A3D71", // note upper case passes through, is not generated + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd70a3d70a3d71" + }, + }, + { + // with precision 4, the 1% probability rounds down and the + // exact R-value here will sample. see below, where the + // opposite is true. + name: "1 percent sampled with rvalue and precision 4", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 4, + }, + ts: "ot=rv:FD70A3D70A3D71", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:FD70A3D70A3D71;th:fd70a" + }, + }, + { + // at precision 3, the 1% probability rounds + // up to fd71 and so this does not sample. + // see above, where the opposite is true. + name: "1 percent sampled with rvalue and precision 3", + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 3, + }, + ts: "ot=rv:FD70A3D70A3D71", + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "1 percent not sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // this r-value is slightly below the t-value threshold, + // off-by-one compared with the case above in the least- + // significant digit. + ts: "ot=rv:FD70A3D70A3D70", + }, + { + name: "49 percent not sampled with default tid", + cfg: &Config{ + SamplingPercentage: 49, + }, + }, + { + name: "1 percent sampled with rvalue", + cfg: &Config{ + SamplingPercentage: 1, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "ot=rv:fd70B000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=rv:fd70B000000000;th:fd70a3d70a3d71" + }, + }, + { + name: "1 percent sampled with tid", + tid: mustParseTID("a0a0a0a0a0a0a0a0a0fe000000000000"), + cfg: &Config{ + SamplingPercentage: 1, + SamplingPrecision: 4, + }, + // 99/100 = .FD70A3D70A3D70A3D + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.01, "ot=th:fd70a" + }, + }, + { + name: "sampled by priority", + cfg: &Config{ + SamplingPercentage: 1, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(2), + sf: func(SamplerMode) (bool, float64, string) { return true, 1, "ot=th:0" }, + }, + { + name: "not sampled by priority", + cfg: &Config{ + SamplingPercentage: 99, + }, + ts: "", + key: "sampling.priority", + value: pcommon.NewValueInt(0), + }, + { + name: "incoming 50 percent with rvalue", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:90000000000000;th:80000000000000", // note extra zeros in th are erased + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:90000000000000;th:8" + } + // Proportionally, 50% less is 25% absolute sampling + return false, 0, "" + }, + }, + { + name: "incoming 50 percent at 25 percent not sampled", + cfg: &Config{ + SamplingPercentage: 25, + }, + ts: "ot=th:8", // 50% + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "incoming 50 percent at 25 percent sampled", + cfg: &Config{ + SamplingPercentage: 25, + }, + tid: mustParseTID("ffffffffffffffffffffffffffffffff"), // always sampled + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 4, "ot=th:c" + } + return true, 8, "ot=th:e" + }, + }, + { + name: "equalizing vs proportional", + cfg: &Config{ + SamplingPercentage: 50, + }, + ts: "ot=rv:c0000000000000;th:8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 2, "ot=rv:c0000000000000;th:8" + } + return true, 4, "ot=rv:c0000000000000;th:c" + }, + }, + { + name: "inconsistent arriving threshold", + log: "inconsistent arriving threshold", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=rv:40000000000000;th:0" + }, + }, + { + name: "inconsistent arriving threshold not sampled", + log: "inconsistent arriving threshold", + cfg: &Config{ + SamplingPercentage: 1, + FailClosed: true, + }, + ts: "ot=rv:40000000000000;th:8", + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "40 percent precision 3 with rvalue", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + ts: "ot=rv:a0000000000000", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1 / 0.4, "ot=rv:a0000000000000;th:99a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent precision 6 with tid", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 6, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a0d0000000000000"), + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Proportional { + // 5 == 1 / (0.4 * 0.5) + return true, 5, "ot=th:cccccd" + } + // 2.5 == 1 / 0.4 + return true, 2.5, "ot=th:99999a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent partly sampled", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a0b0000000000000"), + ts: "ot=th:8", // 50% + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Proportional { + return false, 0, "" + } + // 2.5 == 1 / 0.4 + return true, 2.5, "ot=th:99a" + }, + }, + { + name: "arriving 50 percent sampled at 40 percent not sampled", + cfg: &Config{ + SamplingPercentage: 40, + SamplingPrecision: 3, + }, + tid: mustParseTID("a0a0a0a0a0a0a0a0a080000000000000"), + ts: "ot=th:8", // 50% + sf: func(SamplerMode) (bool, float64, string) { + return false, 0, "" + }, + }, + { + name: "200 percent equals 100 percent", + cfg: &Config{ + SamplingPercentage: 200, + }, + ts: "", + sf: func(SamplerMode) (bool, float64, string) { + return true, 1, "ot=th:0" + }, + }, + { + name: "tiny probability rounding", + cfg: &Config{ + SamplingPercentage: 100 * 0x1p-14, + }, + tid: improbableTraceID, + ts: "ot=th:fffc", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 1 << 14, "ot=th:fffc" + } + return true, 1 << 28, "ot=th:fffffff" + }, + }, + { + // Note this test tests a probability value very close + // to the limit near 100.0% expressible in a float32, + // which is how the SamplingPercentage field is declared. + // it's impossible to have 10 significant figures at + // at this extreme. + name: "almost 100pct sampling", + cfg: &Config{ + SamplingPercentage: (1 - 8e-7) * 100, // very close to 100% + SamplingPrecision: 10, // 10 sig figs is impossible + }, + tid: improbableTraceID, + sf: func(SamplerMode) (bool, float64, string) { + // The adjusted count is very close to 1.0. + // The threshold has 8 significant figures. + return true, 1 / (1 - 8e-7), "ot=th:00000cccccccd" + }, + }, + { + name: "probability underflow", + cfg: &Config{ + SamplingPercentage: 0x1p-4, + }, + tid: improbableTraceID, + ts: "ot=th:fffffffffffff8", + sf: func(mode SamplerMode) (bool, float64, string) { + if mode == Equalizing { + return true, 1 << 53, "ot=th:fffffffffffff8" + } + return false, 0, "" + }, + }, + } + for _, tt := range tests { + for _, mode := range []SamplerMode{Equalizing, Proportional} { + t.Run(fmt.Sprint(mode, "_", tt.name), func(t *testing.T) { + + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.HashSeed = defaultHashSeed + + set := processortest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + tsp, err := newTracesProcessor(context.Background(), set, cfg, sink) + require.NoError(t, err) + + tid := defaultTID + + if !tt.tid.IsEmpty() { + tid = tt.tid + } + + td := makeSingleSpanWithAttrib(tid, sid, tt.ts, tt.key, tt.value) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + var expectSampled bool + var expectCount float64 + var expectTS string + if tt.sf != nil { + expectSampled, expectCount, expectTS = tt.sf(mode) + } + if expectSampled { + require.Equal(t, 1, len(sampledData)) + assert.Equal(t, 1, sink.SpanCount()) + got := sink.AllTraces()[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + gotTs, err := sampling.NewW3CTraceState(got.TraceState().AsRaw()) + require.NoError(t, err) + switch { + case expectCount == 0: + assert.Equal(t, 0.0, gotTs.OTelValue().AdjustedCount()) + case cfg.SamplingPrecision == 0: + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-9, + "compare %v %v", expectCount, gotTs.OTelValue().AdjustedCount()) + default: + assert.InEpsilon(t, expectCount, gotTs.OTelValue().AdjustedCount(), 1e-3, + "compare %v %v", expectCount, gotTs.OTelValue().AdjustedCount()) + } + require.Equal(t, expectTS, got.TraceState().AsRaw()) + } else { + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + require.Equal(t, "", expectTS) + } + + if len(tt.log) == 0 { + require.Equal(t, 0, len(observed.All()), "should not have logs: %v", observed.All()) + } else { + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), tt.log) + } + }) + } + } +} + +// Test_tracesamplerprocessor_TraceStateErrors checks that when +// FailClosed is true, certain spans do not pass, with errors. +func Test_tracesamplerprocessor_TraceStateErrors(t *testing.T) { + defaultTID := mustParseTID("fefefefefefefefefe80000000000000") + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + name string + tid pcommon.TraceID + cfg *Config + ts string + sf func(SamplerMode) string + }{ + { + name: "missing randomness", + cfg: &Config{ + SamplingPercentage: 100, + }, + ts: "", + tid: pcommon.TraceID{}, + sf: func(SamplerMode) string { + return "missing randomness" + }, + }, + { + name: "invalid r-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=rv:abababababababab", // 16 digits is too many + sf: func(SamplerMode) string { + return "r-value must have 14 hex digits" + }, + }, + { + name: "invalid t-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:abababababababab", // 16 digits is too many + sf: func(SamplerMode) string { + return "t-value exceeds 14 hex digits" + }, + }, + { + name: "t-value syntax", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:-1", + sf: func(SamplerMode) string { + return "invalid syntax" + }, + }, + { + name: "inconsistent t-value trace ID", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: mustParseTID("ffffffffffffffffff70000000000000"), + ts: "ot=th:8", + sf: func(SamplerMode) string { + return "inconsistent arriving threshold" + }, + }, + { + name: "inconsistent t-value r-value", + cfg: &Config{ + SamplingPercentage: 100, + }, + tid: defaultTID, + ts: "ot=th:8;rv:70000000000000", + sf: func(SamplerMode) string { + return "inconsistent arriving threshold" + }, + }, + } + for _, tt := range tests { + for _, mode := range []SamplerMode{Equalizing, Proportional} { + t.Run(fmt.Sprint(mode, "_", tt.name), func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + if tt.cfg != nil { + *cfg = *tt.cfg + } + cfg.Mode = mode + cfg.FailClosed = true + + set := processortest.NewNopSettings() + logger, observed := observer.New(zap.DebugLevel) + set.Logger = zap.New(logger) + + expectMessage := "" + if tt.sf != nil { + expectMessage = tt.sf(mode) + + } + + tsp, err := newTracesProcessor(context.Background(), set, cfg, sink) + require.NoError(t, err) + + td := makeSingleSpanWithAttrib(tt.tid, sid, tt.ts, "", pcommon.Value{}) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + require.Equal(t, 0, len(sampledData)) + assert.Equal(t, 0, sink.SpanCount()) + + require.Equal(t, 1, len(observed.All()), "should have one log: %v", observed.All()) + if observed.All()[0].Message == "trace sampler" { + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), expectMessage) + } else { + require.Contains(t, observed.All()[0].Message, "traces sampler") + require.Contains(t, observed.All()[0].Context[0].Interface.(error).Error(), expectMessage) + } + }) + } + } +} + +// Test_tracesamplerprocessor_HashSeedTraceState tests that non-strict +// HashSeed modes generate trace state to indicate sampling. +func Test_tracesamplerprocessor_HashSeedTraceState(t *testing.T) { + sid := idutils.UInt64ToSpanID(0xfefefefe) + tests := []struct { + pct float32 + tvout string + }{ + { + pct: 100, + tvout: "0", + }, + { + pct: 75, + tvout: "4", + }, + { + pct: 50, + tvout: "8", + }, + { + pct: 25, + tvout: "c", + }, + { + pct: 10, + tvout: "e668", // 14-bit rounding means e668 vs e666. + }, + { + pct: 100.0 / 3, + tvout: "aaac", // 14-bit rounding means aaac, vs aaab. + }, + } + for _, tt := range tests { + t.Run(fmt.Sprint(tt.pct, "pct"), func(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := &Config{} + cfg.SamplingPercentage = tt.pct + cfg.Mode = HashSeed + cfg.HashSeed = defaultHashSeed + cfg.SamplingPrecision = 4 + + tsp, err := newTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, sink) + require.NoError(t, err) + + // Repeat until we find 10 sampled cases; each sample will have + // an independent R-value. + const find = 10 + found := 0 + for { + sink.Reset() + tid := idutils.UInt64ToTraceID(rand.Uint64(), rand.Uint64()) + td := makeSingleSpanWithAttrib(tid, sid, "", "", pcommon.Value{}) + + err = tsp.ConsumeTraces(context.Background(), td) + require.NoError(t, err) + + sampledData := sink.AllTraces() + + if len(sampledData) == 0 { + continue + } + assert.Equal(t, 1, sink.SpanCount()) + + span := sampledData[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + spanTs, err := sampling.NewW3CTraceState(span.TraceState().AsRaw()) + require.NoError(t, err) + + threshold, hasT := spanTs.OTelValue().TValueThreshold() + require.True(t, hasT) + require.Equal(t, tt.tvout, spanTs.OTelValue().TValue()) + rnd, hasR := spanTs.OTelValue().RValueRandomness() + require.True(t, hasR) + require.True(t, threshold.ShouldSample(rnd)) + + if found++; find == found { + break + } + } + }) + } +} + func getSpanWithAttributes(key string, value pcommon.Value) ptrace.Span { span := ptrace.NewSpan() initSpanWithAttribute(key, value, span) @@ -747,3 +1382,17 @@ func TestHashingFunction(t *testing.T) { require.Equal(t, tc.sampled, wasSampled) } } + +// makeSingleSpanWithAttrib is used to construct test data with +// a specific TraceID and a single attribute. +func makeSingleSpanWithAttrib(tid pcommon.TraceID, sid pcommon.SpanID, ts string, key string, attribValue pcommon.Value) ptrace.Traces { + traces := ptrace.NewTraces() + span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.TraceState().FromRaw(ts) + span.SetTraceID(tid) + span.SetSpanID(sid) + if key != "" { + attribValue.CopyTo(span.Attributes().PutEmpty(key)) + } + return traces +}