diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 2c501f45f13..82a8571e88d 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -44,7 +44,11 @@ type Builder[N int64 | float64] struct { func (b Builder[N]) input(agg aggregator[N]) Measure[N] { if b.Filter != nil { - agg = newFilter[N](agg, b.Filter) + fltr := b.Filter // Copy to make it immutable after assignment. + return func(_ context.Context, n N, a attribute.Set) { + fAttr, _ := a.Filter(fltr) + agg.Aggregate(n, fAttr) + } } return func(_ context.Context, n N, a attribute.Set) { agg.Aggregate(n, a) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go new file mode 100644 index 00000000000..09c92ab9a28 --- /dev/null +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -0,0 +1,75 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/otel/attribute" +) + +var ( + keyUser = "user" + userAlice = attribute.String(keyUser, "Alice") + adminTrue = attribute.Bool("admin", true) + + alice = attribute.NewSet(userAlice, adminTrue) + + // Filtered. + attrFltr = func(kv attribute.KeyValue) bool { + return kv.Key == attribute.Key(keyUser) + } + fltrAlice = attribute.NewSet(userAlice) +) + +type inputTester[N int64 | float64] struct { + aggregator[N] + + value N + attr attribute.Set +} + +func (it *inputTester[N]) Aggregate(v N, a attribute.Set) { it.value, it.attr = v, a } + +func TestBuilderInput(t *testing.T) { + t.Run("Int64", testBuilderInput[int64]()) + t.Run("Float64", testBuilderInput[float64]()) +} + +func testBuilderInput[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + t.Helper() + + value, attr := N(1), alice + run := func(b Builder[N], wantA attribute.Set) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + + it := &inputTester[N]{} + meas := b.input(it) + meas(context.Background(), value, attr) + + assert.Equal(t, value, it.value, "measured incorrect value") + assert.Equal(t, wantA, it.attr, "measured incorrect attributes") + } + } + + t.Run("NoFilter", run(Builder[N]{}, attr)) + t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice)) + } +} diff --git a/sdk/metric/internal/aggregate/aggregator_test.go b/sdk/metric/internal/aggregate/aggregator_test.go index 3efc3a0ecba..4f322694729 100644 --- a/sdk/metric/internal/aggregate/aggregator_test.go +++ b/sdk/metric/internal/aggregate/aggregator_test.go @@ -34,9 +34,8 @@ const ( ) var ( - alice = attribute.NewSet(attribute.String("user", "alice"), attribute.Bool("admin", true)) - bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false)) - carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false)) + bob = attribute.NewSet(attribute.String(keyUser, "bob"), attribute.Bool("admin", false)) + carol = attribute.NewSet(attribute.String(keyUser, "carol"), attribute.Bool("admin", false)) // Sat Jan 01 2000 00:00:00 GMT+0000. staticTime = time.Unix(946684800, 0) diff --git a/sdk/metric/internal/aggregate/filter.go b/sdk/metric/internal/aggregate/filter.go deleted file mode 100644 index ea471149e75..00000000000 --- a/sdk/metric/internal/aggregate/filter.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - -import ( - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// newFilter returns an Aggregator that wraps an agg with an attribute -// filtering function. Both pre-computed non-pre-computed Aggregators can be -// passed for agg. An appropriate Aggregator will be returned for the detected -// type. -func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggregator[N] { - if fn == nil { - return agg - } - return &filter[N]{ - filter: fn, - aggregator: agg, - } -} - -// filter wraps an aggregator with an attribute filter. All recorded -// measurements will have their attributes filtered before they are passed to -// the underlying aggregator's Aggregate method. -// -// This should not be used to wrap a pre-computed Aggregator. Use a -// precomputedFilter instead. -type filter[N int64 | float64] struct { - filter attribute.Filter - aggregator aggregator[N] -} - -// Aggregate records the measurement, scoped by attr, and aggregates it -// into an aggregation. -func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { - fAttr, _ := attr.Filter(f.filter) - f.aggregator.Aggregate(measurement, fAttr) -} - -// Aggregation returns an Aggregation, for all the aggregated -// measurements made and ends an aggregation cycle. -func (f *filter[N]) Aggregation() metricdata.Aggregation { - return f.aggregator.Aggregation() -} diff --git a/sdk/metric/internal/aggregate/filter_test.go b/sdk/metric/internal/aggregate/filter_test.go deleted file mode 100644 index b6544e3706b..00000000000 --- a/sdk/metric/internal/aggregate/filter_test.go +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -// This is an aggregator that has a stable output, used for testing. It does not -// follow any spec prescribed aggregation. -type testStableAggregator[N int64 | float64] struct { - sync.Mutex - values []metricdata.DataPoint[N] -} - -// Aggregate records the measurement, scoped by attr, and aggregates it -// into an aggregation. -func (a *testStableAggregator[N]) Aggregate(measurement N, attr attribute.Set) { - a.Lock() - defer a.Unlock() - - a.values = append(a.values, metricdata.DataPoint[N]{ - Attributes: attr, - Value: measurement, - }) -} - -// Aggregation returns an Aggregation, for all the aggregated -// measurements made and ends an aggregation cycle. -func (a *testStableAggregator[N]) Aggregation() metricdata.Aggregation { - return metricdata.Gauge[N]{ - DataPoints: a.values, - } -} - -func testNewFilterNoFilter[N int64 | float64](t *testing.T, agg aggregator[N]) { - filter := newFilter(agg, nil) - assert.Equal(t, agg, filter) -} - -func testNewFilter[N int64 | float64](t *testing.T, agg aggregator[N]) { - f := newFilter(agg, testAttributeFilter) - require.IsType(t, &filter[N]{}, f) - filt := f.(*filter[N]) - assert.Equal(t, agg, filt.aggregator) -} - -var testAttributeFilter = func(kv attribute.KeyValue) bool { - return kv.Key == "power-level" -} - -func TestNewFilter(t *testing.T) { - t.Run("int64", func(t *testing.T) { - agg := &testStableAggregator[int64]{} - testNewFilterNoFilter[int64](t, agg) - testNewFilter[int64](t, agg) - }) - t.Run("float64", func(t *testing.T) { - agg := &testStableAggregator[float64]{} - testNewFilterNoFilter[float64](t, agg) - testNewFilter[float64](t, agg) - }) -} - -func testDataPoint[N int64 | float64](attr attribute.Set) metricdata.DataPoint[N] { - return metricdata.DataPoint[N]{ - Attributes: attr, - Value: 1, - } -} - -func testFilterAggregate[N int64 | float64](t *testing.T) { - testCases := []struct { - name string - inputAttr []attribute.Set - output []metricdata.DataPoint[N] - }{ - { - name: "Will filter all out", - inputAttr: []attribute.Set{ - attribute.NewSet( - attribute.String("foo", "bar"), - attribute.Float64("lifeUniverseEverything", 42.0), - ), - }, - output: []metricdata.DataPoint[N]{ - testDataPoint[N](*attribute.EmptySet()), - }, - }, - { - name: "Will keep appropriate attributes", - inputAttr: []attribute.Set{ - attribute.NewSet( - attribute.String("foo", "bar"), - attribute.Int("power-level", 9001), - attribute.Float64("lifeUniverseEverything", 42.0), - ), - attribute.NewSet( - attribute.String("foo", "bar"), - attribute.Int("power-level", 9001), - ), - }, - output: []metricdata.DataPoint[N]{ - // A real Aggregator will combine these, the testAggregator doesn't for list stability. - testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))), - testDataPoint[N](attribute.NewSet(attribute.Int("power-level", 9001))), - }, - }, - { - name: "Will combine Aggregations", - inputAttr: []attribute.Set{ - attribute.NewSet( - attribute.String("foo", "bar"), - ), - attribute.NewSet( - attribute.Float64("lifeUniverseEverything", 42.0), - ), - }, - output: []metricdata.DataPoint[N]{ - // A real Aggregator will combine these, the testAggregator doesn't for list stability. - testDataPoint[N](*attribute.EmptySet()), - testDataPoint[N](*attribute.EmptySet()), - }, - }, - } - - for _, tt := range testCases { - t.Run(tt.name, func(t *testing.T) { - f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter) - for _, set := range tt.inputAttr { - f.Aggregate(1, set) - } - out := f.Aggregation().(metricdata.Gauge[N]) - assert.Equal(t, tt.output, out.DataPoints) - }) - } -} - -func TestFilterAggregate(t *testing.T) { - t.Run("int64", func(t *testing.T) { - testFilterAggregate[int64](t) - }) - t.Run("float64", func(t *testing.T) { - testFilterAggregate[float64](t) - }) -} - -func testFilterConcurrent[N int64 | float64](t *testing.T) { - f := newFilter[N](&testStableAggregator[N]{}, testAttributeFilter) - wg := &sync.WaitGroup{} - wg.Add(2) - - go func() { - f.Aggregate(1, attribute.NewSet( - attribute.String("foo", "bar"), - )) - wg.Done() - }() - - go func() { - f.Aggregate(1, attribute.NewSet( - attribute.Int("power-level", 9001), - )) - wg.Done() - }() - - wg.Wait() -} - -func TestFilterConcurrent(t *testing.T) { - t.Run("int64", func(t *testing.T) { - testFilterConcurrent[int64](t) - }) - t.Run("float64", func(t *testing.T) { - testFilterConcurrent[float64](t) - }) -}