Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector/spanmetricsv2] Scale spans based on adjusted count #95

Merged
merged 9 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion connector/spanmetricsconnectorv2/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/aggregator"
"github.com/elastic/opentelemetry-collector-components/connector/spanmetricsconnectorv2/internal/model"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -64,8 +65,9 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
duration = time.Duration(endTime - startTime)
}
spanAttrs := span.Attributes()
adjustedCount := calculateAdjustedCount(span.TraceState().AsRaw())
for _, md := range sm.metricDefs {
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration))
multiError = errors.Join(multiError, aggregator.Add(md, spanAttrs, duration, adjustedCount))
}
}
}
Expand All @@ -88,3 +90,26 @@ func (sm *spanMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces) erro
}
return sm.next.ConsumeMetrics(ctx, processedMetrics)
}

// calculateAdjustedCount calculates the adjusted count which represents
// the number of spans in the population that are represented by the
// individually sampled span. If the span is not-sampled OR if a non-
// probability sampler is used then adjusted count defaults to 1.
// https://github.com/open-telemetry/oteps/blob/main/text/trace/0235-sampling-threshold-in-trace-state.md
func calculateAdjustedCount(tracestate string) uint64 {
w3cTraceState, err := sampling.NewW3CTraceState(tracestate)
if err != nil {
return 1
}
otTraceState := w3cTraceState.OTelValue()
if otTraceState == nil {
return 1
}
if len(otTraceState.TValue()) == 0 {
// For non-probabilistic sampler OR always sampling threshold, default to 1
return 1
}
// TODO (lahsivjar): Handle fractional adjusted count. One way to do this
// would be to scale the values in the histograms for some precision.
return uint64(otTraceState.AdjustedCount())
}
18 changes: 18 additions & 0 deletions connector/spanmetricsconnectorv2/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,24 @@ func TestConnector(t *testing.T) {
}
}

func TestCalculateAdjustedCount(t *testing.T) {
for _, tc := range []struct {
tracestate string
expected uint64
}{
{"", 1},
{"invalid=p:8;th:8", 1},
{"ot=404:8", 1},
{"ot=th:0", 1}, // 100% sampling
{"ot=th:8", 2}, // 50% sampling
{"ot=th:c", 4}, // 25% sampling
} {
t.Run("tracestate/"+tc.tracestate, func(t *testing.T) {
assert.Equal(t, tc.expected, calculateAdjustedCount(tc.tracestate))
})
}
}

func BenchmarkConnector(b *testing.B) {
factory := NewFactory()
settings := connectortest.NewNopSettings()
Expand Down
1 change: 1 addition & 0 deletions connector/spanmetricsconnectorv2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.109.0
go.opentelemetry.io/collector/confmap v1.15.0
Expand Down
2 changes: 2 additions & 0 deletions connector/spanmetricsconnectorv2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.109.0/go.mod h1:KvJWxR0bDk9Qh0ktw4gOFsd/ZrJ7p5KTAQueEJsaK9Q=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 h1:3kXFgdEEKw37ftdRC7SmXAiZuLahVavqOYRhlJVMLc8=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0/go.mod h1:HtaWI5WJKJkBhHz2R7Xb2n7R3fdBPhfKieYcQajNCTo=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0 h1:YQB8+grNfmaLqiavbv4VKhBw1NF8O6pSmbLC+FjMrKM=
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/sampling v0.109.0/go.mod h1:XOuilD83ZQWc0Te2B7+X0dRm9Z19M4h480UXTPO41Xc=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,21 @@ func NewAggregator() *Aggregator {
}
}

// Add adds a span duration into the configured metrics.
// Add adds a span duration into the configured metrics. It also takes
// `adjustedCount` parameter to denote the total number of spans in the
// population that are represented by an individually sampled span.
// The adjusted count is is calculated as per:
// https://opentelemetry.io/docs/specs/otel/trace/tracestate-probability-sampling/#adjusted-count
func (a *Aggregator) Add(
md model.MetricDef,
srcAttrs pcommon.Map,
spanDuration time.Duration,
adjustedCount uint64,
) error {
if adjustedCount == 0 {
// Nothing to do as the span represents `0` spans
return nil
}
filteredAttrs := pcommon.NewMap()
for _, definedAttr := range md.Attributes {
if srcAttr, ok := srcAttrs.Get(definedAttr.Key); ok {
Expand Down Expand Up @@ -88,7 +97,7 @@ func (a *Aggregator) Add(
a.datapoints[md.Key][attrKey] = newAggregatorDP(md, filteredAttrs)
}
value := float64(spanDuration.Nanoseconds()) / metricUnitToDivider[md.Unit]
a.datapoints[md.Key][attrKey].Add(value)
a.datapoints[md.Key][attrKey].Add(value, adjustedCount)
return nil
}

Expand Down Expand Up @@ -179,15 +188,15 @@ func newAggregatorDP(
return &dp
}

func (dp *aggregatorDP) Add(value float64) {
func (dp *aggregatorDP) Add(value float64, count uint64) {
if dp.expHistogramDP != nil {
dp.expHistogramDP.Add(value)
dp.expHistogramDP.Add(value, count)
}
if dp.explicitHistogramDP != nil {
dp.explicitHistogramDP.Add(value)
dp.explicitHistogramDP.Add(value, count)
}
if dp.summaryDP != nil {
dp.summaryDP.Add(value)
dp.summaryDP.Add(value, count)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestExplicitBounds(t *testing.T) {
for _, span := range tc.input {
duration := time.Duration(span.EndTimestamp() - span.StartTimestamp())
for _, md := range tc.metricDefs {
require.NoError(t, agg.Add(md, span.Attributes(), duration))
require.NoError(t, agg.Add(md, span.Attributes(), duration, 1))
}
}
for _, md := range tc.metricDefs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func newExponentialHistogramDP(attrs pcommon.Map, maxSize int32) *exponentialHis
}
}

func (dp *exponentialHistogramDP) Add(value float64) {
dp.data.Update(value)
func (dp *exponentialHistogramDP) Add(value float64, count uint64) {
dp.data.UpdateByIncr(value, count)
}

func (dp *exponentialHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ func newExplicitHistogramDP(attrs pcommon.Map, bounds []float64) *explicitHistog
}
}

func (dp *explicitHistogramDP) Add(value float64) {
dp.sum += value
dp.count++
dp.counts[sort.SearchFloat64s(dp.bounds, value)]++
func (dp *explicitHistogramDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
dp.counts[sort.SearchFloat64s(dp.bounds, value)] += count
}

func (dp *explicitHistogramDP) Copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func newSummaryDP(attrs pcommon.Map) *summaryDP {
}
}

func (dp *summaryDP) Add(value float64) {
dp.sum += value
dp.count++
func (dp *summaryDP) Add(value float64, count uint64) {
dp.sum += value * float64(count)
dp.count += count
}

func (dp *summaryDP) Copy(
Expand Down
12 changes: 12 additions & 0 deletions connector/spanmetricsconnectorv2/testdata/traces.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,15 @@ resourceSpans:
name: msg-span-2
parentSpanId: "bcff497b5a47310f"
startTimeUnixNano: "1581452772000000321"
- attributes:
- key: db.name
value:
stringValue: main
- key: db.system
value:
stringValue: mysql
endTimeUnixNano: "1581452772500000804"
name: th-value-8 # represents 2 sampled spans
parentSpanId: ""
startTimeUnixNano: "1581452772000000381"
traceState: "ot=th:8"
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ resourceMetrics:
- key: db.system
value:
stringValue: mysql
count: "2"
count: "4"
max: 1000.000468
min: 500.000468
min: 500.000423
negative: {}
positive:
bucketCounts:
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -336,7 +336,7 @@ resourceMetrics:
- "1"
offset: 1147
scale: 7
sum: 1500.000936
sum: 2500.001782
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for DB spans
Expand All @@ -357,15 +357,15 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "1"
- "0"
- "0"
- "0"
- "0"
- "0"
count: "2"
count: "4"
explicitBounds:
- 2
- 4
Expand All @@ -383,7 +383,7 @@ resourceMetrics:
- 5000
- 10000
- 15000
sum: 1500.000936
sum: 2500.001782
timeUnixNano: "1000000"
name: db.trace.span.duration
- description: Span duration for messaging spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ resourceMetrics:
exponentialHistogram:
aggregationTemporality: 1
dataPoints:
- count: "6"
- count: "8"
max: 17.000000468
min: 0.002000468
negative: {}
positive:
bucketCounts:
- "3"
- "5"
- "3"
offset: -1
scale: -4
sum: 30.402002808
sum: 31.402003653999998
timeUnixNano: "1000000"
name: trace.span.duration
- description: Span duration with custom histogram buckets
Expand All @@ -33,16 +33,16 @@ resourceMetrics:
- bucketCounts:
- "0"
- "1"
- "2"
- "4"
- "1"
- "2"
count: "6"
count: "8"
explicitBounds:
- 0.001
- 0.1
- 1
- 10
sum: 30.402002808
sum: 31.402003653999998
timeUnixNano: "1000000"
name: trace.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ resourceMetrics:
exponentialHistogram:
aggregationTemporality: 1
dataPoints:
- count: "6"
- count: "8"
max: 17000.000468
min: 2.000468
negative: {}
Expand Down Expand Up @@ -82,7 +82,7 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -126,7 +126,7 @@ resourceMetrics:
- "1"
offset: 8
scale: 3
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: trace.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resourceMetrics:
- key: 404.attribute
value:
stringValue: undefined
count: "6"
count: "8"
max: 17000.000468
min: 2.000468
negative: {}
Expand Down Expand Up @@ -86,7 +86,7 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "0"
- "0"
- "0"
Expand Down Expand Up @@ -130,7 +130,7 @@ resourceMetrics:
- "1"
offset: 8
scale: 3
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: 404.span.duration
- description: Span duration with missing attribute but default value
Expand All @@ -151,15 +151,15 @@ resourceMetrics:
- "0"
- "0"
- "0"
- "1"
- "3"
- "1"
- "1"
- "0"
- "0"
- "0"
- "1"
- "1"
count: "6"
count: "8"
explicitBounds:
- 2
- 4
Expand All @@ -177,7 +177,7 @@ resourceMetrics:
- 5000
- 10000
- 15000
sum: 30402.002807999997
sum: 31402.003653999996
timeUnixNano: "1000000"
name: 404.span.duration
scope:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ resourceMetrics:
- key: db.system
value:
stringValue: mysql
count: "2"
sum: 1500.000936
count: "4"
sum: 2500.001782
timeUnixNano: "1000000"
- description: Summary for messaging spans
name: msg.trace.span.summary
Expand Down