diff --git a/internal/component/otelcol/processor/transform/transform.go b/internal/component/otelcol/processor/transform/transform.go index 708ce7cdc4..aabae21e4c 100644 --- a/internal/component/otelcol/processor/transform/transform.go +++ b/internal/component/otelcol/processor/transform/transform.go @@ -53,9 +53,9 @@ func (c *ContextID) UnmarshalText(text []byte) error { } } -type contextStatementsSlice []contextStatements +type ContextStatementsSlice []ContextStatements -type contextStatements struct { +type ContextStatements struct { Context ContextID `river:"context,attr"` Statements []string `river:"statements,attr"` } @@ -64,9 +64,9 @@ type contextStatements struct { type Arguments struct { // ErrorMode determines how the processor reacts to errors that occur while processing a statement. ErrorMode ottl.ErrorMode `river:"error_mode,attr,optional"` - TraceStatements contextStatementsSlice `river:"trace_statements,block,optional"` - MetricStatements contextStatementsSlice `river:"metric_statements,block,optional"` - LogStatements contextStatementsSlice `river:"log_statements,block,optional"` + TraceStatements ContextStatementsSlice `river:"trace_statements,block,optional"` + MetricStatements ContextStatementsSlice `river:"metric_statements,block,optional"` + LogStatements ContextStatementsSlice `river:"log_statements,block,optional"` // Output configures where to send processed data. Required. Output *otelcol.ConsumerArguments `river:"output,block"` @@ -95,7 +95,7 @@ func (args *Arguments) Validate() error { return otelArgs.Validate() } -func (stmts *contextStatementsSlice) convert() []interface{} { +func (stmts *ContextStatementsSlice) convert() []interface{} { if stmts == nil { return nil } @@ -112,7 +112,7 @@ func (stmts *contextStatementsSlice) convert() []interface{} { return res } -func (args *contextStatements) convert() map[string]interface{} { +func (args *ContextStatements) convert() map[string]interface{} { if args == nil { return nil } diff --git a/internal/converter/internal/otelcolconvert/converter_transformprocessor.go b/internal/converter/internal/otelcolconvert/converter_transformprocessor.go new file mode 100644 index 0000000000..694046bb21 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_transformprocessor.go @@ -0,0 +1,75 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/agent/internal/component/otelcol" + "github.com/grafana/agent/internal/component/otelcol/processor/transform" + "github.com/grafana/agent/internal/converter/diag" + "github.com/grafana/agent/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, transformProcessorConverter{}) +} + +type transformProcessorConverter struct{} + +func (transformProcessorConverter) Factory() component.Factory { + return transformprocessor.NewFactory() +} + +func (transformProcessorConverter) InputComponentName() string { + return "otelcol.processor.transform" +} + +func (transformProcessorConverter) ConvertAndAppend(state *state, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.FlowComponentLabel() + + args := toTransformProcessor(state, id, cfg.(*transformprocessor.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "processor", "transform"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", stringifyInstanceID(id), stringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toTransformProcessor(state *state, id component.InstanceID, cfg *transformprocessor.Config) *transform.Arguments { + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + nextLogs = state.Next(id, component.DataTypeLogs) + nextTraces = state.Next(id, component.DataTypeTraces) + ) + + return &transform.Arguments{ + ErrorMode: cfg.ErrorMode, + TraceStatements: toContextStatements(encodeMapslice(cfg.TraceStatements)), + MetricStatements: toContextStatements(encodeMapslice(cfg.MetricStatements)), + LogStatements: toContextStatements(encodeMapslice(cfg.LogStatements)), + Output: &otelcol.ConsumerArguments{ + Metrics: toTokenizedConsumers(nextMetrics), + Logs: toTokenizedConsumers(nextLogs), + Traces: toTokenizedConsumers(nextTraces), + }, + } +} + +func toContextStatements(in []map[string]any) []transform.ContextStatements { + res := make([]transform.ContextStatements, 0, len(in)) + for _, s := range in { + res = append(res, transform.ContextStatements{ + Context: transform.ContextID(encodeString(s["context"])), + Statements: s["statements"].([]string), + }) + } + + return res +} diff --git a/internal/converter/internal/otelcolconvert/testdata/transform.river b/internal/converter/internal/otelcolconvert/testdata/transform.river new file mode 100644 index 0000000000..5b7902f04f --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/transform.river @@ -0,0 +1,62 @@ +otelcol.receiver.otlp "default" { + grpc { } + + http { } + + output { + metrics = [otelcol.processor.transform.default.input] + logs = [otelcol.processor.transform.default.input] + traces = [otelcol.processor.transform.default.input] + } +} + +otelcol.processor.transform "default" { + error_mode = "ignore" + + trace_statements { + context = "resource" + statements = ["keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\", \"process.command_line\"])", "replace_pattern(attributes[\"process.command_line\"], \"password\\\\=[^\\\\s]*(\\\\s?)\", \"password=***\")", "limit(attributes, 100, [])", "truncate_all(attributes, 4096)"] + } + + trace_statements { + context = "span" + statements = ["set(status.code, 1) where attributes[\"http.path\"] == \"/health\"", "set(name, attributes[\"http.route\"])", "replace_match(attributes[\"http.target\"], \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", "limit(attributes, 100, [])", "truncate_all(attributes, 4096)"] + } + + metric_statements { + context = "resource" + statements = ["keep_keys(attributes, [\"host.name\"])", "truncate_all(attributes, 4096)"] + } + + metric_statements { + context = "metric" + statements = ["set(description, \"Sum\") where type == \"Sum\""] + } + + metric_statements { + context = "datapoint" + statements = ["limit(attributes, 100, [\"host.name\"])", "truncate_all(attributes, 4096)", "convert_sum_to_gauge() where metric.name == \"system.processes.count\"", "convert_gauge_to_sum(\"cumulative\", false) where metric.name == \"prometheus_metric\""] + } + + log_statements { + context = "resource" + statements = ["keep_keys(attributes, [\"service.name\", \"service.namespace\", \"cloud.region\"])"] + } + + log_statements { + context = "log" + statements = ["set(severity_text, \"FAIL\") where body == \"request failed\"", "replace_all_matches(attributes, \"/user/*/list/*\", \"/user/{userId}/list/{listId}\")", "replace_all_patterns(attributes, \"value\", \"/account/\\\\d{4}\", \"/account/{accountId}\")", "set(body, attributes[\"http.route\"])"] + } + + output { + metrics = [otelcol.exporter.otlp.default.input] + logs = [otelcol.exporter.otlp.default.input] + traces = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/transform.yaml b/internal/converter/internal/otelcolconvert/testdata/transform.yaml new file mode 100644 index 0000000000..4bd271d264 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/transform.yaml @@ -0,0 +1,75 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + transform: + error_mode: ignore + trace_statements: + - context: resource + statements: + - keep_keys(attributes, ["service.name", "service.namespace", "cloud.region", "process.command_line"]) + - replace_pattern(attributes["process.command_line"], "password\\=[^\\s]*(\\s?)", "password=***") + - limit(attributes, 100, []) + - truncate_all(attributes, 4096) + - context: span + statements: + - set(status.code, 1) where attributes["http.path"] == "/health" + - set(name, attributes["http.route"]) + - replace_match(attributes["http.target"], "/user/*/list/*", "/user/{userId}/list/{listId}") + - limit(attributes, 100, []) + - truncate_all(attributes, 4096) + + metric_statements: + - context: resource + statements: + - keep_keys(attributes, ["host.name"]) + - truncate_all(attributes, 4096) + - context: metric + statements: + - set(description, "Sum") where type == "Sum" + - context: datapoint + statements: + - limit(attributes, 100, ["host.name"]) + - truncate_all(attributes, 4096) + - convert_sum_to_gauge() where metric.name == "system.processes.count" + - convert_gauge_to_sum("cumulative", false) where metric.name == "prometheus_metric" + + log_statements: + - context: resource + statements: + - keep_keys(attributes, ["service.name", "service.namespace", "cloud.region"]) + - context: log + statements: + - set(severity_text, "FAIL") where body == "request failed" + - replace_all_matches(attributes, "/user/*/list/*", "/user/{userId}/list/{listId}") + - replace_all_patterns(attributes, "value", "/account/\\d{4}", "/account/{accountId}") + - set(body, attributes["http.route"]) + + +exporters: + otlp: + # Our defaults have drifted from upstream, so we explicitly set our + # defaults below (balancer_name and queue_size). + endpoint: database:4317 + balancer_name: pick_first + sending_queue: + queue_size: 5000 + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [transform] + exporters: [otlp] + logs: + receivers: [otlp] + processors: [transform] + exporters: [otlp] + traces: + receivers: [otlp] + processors: [transform] + exporters: [otlp] +