From b1bf422290378ddb842309bf5dab0816e2459aab Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Fri, 13 Sep 2024 15:23:57 -0400 Subject: [PATCH] Add 'pipeline' attribute to processor metrics --- .../processor-pipeline-id-attribute-2.yaml | 25 ++++ .../processor-pipeline-id-attribute.yaml | 25 ++++ .../component_telemetry_test.go.tmpl | 6 +- cmd/mdatagen/templates/component_test.go.tmpl | 6 +- component/componenttest/obsreporttest.go | 16 ++- .../componenttest/otelprometheuschecker.go | 25 ++-- .../otelprometheuschecker_test.go | 18 ++- .../testdata/prometheus_response | 24 ++-- .../batchprocessor/batch_processor_test.go | 91 +++++++------- processor/batchprocessor/factory_test.go | 9 +- .../generated_component_telemetry_test.go | 4 +- .../generated_component_test.go | 4 +- processor/batchprocessor/go.mod | 2 + processor/batchprocessor/metrics.go | 2 +- processor/go.mod | 3 + processor/internal/obsmetrics.go | 3 +- processor/internal/processor.go | 3 + .../memorylimiterprocessor/factory_test.go | 7 +- .../generated_component_test.go | 2 +- processor/memorylimiterprocessor/go.mod | 2 + .../memorylimiter_test.go | 14 +-- .../generated_component_telemetry_test.go | 4 +- processor/processorhelper/logs.go | 2 - processor/processorhelper/logs_test.go | 28 +++-- processor/processorhelper/metrics.go | 2 - processor/processorhelper/metrics_test.go | 28 +++-- processor/processorhelper/obsreport.go | 2 + processor/processorhelper/obsreport_test.go | 116 +++++++++++++----- processor/processorhelper/traces.go | 2 - processor/processorhelper/traces_test.go | 28 +++-- processor/processorprofiles/go.mod | 2 + processor/processortest/nop_processor.go | 3 +- processor/processortest/nop_processor_test.go | 9 +- processor/processortest/shutdown_verifier.go | 6 +- service/internal/builders/processor_test.go | 27 ++-- service/internal/graph/nodes.go | 2 +- 36 files changed, 356 insertions(+), 196 deletions(-) create mode 100644 .chloggen/processor-pipeline-id-attribute-2.yaml create mode 100644 .chloggen/processor-pipeline-id-attribute.yaml diff --git a/.chloggen/processor-pipeline-id-attribute-2.yaml b/.chloggen/processor-pipeline-id-attribute-2.yaml new file mode 100644 index 00000000000..140ebbff232 --- /dev/null +++ b/.chloggen/processor-pipeline-id-attribute-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Processor metrics now include the `pipeline` attribute. + +# One or more tracking issues or pull requests related to the change +issues: [11171] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/processor-pipeline-id-attribute.yaml b/.chloggen/processor-pipeline-id-attribute.yaml new file mode 100644 index 00000000000..736049127a0 --- /dev/null +++ b/.chloggen/processor-pipeline-id-attribute.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processortest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: NewNopSettings now requires a component.DataType parameter. + +# One or more tracking issues or pull requests related to the change +issues: [11171] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/mdatagen/templates/component_telemetry_test.go.tmpl b/cmd/mdatagen/templates/component_telemetry_test.go.tmpl index 58c0ab7d6f6..335cc88889a 100644 --- a/cmd/mdatagen/templates/component_telemetry_test.go.tmpl +++ b/cmd/mdatagen/templates/component_telemetry_test.go.tmpl @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - + "go.opentelemetry.io/collector/component" {{- if or isConnector isExporter isExtension isProcessor isReceiver }} "go.opentelemetry.io/collector/config/configtelemetry" @@ -28,8 +28,8 @@ type componentTestTelemetry struct { } {{- if or isConnector isExporter isExtension isProcessor isReceiver }} -func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings { - settings := {{ .Status.Class }}test.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt component.DataType{{- end -}}) {{ .Status.Class }}.Settings { + settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}}) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/cmd/mdatagen/templates/component_test.go.tmpl b/cmd/mdatagen/templates/component_test.go.tmpl index be75cb1bae9..bcec504a2cf 100644 --- a/cmd/mdatagen/templates/component_test.go.tmpl +++ b/cmd/mdatagen/templates/component_test.go.tmpl @@ -125,7 +125,7 @@ func TestComponentLifecycle(t *testing.T) { switch test.name { case "logs": e, ok := c.(exporter.Logs) - require.True(t, ok) + require.True(t, ok) logs := generateLifecycleTestLogs() if !e.Capabilities().MutatesData { logs.MarkReadOnly() @@ -204,7 +204,7 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { {{- if not .Tests.SkipShutdown }} t.Run(test.name + "-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) @@ -213,7 +213,7 @@ func TestComponentLifecycle(t *testing.T) { {{- if not .Tests.SkipLifecycle }} t.Run(test.name + "-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := {{ .Tests.Host }} err = c.Start(context.Background(), host) diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index 3662c640671..f8dd5594d02 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -33,11 +33,13 @@ const ( transportTag = "transport" exporterTag = "exporter" processorTag = "processor" + pipelineTag = "pipeline" ) type TestTelemetry struct { ts component.TelemetrySettings id component.ID + extraAttrs []attribute.KeyValue SpanRecorder *tracetest.SpanRecorder prometheusChecker *prometheusChecker @@ -83,19 +85,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error { - return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorTraces(attrs, acceptedSpans, refusedSpans, droppedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { - return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorMetrics(attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { - return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs) + return tts.prometheusChecker.checkProcessorLogs(attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords) } // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. @@ -137,16 +142,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings { return tts.ts } -// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters. +// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters. // The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods. // The caller must defer a call to `Shutdown` on the returned TestTelemetry. -func SetupTelemetry(id component.ID) (TestTelemetry, error) { +func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) { sr := new(tracetest.SpanRecorder) tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) settings := TestTelemetry{ ts: NewNopTelemetrySettings(), id: id, + extraAttrs: extraAttrs, SpanRecorder: sr, } settings.ts.TracerProvider = tp diff --git a/component/componenttest/otelprometheuschecker.go b/component/componenttest/otelprometheuschecker.go index 14b357b9704..252e5e6ddb9 100644 --- a/component/componenttest/otelprometheuschecker.go +++ b/component/componenttest/otelprometheuschecker.go @@ -48,24 +48,23 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs)) } -func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "spans", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorTraces(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "spans", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorMetrics(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "metric_points", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error { - return pc.checkProcessor(processor, "log_records", accepted, refused, dropped) +func (pc *prometheusChecker) checkProcessorLogs(attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return pc.checkProcessor(attrs, "log_records", accepted, refused, dropped) } -func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error { - processorAttrs := attributesForProcessorMetrics(processor) +func (pc *prometheusChecker) checkProcessor(attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error { return multierr.Combine( - pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs), - pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs), - pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs), + pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, attrs), + pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, attrs), + pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, attrs), ) } @@ -190,8 +189,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att } } -func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String(processorTag, processor.String())} +func attributesForProcessorMetrics(processor component.ID, extraAttrs []attribute.KeyValue) []attribute.KeyValue { + return append(extraAttrs, attribute.String(processorTag, processor.String())) } // attributesForExporterMetrics returns the attributes that are needed for the receiver metrics. diff --git a/component/componenttest/otelprometheuschecker_test.go b/component/componenttest/otelprometheuschecker_test.go index d7176134f54..898f3d43dd1 100644 --- a/component/componenttest/otelprometheuschecker_test.go +++ b/component/componenttest/otelprometheuschecker_test.go @@ -88,17 +88,29 @@ func TestPromChecker(t *testing.T) { ) assert.NoError(t, - pc.checkProcessorTraces(processor, 42, 13, 7), + pc.checkProcessorTraces([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("otel_signal", "traces"), + attribute.String("pipeline", "traces/fakePipeline"), + }, 42, 13, 7), "metrics from Receiver Traces should be valid", ) assert.NoError(t, - pc.checkProcessorMetrics(processor, 7, 41, 13), + pc.checkProcessorMetrics([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("otel_signal", "metrics"), + attribute.String("pipeline", "metrics/fakePipeline"), + }, 7, 41, 13), "metrics from Receiver Metrics should be valid", ) assert.NoError(t, - pc.checkProcessorLogs(processor, 102, 35, 14), + pc.checkProcessorLogs([]attribute.KeyValue{ + attribute.String("processor", processor.String()), + attribute.String("otel_signal", "logs"), + attribute.String("pipeline", "logs/fakePipeline"), + }, 102, 35, 14), "metrics from Receiver Logs should be valid", ) diff --git a/component/componenttest/testdata/prometheus_response b/component/componenttest/testdata/prometheus_response index 9d0eb69ee7f..ec0397d0c27 100644 --- a/component/componenttest/testdata/prometheus_response +++ b/component/componenttest/testdata/prometheus_response @@ -18,40 +18,40 @@ otelcol_exporter_send_failed_log_records{exporter="fakeExporter"} 36 otelcol_exporter_sent_log_records{exporter="fakeExporter"} 103 # HELP otelcol_processor_accepted_spans Number of spans successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_spans counter -otelcol_processor_accepted_spans{processor="fakeProcessor"} 42 +otelcol_processor_accepted_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 42 # HELP otelcol_processor_refused_spans Number of spans that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_spans counter -otelcol_processor_refused_spans{processor="fakeProcessor"} 13 +otelcol_processor_refused_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 13 # HELP otelcol_processor_dropped_spans Number of spans that were dropped. # TYPE otelcol_processor_dropped_spans counter -otelcol_processor_dropped_spans{processor="fakeProcessor"} 7 +otelcol_processor_dropped_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 7 # HELP otelcol_processor_inserted_spans Number of spans that were inserted. # TYPE otelcol_processor_inserted_spans counter -otelcol_processor_inserted_spans{processor="fakeProcessor"} 5 +otelcol_processor_inserted_spans{processor="fakeProcessor",otel_signal="traces",pipeline="traces/fakePipeline"} 5 # HELP otelcol_processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_metric_points counter -otelcol_processor_accepted_metric_points{processor="fakeProcessor"} 7 +otelcol_processor_accepted_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 7 # HELP otelcol_processor_refused_metric_points Number of metric points that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_metric_points counter -otelcol_processor_refused_metric_points{processor="fakeProcessor"} 41 +otelcol_processor_refused_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 41 # HELP otelcol_processor_dropped_metric_points Number of metric points that were dropped. # TYPE otelcol_processor_dropped_metric_points counter -otelcol_processor_dropped_metric_points{processor="fakeProcessor"} 13 +otelcol_processor_dropped_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 13 # HELP otelcol_processor_inserted_metric_points Number of metric points that were inserted. # TYPE otelcol_processor_inserted_metric_points counter -otelcol_processor_inserted_metric_points{processor="fakeProcessor"} 4 +otelcol_processor_inserted_metric_points{processor="fakeProcessor",otel_signal="metrics",pipeline="metrics/fakePipeline"} 4 # HELP otelcol_processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline. # TYPE otelcol_processor_accepted_log_records counter -otelcol_processor_accepted_log_records{processor="fakeProcessor"} 102 +otelcol_processor_accepted_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 102 # HELP otelcol_processor_refused_log_records Number of log records that were rejected by the next component in the pipeline. # TYPE otelcol_processor_refused_log_records counter -otelcol_processor_refused_log_records{processor="fakeProcessor"} 35 +otelcol_processor_refused_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 35 # HELP otelcol_processor_dropped_log_records Number of log records that were dropped. # TYPE otelcol_processor_dropped_log_records counter -otelcol_processor_dropped_log_records{processor="fakeProcessor"} 14 +otelcol_processor_dropped_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 14 # HELP otelcol_processor_inserted_log_records Number of log records that were inserted. # TYPE otelcol_processor_inserted_log_records counter -otelcol_processor_inserted_log_records{processor="fakeProcessor"} 3 +otelcol_processor_inserted_log_records{processor="fakeProcessor",otel_signal="logs",pipeline="logs/fakePipeline"} 3 # HELP otelcol_receiver_accepted_log_records Number of log records successfully pushed into the pipeline. # TYPE otelcol_receiver_accepted_log_records counter otelcol_receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102 diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 8647670a3c8..bca60718edb 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/client" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" @@ -31,25 +32,22 @@ import ( func TestProcessorShutdown(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { require.NotPanics(t, func() { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(component.DataTypeTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = tProc.Shutdown(ctx) }) require.NotPanics(t, func() { - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(component.DataTypeMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = mProc.Shutdown(ctx) }) require.NotPanics(t, func() { - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(component.DataTypeLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = lProc.Shutdown(ctx) }) @@ -58,22 +56,19 @@ func TestProcessorShutdown(t *testing.T) { func TestProcessorLifecycle(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(component.DataTypeTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, tProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, tProc.Shutdown(ctx)) - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(component.DataTypeMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, mProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, mProc.Shutdown(ctx)) - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(component.DataTypeLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, lProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, lProc.Shutdown(ctx)) @@ -84,7 +79,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -127,7 +122,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -174,7 +169,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -220,7 +215,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -241,7 +236,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -262,7 +257,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -277,7 +272,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -295,7 +290,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -346,7 +341,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -367,7 +362,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -388,7 +383,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum - 1), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -403,7 +398,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -418,7 +413,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -437,7 +432,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -484,7 +479,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { } sink := new(consumertest.TracesSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -515,7 +510,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metricsPerRequest := 5 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -569,7 +564,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -609,7 +604,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -630,7 +625,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -651,7 +646,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -666,7 +661,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -702,7 +697,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -751,7 +746,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -849,7 +844,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { ctx := context.Background() sink := new(metricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed metricsPerRequest := 1000 batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) @@ -897,7 +892,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -949,7 +944,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -989,7 +984,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -1010,7 +1005,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -1031,7 +1026,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1046,7 +1041,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1063,7 +1058,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1112,7 +1107,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1191,7 +1186,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.SendBatchSize = 1000 cfg.Timeout = 10 * time.Minute cfg.MetadataKeys = []string{"token1", "token2"} - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -1285,7 +1280,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeTraces) batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1327,7 +1322,7 @@ func TestBatchZeroConfig(t *testing.T) { const requestCount = 5 const logsPerRequest = 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1368,7 +1363,7 @@ func TestBatchSplitOnly(t *testing.T) { require.NoError(t, cfg.Validate()) sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(component.DataTypeLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) diff --git a/processor/batchprocessor/factory_test.go b/processor/batchprocessor/factory_test.go index 6dbc3af13da..8e61c90c9dc 100644 --- a/processor/batchprocessor/factory_test.go +++ b/processor/batchprocessor/factory_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/processor/processortest" ) @@ -23,20 +24,18 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig() - creationSet := processortest.NewNopSettings() - tp, err := factory.CreateTracesProcessor(context.Background(), creationSet, cfg, nil) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), cfg, nil) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") assert.NoError(t, tp.Shutdown(context.Background())) - mp, err := factory.CreateMetricsProcessor(context.Background(), creationSet, cfg, nil) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), cfg, nil) assert.NotNil(t, mp) assert.NoError(t, err, "cannot create metric processor") assert.NoError(t, mp.Shutdown(context.Background())) - lp, err := factory.CreateLogsProcessor(context.Background(), creationSet, cfg, nil) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), cfg, nil) assert.NotNil(t, lp) assert.NoError(t, err, "cannot create logs processor") assert.NoError(t, lp.Shutdown(context.Background())) diff --git a/processor/batchprocessor/generated_component_telemetry_test.go b/processor/batchprocessor/generated_component_telemetry_test.go index 4747507bcb3..a670bcb63ef 100644 --- a/processor/batchprocessor/generated_component_telemetry_test.go +++ b/processor/batchprocessor/generated_component_telemetry_test.go @@ -23,8 +23,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt component.DataType) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/batchprocessor/generated_component_test.go b/processor/batchprocessor/generated_component_test.go index af68a5bf9a8..4d85d8e5975 100644 --- a/processor/batchprocessor/generated_component_test.go +++ b/processor/batchprocessor/generated_component_test.go @@ -68,13 +68,13 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { t.Run(test.name+"-shutdown", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) t.Run(test.name+"-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 557defcc6c3..7d560763fb5 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -91,3 +91,5 @@ replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/co replace go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus replace go.opentelemetry.io/collector/processor/processorprofiles => ../processorprofiles + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 0c98063ceb2..eca517fd33c 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -32,7 +32,7 @@ type batchProcessorTelemetry struct { } func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) { - attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String())) + attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String()), attribute.String("pipeline", set.PipelineID.String())) telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { diff --git a/processor/go.mod b/processor/go.mod index 726c6aff708..b8d113083d7 100644 --- a/processor/go.mod +++ b/processor/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 + go.opentelemetry.io/collector/component/componentprofiles v0.109.0 go.opentelemetry.io/collector/component/componentstatus v0.109.0 go.opentelemetry.io/collector/config/configtelemetry v0.109.0 go.opentelemetry.io/collector/consumer v0.109.0 @@ -71,3 +72,5 @@ replace go.opentelemetry.io/collector/consumer/consumertest => ../consumer/consu replace go.opentelemetry.io/collector/component/componentstatus => ../component/componentstatus replace go.opentelemetry.io/collector/processor/processorprofiles => ./processorprofiles + +replace go.opentelemetry.io/collector/component/componentprofiles => ../component/componentprofiles diff --git a/processor/internal/obsmetrics.go b/processor/internal/obsmetrics.go index c96fbe5e9e0..055b79547b1 100644 --- a/processor/internal/obsmetrics.go +++ b/processor/internal/obsmetrics.go @@ -6,8 +6,9 @@ package internal // import "go.opentelemetry.io/collector/processor/internal" const ( MetricNameSep = "_" - // ProcessorKey is the key used to identify processors in metrics and traces. ProcessorKey = "processor" + SignalKey = "otel.signal" + PipelineKey = "pipeline" ProcessorMetricPrefix = ProcessorKey + MetricNameSep ) diff --git a/processor/internal/processor.go b/processor/internal/processor.go index 4e7a07a18b8..7750183490e 100644 --- a/processor/internal/processor.go +++ b/processor/internal/processor.go @@ -14,4 +14,7 @@ type Settings struct { // BuildInfo can be used by components for informational purposes BuildInfo component.BuildInfo + + // PipelineID indicates which pipeline contains this processor. + PipelineID component.ID } diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index f63d27a381e..df9eaa9695c 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/memorylimiter" @@ -38,19 +39,19 @@ func TestCreateProcessor(t *testing.T) { pCfg.MemorySpikeLimitMiB = 1907 pCfg.CheckInterval = 100 * time.Millisecond - tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, tp) // test if we can shutdown a monitoring routine that has not started assert.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) - mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, mp) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) - lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/memorylimiterprocessor/generated_component_test.go b/processor/memorylimiterprocessor/generated_component_test.go index 1820e3e0a8c..46f43ebaa58 100644 --- a/processor/memorylimiterprocessor/generated_component_test.go +++ b/processor/memorylimiterprocessor/generated_component_test.go @@ -68,7 +68,7 @@ func TestComponentLifecycle(t *testing.T) { for _, test := range tests { t.Run(test.name+"-lifecycle", func(t *testing.T) { - c, err := test.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := test.createFn(context.Background(), processortest.NewNopSettings(component.MustNewType(test.name)), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index 32e125c5222..87f4daeee31 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -98,3 +98,5 @@ replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/co replace go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus replace go.opentelemetry.io/collector/processor/processorprofiles => ../processorprofiles + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index a09c31e1ff0..8ea274d36c7 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -51,7 +51,7 @@ func TestNoDataLoss(t *testing.T) { cfg.MemoryLimitMiB = uint32(ms.Alloc/(1024*1024) + expectedMemoryIncreaseMiB) cfg.MemorySpikeLimitMiB = 1 - set := processortest.NewNopSettings() + set := processortest.NewNopSettings(component.DataTypeLogs) limiter, err := newMemoryLimiterProcessor(set, cfg) require.NoError(t, err) @@ -174,11 +174,11 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeMetrics), tt.mlCfg) require.NoError(t, err) mp, err := processorhelper.NewMetricsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeMetrics), tt.mlCfg, consumertest.NewNop(), ml.processMetrics, @@ -264,11 +264,11 @@ func TestTraceMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeTraces), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewTracesProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeTraces), tt.mlCfg, consumertest.NewNop(), ml.processTraces, @@ -354,11 +354,11 @@ func TestLogMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(component.DataTypeLogs), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewLogsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(component.DataTypeLogs), tt.mlCfg, consumertest.NewNop(), ml.processLogs, diff --git a/processor/processorhelper/generated_component_telemetry_test.go b/processor/processorhelper/generated_component_telemetry_test.go index f03fb0ad564..12d31a2b1cd 100644 --- a/processor/processorhelper/generated_component_telemetry_test.go +++ b/processor/processorhelper/generated_component_telemetry_test.go @@ -23,8 +23,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt component.DataType) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index f8b76b512a3..f43a4e8eb67 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewLogsProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "logs")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 676155595e5..8e9e5645b5c 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -24,7 +24,7 @@ import ( var testLogsCfg = struct{}{} func TestNewLogsProcessor(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) require.NoError(t, err) assert.True(t, lp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewLogsProcessor(t *testing.T) { func TestNewLogsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewLogsProcessor_WithOptions(t *testing.T) { } func TestNewLogsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), nil) + _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewLogsProcessor_ProcessLogError(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) require.NoError(t, err) assert.Equal(t, want, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } @@ -87,7 +87,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { incomingLogRecords.AppendEmpty() testTelemetry := setupTestTelemetry() - lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeLogs), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) @@ -104,8 +104,12 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + Value: 3, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "logs"), + attribute.String("pipeline", "logs"), + ), }, }, }, @@ -119,8 +123,12 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + Value: 1, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "logs"), + attribute.String("pipeline", "logs"), + ), }, }, }, diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index f98db1e240a..f87f6580638 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewMetricsProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "metrics")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index db59e4be030..fb01790b17a 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -24,7 +24,7 @@ import ( var testMetricsCfg = struct{}{} func TestNewMetricsProcessor(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) require.NoError(t, err) assert.True(t, mp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewMetricsProcessor(t *testing.T) { func TestNewMetricsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewMetricsProcessor_WithOptions(t *testing.T) { } func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), nil) + _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) require.NoError(t, err) assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } @@ -88,7 +88,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { dps.AppendEmpty() testTelemetry := setupTestTelemetry() - mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) + mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeMetrics), &testMetricsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) @@ -105,8 +105,12 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 2, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + Value: 2, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "metrics"), + attribute.String("pipeline", "metrics"), + ), }, }, }, @@ -120,8 +124,12 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + Value: 3, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "metrics"), + attribute.String("pipeline", "metrics"), + ), }, }, }, diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index 791822e0aa7..fdf2f56f6f4 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -55,6 +55,8 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) { return &ObsReport{ otelAttrs: []attribute.KeyValue{ attribute.String(internal.ProcessorKey, cfg.ProcessorID.String()), + attribute.String(internal.SignalKey, cfg.ProcessorCreateSettings.PipelineID.Type().String()), + attribute.String(internal.PipelineKey, cfg.ProcessorCreateSettings.PipelineID.String()), }, telemetryBuilder: telemetryBuilder, }, nil diff --git a/processor/processorhelper/obsreport_test.go b/processor/processorhelper/obsreport_test.go index 6b8aa846e8c..6f1417290b4 100644 --- a/processor/processorhelper/obsreport_test.go +++ b/processor/processorhelper/obsreport_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" @@ -19,18 +20,26 @@ import ( ) var ( - processorID = component.MustNewID("fakeProcessor") + processorID = component.MustNewID("fakeProcessor") + tracesPipeline = component.MustNewIDWithName("traces", "fakePipeline") + metricsPipeline = component.MustNewIDWithName("metrics", "fakePipeline") + logsPipeline = component.MustNewIDWithName("logs", "fakePipeline") ) func TestProcessorTraceData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, tracesPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedSpans = 27 const refusedSpans = 19 const droppedSpans = 13 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) require.NoError(t, err) obsrep.TracesAccepted(context.Background(), acceptedSpans) @@ -42,14 +51,19 @@ func TestProcessorTraceData(t *testing.T) { } func TestProcessorMetricsData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, metricsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedPoints = 29 const refusedPoints = 11 const droppedPoints = 17 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) require.NoError(t, err) obsrep.MetricsAccepted(context.Background(), acceptedPoints) @@ -83,14 +97,19 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, logsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedRecords = 29 const refusedRecords = 11 const droppedRecords = 17 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) require.NoError(t, err) obsrep.LogsAccepted(context.Background(), acceptedRecords) @@ -102,13 +121,18 @@ func TestProcessorLogRecords(t *testing.T) { } func TestCheckProcessorTracesViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(tracesPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) assert.NoError(t, err) @@ -131,13 +155,18 @@ func TestCheckProcessorTracesViews(t *testing.T) { } func TestCheckProcessorMetricsViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(metricsPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) assert.NoError(t, err) @@ -160,13 +189,18 @@ func TestCheckProcessorMetricsViews(t *testing.T) { } func TestCheckProcessorLogViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(logsPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) assert.NoError(t, err) @@ -190,7 +224,7 @@ func TestCheckProcessorLogViews(t *testing.T) { func TestNoMetrics(t *testing.T) { // ensure if LevelNone is configured, no metrics are emitted by the component - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, tracesPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -201,8 +235,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) assert.NoError(t, err) @@ -212,7 +251,7 @@ func TestNoMetrics(t *testing.T) { require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped)) }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, metricsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -223,8 +262,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) assert.NoError(t, err) @@ -234,7 +278,7 @@ func TestNoMetrics(t *testing.T) { require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped)) }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, logsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -245,8 +289,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) assert.NoError(t, err) @@ -258,10 +307,17 @@ func TestNoMetrics(t *testing.T) { }) } -func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { - tt, err := componenttest.SetupTelemetry(id) +func testTelemetry(t *testing.T, id component.ID, pipeline component.ID, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { + tt, err := componenttest.SetupTelemetry(id, extraProcessorAttrs(pipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) testFunc(t, tt) } + +func extraProcessorAttrs(pipeline component.ID) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("otel_signal", pipeline.Type().String()), + attribute.String("pipeline", pipeline.String()), + } +} diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index b2b52c58c76..8c5ffa299d2 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewTracesProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "traces")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 1e11564cdfc..bce18b38e71 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -24,7 +24,7 @@ import ( var testTracesCfg = struct{}{} func TestNewTracesProcessor(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) require.NoError(t, err) assert.True(t, tp.Capabilities().MutatesData) @@ -35,7 +35,7 @@ func TestNewTracesProcessor(t *testing.T) { func TestNewTracesProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +47,19 @@ func TestNewTracesProcessor_WithOptions(t *testing.T) { } func TestNewTracesProcessor_NilRequiredFields(t *testing.T) { - _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), nil) + _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewTracesProcessor_ProcessTraceError(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) require.NoError(t, err) assert.Equal(t, want, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(component.DataTypeTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } @@ -88,7 +88,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { incomingSpans.AppendEmpty() testTelemetry := setupTestTelemetry() - tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(component.DataTypeTraces), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) @@ -105,8 +105,12 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 4, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + Value: 4, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "traces"), + attribute.String("pipeline", "traces"), + ), }, }, }, @@ -120,8 +124,12 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + Value: 1, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "traces"), + attribute.String("pipeline", "traces"), + ), }, }, }, diff --git a/processor/processorprofiles/go.mod b/processor/processorprofiles/go.mod index 558c291fc7e..2f439d01b95 100644 --- a/processor/processorprofiles/go.mod +++ b/processor/processorprofiles/go.mod @@ -54,3 +54,5 @@ replace go.opentelemetry.io/collector/consumer/consumerprofiles => ../../consume replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/consumertest replace go.opentelemetry.io/collector/component/componentstatus => ../../component/componentstatus + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index f1dd64bfd52..9a6d195dca6 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -20,11 +20,12 @@ import ( var nopType = component.MustNewType("nop") // NewNopSettings returns a new nop settings for Create*Processor functions. -func NewNopSettings() processor.Settings { +func NewNopSettings(dt component.DataType) processor.Settings { return processor.Settings{ ID: component.NewIDWithName(nopType, uuid.NewString()), TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: component.NewID(dt), } } diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index fcf902e552d..4bf2d1fe6be 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -27,28 +28,28 @@ func TestNewNopFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &nopConfig{}, cfg) - traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(component.DataTypeTraces), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(component.DataTypeMetrics), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(component.DataTypeLogs), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) - profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(componentprofiles.DataTypeProfiles), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, profiles.Capabilities()) assert.NoError(t, profiles.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processortest/shutdown_verifier.go b/processor/processortest/shutdown_verifier.go index d020f6e4f8a..11c01ad3ba6 100644 --- a/processor/processortest/shutdown_verifier.go +++ b/processor/processortest/shutdown_verifier.go @@ -21,7 +21,7 @@ import ( func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.TracesSink) - proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(component.DataTypeTraces), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } @@ -45,7 +45,7 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.LogsSink) - proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(component.DataTypeLogs), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } @@ -69,7 +69,7 @@ func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Facto func verifyMetricsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.MetricsSink) - proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(component.DataTypeMetrics), cfg, nextSink) if errors.Is(err, component.ErrDataTypeIsNotSupported) { return } diff --git a/service/internal/builders/processor_test.go b/service/internal/builders/processor_test.go index cc1ce0d5db2..01e6eadcbf8 100644 --- a/service/internal/builders/processor_test.go +++ b/service/internal/builders/processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" @@ -186,30 +187,36 @@ func TestNewNopProcessorBuilder(t *testing.T) { factory := processortest.NewNopFactory() cfg := factory.CreateDefaultConfig() - set := processortest.NewNopSettings() - set.ID = component.NewID(nopType) - traces, err := factory.CreateTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + tracesSet := processortest.NewNopSettings(component.DataTypeTraces) + tracesSet.ID = component.NewID(nopType) + traces, err := factory.CreateTracesProcessor(context.Background(), tracesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) + bTraces, err := builder.CreateTraces(context.Background(), tracesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, traces, bTraces) - metrics, err := factory.CreateMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + metricsSet := processortest.NewNopSettings(component.DataTypeMetrics) + metricsSet.ID = component.NewID(nopType) + metrics, err := factory.CreateMetricsProcessor(context.Background(), metricsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) + bMetrics, err := builder.CreateMetrics(context.Background(), metricsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, metrics, bMetrics) - logs, err := factory.CreateLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + logsSet := processortest.NewNopSettings(component.DataTypeLogs) + logsSet.ID = component.NewID(nopType) + logs, err := factory.CreateLogsProcessor(context.Background(), logsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) + bLogs, err := builder.CreateLogs(context.Background(), logsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, logs, bLogs) - profiles, err := factory.CreateProfilesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + profilesSet := processortest.NewNopSettings(componentprofiles.DataTypeProfiles) + profilesSet.ID = component.NewID(nopType) + profiles, err := factory.CreateProfilesProcessor(context.Background(), profilesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bProfiles, err := builder.CreateProfiles(context.Background(), set, consumertest.NewNop()) + bProfiles, err := builder.CreateProfiles(context.Background(), profilesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profiles, bProfiles) } diff --git a/service/internal/graph/nodes.go b/service/internal/graph/nodes.go index f9cc3e6fa7a..f65246b7734 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/nodes.go @@ -144,7 +144,7 @@ func (n *processorNode) buildComponent(ctx context.Context, next baseConsumer, ) error { tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) - set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, PipelineID: n.pipelineID} var err error switch n.pipelineID.Type() { case component.DataTypeTraces: