Skip to content

Commit

Permalink
Add 'pipeline' attribute to processor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 16, 2024
1 parent 3b50b38 commit 77300d1
Show file tree
Hide file tree
Showing 36 changed files with 356 additions and 196 deletions.
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Processor metrics now include the `pipeline` attribute.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type:

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component:

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note:

# One or more tracking issues or pull requests related to the change
issues: []

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 3 additions & 3 deletions cmd/mdatagen/templates/component_telemetry_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"go.opentelemetry.io/collector/component"
{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -28,8 +28,8 @@ type componentTestTelemetry struct {
}

{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings()
func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt component.DataType{{- end -}}) {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}})
settings.MeterProvider = tt.meterProvider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return tt.meterProvider
Expand Down
6 changes: 3 additions & 3 deletions cmd/mdatagen/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestComponentLifecycle(t *testing.T) {
switch test.name {
case "logs":
e, ok := c.(exporter.Logs)
require.True(t, ok)
require.True(t, ok)
logs := generateLifecycleTestLogs()
if !e.Capabilities().MutatesData {
logs.MarkReadOnly()
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestComponentLifecycle(t *testing.T) {
for _, test := range tests {
{{- if not .Tests.SkipShutdown }}
t.Run(test.name + "-shutdown", func(t *testing.T) {
c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg)
require.NoError(t, err)
err = c.Shutdown(context.Background())
require.NoError(t, err)
Expand All @@ -213,7 +213,7 @@ func TestComponentLifecycle(t *testing.T) {

{{- if not .Tests.SkipLifecycle }}
t.Run(test.name + "-lifecycle", func(t *testing.T) {
c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg)
require.NoError(t, err)
host := {{ .Tests.Host }}
err = c.Start(context.Background(), host)
Expand Down
16 changes: 11 additions & 5 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ const (
transportTag = "transport"
exporterTag = "exporter"
processorTag = "processor"
pipelineTag = "pipeline"
)

type TestTelemetry struct {
ts component.TelemetrySettings
id component.ID
extraAttrs []attribute.KeyValue
SpanRecorder *tracetest.SpanRecorder

prometheusChecker *prometheusChecker
Expand Down Expand Up @@ -83,19 +85,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext
// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorTraces(attrs, acceptedSpans, refusedSpans, droppedSpans)
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorMetrics(attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs)
return tts.prometheusChecker.checkProcessorLogs(attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down Expand Up @@ -137,16 +142,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings {
return tts.ts
}

// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters.
// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters.
// The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods.
// The caller must defer a call to `Shutdown` on the returned TestTelemetry.
func SetupTelemetry(id component.ID) (TestTelemetry, error) {
func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) {
sr := new(tracetest.SpanRecorder)
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))

settings := TestTelemetry{
ts: NewNopTelemetrySettings(),
id: id,
extraAttrs: extraAttrs,
SpanRecorder: sr,
}
settings.ts.TracerProvider = tp
Expand Down
25 changes: 12 additions & 13 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,23 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot
pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorTraces(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "spans", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorMetrics(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "metric_points", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorLogs(attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return pc.checkProcessor(attrs, "log_records", accepted, refused, dropped)
}

func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
func (pc *prometheusChecker) checkProcessor(attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error {
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, attrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, attrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, attrs),
)
}

Expand Down Expand Up @@ -190,8 +189,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att
}
}

func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(processorTag, processor.String())}
func attributesForProcessorMetrics(processor component.ID, extraAttrs []attribute.KeyValue) []attribute.KeyValue {
return append(extraAttrs, attribute.String(processorTag, processor.String()))
}

// attributesForExporterMetrics returns the attributes that are needed for the receiver metrics.
Expand Down
18 changes: 15 additions & 3 deletions component/componenttest/otelprometheuschecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,29 @@ func TestPromChecker(t *testing.T) {
)

assert.NoError(t,
pc.checkProcessorTraces(processor, 42, 13, 7),
pc.checkProcessorTraces([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("otel_signal", "traces"),
attribute.String("pipeline", "traces/fakePipeline"),
}, 42, 13, 7),
"metrics from Receiver Traces should be valid",
)

assert.NoError(t,
pc.checkProcessorMetrics(processor, 7, 41, 13),
pc.checkProcessorMetrics([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("otel_signal", "metrics"),
attribute.String("pipeline", "metrics/fakePipeline"),
}, 7, 41, 13),
"metrics from Receiver Metrics should be valid",
)

assert.NoError(t,
pc.checkProcessorLogs(processor, 102, 35, 14),
pc.checkProcessorLogs([]attribute.KeyValue{
attribute.String("processor", processor.String()),
attribute.String("otel_signal", "logs"),
attribute.String("pipeline", "logs/fakePipeline"),
}, 102, 35, 14),
"metrics from Receiver Logs should be valid",
)

Expand Down
24 changes: 12 additions & 12 deletions component/componenttest/testdata/prometheus_response
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,40 @@ otelcol_exporter_send_failed_log_records{exporter="fakeExporter"} 36
otelcol_exporter_sent_log_records{exporter="fakeExporter"} 103
# HELP otelcol_processor_accepted_spans Number of spans successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_spans counter
otelcol_processor_accepted_spans{processor="fakeProcessor"} 42
otelcol_processor_accepted_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 42
# HELP otelcol_processor_refused_spans Number of spans that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_spans counter
otelcol_processor_refused_spans{processor="fakeProcessor"} 13
otelcol_processor_refused_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 13
# HELP otelcol_processor_dropped_spans Number of spans that were dropped.
# TYPE otelcol_processor_dropped_spans counter
otelcol_processor_dropped_spans{processor="fakeProcessor"} 7
otelcol_processor_dropped_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 7
# HELP otelcol_processor_inserted_spans Number of spans that were inserted.
# TYPE otelcol_processor_inserted_spans counter
otelcol_processor_inserted_spans{processor="fakeProcessor"} 5
otelcol_processor_inserted_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 5
# HELP otelcol_processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_metric_points counter
otelcol_processor_accepted_metric_points{processor="fakeProcessor"} 7
otelcol_processor_accepted_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 7
# HELP otelcol_processor_refused_metric_points Number of metric points that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_metric_points counter
otelcol_processor_refused_metric_points{processor="fakeProcessor"} 41
otelcol_processor_refused_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 41
# HELP otelcol_processor_dropped_metric_points Number of metric points that were dropped.
# TYPE otelcol_processor_dropped_metric_points counter
otelcol_processor_dropped_metric_points{processor="fakeProcessor"} 13
otelcol_processor_dropped_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 13
# HELP otelcol_processor_inserted_metric_points Number of metric points that were inserted.
# TYPE otelcol_processor_inserted_metric_points counter
otelcol_processor_inserted_metric_points{processor="fakeProcessor"} 4
otelcol_processor_inserted_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 4
# HELP otelcol_processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline.
# TYPE otelcol_processor_accepted_log_records counter
otelcol_processor_accepted_log_records{processor="fakeProcessor"} 102
otelcol_processor_accepted_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 102
# HELP otelcol_processor_refused_log_records Number of log records that were rejected by the next component in the pipeline.
# TYPE otelcol_processor_refused_log_records counter
otelcol_processor_refused_log_records{processor="fakeProcessor"} 35
otelcol_processor_refused_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 35
# HELP otelcol_processor_dropped_log_records Number of log records that were dropped.
# TYPE otelcol_processor_dropped_log_records counter
otelcol_processor_dropped_log_records{processor="fakeProcessor"} 14
otelcol_processor_dropped_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 14
# HELP otelcol_processor_inserted_log_records Number of log records that were inserted.
# TYPE otelcol_processor_inserted_log_records counter
otelcol_processor_inserted_log_records{processor="fakeProcessor"} 3
otelcol_processor_inserted_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 3
# HELP otelcol_receiver_accepted_log_records Number of log records successfully pushed into the pipeline.
# TYPE otelcol_receiver_accepted_log_records counter
otelcol_receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102
Expand Down
Loading

0 comments on commit 77300d1

Please sign in to comment.