From c3f09f4e9a2521a861ae2708f30b5425e535428e Mon Sep 17 00:00:00 2001 From: Daniel Jaglowski Date: Fri, 4 Oct 2024 17:35:22 -0400 Subject: [PATCH] [processorhelper] Fix bug where record in/out metrics were skipped (#11360) Resolves #11351 --- .chloggen/processorhelper-in-out-fix.yaml | 25 ++++++++++ processor/processorhelper/logs.go | 1 + processor/processorhelper/logs_test.go | 56 ++++++++++++++++++++++ processor/processorhelper/metrics.go | 1 + processor/processorhelper/metrics_test.go | 55 ++++++++++++++++++++++ processor/processorhelper/traces.go | 1 + processor/processorhelper/traces_test.go | 57 +++++++++++++++++++++++ 7 files changed, 196 insertions(+) create mode 100644 .chloggen/processorhelper-in-out-fix.yaml diff --git a/.chloggen/processorhelper-in-out-fix.yaml b/.chloggen/processorhelper-in-out-fix.yaml new file mode 100644 index 00000000000..5bff90ef80b --- /dev/null +++ b/.chloggen/processorhelper-in-out-fix.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processorhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix issue where in/out parameters were not recorded when error was returned from consumer. + +# One or more tracking issues or pull requests related to the change +issues: [11351] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index a434fe8fb93..5fedd5dfaf0 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -55,6 +55,7 @@ func NewLogs( ld, errFunc = logsFunc(ctx, ld) span.AddEvent("End processing.", eventOptions) if errFunc != nil { + obs.recordInOut(ctx, recordsIn, 0) if errors.Is(errFunc, ErrSkipProcessingData) { return nil } diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 0e94feb2f29..9a6c5592745 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -159,3 +159,59 @@ func TestLogs_RecordInOut(t *testing.T) { }, }) } + +func TestLogs_RecordIn_ErrorOut(t *testing.T) { + // Regardless of input, return error + mockErr := func(_ context.Context, _ plog.Logs) (plog.Logs, error) { + return plog.NewLogs(), errors.New("fake") + } + + incomingLogs := plog.NewLogs() + incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords() + + // Add 3 records to the incoming + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + incomingLogRecords.AppendEmpty() + + testTelemetry := setupTestTelemetry() + lp, err := NewLogs(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockErr) + require.NoError(t, err) + + require.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) + require.Error(t, lp.ConsumeLogs(context.Background(), incomingLogs)) + require.NoError(t, lp.Shutdown(context.Background())) + + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_items", + Description: "Number of items passed to the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 3, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + }, + }, + }, + }, + { + Name: "otelcol_processor_outgoing_items", + Description: "Number of items emitted from the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")), + }, + }, + }, + }, + }) +} diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index c8381fd9589..bf69e1b1693 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -55,6 +55,7 @@ func NewMetrics( md, errFunc = metricsFunc(ctx, md) span.AddEvent("End processing.", eventOptions) if errFunc != nil { + obs.recordInOut(ctx, pointsIn, 0) if errors.Is(errFunc, ErrSkipProcessingData) { return nil } diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 0bea27c2c22..cb841abfa74 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -159,3 +159,58 @@ func TestMetrics_RecordInOut(t *testing.T) { }, }) } + +func TestMetrics_RecordIn_ErrorOut(t *testing.T) { + /// Regardless of input, return error + mockErr := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) { + return pmetric.NewMetrics(), errors.New("fake") + } + + incomingMetrics := pmetric.NewMetrics() + dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints() + + // Add 2 data points to the incoming + dps.AppendEmpty() + dps.AppendEmpty() + + testTelemetry := setupTestTelemetry() + mp, err := NewMetrics(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockErr) + require.NoError(t, err) + + require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) + require.Error(t, mp.ConsumeMetrics(context.Background(), incomingMetrics)) + require.NoError(t, mp.Shutdown(context.Background())) + + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_items", + Description: "Number of items passed to the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 2, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + }, + }, + }, + }, + { + Name: "otelcol_processor_outgoing_items", + Description: "Number of items emitted from the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")), + }, + }, + }, + }, + }) +} diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 492634541cc..3cebc3ab9b9 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -55,6 +55,7 @@ func NewTraces( td, errFunc = tracesFunc(ctx, td) span.AddEvent("End processing.", eventOptions) if errFunc != nil { + obs.recordInOut(ctx, spansIn, 0) if errors.Is(errFunc, ErrSkipProcessingData) { return nil } diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index ecc4f3c9184..a55e99aa42b 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -161,3 +161,60 @@ func TestTraces_RecordInOut(t *testing.T) { }, }) } + +func TestTraces_RecordIn_ErrorOut(t *testing.T) { + // Regardless of input, return error + mockErr := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) { + return ptrace.NewTraces(), errors.New("fake") + } + + incomingTraces := ptrace.NewTraces() + incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans() + + // Add 4 records to the incoming + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + incomingSpans.AppendEmpty() + + testTelemetry := setupTestTelemetry() + tp, err := NewTraces(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockErr) + require.NoError(t, err) + + require.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) + require.Error(t, tp.ConsumeTraces(context.Background(), incomingTraces)) + require.NoError(t, tp.Shutdown(context.Background())) + + testTelemetry.assertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_processor_incoming_items", + Description: "Number of items passed to the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 4, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + }, + }, + }, + }, + { + Name: "otelcol_processor_outgoing_items", + Description: "Number of items emitted from the processor. [alpha]", + Unit: "{items}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Value: 0, + Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")), + }, + }, + }, + }, + }) +}