diff --git a/sdk/metric/exemplar/fixed_size_reservoir_test.go b/sdk/metric/exemplar/fixed_size_reservoir_test.go index 1840abbd58d..0a901fd9cae 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir_test.go +++ b/sdk/metric/exemplar/fixed_size_reservoir_test.go @@ -15,12 +15,12 @@ import ( ) func TestNewFixedSizeReservoir(t *testing.T) { - t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) { - return NewFixedSizeReservoir(n), n + t.Run("Int64", ReservoirTest[int64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n })) - t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) { - return NewFixedSizeReservoir(n), n + t.Run("Float64", ReservoirTest[float64](func(n int) (ReservoirProvider, int) { + return FixedSizeReservoirProvider(n), n })) } diff --git a/sdk/metric/exemplar/histogram_reservoir_test.go b/sdk/metric/exemplar/histogram_reservoir_test.go index 64c101cb057..3f43e801e8f 100644 --- a/sdk/metric/exemplar/histogram_reservoir_test.go +++ b/sdk/metric/exemplar/histogram_reservoir_test.go @@ -7,11 +7,11 @@ import "testing" func TestHist(t *testing.T) { bounds := []float64{0, 100} - t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) { - return NewHistogramReservoir(bounds), len(bounds) + t.Run("Int64", ReservoirTest[int64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) })) - t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) { - return NewHistogramReservoir(bounds), len(bounds) + t.Run("Float64", ReservoirTest[float64](func(int) (ReservoirProvider, int) { + return HistogramReservoirProvider(bounds), len(bounds) })) } diff --git a/sdk/metric/exemplar/reservoir_test.go b/sdk/metric/exemplar/reservoir_test.go index 88bb53757e9..6d9336b44a1 100644 --- a/sdk/metric/exemplar/reservoir_test.go +++ b/sdk/metric/exemplar/reservoir_test.go @@ -18,7 +18,7 @@ import ( // Sat Jan 01 2000 00:00:00 GMT+0000. var staticTime = time.Unix(946684800, 0) -type factory func(requestedCap int) (r Reservoir, actualCap int) +type factory func(requestedCap int) (r ReservoirProvider, actualCap int) func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { return func(t *testing.T) { @@ -29,10 +29,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("CaptureSpanContext", func(t *testing.T) { t.Helper() - r, n := f(1) + rp, n := f(1) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01} sc := trace.NewSpanContext(trace.SpanContextConfig{ @@ -60,10 +61,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("FilterAttributes", func(t *testing.T) { t.Helper() - r, n := f(1) + rp, n := f(1) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) adminTrue := attribute.Bool("admin", true) r.Offer(ctx, staticTime, NewValue(N(10)), []attribute.KeyValue{adminTrue}) @@ -83,10 +85,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("CollectLessThanN", func(t *testing.T) { t.Helper() - r, n := f(2) + rp, n := f(2) if n < 2 { t.Skip("skipping, reservoir capacity less than 2:", n) } + r := rp(*attribute.EmptySet()) r.Offer(ctx, staticTime, NewValue(N(10)), nil) @@ -99,10 +102,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("MultipleOffers", func(t *testing.T) { t.Helper() - r, n := f(3) + rp, n := f(3) if n < 1 { t.Skip("skipping, reservoir capacity less than 1:", n) } + r := rp(*attribute.EmptySet()) for i := 0; i < n+1; i++ { v := NewValue(N(i)) @@ -127,10 +131,11 @@ func ReservoirTest[N int64 | float64](f factory) func(*testing.T) { t.Run("DropAll", func(t *testing.T) { t.Helper() - r, n := f(0) + rp, n := f(0) if n > 0 { t.Skip("skipping, reservoir capacity greater than 0:", n) } + r := rp(*attribute.EmptySet()) r.Offer(context.Background(), staticTime, NewValue(N(10)), nil) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 3df43827e11..75ccf30c5d7 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -491,4 +491,30 @@ func TestExemplars(t *testing.T) { measure(sampled, m) check(t, r, nCPU, 1, 20) }) + + t.Run("Custom reservoir", func(t *testing.T) { + r := NewManualReader() + reservoirProviderSelector := func(agg Aggregation) exemplar.ReservoirProvider { + return exemplar.FixedSizeReservoirProvider(2) + } + v1 := NewView(Instrument{Name: "int64-expo-histogram"}, Stream{ + Aggregation: AggregationBase2ExponentialHistogram{ + MaxSize: 160, // > 20, reservoir size should default to 20. + MaxScale: 20, + }, + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + v2 := NewView(Instrument{Name: "int64-counter"}, Stream{ + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + v3 := NewView(Instrument{Name: "int64-histogram"}, Stream{ + ExemplarReservoirProviderSelector: reservoirProviderSelector, + }) + m := NewMeterProvider(WithReader(r), WithView(v1, v2, v3)).Meter("custom-reservoir") + measure(ctx, m) + check(t, r, 0, 0, 0) + + measure(sampled, m) + check(t, r, 2, 2, 2) + }) } diff --git a/sdk/metric/view.go b/sdk/metric/view.go index cd08c673248..630890f4263 100644 --- a/sdk/metric/view.go +++ b/sdk/metric/view.go @@ -96,11 +96,12 @@ func NewView(criteria Instrument, mask Stream) View { return func(i Instrument) (Stream, bool) { if matchFunc(i) { return Stream{ - Name: nonZero(mask.Name, i.Name), - Description: nonZero(mask.Description, i.Description), - Unit: nonZero(mask.Unit, i.Unit), - Aggregation: agg, - AttributeFilter: mask.AttributeFilter, + Name: nonZero(mask.Name, i.Name), + Description: nonZero(mask.Description, i.Description), + Unit: nonZero(mask.Unit, i.Unit), + Aggregation: agg, + AttributeFilter: mask.AttributeFilter, + ExemplarReservoirProviderSelector: mask.ExemplarReservoirProviderSelector, }, true } return Stream{}, false