Skip to content

Commit

Permalink
feat(comp/otelcol): component to filter OTEL telemetry (#5562)
Browse files Browse the repository at this point in the history
* feat(comp/otelcol): component to filter OTEL telemetry

---------

Signed-off-by: hainenber <[email protected]>
Co-authored-by: Paulin Todev <[email protected]>
Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
3 people authored Oct 31, 2023
1 parent 7fc7653 commit 481674a
Show file tree
Hide file tree
Showing 8 changed files with 666 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ Main (unreleased)

- Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi)

- New Grafana Agent Flow components:

- `otelcol.processor.filter` - filters OTLP telemetry data using OpenTelemetry
Transformation Language (OTTL). (@hainenber)


### Bugfixes

- Fixed an issue where `loki.process` validation for stage `metric.counter` was
Expand Down
1 change: 1 addition & 0 deletions component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
_ "github.com/grafana/agent/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/agent/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/agent/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/agent/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/agent/component/otelcol/processor/k8sattributes" // Import otelcol.processor.k8sattributes
_ "github.com/grafana/agent/component/otelcol/processor/memorylimiter" // Import otelcol.processor.memory_limiter
_ "github.com/grafana/agent/component/otelcol/processor/probabilistic_sampler" // Import otelcol.processor.probabilistic_sampler
Expand Down
110 changes: 110 additions & 0 deletions component/otelcol/processor/filter/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package filter

import (
"github.com/grafana/agent/component"
"github.com/grafana/agent/component/otelcol"
"github.com/grafana/agent/component/otelcol/processor"
otel_service "github.com/grafana/agent/service/otel"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.filter",
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
NeedsServices: []string{otel_service.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
fact := filterprocessor.NewFactory()
return processor.New(opts, fact, args.(Arguments))
},
})
}

type Arguments struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"`
Traces traceConfig `river:"traces,block,optional"`
Metrics metricConfig `river:"metrics,block,optional"`
Logs logConfig `river:"logs,block,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
ErrorMode: ottl.PropagateError,
}

// SetToDefault implements river.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements river.Validator.
func (args *Arguments) Validate() error {
otelArgs, err := args.convertImpl()
if err != nil {
return err
}
return otelArgs.Validate()
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return args.convertImpl()
}

// convertImpl is a helper function which returns the real type of the config,
// instead of the otelcomponent.Config interface.
func (args Arguments) convertImpl() (*filterprocessor.Config, error) {
input := make(map[string]interface{})

input["error_mode"] = args.ErrorMode

if len(args.Traces.Span) > 0 || len(args.Traces.SpanEvent) > 0 {
input["traces"] = args.Traces.convert()
}

if len(args.Metrics.Metric) > 0 || len(args.Metrics.Datapoint) > 0 {
input["metrics"] = args.Metrics.convert()
}

if len(args.Logs.LogRecord) > 0 {
input["logs"] = args.Logs.convert()
}

var result filterprocessor.Config
err := mapstructure.Decode(input, &result)

if err != nil {
return nil, err
}

return &result, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}
188 changes: 188 additions & 0 deletions component/otelcol/processor/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package filter_test

import (
"testing"

"github.com/grafana/agent/component/otelcol/processor/filter"
"github.com/grafana/river"
"github.com/mitchellh/mapstructure"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor"
"github.com/stretchr/testify/require"
)

// Source: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/processor/filterprocessor/README.md#filter-spans-from-traces
func TestArguments_UnmarshalRiver(t *testing.T) {
tests := []struct {
testName string
cfg string
expected map[string]interface{}
errMsg string
}{
{
testName: "Defaults",
cfg: `
output {}
`,
expected: map[string]interface{}{
"error_mode": "propagate",
},
},
{
testName: "IgnoreErrors",
cfg: `
error_mode = "ignore"
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
},
},
{
testName: "DropNonHttpSpans",
cfg: `
error_mode = "ignore"
traces {
span = [
"attributes[\"http.request.method\"] == nil",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"traces": map[string]interface{}{
"span": []interface{}{
`attributes["http.request.method"] == nil`,
},
},
},
},
{
testName: "FilterForMultipleObs",
cfg: `
error_mode = "ignore"
traces {
span = [
"attributes[\"container.name\"] == \"app_container_1\"",
"resource.attributes[\"host.name\"] == \"localhost\"",
"name == \"app_1\"",
]
spanevent = [
"attributes[\"grpc\"] == true",
"IsMatch(name, \".*grpc.*\")",
]
}
metrics {
metric = [
"name == \"my.metric\" and resource.attributes[\"my_label\"] == \"abc123\"",
"type == METRIC_DATA_TYPE_HISTOGRAM",
]
datapoint = [
"metric.type == METRIC_DATA_TYPE_SUMMARY",
"resource.attributes[\"service.name\"] == \"my_service_name\"",
]
}
logs {
log_record = [
"IsMatch(body, \".*password.*\")",
"severity_number < SEVERITY_NUMBER_WARN",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"traces": map[string]interface{}{
"span": []interface{}{
`attributes["container.name"] == "app_container_1"`,
`resource.attributes["host.name"] == "localhost"`,
`name == "app_1"`,
},
"spanevent": []interface{}{
`attributes["grpc"] == true`,
`IsMatch(name, ".*grpc.*")`,
},
},
"metrics": map[string]interface{}{
"metric": []interface{}{
`name == "my.metric" and resource.attributes["my_label"] == "abc123"`,
`type == METRIC_DATA_TYPE_HISTOGRAM`,
},
"datapoint": []interface{}{
`metric.type == METRIC_DATA_TYPE_SUMMARY`,
`resource.attributes["service.name"] == "my_service_name"`,
},
},
"logs": map[string]interface{}{
"log_record": []interface{}{
`IsMatch(body, ".*password.*")`,
`severity_number < SEVERITY_NUMBER_WARN`,
},
},
},
},
{
testName: "ValidOtelFilterFunctionUsage",
cfg: `
error_mode = "ignore"
metrics {
metric = [
"HasAttrKeyOnDatapoint(\"http.method\")",
"HasAttrOnDatapoint(\"http.method\", \"GET\")",
]
}
output {}
`,
expected: map[string]interface{}{
"error_mode": "ignore",
"metrics": map[string]interface{}{
"metric": []interface{}{
`HasAttrKeyOnDatapoint("http.method")`,
`HasAttrOnDatapoint("http.method", "GET")`,
},
},
},
},
{
testName: "invalidOtelFilterFunctionUsage",
cfg: `
error_mode = "ignore"
metrics {
metric = [
"UnknowFunction(\"http.method\")",
]
}
output {}
`,
errMsg: `unable to parse OTTL statement "match() where UnknowFunction(\"http.method\")": undefined function "UnknowFunction"`,
},
}

for _, tc := range tests {
t.Run(tc.testName, func(t *testing.T) {
var args filter.Arguments
err := river.Unmarshal([]byte(tc.cfg), &args)
if tc.errMsg != "" {
require.ErrorContains(t, err, tc.errMsg)
return
}
require.NoError(t, err)

actualPtr, err := args.Convert()
require.NoError(t, err)

actual := actualPtr.(*filterprocessor.Config)

var expectedCfg filterprocessor.Config
err = mapstructure.Decode(tc.expected, &expectedCfg)
require.NoError(t, err)

// Validate
require.NoError(t, actual.Validate())
require.NoError(t, expectedCfg.Validate())

// Compare
require.Equal(t, expectedCfg, *actual)
})
}
}
56 changes: 56 additions & 0 deletions component/otelcol/processor/filter/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package filter

type traceConfig struct {
Span []string `river:"span,attr,optional"`
SpanEvent []string `river:"spanevent,attr,optional"`
}

type metricConfig struct {
Metric []string `river:"metric,attr,optional"`
Datapoint []string `river:"datapoint,attr,optional"`
}
type logConfig struct {
LogRecord []string `river:"log_record,attr,optional"`
}

func (args *traceConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

result := make(map[string]interface{})
if len(args.Span) > 0 {
result["span"] = append([]string{}, args.Span...)
}
if len(args.SpanEvent) > 0 {
result["spanevent"] = append([]string{}, args.SpanEvent...)
}

return result
}

func (args *metricConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

result := make(map[string]interface{})
if len(args.Metric) > 0 {
result["metric"] = append([]string{}, args.Metric...)
}
if len(args.Datapoint) > 0 {
result["datapoint"] = append([]string{}, args.Datapoint...)
}

return result
}

func (args *logConfig) convert() map[string]interface{} {
if args == nil {
return nil
}

return map[string]interface{}{
"log_record": append([]string{}, args.LogRecord...),
}
}
Loading

0 comments on commit 481674a

Please sign in to comment.