diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 76ea3636a27..a6cceb2c253 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -16,12 +16,17 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" + "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) +// now is used to return the current local time while allowing tests to +// override the default time.Now function. +var now = time.Now + // Measure receives measurements to be aggregated. type Measure[N int64 | float64] func(context.Context, N, attribute.Set) @@ -53,19 +58,6 @@ func (b Builder[N]) filter(f Measure[N]) Measure[N] { return f } -func (b Builder[N]) input(agg aggregator[N]) Measure[N] { - if b.Filter != nil { - fltr := b.Filter // Copy to make it immutable after assignment. - return func(_ context.Context, n N, a attribute.Set) { - fAttr, _ := a.Filter(fltr) - agg.Aggregate(n, fAttr) - } - } - return func(_ context.Context, n N, a attribute.Set) { - agg.Aggregate(n, a) - } -} - // LastValue returns a last-value aggregate function input and output. // // The Builder.Temporality is ignored and delta is use always. @@ -111,19 +103,12 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) { - var h aggregator[N] + h := newHistogram[N](cfg, noSum) switch b.Temporality { case metricdata.DeltaTemporality: - h = newDeltaHistogram[N](cfg, noSum) + return b.filter(h.measure), h.delta default: - h = newCumulativeHistogram[N](cfg, noSum) - } - return b.input(h), func(dest *metricdata.Aggregation) int { - // TODO (#4220): optimize memory reuse here. - *dest = h.Aggregation() - - hData, _ := (*dest).(metricdata.Histogram[N]) - return len(hData.DataPoints) + return b.filter(h.measure), h.cumulative } } diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 76ba3a26eaf..79031b40218 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -55,21 +55,12 @@ var ( } ) -type inputTester[N int64 | float64] struct { - aggregator[N] - - value N - attr attribute.Set +func TestBuilderFilter(t *testing.T) { + t.Run("Int64", testBuilderFilter[int64]()) + t.Run("Float64", testBuilderFilter[float64]()) } -func (it *inputTester[N]) Aggregate(v N, a attribute.Set) { it.value, it.attr = v, a } - -func TestBuilderInput(t *testing.T) { - t.Run("Int64", testBuilderInput[int64]()) - t.Run("Float64", testBuilderInput[float64]()) -} - -func testBuilderInput[N int64 | float64]() func(t *testing.T) { +func testBuilderFilter[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { t.Helper() @@ -78,12 +69,11 @@ func testBuilderInput[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { t.Helper() - it := &inputTester[N]{} - meas := b.input(it) + meas := b.filter(func(_ context.Context, v N, a attribute.Set) { + assert.Equal(t, value, v, "measured incorrect value") + assert.Equal(t, wantA, a, "measured incorrect attributes") + }) meas(context.Background(), value, attr) - - assert.Equal(t, value, it.value, "measured incorrect value") - assert.Equal(t, wantA, it.attr, "measured incorrect attributes") } } diff --git a/sdk/metric/internal/aggregate/aggregator.go b/sdk/metric/internal/aggregate/aggregator.go deleted file mode 100644 index fac0dfd901a..00000000000 --- a/sdk/metric/internal/aggregate/aggregator.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - -import ( - "time" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// now is used to return the current local time while allowing tests to -// override the default time.Now function. -var now = time.Now - -// aggregator forms an aggregation from a collection of recorded measurements. -// -// Aggregators need to be comparable so they can be de-duplicated by the SDK -// when it creates them for multiple views. -type aggregator[N int64 | float64] interface { - // Aggregate records the measurement, scoped by attr, and aggregates it - // into an aggregation. - Aggregate(measurement N, attr attribute.Set) - - // Aggregation returns an Aggregation, for all the aggregated - // measurements made and ends an aggregation cycle. - Aggregation() metricdata.Aggregation -} diff --git a/sdk/metric/internal/aggregate/aggregator_test.go b/sdk/metric/internal/aggregate/aggregator_test.go deleted file mode 100644 index 4e16175159b..00000000000 --- a/sdk/metric/internal/aggregate/aggregator_test.go +++ /dev/null @@ -1,148 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - -import ( - "strconv" - "sync" - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" -) - -const ( - defaultGoroutines = 5 - defaultMeasurements = 30 - defaultCycles = 3 -) - -var carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false)) - -func monoIncr[N int64 | float64]() setMap[N] { - return setMap[N]{alice: 1, bob: 10, carol: 2} -} - -// setMap maps attribute sets to a number. -type setMap[N int64 | float64] map[attribute.Set]N - -// expectFunc is a function that returns an Aggregation of expected values for -// a cycle that contains m measurements (total across all goroutines). Each -// call advances the cycle. -type expectFunc func(m int) metricdata.Aggregation - -// aggregatorTester runs an acceptance test on an Aggregator. It will ask an -// Aggregator to aggregate a set of values as if they were real measurements -// made MeasurementN number of times. This will be done in GoroutineN number -// of different goroutines. After the Aggregator has been asked to aggregate -// all these measurements, it is validated using a passed expecterFunc. This -// set of operation is a single cycle, and the the aggregatorTester will run -// CycleN number of cycles. -type aggregatorTester[N int64 | float64] struct { - // GoroutineN is the number of goroutines aggregatorTester will use to run - // the test with. - GoroutineN int - // MeasurementN is the number of measurements that are made each cycle a - // goroutine runs the test. - MeasurementN int - // CycleN is the number of times a goroutine will make a set of - // measurements. - CycleN int -} - -func (at *aggregatorTester[N]) Run(a aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) { - m := at.MeasurementN * at.GoroutineN - return func(t *testing.T) { - t.Run("Comparable", func(t *testing.T) { - assert.NotPanics(t, func() { - _ = map[aggregator[N]]struct{}{a: {}} - }) - }) - - t.Run("CorrectnessConcurrentSafe", func(t *testing.T) { - for i := 0; i < at.CycleN; i++ { - var wg sync.WaitGroup - wg.Add(at.GoroutineN) - for j := 0; j < at.GoroutineN; j++ { - go func() { - defer wg.Done() - for k := 0; k < at.MeasurementN; k++ { - for attrs, n := range incr { - a.Aggregate(N(n), attrs) - } - } - }() - } - wg.Wait() - - metricdatatest.AssertAggregationsEqual(t, eFunc(m), a.Aggregation()) - } - }) - } -} - -var bmarkResults metricdata.Aggregation - -func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() aggregator[N], count int) { - attrs := make([]attribute.Set, count) - for i := range attrs { - attrs[i] = attribute.NewSet(attribute.Int("value", i)) - } - - b.Run("Aggregate", func(b *testing.B) { - agg := factory() - b.ReportAllocs() - b.ResetTimer() - - for n := 0; n < b.N; n++ { - for _, attr := range attrs { - agg.Aggregate(1, attr) - } - } - bmarkResults = agg.Aggregation() - }) - - b.Run("Aggregations", func(b *testing.B) { - aggs := make([]aggregator[N], b.N) - for n := range aggs { - a := factory() - for _, attr := range attrs { - a.Aggregate(1, attr) - } - aggs[n] = a - } - - b.ReportAllocs() - b.ResetTimer() - - for n := 0; n < b.N; n++ { - bmarkResults = aggs[n].Aggregation() - } - }) -} - -func benchmarkAggregator[N int64 | float64](factory func() aggregator[N]) func(*testing.B) { - counts := []int{1, 10, 100} - return func(b *testing.B) { - for _, n := range counts { - b.Run(strconv.Itoa(n), func(b *testing.B) { - benchmarkAggregatorN(b, factory, n) - }) - } - } -} diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 0683ff2eb23..cd030076fdc 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -15,6 +15,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "sort" "sync" "time" @@ -75,7 +76,7 @@ func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[ // Aggregate records the measurement value, scoped by attr, and aggregates it // into a histogram. -func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { +func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) { // This search will return an index in the range [0, len(s.bounds)], where // it will return len(s.bounds) if value is greater than the last element // of s.bounds. This aligns with the buckets in that the length of buckets @@ -106,111 +107,93 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { } } -// newDeltaHistogram returns an Aggregator that summarizes a set of -// measurements as an histogram. Each histogram is scoped by attributes and -// the aggregation cycle the measurements were made in. -// -// Each aggregation cycle is treated independently. When the returned -// Aggregator's Aggregations method is called it will reset all histogram -// counts to zero. -func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { - return &deltaHistogram[N]{ +// newHistogram returns an Aggregator that summarizes a set of measurements as +// an histogram. +func newHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) *histogram[N] { + return &histogram[N]{ histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } } -// deltaHistogram summarizes a set of measurements made in a single -// aggregation cycle as an histogram with explicitly defined buckets. -type deltaHistogram[N int64 | float64] struct { +// histogram summarizes a set of measurements as an histogram with explicitly +// defined buckets. +type histogram[N int64 | float64] struct { *histValues[N] noMinMax bool start time.Time } -func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { +func (s *histogram[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.Histogram, memory reuse is missed. In that + // case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.Histogram[N]) + h.Temporality = metricdata.DeltaTemporality + s.valuesMu.Lock() defer s.valuesMu.Unlock() - if len(s.values) == 0 { - return nil - } - - t := now() // Do not allow modification of our copy of bounds. bounds := make([]float64, len(s.bounds)) copy(bounds, s.bounds) - h := metricdata.Histogram[N]{ - Temporality: metricdata.DeltaTemporality, - DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)), - } + + n := len(s.values) + hDPts := reset(h.DataPoints, n, n) + + var i int for a, b := range s.values { - hdp := metricdata.HistogramDataPoint[N]{ - Attributes: a, - StartTime: s.start, - Time: t, - Count: b.count, - Bounds: bounds, - BucketCounts: b.counts, - } + hDPts[i].Attributes = a + hDPts[i].StartTime = s.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Bounds = bounds + hDPts[i].BucketCounts = b.counts + if !s.noSum { - hdp.Sum = b.total + hDPts[i].Sum = b.total } + if !s.noMinMax { - hdp.Min = metricdata.NewExtrema(b.min) - hdp.Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) } - h.DataPoints = append(h.DataPoints, hdp) // Unused attribute sets do not report. delete(s.values, a) + i++ } // The delta collection cycle resets. s.start = t - return h -} -// newCumulativeHistogram returns an Aggregator that summarizes a set of -// measurements as an histogram. Each histogram is scoped by attributes. -// -// Each aggregation cycle builds from the previous, the histogram counts are -// the bucketed counts of all values aggregated since the returned Aggregator -// was created. -func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { - return &cumulativeHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries, noSum), - noMinMax: cfg.NoMinMax, - start: now(), - } + h.DataPoints = hDPts + *dest = h + + return n } -// cumulativeHistogram summarizes a set of measurements made over all -// aggregation cycles as an histogram with explicitly defined buckets. -type cumulativeHistogram[N int64 | float64] struct { - *histValues[N] +func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() - noMinMax bool - start time.Time -} + // If *dest is not a metricdata.Histogram, memory reuse is missed. In that + // case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.Histogram[N]) + h.Temporality = metricdata.CumulativeTemporality -func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { s.valuesMu.Lock() defer s.valuesMu.Unlock() - if len(s.values) == 0 { - return nil - } - - t := now() // Do not allow modification of our copy of bounds. bounds := make([]float64, len(s.bounds)) copy(bounds, s.bounds) - h := metricdata.Histogram[N]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: make([]metricdata.HistogramDataPoint[N], 0, len(s.values)), - } + + n := len(s.values) + hDPts := reset(h.DataPoints, n, n) + + var i int for a, b := range s.values { // The HistogramDataPoint field values returned need to be copies of // the buckets value as we will keep updating them. @@ -220,26 +203,30 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { counts := make([]uint64, len(b.counts)) copy(counts, b.counts) - hdp := metricdata.HistogramDataPoint[N]{ - Attributes: a, - StartTime: s.start, - Time: t, - Count: b.count, - Bounds: bounds, - BucketCounts: counts, - } + hDPts[i].Attributes = a + hDPts[i].StartTime = s.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Bounds = bounds + hDPts[i].BucketCounts = counts + if !s.noSum { - hdp.Sum = b.total + hDPts[i].Sum = b.total } + if !s.noMinMax { - hdp.Min = metricdata.NewExtrema(b.min) - hdp.Max = metricdata.NewExtrema(b.max) + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) } - h.DataPoints = append(h.DataPoints, hdp) + i++ // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not // overload the system. } - return h + + h.DataPoints = hDPts + *dest = h + + return n } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 8c75562198d..68a00f2a90f 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -15,6 +15,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( + "context" "sort" "testing" @@ -37,74 +38,155 @@ var ( func TestHistogram(t *testing.T) { t.Cleanup(mockTime(now)) - t.Run("Int64", testHistogram[int64]) - t.Run("Float64", testHistogram[float64]) -} -func testHistogram[N int64 | float64](t *testing.T) { - tester := &aggregatorTester[N]{ - GoroutineN: defaultGoroutines, - MeasurementN: defaultMeasurements, - CycleN: defaultCycles, - } + t.Run("Int64/Delta/Sum", testDeltaHist[int64](conf[int64]{hPt: hPointSummed[int64]})) + t.Run("Int64/Delta/NoSum", testDeltaHist[int64](conf[int64]{noSum: true, hPt: hPoint[int64]})) + t.Run("Float64/Delta/Sum", testDeltaHist[float64](conf[float64]{hPt: hPointSummed[float64]})) + t.Run("Float64/Delta/NoSum", testDeltaHist[float64](conf[float64]{noSum: true, hPt: hPoint[float64]})) - incr := monoIncr[N]() - eFunc := deltaHistSummedExpecter[N](incr) - t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc)) - eFunc = deltaHistExpecter[N](incr) - t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc)) - eFunc = cumuHistSummedExpecter[N](incr) - t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc)) - eFunc = cumuHistExpecter[N](incr) - t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc)) + t.Run("Int64/Cumulative/Sum", testCumulativeHist[int64](conf[int64]{hPt: hPointSummed[int64]})) + t.Run("Int64/Cumulative/NoSum", testCumulativeHist[int64](conf[int64]{noSum: true, hPt: hPoint[int64]})) + t.Run("Float64/Cumulative/Sum", testCumulativeHist[float64](conf[float64]{hPt: hPointSummed[float64]})) + t.Run("Float64/Cumulative/NoSum", testCumulativeHist[float64](conf[float64]{noSum: true, hPt: hPoint[float64]})) } -func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { - h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} - return func(m int) metricdata.Aggregation { - h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) - for a, v := range incr { - h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m))) - } - return h - } +type conf[N int64 | float64] struct { + noSum bool + hPt func(attribute.Set, N, uint64) metricdata.HistogramDataPoint[N] } -func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { - h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} - return func(m int) metricdata.Aggregation { - h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) - for a, v := range incr { - h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(m))) - } - return h - } -} - -func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { - var cycle int - h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} - return func(m int) metricdata.Aggregation { - cycle++ - h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) - for a, v := range incr { - h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m))) - } - return h - } +func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + }.ExplicitBucketHistogram(histConf, c.noSum) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 2, alice}, + {ctx, 10, bob}, + {ctx, 2, alice}, + {ctx, 2, alice}, + {ctx, 10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 3), + c.hPt(fltrBob, 10, 2), + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 10, 1), + c.hPt(fltrBob, 3, 1), + }, + }, + }, + }, + { + input: []arg[N]{}, + // Delta histograms are expected to reset. + expect: output{ + n: 0, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{}, + }, + }, + }, + }) } -func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { - var cycle int - h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} - return func(m int) metricdata.Aggregation { - cycle++ - h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) - for a, v := range incr { - h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(cycle*m))) - } - return h - } +func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + }.ExplicitBucketHistogram(histConf, c.noSum) + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + input: []arg[N]{}, + expect: output{ + n: 0, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{}, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 2, alice}, + {ctx, 10, bob}, + {ctx, 2, alice}, + {ctx, 2, alice}, + {ctx, 10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 3), + c.hPt(fltrBob, 10, 2), + }, + }, + }, + }, + { + input: []arg[N]{ + {ctx, 2, alice}, + {ctx, 10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 4), + c.hPt(fltrBob, 10, 3), + }, + }, + }, + }, + { + input: []arg[N]{}, + expect: output{ + n: 2, + agg: metricdata.Histogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[N]{ + c.hPt(fltrAlice, 2, 4), + c.hPt(fltrBob, 10, 3), + }, + }, + }, + }, + }) } // hPointSummed returns an HistogramDataPoint that started and ended now with @@ -190,93 +272,89 @@ func testBucketsSum[N int64 | float64]() func(t *testing.T) { } } -func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { +func TestHistogramImmutableBounds(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) copy(cpB, b) - a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false) - return func(t *testing.T) { - require.Equal(t, cpB, getBounds(a)) + h := newHistogram[int64](aggregation.ExplicitBucketHistogram{Boundaries: b}, false) + require.Equal(t, cpB, h.bounds) - b[0] = 10 - assert.Equal(t, cpB, getBounds(a), "modifying the bounds argument should not change the bounds") + b[0] = 10 + assert.Equal(t, cpB, h.bounds, "modifying the bounds argument should not change the bounds") - a.Aggregate(5, alice) - hdp := a.Aggregation().(metricdata.Histogram[N]).DataPoints[0] - hdp.Bounds[1] = 10 - assert.Equal(t, cpB, getBounds(a), "modifying the Aggregation bounds should not change the bounds") - } -} - -func TestHistogramImmutableBounds(t *testing.T) { - t.Run("Delta", testHistImmutableBounds( - newDeltaHistogram[int64], - func(a aggregator[int64]) []float64 { - deltaH := a.(*deltaHistogram[int64]) - return deltaH.bounds - }, - )) + h.measure(context.Background(), 5, alice) - t.Run("Cumulative", testHistImmutableBounds( - newCumulativeHistogram[int64], - func(a aggregator[int64]) []float64 { - cumuH := a.(*cumulativeHistogram[int64]) - return cumuH.bounds - }, - )) + var data metricdata.Aggregation = metricdata.Histogram[int64]{} + h.cumulative(&data) + hdp := data.(metricdata.Histogram[int64]).DataPoints[0] + hdp.Bounds[1] = 10 + assert.Equal(t, cpB, h.bounds, "modifying the Aggregation bounds should not change the bounds") } func TestCumulativeHistogramImutableCounts(t *testing.T) { - a := newCumulativeHistogram[int64](histConf, false) - a.Aggregate(5, alice) - hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0] + h := newHistogram[int64](histConf, false) + h.measure(context.Background(), 5, alice) + + var data metricdata.Aggregation = metricdata.Histogram[int64]{} + h.cumulative(&data) + hdp := data.(metricdata.Histogram[int64]).DataPoints[0] - cumuH := a.(*cumulativeHistogram[int64]) - require.Equal(t, hdp.BucketCounts, cumuH.values[alice].counts) + require.Equal(t, hdp.BucketCounts, h.values[alice].counts) cpCounts := make([]uint64, len(hdp.BucketCounts)) copy(cpCounts, hdp.BucketCounts) hdp.BucketCounts[0] = 10 - assert.Equal(t, cpCounts, cumuH.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator") + assert.Equal(t, cpCounts, h.values[alice].counts, "modifying the Aggregator bucket counts should not change the Aggregator") } func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - a := newDeltaHistogram[int64](histConf, false) - assert.Nil(t, a.Aggregation()) + h := newHistogram[int64](histConf, false) + + var data metricdata.Aggregation = metricdata.Histogram[int64]{} + require.Equal(t, 0, h.delta(&data)) + require.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) + + h.measure(context.Background(), 1, alice) - a.Aggregate(1, alice) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) + h.delta(&data) + metricdatatest.AssertAggregationsEqual(t, expect, data) // The attr set should be forgotten once Aggregations is called. expect.DataPoints = nil - assert.Nil(t, a.Aggregation()) + assert.Equal(t, 0, h.delta(&data)) + assert.Len(t, data.(metricdata.Histogram[int64]).DataPoints, 0) // Aggregating another set should not affect the original (alice). - a.Aggregate(1, bob) + h.measure(context.Background(), 1, bob) expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} - metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) -} - -func TestEmptyHistogramNilAggregation(t *testing.T) { - assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation()) - assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation()) - assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation()) - assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation()) + h.delta(&data) + metricdatatest.AssertAggregationsEqual(t, expect, data) } func BenchmarkHistogram(b *testing.B) { - b.Run("Int64", benchmarkHistogram[int64]) - b.Run("Float64", benchmarkHistogram[float64]) -} - -func benchmarkHistogram[N int64 | float64](b *testing.B) { - factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) } - b.Run("Delta", benchmarkAggregator(factory)) - factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) } - b.Run("Cumulative", benchmarkAggregator(factory)) + b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.CumulativeTemporality, + }.ExplicitBucketHistogram(histConf, false) + })) + b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.DeltaTemporality, + }.ExplicitBucketHistogram(histConf, false) + })) + b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.CumulativeTemporality, + }.ExplicitBucketHistogram(histConf, false) + })) + b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.DeltaTemporality, + }.ExplicitBucketHistogram(histConf, false) + })) }