From 76db8729e30118ba42f1b33ffce37651b0363c90 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 13 Jun 2024 08:55:31 -0700 Subject: [PATCH] feat(cascadingprocessor): Added status code filtering to cascading filter processor (#1600) * Added status code filtering to Cascading filter processor --------- Co-authored-by: Tim Chan --- .changelog/1600.added.txt | 1 + .../cascadingfilterprocessor/README.md | 19 +++++++- .../cascadingfilterprocessor/cascade_test.go | 24 ++++++++++ .../cascadingfilterprocessor/config/config.go | 2 + .../cascadingfilterprocessor/config_test.go | 4 ++ .../sampling/drop_trace_factory.go | 45 ++++++++++++++++++- .../testdata/cascading_filter_config.yaml | 2 + 7 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 .changelog/1600.added.txt diff --git a/.changelog/1600.added.txt b/.changelog/1600.added.txt new file mode 100644 index 0000000000..48e72e6caa --- /dev/null +++ b/.changelog/1600.added.txt @@ -0,0 +1 @@ +feat(cascadingfilter): Added status code filtering to cascading filter processor \ No newline at end of file diff --git a/pkg/processor/cascadingfilterprocessor/README.md b/pkg/processor/cascadingfilterprocessor/README.md index b908d3054f..8634caa9dd 100644 --- a/pkg/processor/cascadingfilterprocessor/README.md +++ b/pkg/processor/cascadingfilterprocessor/README.md @@ -47,6 +47,7 @@ Each of the specified drop rules has several properties: - `name` (required): identifies the rule - `name_pattern: `: selects the span if its operation name matches the provided regular expression +- `status_code: `: supported string values: "Ok", "Unset", or "Error" - `attributes: `: list of attribute-level filters (both span level and resource level is being evaluated). When several elements are specified, conditions for each of them must be met. Each entry might contain a number of fields: - `key: `: name of the attribute key @@ -119,7 +120,7 @@ processors: use_regex: true ``` -### Filtering out healhtchecks and traffic shaping +### Filtering out healthchecks and traffic shaping In the following example few more conditions were added: @@ -153,6 +154,22 @@ cascadingfilter: spans_per_second: 500 # <- adjust the output traffic level ``` +### Just filtering out status code + +Following example will drop all traces that match the following criteria: + +- there is a ROOT span with status code of "Error" (since status code is "Error", "Ok", or "Unset") + +(status code doc: https://opentelemetry-python.readthedocs.io/en/latest/api/trace.status.html) + +```yaml +processors: + cascading_filter: + trace_reject_filters: + - name: remove-all-traces-with-error-status-code + status_code: "Error" + ``` + ### Advanced configuration It is additionally possible to use adaptive sampling, which will split the total spans per second budget across all the rules evenly (for up to specified limit). Additionally, it can be set that if there's any budget left, it can be filled with random traces. diff --git a/pkg/processor/cascadingfilterprocessor/cascade_test.go b/pkg/processor/cascadingfilterprocessor/cascade_test.go index 148662d60b..0ba82ec9e3 100644 --- a/pkg/processor/cascadingfilterprocessor/cascade_test.go +++ b/pkg/processor/cascadingfilterprocessor/cascade_test.go @@ -33,6 +33,8 @@ import ( var testValue = 10 * time.Millisecond var probabilisticFilteringRate = int32(10) var healthCheckPattern = "health" +var statusCode = ptrace.StatusCodeError.String() + var cfg = cfconfig.Config{ CollectorInstances: 1, DecisionWait: 2 * time.Second, @@ -56,6 +58,7 @@ var cfg = cfconfig.Config{ { Name: "health-check", NamePattern: &healthCheckPattern, + StatusCode: &statusCode, }, }, } @@ -68,6 +71,7 @@ var cfgJustDropping = cfconfig.Config{ { Name: "health-check", NamePattern: &healthCheckPattern, + StatusCode: &statusCode, }, }, } @@ -90,6 +94,7 @@ var cfgAutoRate = cfconfig.Config{ { Name: "health-check", NamePattern: &healthCheckPattern, + StatusCode: &statusCode, }, }, } @@ -101,6 +106,7 @@ func fillSpan(span *ptrace.Span, durationMicros int64) { span.Attributes().PutInt("foo", 55) span.SetStartTimestamp(pcommon.Timestamp(startTime)) span.SetEndTimestamp(pcommon.Timestamp(nowTs)) + span.Status().SetCode(ptrace.StatusCodeError) } func createTrace(c *cascade, numSpans int, durationMicros int64) *sampling.TraceData { @@ -195,6 +201,24 @@ func TestDropTraces(t *testing.T) { require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2)) } +func TestDropTracesWithDifferentStatusCode(t *testing.T) { + cascading := createCascade(t) + + trace1 := createTrace(cascading, 1, 1000000) + trace2 := createTrace(cascading, 1, 1000000) + trace3 := createTrace(cascading, 1, 1000000) + + trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-1") + trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeUnset) + trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-2") + trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeOk) + trace3.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-3") + + require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace1)) + require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2)) + require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace3)) +} + func TestDropTracesAndNotLimitOthers(t *testing.T) { cascading := createCascadeWithConfig(t, cfgJustDropping) diff --git a/pkg/processor/cascadingfilterprocessor/config/config.go b/pkg/processor/cascadingfilterprocessor/config/config.go index c512204abe..f05f7fcb99 100644 --- a/pkg/processor/cascadingfilterprocessor/config/config.go +++ b/pkg/processor/cascadingfilterprocessor/config/config.go @@ -101,6 +101,8 @@ type TraceRejectCfg struct { AttributeCfg []AttributeCfg `mapstructure:"attributes"` // NamePattern (optional) describes a regular expression that must be met by any span operation name NamePattern *string `mapstructure:"name_pattern"` + // StatusCode (optional) describes whether an operation succeeds or not (OK, ERROR, or UNSET) + StatusCode *string `mapstructure:"status_code"` } // Config holds the configuration for cascading-filter-based sampling. diff --git a/pkg/processor/cascadingfilterprocessor/config_test.go b/pkg/processor/cascadingfilterprocessor/config_test.go index f68994de25..10b3eeeb9e 100644 --- a/pkg/processor/cascadingfilterprocessor/config_test.go +++ b/pkg/processor/cascadingfilterprocessor/config_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/otelcol/otelcoltest" + "go.opentelemetry.io/collector/pdata/ptrace" cfconfig "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config" ) @@ -45,6 +46,7 @@ func TestLoadConfig(t *testing.T) { probFilteringRate := int32(100) namePatternValue := "foo.*" healthCheckNamePatternValue := "health.*" + statusCode := ptrace.StatusCodeError.String() id1 := component.NewIDWithName(Type, "1") assert.Equal(t, cfg.Processors[id1], @@ -58,6 +60,7 @@ func TestLoadConfig(t *testing.T) { { Name: "healthcheck-rule", NamePattern: &healthCheckNamePatternValue, + StatusCode: &statusCode, }, }, TraceAcceptCfgs: []cfconfig.TraceAcceptCfg{ @@ -114,6 +117,7 @@ func TestLoadConfig(t *testing.T) { { Name: "healthcheck-rule", NamePattern: &healthCheckNamePatternValue, + StatusCode: &statusCode, }, { Name: "remove-all-traces-with-healthcheck-service", diff --git a/pkg/processor/cascadingfilterprocessor/sampling/drop_trace_factory.go b/pkg/processor/cascadingfilterprocessor/sampling/drop_trace_factory.go index 537b9a7a7c..6b695a07cc 100644 --- a/pkg/processor/cascadingfilterprocessor/sampling/drop_trace_factory.go +++ b/pkg/processor/cascadingfilterprocessor/sampling/drop_trace_factory.go @@ -15,9 +15,11 @@ package sampling import ( + "errors" "regexp" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config" @@ -28,10 +30,31 @@ type dropTraceEvaluator struct { stringAttr *stringAttributeFilter attrs []attributeFilter operationRe *regexp.Regexp + statusCode *string logger *zap.Logger } +func validateStatusCode(statusCode *string) error { + if statusCode == nil { + return nil + } + + validStatusCodes := []string{ + ptrace.StatusCodeError.String(), + ptrace.StatusCodeOk.String(), + ptrace.StatusCodeUnset.String(), + } + + for _, valid := range validStatusCodes { + if *statusCode == valid { + return nil + } + } + + return errors.New("invalid status code: must be one of 'Error', 'Ok', or 'Unset' ") +} + var _ DropTraceEvaluator = (*dropTraceEvaluator)(nil) // NewDropTraceEvaluator creates a drop trace evaluator that checks if trace should be dropped @@ -55,11 +78,16 @@ func NewDropTraceEvaluator(logger *zap.Logger, cfg config.TraceRejectCfg) (DropT } } + if err := validateStatusCode(cfg.StatusCode); err != nil { + return nil, err + } + return &dropTraceEvaluator{ stringAttr: stringAttrFilter, numericAttr: numericAttrFilter, attrs: attrsFilter, operationRe: operationRe, + statusCode: cfg.StatusCode, logger: logger, }, nil } @@ -74,6 +102,7 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b matchingStringAttrFound := false matchingNumericAttrFound := false matchingAttrsFound := false + matchingStatusCodeFound := false for _, batch := range batches { rs := batch.ResourceSpans() @@ -104,6 +133,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b matchingNumericAttrFound = checkIfNumericAttrFound(span.Attributes(), dte.numericAttr) } + if !matchingStatusCodeFound && dte.statusCode != nil && span.ParentSpanID().IsEmpty() { + statusCode := span.Status().Code() + if statusCode.String() == *dte.statusCode { + matchingStatusCodeFound = true + } + } + if dte.operationRe != nil && !matchingOperationFound { if dte.operationRe.MatchString(span.Name()) { matchingOperationFound = true @@ -115,12 +151,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b } conditionMet := struct { - operationName, stringAttr, numericAttr, attrs bool + operationName, stringAttr, numericAttr, attrs, statusCode bool }{ operationName: true, stringAttr: true, numericAttr: true, attrs: true, + statusCode: true, } if dte.operationRe != nil { @@ -136,5 +173,9 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b conditionMet.attrs = matchingAttrsFound } - return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs + if dte.statusCode != nil { + conditionMet.statusCode = matchingStatusCodeFound + } + + return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs && conditionMet.statusCode } diff --git a/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml b/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml index ac3f638776..5774b67c96 100644 --- a/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml +++ b/pkg/processor/cascadingfilterprocessor/testdata/cascading_filter_config.yaml @@ -11,6 +11,7 @@ processors: trace_reject_filters: - name: healthcheck-rule name_pattern: "health.*" + status_code: "Error" trace_accept_filters: - name: include-errors spans_per_second: 200 @@ -42,6 +43,7 @@ processors: trace_reject_filters: - name: healthcheck-rule name_pattern: "health.*" + status_code: "Error" - name: remove-all-traces-with-healthcheck-service string_attribute: {key: service.name, values: [healthcheck.*], use_regex: true} trace_accept_filters: