Skip to content

Commit

Permalink
wire in otelcol.exporter.prometheus and fix problem with multiple tra…
Browse files Browse the repository at this point in the history
…ce pipelines from load balancing

Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski committed Apr 2, 2024
1 parent 50c7edd commit 22b33a4
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/grafana/agent/internal/converter/internal/otelcolconvert"
"github.com/grafana/agent/internal/static/traces"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
otel_component "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/otelcol"
Expand Down Expand Up @@ -76,18 +77,12 @@ func (b *ConfigBuilder) translateSpanMetrics(otelCfg *otelcol.Config, cfg traces
return
}

// Initialize otel component IDs for readability further down
metricsID := otel_component.NewID("metrics")
spanmetricsID := otel_component.NewID("spanmetrics")
metricsSpanmetricsID := otel_component.NewIDWithName("metrics", "spanmetrics")
otlpId := otel_component.NewIDWithName("otlp", "0")

// Remove the custom otel components and delete the custom pipeline
removeProcessor(otelCfg, "traces", "spanmetrics")
removeReceiver(otelCfg, "metrics", "noop")
removeExporter(otelCfg, "metrics", "remote_write")
removeExporter(otelCfg, "metrics", "prometheus")
delete(otelCfg.Service.Pipelines, metricsSpanmetricsID)
delete(otelCfg.Service.Pipelines, otel_component.NewIDWithName("metrics", "spanmetrics"))

// If the spanmetrics configuration includes a handler_endpoint, we cannot convert it.
// This is intentionally after the section above which removes the custom spanmetrics processor
Expand All @@ -104,19 +99,28 @@ func (b *ConfigBuilder) translateSpanMetrics(otelCfg *otelcol.Config, cfg traces
}
otelCfg.Connectors[otel_component.NewID("spanmetrics")] = toSpanmetricsConnector(cfg.SpanMetrics)

// Add the spanmetrics connector to traces pipelines as an exporter
// Add the prometheus exporter to the otel config
prometheusID := otel_component.NewID("prometheus")
pe := prometheusexporter.NewFactory().CreateDefaultConfig().(*prometheusexporter.Config)
if cfg.SpanMetrics.ConstLabels != nil {
pe.ConstLabels = *cfg.SpanMetrics.ConstLabels
}
pe.Namespace = cfg.SpanMetrics.Namespace
pe.MetricExpiration = cfg.SpanMetrics.MetricsFlushInterval
otelCfg.Exporters[prometheusID] = pe

// Add the spanmetrics connector to each traces pipelines as an exporter and create metrics pipelines
spanmetricsID := otel_component.NewID("spanmetrics")
for ix, pipeline := range otelCfg.Service.Pipelines {
if ix.Type() != "traces" {
continue
if ix.Type() == "traces" {
pipeline.Exporters = append(pipeline.Exporters, spanmetricsID)

metricsId := otel_component.NewIDWithName("metrics", ix.Name())
otelCfg.Service.Pipelines[metricsId] = &pipelines.PipelineConfig{}
otelCfg.Service.Pipelines[metricsId].Receivers = append(otelCfg.Service.Pipelines[metricsId].Receivers, spanmetricsID)
otelCfg.Service.Pipelines[metricsId].Exporters = append(otelCfg.Service.Pipelines[metricsId].Exporters, prometheusID)
}
pipeline.Exporters = append(pipeline.Exporters, spanmetricsID)
}

// Build a new metrics pipeline with the spanmetrics connector as the receiver
otelCfg.Service.Pipelines[metricsID] = &pipelines.PipelineConfig{}
otelCfg.Service.Pipelines[metricsID].Receivers = append(otelCfg.Service.Pipelines[metricsID].Receivers, spanmetricsID)
// TODO: replace this with the actual prometheus or remote_write exporter translation
otelCfg.Service.Pipelines[metricsID].Exporters = append(otelCfg.Service.Pipelines[metricsID].Exporters, otlpId)
}

func toSpanmetricsConnector(cfg *traces.SpanMetricsConfig) *spanmetricsconnector.Config {
Expand All @@ -142,7 +146,6 @@ func toSpanmetricsConnector(cfg *traces.SpanMetricsConfig) *spanmetricsconnector

// TODO: decide how to handle these fields
// cfg.SpanMetrics.ConstLabels
// cfg.SpanMetrics.MetricsInstance

return smc
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package build

import (
"fmt"

"github.com/grafana/agent/internal/component/otelcol/exporter/prometheus"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/grafana/agent/internal/converter/internal/otelcolconvert"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusexporter"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, prometheusExporterConverter{})
}

type prometheusExporterConverter struct{}

func (prometheusExporterConverter) Factory() component.Factory {
return prometheusexporter.NewFactory()
}

func (prometheusExporterConverter) InputComponentName() string {
return "otelcol.exporter.prometheus"
}

func (prometheusExporterConverter) ConvertAndAppend(state *otelcolconvert.State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
label := state.FlowComponentLabel()

args := toPrometheusExporterConfig(state, id, cfg.(*prometheusexporter.Config), label)
block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "prometheus"}, label, args)

var diags diag.Diagnostics
diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", otelcolconvert.StringifyInstanceID(id), otelcolconvert.StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toPrometheusExporterConfig(state *otelcolconvert.State, id component.InstanceID, cfg *prometheusexporter.Config, label string) *prometheus.Arguments {
args := &prometheus.Arguments{}
args.SetToDefault()
args.GCFrequency = cfg.MetricExpiration
args.AddMetricSuffixes = cfg.AddMetricSuffixes
args.ResourceToTelemetryConversion = cfg.ResourceToTelemetrySettings.Enabled
// TODO args.ForwardTo
// IncludeTargetInfo: false,
// IncludeScopeInfo: false,
// IncludeScopeLabels: false,

return args
}
38 changes: 12 additions & 26 deletions internal/converter/internal/staticconvert/testdata/traces.river
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,6 @@ otelcol.extension.jaeger_remote_sampling "default_0" {
}
}

otelcol.exporter.otlp "default_0" {
retry_on_failure {
max_elapsed_time = "1m0s"
}

client {
endpoint = "tempo.example.com:14250"

tls {
insecure = true
}
}
}

otelcol.connector.spanmetrics "default" {
histogram {
explicit { }
}

output {
metrics = [otelcol.exporter.otlp.default_0.input]
}
}

otelcol.receiver.otlp "_0_default" {
grpc {
include_metadata = true
Expand Down Expand Up @@ -116,6 +92,11 @@ otelcol.processor.attributes "_0_default" {
}
}

otelcol.exporter.prometheus "_0_default" {
gc_frequency = "0s"
forward_to = []
}

otelcol.exporter.loadbalancing "_0_default" {
protocol {
otlp {
Expand Down Expand Up @@ -144,7 +125,7 @@ otelcol.connector.spanmetrics "_0_default" {
}

output {
metrics = []
metrics = [otelcol.exporter.prometheus._0_default.input]
}
}

Expand Down Expand Up @@ -184,6 +165,11 @@ otelcol.processor.batch "_1_default" {
}
}

otelcol.exporter.prometheus "_1_default" {
gc_frequency = "0s"
forward_to = []
}

otelcol.exporter.otlp "_1_0" {
retry_on_failure {
max_elapsed_time = "1m0s"
Expand All @@ -206,6 +192,6 @@ otelcol.connector.spanmetrics "_1_default" {
}

output {
metrics = []
metrics = [otelcol.exporter.prometheus._1_default.input]
}
}

0 comments on commit 22b33a4

Please sign in to comment.