Skip to content

Commit

Permalink
Add 'pipeline' attribute to processor metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 24, 2024
1 parent d6f568d commit d47db79
Show file tree
Hide file tree
Showing 38 changed files with 415 additions and 204 deletions.
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Processor metrics now include the `pipeline` attribute.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute-3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: componenttest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: SetupTelemetry now accepts optional `extraAttrs ...attribute.KeyValue` parameter.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
25 changes: 25 additions & 0 deletions .chloggen/processor-pipeline-id-attribute.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processortest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: NewNopSettings now requires a component.DataType parameter.

# One or more tracking issues or pull requests related to the change
issues: [11171]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"

"go.opentelemetry.io/collector/component"
{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
"go.opentelemetry.io/collector/config/configtelemetry"
Expand All @@ -28,8 +28,8 @@ type componentTestTelemetry struct {
}

{{- if or isConnector isExporter isExtension isProcessor isReceiver }}
func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings()
func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt pipeline.Signal{{- end -}}) {{ .Status.Class }}.Settings {
settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}})
settings.MeterProvider = tt.meterProvider
settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider {
return tt.meterProvider
Expand Down
16 changes: 13 additions & 3 deletions cmd/mdatagen/internal/templates/component_test.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestComponentLifecycle(t *testing.T) {
switch tt.name {
case "logs":
e, ok := c.(exporter.Logs)
require.True(t, ok)
require.True(t, ok)
logs := generateLifecycleTestLogs()
if !e.Capabilities().MutatesData {
logs.MarkReadOnly()
Expand Down Expand Up @@ -203,9 +203,19 @@ func TestComponentLifecycle(t *testing.T) {
require.NoError(t, sub.Unmarshal(&cfg))

for _, tt := range tests {
var signal pipeline.Signal
switch tt.name {
case "logs":
signal = pipeline.SignalLogs
case "metrics":
signal = pipeline.SignalMetrics
case "traces":
signal = pipeline.SignalTraces
}

{{- if not .Tests.SkipShutdown }}
t.Run(tt.name + "-shutdown", func(t *testing.T) {
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg)
require.NoError(t, err)
err = c.Shutdown(context.Background())
require.NoError(t, err)
Expand All @@ -214,7 +224,7 @@ func TestComponentLifecycle(t *testing.T) {

{{- if not .Tests.SkipLifecycle }}
t.Run(tt.name + "-lifecycle", func(t *testing.T) {
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg)
c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg)
require.NoError(t, err)
host := {{ .Tests.Host }}
err = c.Start(context.Background(), host)
Expand Down
16 changes: 11 additions & 5 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ const (
transportTag = "transport"
exporterTag = "exporter"
processorTag = "processor"
pipelineTag = "pipeline"
)

type TestTelemetry struct {
ts component.TelemetrySettings
id component.ID
extraAttrs []attribute.KeyValue
SpanRecorder *tracetest.SpanRecorder

reader *sdkmetric.ManualReader
Expand Down Expand Up @@ -80,19 +82,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext
// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return checkProcessorTraces(tts.reader, tts.id, acceptedSpans, refusedSpans, droppedSpans)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorTraces(tts.reader, attrs, acceptedSpans, refusedSpans, droppedSpans)

Check warning on line 86 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return checkProcessorMetrics(tts.reader, tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorMetrics(tts.reader, attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)

Check warning on line 93 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L92-L93

Added lines #L92 - L93 were not covered by tests
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// Note: SetupTelemetry must be called before this function.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return checkProcessorLogs(tts.reader, tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...)
return checkProcessorLogs(tts.reader, attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords)

Check warning on line 100 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down Expand Up @@ -134,16 +139,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings {
return tts.ts
}

// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters.
// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters.
// The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods.
// The caller must defer a call to `Shutdown` on the returned TestTelemetry.
func SetupTelemetry(id component.ID) (TestTelemetry, error) {
func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) {

Check warning on line 145 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L145

Added line #L145 was not covered by tests
sr := new(tracetest.SpanRecorder)
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))

settings := TestTelemetry{
ts: NewNopTelemetrySettings(),
id: id,
extraAttrs: extraAttrs,

Check warning on line 152 in component/componenttest/obsreporttest.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/obsreporttest.go#L152

Added line #L152 was not covered by tests
SpanRecorder: sr,
}
settings.ts.TracerProvider = tp
Expand Down
25 changes: 12 additions & 13 deletions component/componenttest/otelchecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,23 @@ func checkReceiver(reader *sdkmetric.ManualReader, receiver component.ID, dataty
checkIntSum(reader, fmt.Sprintf("otelcol_receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func checkProcessorTraces(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "spans", accepted, refused, dropped)
func checkProcessorTraces(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "spans", accepted, refused, dropped)

Check warning on line 45 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}

func checkProcessorMetrics(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "metric_points", accepted, refused, dropped)
func checkProcessorMetrics(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "metric_points", accepted, refused, dropped)

Check warning on line 49 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L48-L49

Added lines #L48 - L49 were not covered by tests
}

func checkProcessorLogs(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error {
return checkProcessor(reader, processor, "log_records", accepted, refused, dropped)
func checkProcessorLogs(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error {
return checkProcessor(reader, attrs, "log_records", accepted, refused, dropped)

Check warning on line 53 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L52-L53

Added lines #L52 - L53 were not covered by tests
}

func checkProcessor(reader *sdkmetric.ManualReader, processor component.ID, datatype string, accepted, refused, dropped int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
func checkProcessor(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error {

Check warning on line 56 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L56

Added line #L56 was not covered by tests
return multierr.Combine(
checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, processorAttrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, attrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, attrs),
checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, attrs),

Check warning on line 60 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L58-L60

Added lines #L58 - L60 were not covered by tests
)
}

Expand Down Expand Up @@ -187,8 +186,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att
}
}

func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String(processorTag, processor.String())}
func attributesForProcessorMetrics(processor component.ID, extraAttrs ...attribute.KeyValue) []attribute.KeyValue {
return append(extraAttrs, attribute.String(processorTag, processor.String()))

Check warning on line 190 in component/componenttest/otelchecker.go

View check run for this annotation

Codecov / codecov/patch

component/componenttest/otelchecker.go#L189-L190

Added lines #L189 - L190 were not covered by tests
}

// attributesForExporterMetrics returns the attributes that are needed for the receiver metrics.
Expand Down
Loading

0 comments on commit d47db79

Please sign in to comment.