Skip to content

Commit

Permalink
Add 'pipeline' attribute to processor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 19, 2024
1 parent df3c9e3 commit 496326f
Show file tree
Hide file tree
Showing 34 changed files with 329 additions and 181 deletions.
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Processor metrics now include the `pipeline` attribute.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processortest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: NewNopSettings now requires a component.DataType parameter.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"go.opentelemetry.io/collector/component"
{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -28,8 +28,8 @@ type componentTestTelemetry struct {
}

{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings()
func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt component.DataType{{- end -}}) {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}})
settings.MeterProvider = tt.meterProvider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return tt.meterProvider
Expand Down
6 changes: 3 additions & 3 deletions cmd/mdatagen/internal/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestComponentLifecycle(t *testing.T) {
switch tt.name {
case "logs":
e, ok := c.(exporter.Logs)
require.True(t, ok)
require.True(t, ok)
logs := generateLifecycleTestLogs()
if !e.Capabilities().MutatesData {
logs.MarkReadOnly()
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestComponentLifecycle(t *testing.T) {
for _, tt := range tests {
{{- if not .Tests.SkipShutdown }}
t.Run(tt.name + "-shutdown", func(t *testing.T) {
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(tt.name)), cfg)
require.NoError(t, err)
err = c.Shutdown(context.Background())
require.NoError(t, err)
Expand All @@ -213,7 +213,7 @@ func TestComponentLifecycle(t *testing.T) {

{{- if not .Tests.SkipLifecycle }}
t.Run(tt.name + "-lifecycle", func(t *testing.T) {
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(tt.name)), cfg)
require.NoError(t, err)
host := {{ .Tests.Host }}
err = c.Start(context.Background(), host)
Expand Down
16 changes: 11 additions & 5 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ const (
transportTag = "transport"
exporterTag = "exporter"
processorTag = "processor"
pipelineTag = "pipeline"
)

type TestTelemetry struct {
ts component.TelemetrySettings
id component.ID
extraAttrs []attribute.KeyValue
SpanRecorder *tracetest.SpanRecorder

reader *sdkmetric.ManualReader
Expand Down Expand Up @@ -80,19 +82,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext
// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return checkProcessorTraces(tts.reader, tts.id, acceptedSpans, refusedSpans, droppedSpans)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorTraces(tts.reader, attrs, acceptedSpans, refusedSpans, droppedSpans)
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return checkProcessorMetrics(tts.reader, tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorMetrics(tts.reader, attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return checkProcessorLogs(tts.reader, tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorLogs(tts.reader, attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down Expand Up @@ -134,16 +139,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings {
return tts.ts
}

// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters.
// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters.
// The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods.
// The caller must defer a call to `Shutdown` on the returned TestTelemetry.
func SetupTelemetry(id component.ID) (TestTelemetry, error) {
func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) {
sr := new(tracetest.SpanRecorder)
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))

settings := TestTelemetry{
ts: NewNopTelemetrySettings(),
id: id,
extraAttrs: extraAttrs,
SpanRecorder: sr,
}
settings.ts.TracerProvider = tp
Expand Down
25 changes: 12 additions & 13 deletions component/componenttest/otelchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,23 @@ func checkReceiver(reader *sdkmetric.ManualReader, receiver component.ID, dataty
checkIntSum(reader, fmt.Sprintf("otelcol_receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func checkProcessorTraces(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "spans", accepted, refused, dropped)
func checkProcessorTraces(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "spans", accepted, refused, dropped)
}

func checkProcessorMetrics(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "metric_points", accepted, refused, dropped)
func checkProcessorMetrics(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "metric_points", accepted, refused, dropped)
}

func checkProcessorLogs(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "log_records", accepted, refused, dropped)
func checkProcessorLogs(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "log_records", accepted, refused, dropped)
}

func checkProcessor(reader *sdkmetric.ManualReader, processor component.ID, datatype string, accepted, refused, dropped int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
func checkProcessor(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error {
return multierr.Combine(
checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, attrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, attrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, attrs),
)
}

Expand Down Expand Up @@ -187,8 +186,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att
}
}

func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(processorTag, processor.String())}
func attributesForProcessorMetrics(processor component.ID, extraAttrs ...attribute.KeyValue) []attribute.KeyValue {
return append(extraAttrs, attribute.String(processorTag, processor.String()))
}

// attributesForExporterMetrics returns the attributes that are needed for the receiver metrics.
Expand Down
Loading

0 comments on commit 496326f

Please sign in to comment.