Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cascadingprocessor): Added status code filtering to cascading filter processor #1600

Merged
merged 13 commits into from
Jun 13, 2024
1 change: 1 addition & 0 deletions .changelog/1600.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(cascadingfilter): Added status code filtering to cascading filter processor
19 changes: 18 additions & 1 deletion pkg/processor/cascadingfilterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Each of the specified drop rules has several properties:

- `name` (required): identifies the rule
- `name_pattern: <regex>`: selects the span if its operation name matches the provided regular expression
- `status_code: <string>`: supported string values: "Ok", "Unset", or "Error"
- `attributes: <list of attributes>`: list of attribute-level filters (both span level and resource level is being evaluated).
When several elements are specified, conditions for each of them must be met. Each entry might contain a number of fields:
- `key: <name>`: name of the attribute key
Expand Down Expand Up @@ -119,7 +120,7 @@ processors:
use_regex: true
```

### Filtering out healhtchecks and traffic shaping
### Filtering out healthchecks and traffic shaping

In the following example few more conditions were added:

Expand Down Expand Up @@ -153,6 +154,22 @@ cascadingfilter:
spans_per_second: 500 # <- adjust the output traffic level
```

### Just filtering out status code

Following example will drop all traces that match the following criteria:

- there is a ROOT span with status code of "Error" (since status code is "Error", "Ok", or "Unset")

(status code doc: https://opentelemetry-python.readthedocs.io/en/latest/api/trace.status.html)

```yaml
processors:
cascading_filter:
trace_reject_filters:
- name: remove-all-traces-with-error-status-code
status_code: "Error"
```

### Advanced configuration

It is additionally possible to use adaptive sampling, which will split the total spans per second budget across all the rules evenly (for up to specified limit). Additionally, it can be set that if there's any budget left, it can be filled with random traces.
Expand Down
24 changes: 24 additions & 0 deletions pkg/processor/cascadingfilterprocessor/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
var testValue = 10 * time.Millisecond
var probabilisticFilteringRate = int32(10)
var healthCheckPattern = "health"
var statusCode = ptrace.StatusCodeError.String()

var cfg = cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 2 * time.Second,
Expand All @@ -56,6 +58,7 @@ var cfg = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -68,6 +71,7 @@ var cfgJustDropping = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -90,6 +94,7 @@ var cfgAutoRate = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -101,6 +106,7 @@ func fillSpan(span *ptrace.Span, durationMicros int64) {
span.Attributes().PutInt("foo", 55)
span.SetStartTimestamp(pcommon.Timestamp(startTime))
span.SetEndTimestamp(pcommon.Timestamp(nowTs))
span.Status().SetCode(ptrace.StatusCodeError)
}

func createTrace(c *cascade, numSpans int, durationMicros int64) *sampling.TraceData {
Expand Down Expand Up @@ -195,6 +201,24 @@ func TestDropTraces(t *testing.T) {
require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2))
}

func TestDropTracesWithDifferentStatusCode(t *testing.T) {
cascading := createCascade(t)

trace1 := createTrace(cascading, 1, 1000000)
trace2 := createTrace(cascading, 1, 1000000)
trace3 := createTrace(cascading, 1, 1000000)

trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-1")
trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeUnset)
trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-2")
trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeOk)
trace3.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-3")

require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace1))
require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2))
require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace3))
}

func TestDropTracesAndNotLimitOthers(t *testing.T) {
cascading := createCascadeWithConfig(t, cfgJustDropping)

Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type TraceRejectCfg struct {
AttributeCfg []AttributeCfg `mapstructure:"attributes"`
// NamePattern (optional) describes a regular expression that must be met by any span operation name
NamePattern *string `mapstructure:"name_pattern"`
// StatusCode (optional) describes whether an operation succeeds or not (OK, ERROR, or UNSET)
StatusCode *string `mapstructure:"status_code"`
}

// Config holds the configuration for cascading-filter-based sampling.
Expand Down
4 changes: 4 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/collector/pdata/ptrace"

cfconfig "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config"
)
Expand All @@ -45,6 +46,7 @@ func TestLoadConfig(t *testing.T) {
probFilteringRate := int32(100)
namePatternValue := "foo.*"
healthCheckNamePatternValue := "health.*"
statusCode := ptrace.StatusCodeError.String()

id1 := component.NewIDWithName(Type, "1")
assert.Equal(t, cfg.Processors[id1],
Expand All @@ -58,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
{
Name: "healthcheck-rule",
NamePattern: &healthCheckNamePatternValue,
StatusCode: &statusCode,
},
},
TraceAcceptCfgs: []cfconfig.TraceAcceptCfg{
Expand Down Expand Up @@ -114,6 +117,7 @@ func TestLoadConfig(t *testing.T) {
{
Name: "healthcheck-rule",
NamePattern: &healthCheckNamePatternValue,
StatusCode: &statusCode,
},
{
Name: "remove-all-traces-with-healthcheck-service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package sampling

import (
"errors"
"regexp"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config"
Expand All @@ -28,10 +30,31 @@ type dropTraceEvaluator struct {
stringAttr *stringAttributeFilter
attrs []attributeFilter
operationRe *regexp.Regexp
statusCode *string

logger *zap.Logger
}

func validateStatusCode(statusCode *string) error {
if statusCode == nil {
return nil
}

validStatusCodes := []string{
ptrace.StatusCodeError.String(),
ptrace.StatusCodeOk.String(),
ptrace.StatusCodeUnset.String(),
}

for _, valid := range validStatusCodes {
if *statusCode == valid {
return nil
}
}

return errors.New("invalid status code: must be one of 'Error', 'Ok', or 'Unset' ")
chan-tim-sumo marked this conversation as resolved.
Show resolved Hide resolved
}

var _ DropTraceEvaluator = (*dropTraceEvaluator)(nil)

// NewDropTraceEvaluator creates a drop trace evaluator that checks if trace should be dropped
Expand All @@ -55,11 +78,16 @@ func NewDropTraceEvaluator(logger *zap.Logger, cfg config.TraceRejectCfg) (DropT
}
}

if err := validateStatusCode(cfg.StatusCode); err != nil {
return nil, err
}

return &dropTraceEvaluator{
stringAttr: stringAttrFilter,
numericAttr: numericAttrFilter,
attrs: attrsFilter,
operationRe: operationRe,
statusCode: cfg.StatusCode,
logger: logger,
}, nil
}
Expand All @@ -74,6 +102,7 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
matchingStringAttrFound := false
matchingNumericAttrFound := false
matchingAttrsFound := false
matchingStatusCodeFound := false

for _, batch := range batches {
rs := batch.ResourceSpans()
Expand Down Expand Up @@ -104,6 +133,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
matchingNumericAttrFound = checkIfNumericAttrFound(span.Attributes(), dte.numericAttr)
}

if !matchingStatusCodeFound && dte.statusCode != nil && span.ParentSpanID().IsEmpty() {
statusCode := span.Status().Code()
if statusCode.String() == *dte.statusCode {
matchingStatusCodeFound = true
}
}

if dte.operationRe != nil && !matchingOperationFound {
if dte.operationRe.MatchString(span.Name()) {
matchingOperationFound = true
Expand All @@ -115,12 +151,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
}

conditionMet := struct {
operationName, stringAttr, numericAttr, attrs bool
operationName, stringAttr, numericAttr, attrs, statusCode bool
}{
operationName: true,
stringAttr: true,
numericAttr: true,
attrs: true,
statusCode: true,
}

if dte.operationRe != nil {
Expand All @@ -136,5 +173,9 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
conditionMet.attrs = matchingAttrsFound
}

return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs
if dte.statusCode != nil {
conditionMet.statusCode = matchingStatusCodeFound
}

return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs && conditionMet.statusCode
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ processors:
trace_reject_filters:
- name: healthcheck-rule
name_pattern: "health.*"
status_code: "Error"
trace_accept_filters:
- name: include-errors
spans_per_second: 200
Expand Down Expand Up @@ -42,6 +43,7 @@ processors:
trace_reject_filters:
- name: healthcheck-rule
name_pattern: "health.*"
status_code: "Error"
- name: remove-all-traces-with-healthcheck-service
string_attribute: {key: service.name, values: [healthcheck.*], use_regex: true}
trace_accept_filters:
Expand Down
Loading