Skip to content

Commit

Permalink
[processorhelper] Fix bug where record in/out metrics were skipped (#…
Browse files Browse the repository at this point in the history
…11360)

Resolves #11351
  • Loading branch information
djaglowski authored Oct 4, 2024
1 parent 8001c93 commit c3f09f4
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 0 deletions.
25 changes: 25 additions & 0 deletions .chloggen/processorhelper-in-out-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix issue where in/out parameters were not recorded when error was returned from consumer.

# One or more tracking issues or pull requests related to the change
issues: [11351]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewLogs(
ld, errFunc = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
obs.recordInOut(ctx, recordsIn, 0)
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions processor/processorhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,59 @@ func TestLogs_RecordInOut(t *testing.T) {
},
})
}

func TestLogs_RecordIn_ErrorOut(t *testing.T) {
// Regardless of input, return error
mockErr := func(_ context.Context, _ plog.Logs) (plog.Logs, error) {
return plog.NewLogs(), errors.New("fake")
}

incomingLogs := plog.NewLogs()
incomingLogRecords := incomingLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords()

// Add 3 records to the incoming
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()
incomingLogRecords.AppendEmpty()

testTelemetry := setupTestTelemetry()
lp, err := NewLogs(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockErr)
require.NoError(t, err)

require.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, lp.ConsumeLogs(context.Background(), incomingLogs))
require.NoError(t, lp.Shutdown(context.Background()))

testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 3,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 0,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "logs")),
},
},
},
},
})
}
1 change: 1 addition & 0 deletions processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewMetrics(
md, errFunc = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
obs.recordInOut(ctx, pointsIn, 0)
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
Expand Down
55 changes: 55 additions & 0 deletions processor/processorhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,58 @@ func TestMetrics_RecordInOut(t *testing.T) {
},
})
}

func TestMetrics_RecordIn_ErrorOut(t *testing.T) {
/// Regardless of input, return error
mockErr := func(_ context.Context, _ pmetric.Metrics) (pmetric.Metrics, error) {
return pmetric.NewMetrics(), errors.New("fake")
}

incomingMetrics := pmetric.NewMetrics()
dps := incomingMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints()

// Add 2 data points to the incoming
dps.AppendEmpty()
dps.AppendEmpty()

testTelemetry := setupTestTelemetry()
mp, err := NewMetrics(context.Background(), testTelemetry.NewSettings(), &testMetricsCfg, consumertest.NewNop(), mockErr)
require.NoError(t, err)

require.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, mp.ConsumeMetrics(context.Background(), incomingMetrics))
require.NoError(t, mp.Shutdown(context.Background()))

testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 2,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 0,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "metrics")),
},
},
},
},
})
}
1 change: 1 addition & 0 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewTraces(
td, errFunc = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
obs.recordInOut(ctx, spansIn, 0)
if errors.Is(errFunc, ErrSkipProcessingData) {
return nil
}
Expand Down
57 changes: 57 additions & 0 deletions processor/processorhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,60 @@ func TestTraces_RecordInOut(t *testing.T) {
},
})
}

func TestTraces_RecordIn_ErrorOut(t *testing.T) {
// Regardless of input, return error
mockErr := func(_ context.Context, _ ptrace.Traces) (ptrace.Traces, error) {
return ptrace.NewTraces(), errors.New("fake")
}

incomingTraces := ptrace.NewTraces()
incomingSpans := incomingTraces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans()

// Add 4 records to the incoming
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()
incomingSpans.AppendEmpty()

testTelemetry := setupTestTelemetry()
tp, err := NewTraces(context.Background(), testTelemetry.NewSettings(), &testLogsCfg, consumertest.NewNop(), mockErr)
require.NoError(t, err)

require.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))
require.Error(t, tp.ConsumeTraces(context.Background(), incomingTraces))
require.NoError(t, tp.Shutdown(context.Background()))

testTelemetry.assertMetrics(t, []metricdata.Metrics{
{
Name: "otelcol_processor_incoming_items",
Description: "Number of items passed to the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 4,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")),
},
},
},
},
{
Name: "otelcol_processor_outgoing_items",
Description: "Number of items emitted from the processor. [alpha]",
Unit: "{items}",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Value: 0,
Attributes: attribute.NewSet(attribute.String("processor", "processorhelper"), attribute.String("otel.signal", "traces")),
},
},
},
},
})
}

0 comments on commit c3f09f4

Please sign in to comment.