Skip to content

Commit

Permalink
Add "inserted" metrics to processors (#10372)
Browse files Browse the repository at this point in the history
#### Link to tracking issue

Resolves #10353

#### Testing

Added equivalent testing to other processor metrics (accepted, refused,
dropped).

#### Documentation

Metric documentation is autogenerated.

#### Open Question

My initial implementation includes a breaking change to
`componenttest.TestTelemetry` which is public facing API. If we want to
avoid an immediate breaking change in this test package, I would propose
the following, which I can submit in a prerequisite PR:
1. Deprecate all `TestTelemetry.Check*` methods.
2. Replace with more granular `TestTelemetry.CheckOneSpecificMetric`
methods.
  • Loading branch information
djaglowski authored Jun 19, 2024
1 parent a138356 commit ea7270c
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 62 deletions.
25 changes: 25 additions & 0 deletions .chloggen/processorhelper-inserted-api.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: component/componenttest

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Added additional "inserted" count to `TestTelemetry.CheckProcessor*` methods.

# One or more tracking issues or pull requests related to the change
issues: [10353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
29 changes: 29 additions & 0 deletions .chloggen/processorhelper-inserted.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processorhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add "inserted" metrics for processors.

# One or more tracking issues or pull requests related to the change
issues: [10353]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This includes the following metrics for processors:
- `processor_inserted_spans`
- `processor_inserted_metric_points`
- `processor_inserted_log_records`
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
12 changes: 6 additions & 6 deletions component/componenttest/obsreporttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,20 @@ func (tts *TestTelemetry) CheckExporterMetricGauge(metric string, val int64) err

// CheckProcessorTraces checks that for the current exported values for trace exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans)
func (tts *TestTelemetry) CheckProcessorTraces(acceptedSpans, refusedSpans, droppedSpans, insertedSpans int64) error {
return tts.prometheusChecker.checkProcessorTraces(tts.id, acceptedSpans, refusedSpans, droppedSpans, insertedSpans)
}

// CheckProcessorMetrics checks that for the current exported values for metrics exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints)
func (tts *TestTelemetry) CheckProcessorMetrics(acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints int64) error {
return tts.prometheusChecker.checkProcessorMetrics(tts.id, acceptedMetricPoints, refusedMetricPoints, droppedMetricPoints, insertedMetricPoints)
}

// CheckProcessorLogs checks that for the current exported values for logs exporter metrics match given values.
// When this function is called it is required to also call SetupTelemetry as first thing.
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords)
func (tts *TestTelemetry) CheckProcessorLogs(acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords int64) error {
return tts.prometheusChecker.checkProcessorLogs(tts.id, acceptedLogRecords, refusedLogRecords, droppedLogRecords, insertedLogRecords)
}

// CheckReceiverTraces checks that for the current exported values for trace receiver metrics match given values.
Expand Down
18 changes: 10 additions & 8 deletions component/componenttest/otelprometheuschecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,26 @@ func (pc *prometheusChecker) checkReceiver(receiver component.ID, datatype, prot
pc.checkCounter(fmt.Sprintf("receiver_refused_%s", datatype), droppedMetricPoints, receiverAttrs))
}

func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorTraces(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "spans", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorMetrics(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "metric_points", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped)
func (pc *prometheusChecker) checkProcessorLogs(processor component.ID, accepted, refused, dropped, inserted int64) error {
return pc.checkProcessor(processor, "log_records", accepted, refused, dropped, inserted)
}

func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped int64) error {
func (pc *prometheusChecker) checkProcessor(processor component.ID, datatype string, accepted, refused, dropped, inserted int64) error {
processorAttrs := attributesForProcessorMetrics(processor)
return multierr.Combine(
pc.checkCounter(fmt.Sprintf("processor_accepted_%s", datatype), accepted, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_refused_%s", datatype), refused, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs))
pc.checkCounter(fmt.Sprintf("processor_dropped_%s", datatype), dropped, processorAttrs),
pc.checkCounter(fmt.Sprintf("processor_inserted_%s", datatype), inserted, processorAttrs),
)
}

func (pc *prometheusChecker) checkExporterTraces(exporter component.ID, sent, sendFailed int64) error {
Expand Down
6 changes: 3 additions & 3 deletions component/componenttest/otelprometheuschecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,17 @@ func TestPromChecker(t *testing.T) {
)

assert.NoError(t,
pc.checkProcessorTraces(processor, 42, 13, 7),
pc.checkProcessorTraces(processor, 42, 13, 7, 5),
"metrics from Receiver Traces should be valid",
)

assert.NoError(t,
pc.checkProcessorMetrics(processor, 7, 41, 13),
pc.checkProcessorMetrics(processor, 7, 41, 13, 4),
"metrics from Receiver Metrics should be valid",
)

assert.NoError(t,
pc.checkProcessorLogs(processor, 102, 35, 14),
pc.checkProcessorLogs(processor, 102, 35, 14, 3),
"metrics from Receiver Logs should be valid",
)

Expand Down
9 changes: 9 additions & 0 deletions component/componenttest/testdata/prometheus_response
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ processor_refused_spans{processor="fakeProcessor"} 13
# HELP processor_dropped_spans Number of spans that were dropped.
# TYPE processor_dropped_spans counter
processor_dropped_spans{processor="fakeProcessor"} 7
# HELP processor_inserted_spans Number of spans that were inserted.
# TYPE processor_inserted_spans counter
processor_inserted_spans{processor="fakeProcessor"} 5
# HELP processor_accepted_metric_points Number of metric points successfully pushed into the next component in the pipeline.
# TYPE processor_accepted_metric_points counter
processor_accepted_metric_points{processor="fakeProcessor"} 7
Expand All @@ -34,6 +37,9 @@ processor_refused_metric_points{processor="fakeProcessor"} 41
# HELP processor_dropped_metric_points Number of metric points that were dropped.
# TYPE processor_dropped_metric_points counter
processor_dropped_metric_points{processor="fakeProcessor"} 13
# HELP processor_inserted_metric_points Number of metric points that were inserted.
# TYPE processor_inserted_metric_points counter
processor_inserted_metric_points{processor="fakeProcessor"} 4
# HELP processor_accepted_log_records Number of log records successfully pushed into the next component in the pipeline.
# TYPE processor_accepted_log_records counter
processor_accepted_log_records{processor="fakeProcessor"} 102
Expand All @@ -43,6 +49,9 @@ processor_refused_log_records{processor="fakeProcessor"} 35
# HELP processor_dropped_log_records Number of log records that were dropped.
# TYPE processor_dropped_log_records counter
processor_dropped_log_records{processor="fakeProcessor"} 14
# HELP processor_inserted_log_records Number of log records that were inserted.
# TYPE processor_inserted_log_records counter
processor_inserted_log_records{processor="fakeProcessor"} 3
# HELP receiver_accepted_log_records Number of log records successfully pushed into the pipeline.
# TYPE receiver_accepted_log_records counter
receiver_accepted_log_records{receiver="fakeReceiver",transport="fakeTransport"} 102
Expand Down
24 changes: 24 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,30 @@ Number of spans that were dropped.
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_log_records

Number of log records that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_metric_points

Number of metric points that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_inserted_spans

Number of spans that were inserted.

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| 1 | Sum | Int | true |

### processor_refused_log_records

Number of log records that were rejected by the next component in the pipeline.
Expand Down
21 changes: 21 additions & 0 deletions processor/processorhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ telemetry:
value_type: int
monotonic: true

processor_inserted_spans:
enabled: true
description: Number of spans that were inserted.
unit: 1
sum:
value_type: int
monotonic: true

processor_accepted_metric_points:
enabled: true
description: Number of metric points successfully pushed into the next component in the pipeline.
Expand All @@ -57,6 +65,14 @@ telemetry:
value_type: int
monotonic: true

processor_inserted_metric_points:
enabled: true
description: Number of metric points that were inserted.
unit: 1
sum:
value_type: int
monotonic: true

processor_accepted_log_records:
enabled: true
description: Number of log records successfully pushed into the next component in the pipeline.
Expand All @@ -79,4 +95,12 @@ telemetry:
unit: 1
sum:
value_type: int
monotonic: true
monotonic: true

processor_inserted_log_records:
enabled: true
description: Number of log records that were inserted.
unit: 1
sum:
value_type: int
monotonic: true
41 changes: 30 additions & 11 deletions processor/processorhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,69 +60,88 @@ func newObsReport(cfg ObsReportSettings) (*ObsReport, error) {
}, nil
}

func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped int64) {
var acceptedCount, refusedCount, droppedCount metric.Int64Counter
func (or *ObsReport) recordData(ctx context.Context, dataType component.DataType, accepted, refused, dropped, inserted int64) {
var acceptedCount, refusedCount, droppedCount, insertedCount metric.Int64Counter
switch dataType {
case component.DataTypeTraces:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedSpans
refusedCount = or.telemetryBuilder.ProcessorRefusedSpans
droppedCount = or.telemetryBuilder.ProcessorDroppedSpans
insertedCount = or.telemetryBuilder.ProcessorInsertedSpans
case component.DataTypeMetrics:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedMetricPoints
refusedCount = or.telemetryBuilder.ProcessorRefusedMetricPoints
droppedCount = or.telemetryBuilder.ProcessorDroppedMetricPoints
insertedCount = or.telemetryBuilder.ProcessorInsertedMetricPoints
case component.DataTypeLogs:
acceptedCount = or.telemetryBuilder.ProcessorAcceptedLogRecords
refusedCount = or.telemetryBuilder.ProcessorRefusedLogRecords
droppedCount = or.telemetryBuilder.ProcessorDroppedLogRecords
insertedCount = or.telemetryBuilder.ProcessorInsertedLogRecords
}

acceptedCount.Add(ctx, accepted, metric.WithAttributes(or.otelAttrs...))
refusedCount.Add(ctx, refused, metric.WithAttributes(or.otelAttrs...))
droppedCount.Add(ctx, dropped, metric.WithAttributes(or.otelAttrs...))
insertedCount.Add(ctx, inserted, metric.WithAttributes(or.otelAttrs...))
}

// TracesAccepted reports that the trace data was accepted.
func (or *ObsReport) TracesAccepted(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0))
or.recordData(ctx, component.DataTypeTraces, int64(numSpans), int64(0), int64(0), int64(0))
}

// TracesRefused reports that the trace data was refused.
func (or *ObsReport) TracesRefused(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0))
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(numSpans), int64(0), int64(0))
}

// TracesDropped reports that the trace data was dropped.
func (or *ObsReport) TracesDropped(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans))
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(numSpans), int64(0))
}

// TracesInserted reports that the trace data was inserted.
func (or *ObsReport) TracesInserted(ctx context.Context, numSpans int) {
or.recordData(ctx, component.DataTypeTraces, int64(0), int64(0), int64(0), int64(numSpans))
}

// MetricsAccepted reports that the metrics were accepted.
func (or *ObsReport) MetricsAccepted(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0))
or.recordData(ctx, component.DataTypeMetrics, int64(numPoints), int64(0), int64(0), int64(0))
}

// MetricsRefused reports that the metrics were refused.
func (or *ObsReport) MetricsRefused(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0))
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(numPoints), int64(0), int64(0))
}

// MetricsDropped reports that the metrics were dropped.
func (or *ObsReport) MetricsDropped(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints))
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(numPoints), int64(0))
}

// MetricsInserted reports that the metrics were inserted.
func (or *ObsReport) MetricsInserted(ctx context.Context, numPoints int) {
or.recordData(ctx, component.DataTypeMetrics, int64(0), int64(0), int64(0), int64(numPoints))
}

// LogsAccepted reports that the logs were accepted.
func (or *ObsReport) LogsAccepted(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0))
or.recordData(ctx, component.DataTypeLogs, int64(numRecords), int64(0), int64(0), int64(0))
}

// LogsRefused reports that the logs were refused.
func (or *ObsReport) LogsRefused(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0))
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(numRecords), int64(0), int64(0))
}

// LogsDropped reports that the logs were dropped.
func (or *ObsReport) LogsDropped(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords))
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(numRecords), int64(0))
}

// LogsInserted reports that the logs were inserted.
func (or *ObsReport) LogsInserted(ctx context.Context, numRecords int) {
or.recordData(ctx, component.DataTypeLogs, int64(0), int64(0), int64(0), int64(numRecords))
}
Loading

0 comments on commit ea7270c

Please sign in to comment.