From d47db797f547f999bcdb3b16e46c41d76c10a969 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Fri, 13 Sep 2024 15:23:57 -0400 Subject: [PATCH] Add 'pipeline' attribute to processor metrics --- .../processor-pipeline-id-attribute-2.yaml | 25 ++++ .../processor-pipeline-id-attribute-3.yaml | 25 ++++ .../processor-pipeline-id-attribute.yaml | 25 ++++ .../component_telemetry_test.go.tmpl | 6 +- .../internal/templates/component_test.go.tmpl | 16 ++- component/componenttest/obsreporttest.go | 16 ++- component/componenttest/otelchecker.go | 25 ++-- .../batchprocessor/batch_processor_test.go | 91 +++++++------- processor/batchprocessor/factory_test.go | 9 +- .../generated_component_telemetry_test.go | 5 +- .../generated_component_test.go | 14 ++- processor/batchprocessor/go.mod | 4 +- processor/batchprocessor/metrics.go | 2 +- processor/go.mod | 3 + processor/internal/obsmetrics.go | 3 +- processor/internal/processor.go | 8 +- .../memorylimiterprocessor/factory_test.go | 7 +- .../generated_component_test.go | 12 +- processor/memorylimiterprocessor/go.mod | 4 +- .../memorylimiter_test.go | 15 +-- .../generated_component_telemetry_test.go | 5 +- processor/processorhelper/logs.go | 2 - processor/processorhelper/logs_test.go | 29 +++-- processor/processorhelper/metrics.go | 2 - processor/processorhelper/metrics_test.go | 29 +++-- processor/processorhelper/obsreport.go | 2 + processor/processorhelper/obsreport_test.go | 117 +++++++++++++----- processor/processorhelper/traces.go | 2 - processor/processorhelper/traces_test.go | 29 +++-- processor/processorprofiles/go.mod | 2 + processor/processortest/nop_processor.go | 4 +- processor/processortest/nop_processor_test.go | 10 +- processor/processortest/shutdown_verifier.go | 6 +- service/internal/builders/processor_test.go | 28 +++-- service/internal/graph/nodes.go | 2 +- service/pipelines/config.go | 8 +- service/service.go | 7 +- service/service_test.go | 20 +-- 38 files changed, 415 insertions(+), 204 deletions(-) create mode 100644 .chloggen/processor-pipeline-id-attribute-2.yaml create mode 100644 .chloggen/processor-pipeline-id-attribute-3.yaml create mode 100644 .chloggen/processor-pipeline-id-attribute.yaml diff --git a/.chloggen/processor-pipeline-id-attribute-2.yaml b/.chloggen/processor-pipeline-id-attribute-2.yaml new file mode 100644 index 00000000000..140ebbff232 --- /dev/null +++ b/.chloggen/processor-pipeline-id-attribute-2.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Processor metrics now include the `pipeline` attribute. + +# One or more tracking issues or pull requests related to the change +issues: [11171] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/.chloggen/processor-pipeline-id-attribute-3.yaml b/.chloggen/processor-pipeline-id-attribute-3.yaml new file mode 100644 index 00000000000..eb905ba4da9 --- /dev/null +++ b/.chloggen/processor-pipeline-id-attribute-3.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: componenttest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: SetupTelemetry now accepts optional `extraAttrs ...attribute.KeyValue` parameter. + +# One or more tracking issues or pull requests related to the change +issues: [11171] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/processor-pipeline-id-attribute.yaml b/.chloggen/processor-pipeline-id-attribute.yaml new file mode 100644 index 00000000000..736049127a0 --- /dev/null +++ b/.chloggen/processor-pipeline-id-attribute.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processortest + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: NewNopSettings now requires a component.DataType parameter. + +# One or more tracking issues or pull requests related to the change +issues: [11171] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl b/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl index 58c0ab7d6f6..561c6105c6d 100644 --- a/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl +++ b/cmd/mdatagen/internal/templates/component_telemetry_test.go.tmpl @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - + "go.opentelemetry.io/collector/component" {{- if or isConnector isExporter isExtension isProcessor isReceiver }} "go.opentelemetry.io/collector/config/configtelemetry" @@ -28,8 +28,8 @@ type componentTestTelemetry struct { } {{- if or isConnector isExporter isExtension isProcessor isReceiver }} -func (tt *componentTestTelemetry) NewSettings() {{ .Status.Class }}.Settings { - settings := {{ .Status.Class }}test.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings({{- if isProcessor -}}dt pipeline.Signal{{- end -}}) {{ .Status.Class }}.Settings { + settings := {{ .Status.Class }}test.NewNopSettings({{- if isProcessor -}}dt{{- end -}}) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/cmd/mdatagen/internal/templates/component_test.go.tmpl b/cmd/mdatagen/internal/templates/component_test.go.tmpl index c40f3f39d07..81cefbab47d 100644 --- a/cmd/mdatagen/internal/templates/component_test.go.tmpl +++ b/cmd/mdatagen/internal/templates/component_test.go.tmpl @@ -126,7 +126,7 @@ func TestComponentLifecycle(t *testing.T) { switch tt.name { case "logs": e, ok := c.(exporter.Logs) - require.True(t, ok) + require.True(t, ok) logs := generateLifecycleTestLogs() if !e.Capabilities().MutatesData { logs.MarkReadOnly() @@ -203,9 +203,19 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, sub.Unmarshal(&cfg)) for _, tt := range tests { + var signal pipeline.Signal + switch tt.name { + case "logs": + signal = pipeline.SignalLogs + case "metrics": + signal = pipeline.SignalMetrics + case "traces": + signal = pipeline.SignalTraces + } + {{- if not .Tests.SkipShutdown }} t.Run(tt.name + "-shutdown", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) @@ -214,7 +224,7 @@ func TestComponentLifecycle(t *testing.T) { {{- if not .Tests.SkipLifecycle }} t.Run(tt.name + "-lifecycle", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg) require.NoError(t, err) host := {{ .Tests.Host }} err = c.Start(context.Background(), host) diff --git a/component/componenttest/obsreporttest.go b/component/componenttest/obsreporttest.go index d26ecf2ab8e..1903dd514bb 100644 --- a/component/componenttest/obsreporttest.go +++ b/component/componenttest/obsreporttest.go @@ -30,11 +30,13 @@ const ( transportTag = "transport" exporterTag = "exporter" processorTag = "processor" + pipelineTag = "pipeline" ) type TestTelemetry struct { ts component.TelemetrySettings id component.ID + extraAttrs []attribute.KeyValue SpanRecorder *tracetest.SpanRecorder reader *sdkmetric.ManualReader @@ -80,19 +82,22 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64, ext // CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error { - return checkProcessorTraces(tts.reader, tts.id, acceptedSpans, refusedSpans, droppedSpans) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...) + return checkProcessorTraces(tts.reader, attrs, acceptedSpans, refusedSpans, droppedSpans) } // CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error { - return checkProcessorMetrics(tts.reader, tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...) + return checkProcessorMetrics(tts.reader, attrs, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints) } // CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values. // Note: SetupTelemetry must be called before this function. func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error { - return checkProcessorLogs(tts.reader, tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords) + attrs := attributesForProcessorMetrics(tts.id, tts.extraAttrs...) + return checkProcessorLogs(tts.reader, attrs, acceptedLogRecords, refusedLogRecords, droppedLogRecords) } // CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values. @@ -134,16 +139,17 @@ func (tts *TestTelemetry) TelemetrySettings() component.TelemetrySettings { return tts.ts } -// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, producers, or exporters. +// SetupTelemetry sets up the testing environment to check the metrics recorded by receivers, or exporters. // The caller must pass the ID of the component being tested. The ID will be used by the CreateSettings and Check methods. // The caller must defer a call to `Shutdown` on the returned TestTelemetry. -func SetupTelemetry(id component.ID) (TestTelemetry, error) { +func SetupTelemetry(id component.ID, extraAttrs ...attribute.KeyValue) (TestTelemetry, error) { sr := new(tracetest.SpanRecorder) tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) settings := TestTelemetry{ ts: NewNopTelemetrySettings(), id: id, + extraAttrs: extraAttrs, SpanRecorder: sr, } settings.ts.TracerProvider = tp diff --git a/component/componenttest/otelchecker.go b/component/componenttest/otelchecker.go index 275ad35f45c..064be50f179 100644 --- a/component/componenttest/otelchecker.go +++ b/component/componenttest/otelchecker.go @@ -41,24 +41,23 @@ func checkReceiver(reader *sdkmetric.ManualReader, receiver component.ID, dataty checkIntSum(reader, fmt.Sprintf("otelcol_receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs)) } -func checkProcessorTraces(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error { - return checkProcessor(reader, processor, "spans", accepted, refused, dropped) +func checkProcessorTraces(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return checkProcessor(reader, attrs, "spans", accepted, refused, dropped) } -func checkProcessorMetrics(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error { - return checkProcessor(reader, processor, "metric_points", accepted, refused, dropped) +func checkProcessorMetrics(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return checkProcessor(reader, attrs, "metric_points", accepted, refused, dropped) } -func checkProcessorLogs(reader *sdkmetric.ManualReader, processor component.ID, accepted, refused, dropped int64) error { - return checkProcessor(reader, processor, "log_records", accepted, refused, dropped) +func checkProcessorLogs(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, accepted, refused, dropped int64) error { + return checkProcessor(reader, attrs, "log_records", accepted, refused, dropped) } -func checkProcessor(reader *sdkmetric.ManualReader, processor component.ID, datatype string, accepted, refused, dropped int64) error { - processorAttrs := attributesForProcessorMetrics(processor) +func checkProcessor(reader *sdkmetric.ManualReader, attrs []attribute.KeyValue, datatype string, accepted, refused, dropped int64) error { return multierr.Combine( - checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, processorAttrs), - checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, processorAttrs), - checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, processorAttrs), + checkIntSum(reader, fmt.Sprintf("otelcol_processor_accepted_%s", datatype), accepted, attrs), + checkIntSum(reader, fmt.Sprintf("otelcol_processor_refused_%s", datatype), refused, attrs), + checkIntSum(reader, fmt.Sprintf("otelcol_processor_dropped_%s", datatype), dropped, attrs), ) } @@ -187,8 +186,8 @@ func attributesForReceiverMetrics(receiver component.ID, transport string) []att } } -func attributesForProcessorMetrics(processor component.ID) []attribute.KeyValue { - return []attribute.KeyValue{attribute.String(processorTag, processor.String())} +func attributesForProcessorMetrics(processor component.ID, extraAttrs ...attribute.KeyValue) []attribute.KeyValue { + return append(extraAttrs, attribute.String(processorTag, processor.String())) } // attributesForExporterMetrics returns the attributes that are needed for the receiver metrics. diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 7f74df04332..7255151bb82 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -26,30 +26,28 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) func TestProcessorShutdown(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { require.NotPanics(t, func() { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(pipeline.SignalTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = tProc.Shutdown(ctx) }) require.NotPanics(t, func() { - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(pipeline.SignalMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = mProc.Shutdown(ctx) }) require.NotPanics(t, func() { - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(pipeline.SignalLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) _ = lProc.Shutdown(ctx) }) @@ -58,22 +56,19 @@ func TestProcessorShutdown(t *testing.T) { func TestProcessorLifecycle(t *testing.T) { factory := NewFactory() - ctx := context.Background() - processorCreationSet := processortest.NewNopSettings() - for i := 0; i < 5; i++ { - tProc, err := factory.CreateTracesProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + tProc, err := factory.CreateTracesProcessor(ctx, processortest.NewNopSettings(pipeline.SignalTraces), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, tProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, tProc.Shutdown(ctx)) - mProc, err := factory.CreateMetricsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + mProc, err := factory.CreateMetricsProcessor(ctx, processortest.NewNopSettings(pipeline.SignalMetrics), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, mProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, mProc.Shutdown(ctx)) - lProc, err := factory.CreateLogsProcessor(ctx, processorCreationSet, factory.CreateDefaultConfig(), consumertest.NewNop()) + lProc, err := factory.CreateLogsProcessor(ctx, processortest.NewNopSettings(pipeline.SignalLogs), factory.CreateDefaultConfig(), consumertest.NewNop()) require.NoError(t, err) require.NoError(t, lProc.Start(ctx, componenttest.NewNopHost())) require.NoError(t, lProc.Shutdown(ctx)) @@ -84,7 +79,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -127,7 +122,7 @@ func TestBatchProcessorSpansDeliveredEnforceBatchSize(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.SendBatchMaxSize = 130 - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -174,7 +169,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { sendBatchSize := 20 cfg.SendBatchSize = uint32(sendBatchSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -220,7 +215,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -241,7 +236,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -262,7 +257,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -277,7 +272,7 @@ func TestBatchProcessorSentBySize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -295,7 +290,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { cfg.SendBatchSize = uint32(sendBatchSize) cfg.SendBatchMaxSize = uint32(sendBatchMaxSize) cfg.Timeout = 500 * time.Millisecond - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -346,7 +341,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -367,7 +362,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 1, uint64(expectedBatchesNum - 1), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -388,7 +383,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum - 1), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -403,7 +398,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -418,7 +413,7 @@ func TestBatchProcessorSentBySizeWithMaxSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "traces")), }, }, }, @@ -437,7 +432,7 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { spansPerRequest := 10 start := time.Now() - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -484,7 +479,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { } sink := new(consumertest.TracesSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -515,7 +510,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { metricsPerRequest := 5 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -569,7 +564,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { dataPointsPerRequest := metricsPerRequest * dataPointsPerMetric sink := new(consumertest.MetricsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(pipeline.SignalMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -609,7 +604,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -630,7 +625,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -651,7 +646,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -666,7 +661,7 @@ func TestBatchMetricProcessorBatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "metrics")), }, }, }, @@ -702,7 +697,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -751,7 +746,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { metricsPerRequest := 10 sink := new(consumertest.MetricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -849,7 +844,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { func runMetricsProcessorBenchmark(b *testing.B, cfg Config) { ctx := context.Background() sink := new(metricsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalMetrics) creationSet.MetricsLevel = configtelemetry.LevelDetailed metricsPerRequest := 1000 batcher, err := newBatchMetricsProcessor(creationSet, sink, &cfg) @@ -897,7 +892,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -949,7 +944,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { logsPerRequest := 5 sink := new(consumertest.LogsSink) - creationSet := tel.NewSettings() + creationSet := tel.NewSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -989,7 +984,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100_000, 200_000, 300_000, 400_000, 500_000, 600_000, 700_000, 800_000, 900_000, @@ -1010,7 +1005,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.HistogramDataPoint[int64]{ { - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), Count: uint64(expectedBatchesNum), Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, BucketCounts: []uint64{0, 0, uint64(expectedBatchesNum), 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, @@ -1031,7 +1026,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: int64(expectedBatchesNum), - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1046,7 +1041,7 @@ func TestBatchLogProcessor_BatchSize(t *testing.T) { DataPoints: []metricdata.DataPoint[int64]{ { Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "batch")), + Attributes: attribute.NewSet(attribute.String("processor", "batch"), attribute.String("pipeline", "logs")), }, }, }, @@ -1063,7 +1058,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1112,7 +1107,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { logsPerRequest := 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1191,7 +1186,7 @@ func TestBatchProcessorSpansBatchedByMetadata(t *testing.T) { cfg.SendBatchSize = 1000 cfg.Timeout = 10 * time.Minute cfg.MetadataKeys = []string{"token1", "token2"} - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) @@ -1285,7 +1280,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.MetadataKeys = []string{"token"} cfg.MetadataCardinalityLimit = cardLimit - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalTraces) batcher, err := newBatchTracesProcessor(creationSet, sink, cfg) require.NoError(t, err) require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) @@ -1327,7 +1322,7 @@ func TestBatchZeroConfig(t *testing.T) { const requestCount = 5 const logsPerRequest = 10 sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) @@ -1368,7 +1363,7 @@ func TestBatchSplitOnly(t *testing.T) { require.NoError(t, cfg.Validate()) sink := new(consumertest.LogsSink) - creationSet := processortest.NewNopSettings() + creationSet := processortest.NewNopSettings(pipeline.SignalLogs) creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchLogsProcessor(creationSet, sink, &cfg) require.NoError(t, err) diff --git a/processor/batchprocessor/factory_test.go b/processor/batchprocessor/factory_test.go index 6dbc3af13da..d3360ec0545 100644 --- a/processor/batchprocessor/factory_test.go +++ b/processor/batchprocessor/factory_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) @@ -23,20 +24,18 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := NewFactory() - cfg := factory.CreateDefaultConfig() - creationSet := processortest.NewNopSettings() - tp, err := factory.CreateTracesProcessor(context.Background(), creationSet, cfg, nil) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), cfg, nil) assert.NotNil(t, tp) assert.NoError(t, err, "cannot create trace processor") assert.NoError(t, tp.Shutdown(context.Background())) - mp, err := factory.CreateMetricsProcessor(context.Background(), creationSet, cfg, nil) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), cfg, nil) assert.NotNil(t, mp) assert.NoError(t, err, "cannot create metric processor") assert.NoError(t, mp.Shutdown(context.Background())) - lp, err := factory.CreateLogsProcessor(context.Background(), creationSet, cfg, nil) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), cfg, nil) assert.NotNil(t, lp) assert.NoError(t, err, "cannot create logs processor") assert.NoError(t, lp.Shutdown(context.Background())) diff --git a/processor/batchprocessor/generated_component_telemetry_test.go b/processor/batchprocessor/generated_component_telemetry_test.go index 4747507bcb3..31462ecd6e0 100644 --- a/processor/batchprocessor/generated_component_telemetry_test.go +++ b/processor/batchprocessor/generated_component_telemetry_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" ) @@ -23,8 +24,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt pipeline.Signal) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/batchprocessor/generated_component_test.go b/processor/batchprocessor/generated_component_test.go index 0de5ebd38a2..b2284b7bdae 100644 --- a/processor/batchprocessor/generated_component_test.go +++ b/processor/batchprocessor/generated_component_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" ) @@ -67,14 +68,23 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, sub.Unmarshal(&cfg)) for _, tt := range tests { + var signal pipeline.Signal + switch tt.name { + case "logs": + signal = pipeline.SignalLogs + case "metrics": + signal = pipeline.SignalMetrics + case "traces": + signal = pipeline.SignalTraces + } t.Run(tt.name+"-shutdown", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg) require.NoError(t, err) err = c.Shutdown(context.Background()) require.NoError(t, err) }) t.Run(tt.name+"-lifecycle", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/batchprocessor/go.mod b/processor/batchprocessor/go.mod index 4fc64f81f2c..e48c9d59472 100644 --- a/processor/batchprocessor/go.mod +++ b/processor/batchprocessor/go.mod @@ -12,6 +12,7 @@ require ( go.opentelemetry.io/collector/consumer/consumertest v0.110.0 go.opentelemetry.io/collector/pdata v1.16.0 go.opentelemetry.io/collector/pdata/testdata v0.110.0 + go.opentelemetry.io/collector/pipeline v0.110.0 go.opentelemetry.io/collector/processor v0.110.0 go.opentelemetry.io/otel v1.30.0 go.opentelemetry.io/otel/metric v1.30.0 @@ -41,7 +42,6 @@ require ( go.opentelemetry.io/collector/consumer/consumerprofiles v0.110.0 // indirect go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect - go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect go.opentelemetry.io/otel/sdk v1.30.0 // indirect go.uber.org/multierr v1.11.0 // indirect @@ -88,3 +88,5 @@ replace go.opentelemetry.io/collector/processor/processorprofiles => ../processo replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/globalsignal => ../../internal/globalsignal + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/batchprocessor/metrics.go b/processor/batchprocessor/metrics.go index 0c98063ceb2..eca517fd33c 100644 --- a/processor/batchprocessor/metrics.go +++ b/processor/batchprocessor/metrics.go @@ -32,7 +32,7 @@ type batchProcessorTelemetry struct { } func newBatchProcessorTelemetry(set processor.Settings, currentMetadataCardinality func() int) (*batchProcessorTelemetry, error) { - attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String())) + attrs := attribute.NewSet(attribute.String(internal.ProcessorKey, set.ID.String()), attribute.String("pipeline", set.PipelineID.String())) telemetryBuilder, err := metadata.NewTelemetryBuilder(set.TelemetrySettings, metadata.WithProcessorBatchMetadataCardinalityCallback(func() int64 { diff --git a/processor/go.mod b/processor/go.mod index 0d369f573de..602f11d2cd8 100644 --- a/processor/go.mod +++ b/processor/go.mod @@ -6,6 +6,7 @@ require ( github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.110.0 + go.opentelemetry.io/collector/component/componentprofiles v0.110.0 go.opentelemetry.io/collector/component/componentstatus v0.110.0 go.opentelemetry.io/collector/config/configtelemetry v0.110.0 go.opentelemetry.io/collector/consumer v0.110.0 @@ -68,3 +69,5 @@ replace go.opentelemetry.io/collector/processor/processorprofiles => ./processor replace go.opentelemetry.io/collector/pipeline => ../pipeline replace go.opentelemetry.io/collector/internal/globalsignal => ../internal/globalsignal + +replace go.opentelemetry.io/collector/component/componentprofiles => ../component/componentprofiles diff --git a/processor/internal/obsmetrics.go b/processor/internal/obsmetrics.go index c96fbe5e9e0..055b79547b1 100644 --- a/processor/internal/obsmetrics.go +++ b/processor/internal/obsmetrics.go @@ -6,8 +6,9 @@ package internal // import "go.opentelemetry.io/collector/processor/internal" const ( MetricNameSep = "_" - // ProcessorKey is the key used to identify processors in metrics and traces. ProcessorKey = "processor" + SignalKey = "otel.signal" + PipelineKey = "pipeline" ProcessorMetricPrefix = ProcessorKey + MetricNameSep ) diff --git a/processor/internal/processor.go b/processor/internal/processor.go index 4e7a07a18b8..c2d2787166a 100644 --- a/processor/internal/processor.go +++ b/processor/internal/processor.go @@ -3,7 +3,10 @@ package internal // import "go.opentelemetry.io/collector/processor/internal" -import "go.opentelemetry.io/collector/component" +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pipeline" +) // Settings is passed to Create* functions in Factory. type Settings struct { @@ -14,4 +17,7 @@ type Settings struct { // BuildInfo can be used by components for informational purposes BuildInfo component.BuildInfo + + // PipelineID indicates which pipeline contains this processor. + PipelineID pipeline.ID } diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index 2502fa5b1ef..aead78ecc15 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/memorylimiter" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) @@ -38,19 +39,19 @@ func TestCreateProcessor(t *testing.T) { pCfg.MemorySpikeLimitMiB = 1907 pCfg.CheckInterval = 100 * time.Millisecond - tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, tp) // test if we can shutdown a monitoring routine that has not started require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted) require.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) - mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, mp) require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) - lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), cfg, consumertest.NewNop()) require.NoError(t, err) assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/memorylimiterprocessor/generated_component_test.go b/processor/memorylimiterprocessor/generated_component_test.go index 598a82be167..0c2b58e6de9 100644 --- a/processor/memorylimiterprocessor/generated_component_test.go +++ b/processor/memorylimiterprocessor/generated_component_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" ) @@ -67,8 +68,17 @@ func TestComponentLifecycle(t *testing.T) { require.NoError(t, sub.Unmarshal(&cfg)) for _, tt := range tests { + var signal pipeline.Signal + switch tt.name { + case "logs": + signal = pipeline.SignalLogs + case "metrics": + signal = pipeline.SignalMetrics + case "traces": + signal = pipeline.SignalTraces + } t.Run(tt.name+"-lifecycle", func(t *testing.T) { - c, err := tt.createFn(context.Background(), processortest.NewNopSettings(), cfg) + c, err := tt.createFn(context.Background(), processortest.NewNopSettings(signal), cfg) require.NoError(t, err) host := componenttest.NewNopHost() err = c.Start(context.Background(), host) diff --git a/processor/memorylimiterprocessor/go.mod b/processor/memorylimiterprocessor/go.mod index ec8fdefef83..440679d9ebd 100644 --- a/processor/memorylimiterprocessor/go.mod +++ b/processor/memorylimiterprocessor/go.mod @@ -10,6 +10,7 @@ require ( go.opentelemetry.io/collector/consumer v0.110.0 go.opentelemetry.io/collector/consumer/consumertest v0.110.0 go.opentelemetry.io/collector/pdata v1.16.0 + go.opentelemetry.io/collector/pipeline v0.110.0 go.opentelemetry.io/collector/processor v0.110.0 go.uber.org/goleak v1.3.0 ) @@ -43,7 +44,6 @@ require ( go.opentelemetry.io/collector/internal/globalsignal v0.110.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.110.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.110.0 // indirect - go.opentelemetry.io/collector/pipeline v0.110.0 // indirect go.opentelemetry.io/collector/processor/processorprofiles v0.110.0 // indirect go.opentelemetry.io/otel v1.30.0 // indirect go.opentelemetry.io/otel/metric v1.30.0 // indirect @@ -95,3 +95,5 @@ replace go.opentelemetry.io/collector/processor/processorprofiles => ../processo replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/globalsignal => ../../internal/globalsignal + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index 9794b7dcd4b..749d5c1e6af 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -20,6 +20,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal" "go.opentelemetry.io/collector/processor/processorhelper" @@ -51,7 +52,7 @@ func TestNoDataLoss(t *testing.T) { cfg.MemoryLimitMiB = uint32(ms.Alloc/(1024*1024) + expectedMemoryIncreaseMiB) cfg.MemorySpikeLimitMiB = 1 - set := processortest.NewNopSettings() + set := processortest.NewNopSettings(pipeline.SignalLogs) limiter, err := newMemoryLimiterProcessor(set, cfg) require.NoError(t, err) @@ -174,11 +175,11 @@ func TestMetricsMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(pipeline.SignalMetrics), tt.mlCfg) require.NoError(t, err) mp, err := processorhelper.NewMetricsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(pipeline.SignalMetrics), tt.mlCfg, consumertest.NewNop(), ml.processMetrics, @@ -264,11 +265,11 @@ func TestTraceMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(pipeline.SignalTraces), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewTracesProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(pipeline.SignalTraces), tt.mlCfg, consumertest.NewNop(), ml.processTraces, @@ -354,11 +355,11 @@ func TestLogMemoryPressureResponse(t *testing.T) { ms.Alloc = tt.memAlloc } - ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(), tt.mlCfg) + ml, err := newMemoryLimiterProcessor(processortest.NewNopSettings(pipeline.SignalLogs), tt.mlCfg) require.NoError(t, err) tp, err := processorhelper.NewLogsProcessor( context.Background(), - processortest.NewNopSettings(), + processortest.NewNopSettings(pipeline.SignalLogs), tt.mlCfg, consumertest.NewNop(), ml.processLogs, diff --git a/processor/processorhelper/generated_component_telemetry_test.go b/processor/processorhelper/generated_component_telemetry_test.go index f03fb0ad564..feddacdffe5 100644 --- a/processor/processorhelper/generated_component_telemetry_test.go +++ b/processor/processorhelper/generated_component_telemetry_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" ) @@ -23,8 +24,8 @@ type componentTestTelemetry struct { meterProvider *sdkmetric.MeterProvider } -func (tt *componentTestTelemetry) NewSettings() processor.Settings { - settings := processortest.NewNopSettings() +func (tt *componentTestTelemetry) NewSettings(dt pipeline.Signal) processor.Settings { + settings := processortest.NewNopSettings(dt) settings.MeterProvider = tt.meterProvider settings.LeveledMeterProvider = func(_ configtelemetry.Level) metric.MeterProvider { return tt.meterProvider diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index f8b76b512a3..f43a4e8eb67 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewLogsProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "logs")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 1fe5bb7098d..1b1120be60b 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -18,13 +18,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) var testLogsCfg = struct{}{} func TestNewLogsProcessor(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil)) require.NoError(t, err) assert.True(t, lp.Capabilities().MutatesData) @@ -35,7 +36,7 @@ func TestNewLogsProcessor(t *testing.T) { func TestNewLogsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +48,19 @@ func TestNewLogsProcessor_WithOptions(t *testing.T) { } func TestNewLogsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), nil) + _, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewLogsProcessor_ProcessLogError(t *testing.T) { want := errors.New("my_error") - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(want)) require.NoError(t, err) assert.Equal(t, want, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } func TestNewLogsProcessor_ProcessLogsErrSkipProcessingData(t *testing.T) { - lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) + lp, err := NewLogsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), newTestLProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, lp.ConsumeLogs(context.Background(), plog.NewLogs())) } @@ -87,7 +88,7 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { incomingLogRecords.AppendEmpty() testTelemetry := setupTestTelemetry() - lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + lp, err := NewLogsProcessor(context.Background(), testTelemetry.NewSettings(pipeline.SignalLogs), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) @@ -104,8 +105,12 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + Value: 3, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "logs"), + attribute.String("pipeline", "logs"), + ), }, }, }, @@ -119,8 +124,12 @@ func TestLogsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + Value: 1, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "logs"), + attribute.String("pipeline", "logs"), + ), }, }, }, diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index f98db1e240a..f87f6580638 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewMetricsProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "metrics")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 90293dd0354..8c0e1d10583 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -18,13 +18,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) var testMetricsCfg = struct{}{} func TestNewMetricsProcessor(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil)) require.NoError(t, err) assert.True(t, mp.Capabilities().MutatesData) @@ -35,7 +36,7 @@ func TestNewMetricsProcessor(t *testing.T) { func TestNewMetricsProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +48,19 @@ func TestNewMetricsProcessor_WithOptions(t *testing.T) { } func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) { - _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), nil) + _, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) { want := errors.New("my_error") - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(want)) require.NoError(t, err) assert.Equal(t, want, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } func TestNewMetricsProcessor_ProcessMetricsErrSkipProcessingData(t *testing.T) { - mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) + mp, err := NewMetricsProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), newTestMProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, mp.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) } @@ -88,7 +89,7 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { dps.AppendEmpty() testTelemetry := setupTestTelemetry() - mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockAggregate) + mp, err := NewMetricsProcessor(context.Background(), testTelemetry.NewSettings(pipeline.SignalMetrics), &testMetricsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) @@ -105,8 +106,12 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 2, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + Value: 2, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "metrics"), + attribute.String("pipeline", "metrics"), + ), }, }, }, @@ -120,8 +125,12 @@ func TestMetricsProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 3, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + Value: 3, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "metrics"), + attribute.String("pipeline", "metrics"), + ), }, }, }, diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index d1445a3ee05..b43983d11ad 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -56,6 +56,8 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) { return &ObsReport{ otelAttrs: []attribute.KeyValue{ attribute.String(internal.ProcessorKey, cfg.ProcessorID.String()), + attribute.String(internal.SignalKey, cfg.ProcessorCreateSettings.PipelineID.Signal().String()), + attribute.String(internal.PipelineKey, cfg.ProcessorCreateSettings.PipelineID.String()), }, telemetryBuilder: telemetryBuilder, }, nil diff --git a/processor/processorhelper/obsreport_test.go b/processor/processorhelper/obsreport_test.go index 4a3cb25507f..8f48e6be015 100644 --- a/processor/processorhelper/obsreport_test.go +++ b/processor/processorhelper/obsreport_test.go @@ -9,28 +9,38 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" ) var ( - processorID = component.MustNewID("fakeProcessor") + processorID = component.MustNewID("fakeProcessor") + tracesPipeline = pipeline.MustNewIDWithName("traces", "fakePipeline") + metricsPipeline = pipeline.MustNewIDWithName("metrics", "fakePipeline") + logsPipeline = pipeline.MustNewIDWithName("logs", "fakePipeline") ) func TestProcessorTraceData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, tracesPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedSpans = 27 const refusedSpans = 19 const droppedSpans = 13 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) require.NoError(t, err) obsrep.TracesAccepted(context.Background(), acceptedSpans) @@ -42,14 +52,19 @@ func TestProcessorTraceData(t *testing.T) { } func TestProcessorMetricsData(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, metricsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedPoints = 29 const refusedPoints = 11 const droppedPoints = 17 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) require.NoError(t, err) obsrep.MetricsAccepted(context.Background(), acceptedPoints) @@ -83,14 +98,19 @@ func TestBuildProcessorCustomMetricName(t *testing.T) { } func TestProcessorLogRecords(t *testing.T) { - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, logsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const acceptedRecords = 29 const refusedRecords = 11 const droppedRecords = 17 obsrep, err := newObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) require.NoError(t, err) obsrep.LogsAccepted(context.Background(), acceptedRecords) @@ -102,13 +122,18 @@ func TestProcessorLogRecords(t *testing.T) { } func TestCheckProcessorTracesViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(tracesPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) require.NoError(t, err) @@ -131,13 +156,18 @@ func TestCheckProcessorTracesViews(t *testing.T) { } func TestCheckProcessorMetricsViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(metricsPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) require.NoError(t, err) @@ -160,13 +190,18 @@ func TestCheckProcessorMetricsViews(t *testing.T) { } func TestCheckProcessorLogViews(t *testing.T) { - tt, err := componenttest.SetupTelemetry(processorID) + tt, err := componenttest.SetupTelemetry(processorID, extraProcessorAttrs(logsPipeline)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: tt.TelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) require.NoError(t, err) @@ -190,7 +225,7 @@ func TestCheckProcessorLogViews(t *testing.T) { func TestNoMetrics(t *testing.T) { // ensure if LevelNone is configured, no metrics are emitted by the component - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, tracesPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -201,8 +236,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: tracesPipeline, + }, }) require.NoError(t, err) @@ -212,7 +252,7 @@ func TestNoMetrics(t *testing.T) { require.Error(t, tt.CheckProcessorTraces(accepted, refused, dropped)) }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, metricsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -223,8 +263,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: metricsPipeline, + }, }) require.NoError(t, err) @@ -234,7 +279,7 @@ func TestNoMetrics(t *testing.T) { require.Error(t, tt.CheckProcessorMetrics(accepted, refused, dropped)) }) - testTelemetry(t, processorID, func(t *testing.T, tt componenttest.TestTelemetry) { + testTelemetry(t, processorID, logsPipeline, func(t *testing.T, tt componenttest.TestTelemetry) { const accepted = 29 const refused = 11 const dropped = 17 @@ -245,8 +290,13 @@ func TestNoMetrics(t *testing.T) { } por, err := NewObsReport(ObsReportSettings{ - ProcessorID: processorID, - ProcessorCreateSettings: processor.Settings{ID: processorID, TelemetrySettings: set, BuildInfo: component.NewDefaultBuildInfo()}, + ProcessorID: processorID, + ProcessorCreateSettings: processor.Settings{ + ID: processorID, + TelemetrySettings: set, + BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: logsPipeline, + }, }) require.NoError(t, err) @@ -258,10 +308,17 @@ func TestNoMetrics(t *testing.T) { }) } -func testTelemetry(t *testing.T, id component.ID, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { - tt, err := componenttest.SetupTelemetry(id) +func testTelemetry(t *testing.T, id component.ID, pid pipeline.ID, testFunc func(t *testing.T, tt componenttest.TestTelemetry)) { + tt, err := componenttest.SetupTelemetry(id, extraProcessorAttrs(pid)...) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) testFunc(t, tt) } + +func extraProcessorAttrs(pid pipeline.ID) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String("otel.signal", pid.Signal().String()), + attribute.String("pipeline", pid.String()), + } +} diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index b2b52c58c76..8c5ffa299d2 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -7,7 +7,6 @@ import ( "context" "errors" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" @@ -47,7 +46,6 @@ func NewTracesProcessor( if err != nil { return nil, err } - obs.otelAttrs = append(obs.otelAttrs, attribute.String("otel.signal", "traces")) eventOptions := spanAttributes(set.ID) bs := fromOptions(options) diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 74fbea2ed88..e3f3d5e8da7 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -18,13 +18,14 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor/processortest" ) var testTracesCfg = struct{}{} func TestNewTracesProcessor(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil)) require.NoError(t, err) assert.True(t, tp.Capabilities().MutatesData) @@ -35,7 +36,7 @@ func TestNewTracesProcessor(t *testing.T) { func TestNewTracesProcessor_WithOptions(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(nil), WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithCapabilities(consumer.Capabilities{MutatesData: false})) @@ -47,19 +48,19 @@ func TestNewTracesProcessor_WithOptions(t *testing.T) { } func TestNewTracesProcessor_NilRequiredFields(t *testing.T) { - _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), nil) + _, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), &testTracesCfg, consumertest.NewNop(), nil) assert.Error(t, err) } func TestNewTracesProcessor_ProcessTraceError(t *testing.T) { want := errors.New("my_error") - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(want)) require.NoError(t, err) assert.Equal(t, want, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } func TestNewTracesProcessor_ProcessTracesErrSkipProcessingData(t *testing.T) { - tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) + tp, err := NewTracesProcessor(context.Background(), processortest.NewNopSettings(pipeline.SignalTraces), &testTracesCfg, consumertest.NewNop(), newTestTProcessor(ErrSkipProcessingData)) require.NoError(t, err) assert.NoError(t, tp.ConsumeTraces(context.Background(), ptrace.NewTraces())) } @@ -88,7 +89,7 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { incomingSpans.AppendEmpty() testTelemetry := setupTestTelemetry() - tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockAggregate) + tp, err := NewTracesProcessor(context.Background(), testTelemetry.NewSettings(pipeline.SignalTraces), &testLogsCfg, consumertest.NewNop(), mockAggregate) require.NoError(t, err) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) @@ -105,8 +106,12 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 4, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + Value: 4, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "traces"), + attribute.String("pipeline", "traces"), + ), }, }, }, @@ -120,8 +125,12 @@ func TestTracesProcessor_RecordInOut(t *testing.T) { IsMonotonic: true, DataPoints: []metricdata.DataPoint[int64]{ { - Value: 1, - Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + Value: 1, + Attributes: attribute.NewSet( + attribute.String("processor", "processorhelper"), + attribute.String("otel.signal", "traces"), + attribute.String("pipeline", "traces"), + ), }, }, }, diff --git a/processor/processorprofiles/go.mod b/processor/processorprofiles/go.mod index 0be1e7d8e7d..ca8410e5430 100644 --- a/processor/processorprofiles/go.mod +++ b/processor/processorprofiles/go.mod @@ -60,3 +60,5 @@ replace go.opentelemetry.io/collector/component/componentstatus => ../../compone replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/globalsignal => ../../internal/globalsignal + +replace go.opentelemetry.io/collector/component/componentprofiles => ../../component/componentprofiles diff --git a/processor/processortest/nop_processor.go b/processor/processortest/nop_processor.go index f1dd64bfd52..1b7e83c252b 100644 --- a/processor/processortest/nop_processor.go +++ b/processor/processortest/nop_processor.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorprofiles" ) @@ -20,11 +21,12 @@ import ( var nopType = component.MustNewType("nop") // NewNopSettings returns a new nop settings for Create*Processor functions. -func NewNopSettings() processor.Settings { +func NewNopSettings(dt pipeline.Signal) processor.Settings { return processor.Settings{ ID: component.NewIDWithName(nopType, uuid.NewString()), TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), + PipelineID: pipeline.NewID(dt), } } diff --git a/processor/processortest/nop_processor_test.go b/processor/processortest/nop_processor_test.go index fcf902e552d..10ee8ea1e9e 100644 --- a/processor/processortest/nop_processor_test.go +++ b/processor/processortest/nop_processor_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -18,6 +19,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" ) func TestNewNopFactory(t *testing.T) { @@ -27,28 +29,28 @@ func TestNewNopFactory(t *testing.T) { cfg := factory.CreateDefaultConfig() assert.Equal(t, &nopConfig{}, cfg) - traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + traces, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(pipeline.SignalTraces), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) assert.NoError(t, traces.Shutdown(context.Background())) - metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + metrics, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(pipeline.SignalMetrics), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) assert.NoError(t, metrics.Shutdown(context.Background())) - logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + logs, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(pipeline.SignalLogs), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) assert.NoError(t, logs.Shutdown(context.Background())) - profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(), cfg, consumertest.NewNop()) + profiles, err := factory.CreateProfilesProcessor(context.Background(), NewNopSettings(componentprofiles.SignalProfiles), cfg, consumertest.NewNop()) require.NoError(t, err) assert.Equal(t, consumer.Capabilities{MutatesData: false}, profiles.Capabilities()) assert.NoError(t, profiles.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/processortest/shutdown_verifier.go b/processor/processortest/shutdown_verifier.go index 5561991f576..4a29df26a34 100644 --- a/processor/processortest/shutdown_verifier.go +++ b/processor/processortest/shutdown_verifier.go @@ -22,7 +22,7 @@ import ( func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.TracesSink) - proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateTracesProcessor(context.Background(), NewNopSettings(pipeline.SignalTraces), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -46,7 +46,7 @@ func verifyTracesDoesNotProduceAfterShutdown(t *testing.T, factory processor.Fac func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.LogsSink) - proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateLogsProcessor(context.Background(), NewNopSettings(pipeline.SignalLogs), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } @@ -70,7 +70,7 @@ func verifyLogsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Facto func verifyMetricsDoesNotProduceAfterShutdown(t *testing.T, factory processor.Factory, cfg component.Config) { // Create a proc and output its produce to a sink. nextSink := new(consumertest.MetricsSink) - proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(), cfg, nextSink) + proc, err := factory.CreateMetricsProcessor(context.Background(), NewNopSettings(pipeline.SignalMetrics), cfg, nextSink) if errors.Is(err, pipeline.ErrSignalNotSupported) { return } diff --git a/service/internal/builders/processor_test.go b/service/internal/builders/processor_test.go index 09443387817..bf3bd1f8a85 100644 --- a/service/internal/builders/processor_test.go +++ b/service/internal/builders/processor_test.go @@ -11,10 +11,12 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentprofiles" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" @@ -186,30 +188,36 @@ func TestNewNopProcessorBuilder(t *testing.T) { factory := processortest.NewNopFactory() cfg := factory.CreateDefaultConfig() - set := processortest.NewNopSettings() - set.ID = component.NewID(nopType) - traces, err := factory.CreateTracesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + tracesSet := processortest.NewNopSettings(pipeline.SignalTraces) + tracesSet.ID = component.NewID(nopType) + traces, err := factory.CreateTracesProcessor(context.Background(), tracesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bTraces, err := builder.CreateTraces(context.Background(), set, consumertest.NewNop()) + bTraces, err := builder.CreateTraces(context.Background(), tracesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, traces, bTraces) - metrics, err := factory.CreateMetricsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + metricsSet := processortest.NewNopSettings(pipeline.SignalMetrics) + metricsSet.ID = component.NewID(nopType) + metrics, err := factory.CreateMetricsProcessor(context.Background(), metricsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bMetrics, err := builder.CreateMetrics(context.Background(), set, consumertest.NewNop()) + bMetrics, err := builder.CreateMetrics(context.Background(), metricsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, metrics, bMetrics) - logs, err := factory.CreateLogsProcessor(context.Background(), set, cfg, consumertest.NewNop()) + logsSet := processortest.NewNopSettings(pipeline.SignalLogs) + logsSet.ID = component.NewID(nopType) + logs, err := factory.CreateLogsProcessor(context.Background(), logsSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bLogs, err := builder.CreateLogs(context.Background(), set, consumertest.NewNop()) + bLogs, err := builder.CreateLogs(context.Background(), logsSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, logs, bLogs) - profiles, err := factory.CreateProfilesProcessor(context.Background(), set, cfg, consumertest.NewNop()) + profilesSet := processortest.NewNopSettings(componentprofiles.SignalProfiles) + profilesSet.ID = component.NewID(nopType) + profiles, err := factory.CreateProfilesProcessor(context.Background(), profilesSet, cfg, consumertest.NewNop()) require.NoError(t, err) - bProfiles, err := builder.CreateProfiles(context.Background(), set, consumertest.NewNop()) + bProfiles, err := builder.CreateProfiles(context.Background(), profilesSet, consumertest.NewNop()) require.NoError(t, err) assert.IsType(t, profiles, bProfiles) } diff --git a/service/internal/graph/nodes.go b/service/internal/graph/nodes.go index 89580dab487..29fd0c69eca 100644 --- a/service/internal/graph/nodes.go +++ b/service/internal/graph/nodes.go @@ -145,7 +145,7 @@ func (n *processorNode) buildComponent(ctx context.Context, next baseConsumer, ) error { tel.Logger = components.ProcessorLogger(tel.Logger, n.componentID, n.pipelineID) - set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info} + set := processor.Settings{ID: n.componentID, TelemetrySettings: tel, BuildInfo: info, PipelineID: n.pipelineID} var err error switch n.pipelineID.Signal() { case pipeline.SignalTraces: diff --git a/service/pipelines/config.go b/service/pipelines/config.go index dd37e696039..bfb9de4311f 100644 --- a/service/pipelines/config.go +++ b/service/pipelines/config.go @@ -21,7 +21,7 @@ var ( // Config defines the configurable settings for service telemetry. // // Deprecated: [v0.110.0] Use ConfigWithPipelineID instead -type Config map[component.ID]*PipelineConfig +type Config map[pipeline.ID]*PipelineConfig func (cfg Config) Validate() error { // Must have at least one pipeline. @@ -32,12 +32,12 @@ func (cfg Config) Validate() error { // Check that all pipelines have at least one receiver and one exporter, and they reference // only configured components. for pipelineID, p := range cfg { - switch pipelineID.Type() { + switch pipelineID.Signal() { // nolint - case component.DataTypeTraces, component.DataTypeMetrics, component.DataTypeLogs, componentprofiles.DataTypeProfiles: + case pipeline.SignalTraces, pipeline.SignalMetrics, pipeline.SignalLogs, componentprofiles.SignalProfiles: // Continue default: - return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Type()) + return fmt.Errorf("pipeline %q: unknown datatype %q", pipelineID, pipelineID.Signal()) } // Validate pipeline has at least one receiver. diff --git a/service/service.go b/service/service.go index 83a8a12c848..bee7eca58a2 100644 --- a/service/service.go +++ b/service/service.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/extensions" @@ -303,16 +302,12 @@ func (srv *Service) initExtensions(ctx context.Context, cfg extensions.Config) e return nil } -func convertFromComponentIDToPipelineID(id component.ID) pipeline.ID { - return pipeline.MustNewIDWithName(id.Type().String(), id.Name()) -} - // Creates the pipeline graph. func (srv *Service) initGraph(ctx context.Context, cfg Config) error { if len(cfg.Pipelines) > 0 { cfg.PipelinesWithPipelineID = make(pipelines.ConfigWithPipelineID, len(cfg.Pipelines)) for k, v := range cfg.Pipelines { - cfg.PipelinesWithPipelineID[convertFromComponentIDToPipelineID(k)] = v + cfg.PipelinesWithPipelineID[k] = v } } diff --git a/service/service_test.go b/service/service_test.go index 8420e52a62c..dd060018ba7 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -229,21 +229,21 @@ func TestServiceGetExporters(t *testing.T) { assert.NoError(t, srv.Shutdown(context.Background())) }) - expMap := srv.host.GetExporters() + expMap := srv.host.GetExportersWithSignal() - v, ok := expMap[component.DataTypeTraces] + v, ok := expMap[pipeline.SignalTraces] assert.True(t, ok) assert.NotNil(t, v) assert.Len(t, expMap, 4) - assert.Len(t, expMap[component.DataTypeTraces], 1) - assert.Contains(t, expMap[component.DataTypeTraces], component.NewID(nopType)) - assert.Len(t, expMap[component.DataTypeMetrics], 1) - assert.Contains(t, expMap[component.DataTypeMetrics], component.NewID(nopType)) - assert.Len(t, expMap[component.DataTypeLogs], 1) - assert.Contains(t, expMap[component.DataTypeLogs], component.NewID(nopType)) - assert.Len(t, expMap[componentprofiles.DataTypeProfiles], 1) - assert.Contains(t, expMap[componentprofiles.DataTypeProfiles], component.NewID(nopType)) + assert.Len(t, expMap[pipeline.SignalTraces], 1) + assert.Contains(t, expMap[pipeline.SignalTraces], component.NewID(nopType)) + assert.Len(t, expMap[pipeline.SignalMetrics], 1) + assert.Contains(t, expMap[pipeline.SignalMetrics], component.NewID(nopType)) + assert.Len(t, expMap[pipeline.SignalLogs], 1) + assert.Contains(t, expMap[pipeline.SignalLogs], component.NewID(nopType)) + assert.Len(t, expMap[componentprofiles.SignalProfiles], 1) + assert.Contains(t, expMap[componentprofiles.SignalProfiles], component.NewID(nopType)) } // nolint