diff --git a/CHANGELOG.md b/CHANGELOG.md index d11886adf59..fd2e673fb42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,11 +32,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) - If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289) +- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) +- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) ### Fixed - Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143) - Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307) +- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317) ## [1.16.0/0.39.0] 2023-05-18 diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index b4d6fa8b35c..83652c6e97f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{} var _ metric.Float64ObservableUpDownCounter = float64Observable{} var _ metric.Float64ObservableGauge = float64Observable{} -func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { +func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { return float64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -296,9 +296,9 @@ var _ metric.Int64ObservableCounter = int64Observable{} var _ metric.Int64ObservableUpDownCounter = int64Observable{} var _ metric.Int64ObservableGauge = int64Observable{} -func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { +func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { return int64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -306,18 +306,20 @@ type observable[N int64 | float64] struct { metric.Observable observablID[N] + meter *meter measures []aggregate.Measure[N] } -func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { +func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { return &observable[N]{ observablID: observablID[N]{ name: name, description: desc, kind: kind, unit: u, - scope: scope, + scope: m.scope, }, + meter: m, measures: meas, } } @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument") // and nil if it should. An errEmptyAgg error is returned if o is effectively a // no-op because it does not have any aggregators. Also, an error is returned // if scope defines a Meter other than the one o was created by. -func (o *observable[N]) registerable(scope instrumentation.Scope) error { +func (o *observable[N]) registerable(m *meter) error { if len(o.measures) == 0 { return errEmptyAgg } - if scope != o.scope { + if m != o.meter { return fmt.Errorf( "invalid registration: observable %q from Meter %q, registered with Meter %q", o.name, o.scope.Name, - scope.Name, + m.scope.Name, ) } return nil diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index f386c337135..2c501f45f13 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -109,13 +109,13 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { // ExplicitBucketHistogram returns a histogram aggregate function input and // output. -func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) { +func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) { var h aggregator[N] switch b.Temporality { case metricdata.DeltaTemporality: - h = newDeltaHistogram[N](cfg) + h = newDeltaHistogram[N](cfg, noSum) default: - h = newCumulativeHistogram[N](cfg) + h = newCumulativeHistogram[N](cfg, noSum) } return b.input(h), func(dest *metricdata.Aggregation) int { // TODO (#4220): optimize memory reuse here. diff --git a/sdk/metric/internal/aggregate/aggregator_example_test.go b/sdk/metric/internal/aggregate/aggregator_example_test.go deleted file mode 100644 index 7b566b957d9..00000000000 --- a/sdk/metric/internal/aggregate/aggregator_example_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package aggregate - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/embedded" - "go.opentelemetry.io/otel/sdk/metric/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metricdata" -) - -type meter struct { - // When a reader initiates a collection, the meter would collect - // aggregations from each of these functions. - aggregations []metricdata.Aggregation -} - -func (p *meter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64Counter, error) { - // This is an example of how a meter would create an aggregator for a new - // counter. At this point the provider would determine the aggregation and - // temporality to used based on the Reader and View configuration. Assume - // here these are determined to be a cumulative sum. - - aggregator := newCumulativeSum[int64](true) - count := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for counter\n", aggregator) - - return count, nil -} - -func (p *meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { - // This is an example of how a meter would create an aggregator for a new - // up-down counter. At this point the provider would determine the - // aggregation and temporality to used based on the Reader and View - // configuration. Assume here these are determined to be a last-value - // aggregation (the temporality does not affect the produced aggregations). - - aggregator := newLastValue[int64]() - upDownCount := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for up-down counter\n", aggregator) - - return upDownCount, nil -} - -func (p *meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { - // This is an example of how a meter would create an aggregator for a new - // histogram. At this point the provider would determine the aggregation - // and temporality to used based on the Reader and View configuration. - // Assume here these are determined to be a delta explicit-bucket - // histogram. - - aggregator := newDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{ - Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, - NoMinMax: false, - }) - hist := inst{aggregateFunc: aggregator.Aggregate} - - p.aggregations = append(p.aggregations, aggregator.Aggregation()) - - fmt.Printf("using %T aggregator for histogram\n", aggregator) - - return hist, nil -} - -// inst is a generalized int64 synchronous counter, up-down counter, and -// histogram used for demonstration purposes only. -type inst struct { - aggregateFunc func(int64, attribute.Set) - - embedded.Int64Counter - embedded.Int64UpDownCounter - embedded.Int64Histogram -} - -func (inst) Add(context.Context, int64, ...metric.AddOption) {} -func (inst) Record(context.Context, int64, ...metric.RecordOption) {} - -func Example() { - m := meter{} - - _, _ = m.Int64Counter("counter example") - _, _ = m.Int64UpDownCounter("up-down counter example") - _, _ = m.Int64Histogram("histogram example") - - // Output: - // using *aggregate.cumulativeSum[int64] aggregator for counter - // using *aggregate.lastValue[int64] aggregator for up-down counter - // using *aggregate.deltaHistogram[int64] aggregator for histogram -} diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index a7a7780c307..0683ff2eb23 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -27,7 +27,7 @@ import ( type buckets[N int64 | float64] struct { counts []uint64 count uint64 - sum N + total N min, max N } @@ -36,10 +36,11 @@ func newBuckets[N int64 | float64](n int) *buckets[N] { return &buckets[N]{counts: make([]uint64, n)} } +func (b *buckets[N]) sum(value N) { b.total += value } + func (b *buckets[N]) bin(idx int, value N) { b.counts[idx]++ b.count++ - b.sum += value if value < b.min { b.min = value } else if value > b.max { @@ -50,13 +51,14 @@ func (b *buckets[N]) bin(idx int, value N) { // histValues summarizes a set of measurements as an histValues with // explicitly defined buckets. type histValues[N int64 | float64] struct { + noSum bool bounds []float64 values map[attribute.Set]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -65,6 +67,7 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] { copy(b, bounds) sort.Float64s(b) return &histValues[N]{ + noSum: noSum, bounds: b, values: make(map[attribute.Set]*buckets[N]), } @@ -98,6 +101,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { s.values[attr] = b } b.bin(idx, value) + if !s.noSum { + b.sum(value) + } } // newDeltaHistogram returns an Aggregator that summarizes a set of @@ -107,9 +113,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) { // Each aggregation cycle is treated independently. When the returned // Aggregator's Aggregations method is called it will reset all histogram // counts to zero. -func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &deltaHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -148,7 +154,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: b.counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) @@ -170,9 +178,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation { // Each aggregation cycle builds from the previous, the histogram counts are // the bucketed counts of all values aggregated since the returned Aggregator // was created. -func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] { +func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] { return &cumulativeHistogram[N]{ - histValues: newHistValues[N](cfg.Boundaries), + histValues: newHistValues[N](cfg.Boundaries, noSum), noMinMax: cfg.NoMinMax, start: now(), } @@ -219,7 +227,9 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation { Count: b.count, Bounds: bounds, BucketCounts: counts, - Sum: b.sum, + } + if !s.noSum { + hdp.Sum = b.total } if !s.noMinMax { hdp.Min = metricdata.NewExtrema(b.min) diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 3656be9e988..8c75562198d 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -49,10 +49,25 @@ func testHistogram[N int64 | float64](t *testing.T) { } incr := monoIncr[N]() - eFunc := deltaHistExpecter[N](incr) - t.Run("Delta", tester.Run(newDeltaHistogram[N](histConf), incr, eFunc)) + eFunc := deltaHistSummedExpecter[N](incr) + t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc)) + eFunc = deltaHistExpecter[N](incr) + t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc)) + eFunc = cumuHistSummedExpecter[N](incr) + t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc)) eFunc = cumuHistExpecter[N](incr) - t.Run("Cumulative", tester.Run(newCumulativeHistogram[N](histConf), incr, eFunc)) + t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc)) +} + +func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality} + return func(m int) metricdata.Aggregation { + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m))) + } + return h + } } func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { @@ -66,6 +81,19 @@ func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc { + var cycle int + h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} + return func(m int) metricdata.Aggregation { + cycle++ + h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr)) + for a, v := range incr { + h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m))) + } + return h + } +} + func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { var cycle int h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality} @@ -79,6 +107,25 @@ func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc { } } +// hPointSummed returns an HistogramDataPoint that started and ended now with +// multi number of measurements values v. It includes a min and max (set to v). +func hPointSummed[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { + idx := sort.SearchFloat64s(bounds, float64(v)) + counts := make([]uint64, len(bounds)+1) + counts[idx] += multi + return metricdata.HistogramDataPoint[N]{ + Attributes: a, + StartTime: now(), + Time: now(), + Count: multi, + Bounds: bounds, + BucketCounts: counts, + Min: metricdata.NewExtrema(v), + Max: metricdata.NewExtrema(v), + Sum: v * N(multi), + } +} + // hPoint returns an HistogramDataPoint that started and ended now with multi // number of measurements values v. It includes a min and max (set to v). func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] { @@ -94,7 +141,6 @@ func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.Hi BucketCounts: counts, Min: metricdata.NewExtrema(v), Max: metricdata.NewExtrema(v), - Sum: v * N(multi), } } @@ -106,28 +152,50 @@ func TestBucketsBin(t *testing.T) { func testBucketsBin[N int64 | float64]() func(t *testing.T) { return func(t *testing.T) { b := newBuckets[N](3) - assertB := func(counts []uint64, count uint64, sum, min, max N) { + assertB := func(counts []uint64, count uint64, min, max N) { + t.Helper() assert.Equal(t, counts, b.counts) assert.Equal(t, count, b.count) - assert.Equal(t, sum, b.sum) assert.Equal(t, min, b.min) assert.Equal(t, max, b.max) } - assertB([]uint64{0, 0, 0}, 0, 0, 0, 0) + assertB([]uint64{0, 0, 0}, 0, 0, 0) b.bin(1, 2) - assertB([]uint64{0, 1, 0}, 1, 2, 0, 2) + assertB([]uint64{0, 1, 0}, 1, 0, 2) b.bin(0, -1) - assertB([]uint64{1, 1, 0}, 2, 1, -1, 2) + assertB([]uint64{1, 1, 0}, 2, -1, 2) + } +} + +func TestBucketsSum(t *testing.T) { + t.Run("Int64", testBucketsSum[int64]()) + t.Run("Float64", testBucketsSum[float64]()) +} + +func testBucketsSum[N int64 | float64]() func(t *testing.T) { + return func(t *testing.T) { + b := newBuckets[N](3) + + var want N + assert.Equal(t, want, b.total) + + b.sum(2) + want = 2 + assert.Equal(t, want, b.total) + + b.sum(-1) + want = 1 + assert.Equal(t, want, b.total) } } -func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { +func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) { b := []float64{0, 1, 2} cpB := make([]float64, len(b)) copy(cpB, b) - a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}) + a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false) return func(t *testing.T) { require.Equal(t, cpB, getBounds(a)) @@ -160,7 +228,7 @@ func TestHistogramImmutableBounds(t *testing.T) { } func TestCumulativeHistogramImutableCounts(t *testing.T) { - a := newCumulativeHistogram[int64](histConf) + a := newCumulativeHistogram[int64](histConf, false) a.Aggregate(5, alice) hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0] @@ -176,12 +244,12 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) { func TestDeltaHistogramReset(t *testing.T) { t.Cleanup(mockTime(now)) - a := newDeltaHistogram[int64](histConf) + a := newDeltaHistogram[int64](histConf, false) assert.Nil(t, a.Aggregation()) a.Aggregate(1, alice) expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality} - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](alice, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) // The attr set should be forgotten once Aggregations is called. @@ -190,15 +258,15 @@ func TestDeltaHistogramReset(t *testing.T) { // Aggregating another set should not affect the original (alice). a.Aggregate(1, bob) - expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](bob, 1, 1)} + expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)} metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation()) } func TestEmptyHistogramNilAggregation(t *testing.T) { - assert.Nil(t, newCumulativeHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newCumulativeHistogram[float64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[int64](histConf).Aggregation()) - assert.Nil(t, newDeltaHistogram[float64](histConf).Aggregation()) + assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation()) + assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation()) } func BenchmarkHistogram(b *testing.B) { @@ -207,8 +275,8 @@ func BenchmarkHistogram(b *testing.B) { } func benchmarkHistogram[N int64 | float64](b *testing.B) { - factory := func() aggregator[N] { return newDeltaHistogram[N](histConf) } + factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) } b.Run("Delta", benchmarkAggregator(factory)) - factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf) } + factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) } b.Run("Cumulative", benchmarkAggregator(factory)) } diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7e1d32be249..f76d5190413 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -42,8 +42,8 @@ type meter struct { scope instrumentation.Scope pipes pipelines - int64IP *int64InstProvider - float64IP *float64InstProvider + int64Resolver resolver[int64] + float64Resolver resolver[float64] } func newMeter(s instrumentation.Scope, p pipelines) *meter { @@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { var viewCache cache[string, streamID] return &meter{ - scope: s, - pipes: p, - int64IP: newInt64InstProvider(s, p, &viewCache), - float64IP: newFloat64InstProvider(s, p, &viewCache), + scope: s, + pipes: p, + int64Resolver: newResolver[int64](p, &viewCache), + float64Resolver: newResolver[float64](p, &viewCache), } } @@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil) func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) { cfg := metric.NewInt64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { cfg := metric.NewInt64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { cfg := metric.NewInt64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { cfg := metric.NewInt64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) { cfg := metric.NewFloat64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { cfg := metric.NewFloat64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { cfg := metric.NewFloat64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { cfg := metric.NewFloat64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -301,7 +307,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) switch o := inst.(type) { case int64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -309,7 +315,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } reg.registerInt64(o.observablID) case float64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } } - if err := errs.errorOrNil(); err != nil { - return nil, err - } - + err := errs.errorOrNil() if reg.len() == 0 { - // All insts use drop aggregation. - return noopRegister{}, nil + // All insts use drop aggregation or are invalid. + return noopRegister{}, err } - cback := func(ctx context.Context) error { - return f(ctx, reg) - } - return m.pipes.registerMultiCallback(cback), nil + // Some or all instruments were valid. + cback := func(ctx context.Context) error { return f(ctx, reg) } + return m.pipes.registerMultiCallback(cback), err } type observer struct { @@ -441,17 +443,9 @@ func (noopRegister) Unregister() error { } // int64InstProvider provides int64 OpenTelemetry instruments. -type int64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[int64] -} +type int64InstProvider struct{ *meter } -func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider { - return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)} -} - -func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { +func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { inst := Instrument{ Name: name, Description: desc, @@ -459,27 +453,19 @@ func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]a Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.int64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { +func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &int64Inst{measures: aggs}, err } // float64InstProvider provides float64 OpenTelemetry instruments. -type float64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[float64] -} - -func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider { - return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)} -} +type float64InstProvider struct{ *meter } -func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { +func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { inst := Instrument{ Name: name, Description: desc, @@ -487,20 +473,20 @@ func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([ Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.float64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { +func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &float64Inst{measures: aggs}, err } -type int64ObservProvider struct{ *int64InstProvider } +type int64ObservProvider struct{ *meter } func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newInt64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) + return newInt64Observable(p.meter, kind, name, desc, u, aggs), err } func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { @@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { o.observe(val, c.Attributes()) } -type float64ObservProvider struct{ *float64InstProvider } +type float64ObservProvider struct{ *meter } func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) + return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err } func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index a6a6c8ca44c..5989e0c9575 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -412,7 +412,15 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr meas, comp = b.Sum(false) } case aggregation.ExplicitBucketHistogram: - meas, comp = b.ExplicitBucketHistogram(a) + var noSum bool + switch kind { + case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: + // The sum should not be collected for any instrument that can make + // negative measurements: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations + noSum = true + } + meas, comp = b.ExplicitBucketHistogram(a, noSum) default: err = errUnknownAggregation } @@ -425,23 +433,28 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | // |--------------------------|------|-----------|-----|-----------|-----------------------| -// | Counter | X | | X | X | X | -// | UpDownCounter | X | | X | | | -// | Histogram | X | | X | X | X | -// | Observable Counter | X | | X | | | -// | Observable UpDownCounter | X | | X | | | -// | Observable Gauge | X | X | | | |. +// | Counter | ✓ | | ✓ | ✓ | ✓ | +// | UpDownCounter | ✓ | | ✓ | ✓ | | +// | Histogram | ✓ | | ✓ | ✓ | ✓ | +// | Observable Counter | ✓ | | ✓ | ✓ | | +// | Observable UpDownCounter | ✓ | | ✓ | ✓ | | +// | Observable Gauge | ✓ | ✓ | | ✓ | |. func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.Default: return nil case aggregation.ExplicitBucketHistogram: - if kind == InstrumentKindCounter || kind == InstrumentKindHistogram { + switch kind { + case InstrumentKindCounter, + InstrumentKindUpDownCounter, + InstrumentKindHistogram, + InstrumentKindObservableCounter, + InstrumentKindObservableUpDownCounter, + InstrumentKindObservableGauge: return nil + default: + return errIncompatibleAggregation } - // TODO: review need for aggregation check after - // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 - return errIncompatibleAggregation case aggregation.Sum: switch kind { case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter: diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 3e1c86dc654..c10bbc36803 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -498,7 +498,7 @@ func TestPipelineRegistryResource(t *testing.T) { } func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Sum{} })) readers := []Reader{testRdrHistogram} views := []View{defaultView} @@ -643,7 +643,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "SyncUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "SyncHistogram and Drop", @@ -686,7 +685,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableUpDownCounter and Drop", @@ -708,7 +706,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "ObservableGauge and Drop", @@ -730,7 +727,6 @@ func TestIsAggregatorCompatible(t *testing.T) { name: "ObservableGauge and ExplicitBucketHistogram", kind: InstrumentKindObservableGauge, agg: aggregation.ExplicitBucketHistogram{}, - want: errIncompatibleAggregation, }, { name: "unknown kind with Sum should error", diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index 5f02b130570..774e026ad87 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -21,11 +21,14 @@ import ( "testing" "github.com/go-logr/logr/funcr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestMeterConcurrentSafe(t *testing.T) { @@ -120,3 +123,51 @@ func TestMeterProviderReturnsNoopMeterAfterShutdown(t *testing.T) { _, ok = m.(noop.Meter) assert.Truef(t, ok, "Meter from shutdown MeterProvider is not NoOp: %T", m) } + +func TestMeterProviderMixingOnRegisterErrors(t *testing.T) { + otel.SetLogger(testr.New(t)) + + rdr0 := NewManualReader() + mp0 := NewMeterProvider(WithReader(rdr0)) + + rdr1 := NewManualReader() + mp1 := NewMeterProvider(WithReader(rdr1)) + + // Meters with the same scope but different MeterProviders. + m0 := mp0.Meter("TestMeterProviderMixingOnRegisterErrors") + m1 := mp1.Meter("TestMeterProviderMixingOnRegisterErrors") + + m0Gauge, err := m0.Float64ObservableGauge("float64Gauge") + require.NoError(t, err) + + m1Gauge, err := m1.Int64ObservableGauge("int64Gauge") + require.NoError(t, err) + + _, err = m0.RegisterCallback( + func(_ context.Context, o api.Observer) error { + o.ObserveFloat64(m0Gauge, 2) + // Observe an instrument from a different MeterProvider. + o.ObserveInt64(m1Gauge, 1) + + return nil + }, + m0Gauge, m1Gauge, + ) + assert.Error( + t, + err, + "Instrument registered with Meter from different MeterProvider", + ) + + var data metricdata.ResourceMetrics + _ = rdr0.Collect(context.Background(), &data) + // Only the metrics from mp0 should be produced. + assert.Len(t, data.ScopeMetrics, 1) + + err = rdr1.Collect(context.Background(), &data) + assert.NoError(t, err, "Errored when collect should be a noop") + assert.Len( + t, data.ScopeMetrics, 0, + "Metrics produced for instrument collected by different MeterProvider", + ) +} diff --git a/sdk/resource/host_id_readfile.go b/sdk/resource/host_id_readfile.go index f92c6dad0f9..721e3ca6e7d 100644 --- a/sdk/resource/host_id_readfile.go +++ b/sdk/resource/host_id_readfile.go @@ -21,7 +21,7 @@ import "os" func readFile(filename string) (string, error) { b, err := os.ReadFile(filename) if err != nil { - return "", nil + return "", err } return string(b), nil diff --git a/sdk/resource/host_id_readfile_test.go b/sdk/resource/host_id_readfile_test.go new file mode 100644 index 00000000000..f071a05cc0c --- /dev/null +++ b/sdk/resource/host_id_readfile_test.go @@ -0,0 +1,53 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux || dragonfly || freebsd || netbsd || openbsd || solaris + +package resource + +import ( + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestReadFileExistent(t *testing.T) { + fileContents := "foo" + + f, err := os.CreateTemp("", "readfile_") + require.NoError(t, err) + + defer os.Remove(f.Name()) + + _, err = f.WriteString(fileContents) + require.NoError(t, err) + require.NoError(t, f.Close()) + + result, err := readFile(f.Name()) + require.NoError(t, err) + require.Equal(t, result, fileContents) +} + +func TestReadFileNonExistent(t *testing.T) { + // create unique filename + f, err := os.CreateTemp("", "readfile_") + require.NoError(t, err) + + // make file non-existent + require.NoError(t, os.Remove(f.Name())) + + _, err = readFile(f.Name()) + require.ErrorIs(t, err, os.ErrNotExist) +}