Skip to content

Commit

Permalink
otelcolconvert: support converting transform processor (#6521)
Browse files Browse the repository at this point in the history
Signed-off-by: Paschalis Tsilias <[email protected]>
  • Loading branch information
tpaschalis authored Mar 13, 2024
1 parent 677b687 commit 3b24a9d
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 7 deletions.
14 changes: 7 additions & 7 deletions internal/component/otelcol/processor/transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func (c *ContextID) UnmarshalText(text []byte) error {
}
}

type contextStatementsSlice []contextStatements
type ContextStatementsSlice []ContextStatements

type contextStatements struct {
type ContextStatements struct {
Context ContextID `river:"context,attr"`
Statements []string `river:"statements,attr"`
}
Expand All @@ -64,9 +64,9 @@ type contextStatements struct {
type Arguments struct {
// ErrorMode determines how the processor reacts to errors that occur while processing a statement.
ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"`
TraceStatements contextStatementsSlice `river:"trace_statements,block,optional"`
MetricStatements contextStatementsSlice `river:"metric_statements,block,optional"`
LogStatements contextStatementsSlice `river:"log_statements,block,optional"`
TraceStatements ContextStatementsSlice `river:"trace_statements,block,optional"`
MetricStatements ContextStatementsSlice `river:"metric_statements,block,optional"`
LogStatements ContextStatementsSlice `river:"log_statements,block,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `river:"output,block"`
Expand Down Expand Up @@ -95,7 +95,7 @@ func (args *Arguments) Validate() error {
return otelArgs.Validate()
}

func (stmts *contextStatementsSlice) convert() []interface{} {
func (stmts *ContextStatementsSlice) convert() []interface{} {
if stmts == nil {
return nil
}
Expand All @@ -112,7 +112,7 @@ func (stmts *contextStatementsSlice) convert() []interface{} {
return res
}

func (args *contextStatements) convert() map[string]interface{} {
func (args *ContextStatements) convert() map[string]interface{} {
if args == nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/internal/component/otelcol"
"github.com/grafana/agent/internal/component/otelcol/processor/transform"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, transformProcessorConverter{})
}

type transformProcessorConverter struct{}

func (transformProcessorConverter) Factory() component.Factory {
return transformprocessor.NewFactory()
}

func (transformProcessorConverter) InputComponentName() string {
return "otelcol.processor.transform"
}

func (transformProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toTransformProcessor(state, id, cfg.(*transformprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "transform"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toTransformProcessor(state *state, id component.InstanceID, cfg *transformprocessor.Config) *transform.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &transform.Arguments{
ErrorMode: cfg.ErrorMode,
TraceStatements: toContextStatements(encodeMapslice(cfg.TraceStatements)),
MetricStatements: toContextStatements(encodeMapslice(cfg.MetricStatements)),
LogStatements: toContextStatements(encodeMapslice(cfg.LogStatements)),
Output: &otelcol.ConsumerArguments{
Metrics: toTokenizedConsumers(nextMetrics),
Logs: toTokenizedConsumers(nextLogs),
Traces: toTokenizedConsumers(nextTraces),
},
}
}

func toContextStatements(in []map[string]any) []transform.ContextStatements {
res := make([]transform.ContextStatements, 0, len(in))
for _, s := range in {
res = append(res, transform.ContextStatements{
Context: transform.ContextID(encodeString(s["context"])),
Statements: s["statements"].([]string),
})
}

return res
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = [otelcol.processor.transform.default.input]
logs = [otelcol.processor.transform.default.input]
traces = [otelcol.processor.transform.default.input]
}
}

otelcol.processor.transform "default" {
error_mode = "ignore"

trace_statements {
context = "resource"
statements = ["keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", "limit(attributes, 100, [])", "truncate_all(attributes, 4096)"]
}

trace_statements {
context = "span"
statements = ["set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", "set(name, attributes[\"http.route\"])", "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", "limit(attributes, 100, [])", "truncate_all(attributes, 4096)"]
}

metric_statements {
context = "resource"
statements = ["keep_keys(attributes, [\"host.name\"])", "truncate_all(attributes, 4096)"]
}

metric_statements {
context = "metric"
statements = ["set(description, \"Sum\") where type == \"Sum\""]
}

metric_statements {
context = "datapoint"
statements = ["limit(attributes, 100, [\"host.name\"])", "truncate_all(attributes, 4096)", "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\""]
}

log_statements {
context = "resource"
statements = ["keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])"]
}

log_statements {
context = "log"
statements = ["set(severity_text, \"FAIL\") where body == \"request failed\"", "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", "set(body, attributes[\"http.route\"])"]
}

output {
metrics = [otelcol.exporter.otlp.default.input]
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
}
}

otelcol.exporter.otlp "default" {
client {
endpoint = "database:4317"
}
}
75 changes: 75 additions & 0 deletions internal/converter/internal/otelcolconvert/testdata/transform.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
receivers:
otlp:
protocols:
grpc:
http:

processors:
transform:
error_mode: ignore
trace_statements:
- context: resource
statements:
- keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"])
- replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***")
- limit(attributes, 100, [])
- truncate_all(attributes, 4096)
- context: span
statements:
- set(status.code, 1) where attributes["http.path"] == "/health"
- set(name, attributes["http.route"])
- replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}")
- limit(attributes, 100, [])
- truncate_all(attributes, 4096)

metric_statements:
- context: resource
statements:
- keep_keys(attributes, ["host.name"])
- truncate_all(attributes, 4096)
- context: metric
statements:
- set(description, "Sum") where type == "Sum"
- context: datapoint
statements:
- limit(attributes, 100, ["host.name"])
- truncate_all(attributes, 4096)
- convert_sum_to_gauge() where metric.name == "system.processes.count"
- convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric"

log_statements:
- context: resource
statements:
- keep_keys(attributes, ["service.name", "service.namespace", "cloud.region"])
- context: log
statements:
- set(severity_text, "FAIL") where body == "request failed"
- replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}")
- replace_all_patterns(attributes, "value", "/account/\\d{4}", "/account/{accountId}")
- set(body, attributes["http.route"])


exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

service:
pipelines:
metrics:
receivers: [otlp]
processors: [transform]
exporters: [otlp]
logs:
receivers: [otlp]
processors: [transform]
exporters: [otlp]
traces:
receivers: [otlp]
processors: [transform]
exporters: [otlp]

0 comments on commit 3b24a9d

Please sign in to comment.