From f2a9f2f2beaefa7c5e0b22026a2b0ea77e4da7af Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 19 Jul 2023 07:31:11 -0700 Subject: [PATCH] Restrict Meters to only register and collect instruments it created (#4333) * Add acceptance test * Update Meter Register and collect only inst from itself * Add change to changelog * Fix spelling error * Update changelog entry wording * Simplify the partial success code path --- CHANGELOG.md | 1 + sdk/metric/instrument.go | 20 ++++--- sdk/metric/meter.go | 106 ++++++++++++++++-------------------- sdk/metric/provider_test.go | 51 +++++++++++++++++ 4 files changed, 109 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64d34115a39..75d463e4eb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) - Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) +- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) ### Fixed diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index b4d6fa8b35c..83652c6e97f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{} var _ metric.Float64ObservableUpDownCounter = float64Observable{} var _ metric.Float64ObservableGauge = float64Observable{} -func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { +func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable { return float64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -296,9 +296,9 @@ var _ metric.Int64ObservableCounter = int64Observable{} var _ metric.Int64ObservableUpDownCounter = int64Observable{} var _ metric.Int64ObservableGauge = int64Observable{} -func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { +func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable { return int64Observable{ - observable: newObservable(scope, kind, name, desc, u, meas), + observable: newObservable(m, kind, name, desc, u, meas), } } @@ -306,18 +306,20 @@ type observable[N int64 | float64] struct { metric.Observable observablID[N] + meter *meter measures []aggregate.Measure[N] } -func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { +func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] { return &observable[N]{ observablID: observablID[N]{ name: name, description: desc, kind: kind, unit: u, - scope: scope, + scope: m.scope, }, + meter: m, measures: meas, } } @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument") // and nil if it should. An errEmptyAgg error is returned if o is effectively a // no-op because it does not have any aggregators. Also, an error is returned // if scope defines a Meter other than the one o was created by. -func (o *observable[N]) registerable(scope instrumentation.Scope) error { +func (o *observable[N]) registerable(m *meter) error { if len(o.measures) == 0 { return errEmptyAgg } - if scope != o.scope { + if m != o.meter { return fmt.Errorf( "invalid registration: observable %q from Meter %q, registered with Meter %q", o.name, o.scope.Name, - scope.Name, + m.scope.Name, ) } return nil diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 7e1d32be249..f76d5190413 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -42,8 +42,8 @@ type meter struct { scope instrumentation.Scope pipes pipelines - int64IP *int64InstProvider - float64IP *float64InstProvider + int64Resolver resolver[int64] + float64Resolver resolver[float64] } func newMeter(s instrumentation.Scope, p pipelines) *meter { @@ -52,10 +52,10 @@ func newMeter(s instrumentation.Scope, p pipelines) *meter { var viewCache cache[string, streamID] return &meter{ - scope: s, - pipes: p, - int64IP: newInt64InstProvider(s, p, &viewCache), - float64IP: newFloat64InstProvider(s, p, &viewCache), + scope: s, + pipes: p, + int64Resolver: newResolver[int64](p, &viewCache), + float64Resolver: newResolver[float64](p, &viewCache), } } @@ -68,7 +68,8 @@ var _ metric.Meter = (*meter)(nil) func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) { cfg := metric.NewInt64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -82,7 +83,8 @@ func (m *meter) Int64Counter(name string, options ...metric.Int64CounterOption) func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) { cfg := metric.NewInt64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -96,7 +98,8 @@ func (m *meter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCou func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) { cfg := metric.NewInt64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.int64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := int64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -111,7 +114,7 @@ func (m *meter) Int64Histogram(name string, options ...metric.Int64HistogramOpti func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) { cfg := metric.NewInt64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -127,7 +130,7 @@ func (m *meter) Int64ObservableCounter(name string, options ...metric.Int64Obser func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) { cfg := metric.NewInt64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -143,7 +146,7 @@ func (m *meter) Int64ObservableUpDownCounter(name string, options ...metric.Int6 func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) { cfg := metric.NewInt64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := int64ObservProvider{m.int64IP} + p := int64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -158,7 +161,8 @@ func (m *meter) Int64ObservableGauge(name string, options ...metric.Int64Observa func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) { cfg := metric.NewFloat64CounterConfig(options...) const kind = InstrumentKindCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -172,7 +176,8 @@ func (m *meter) Float64Counter(name string, options ...metric.Float64CounterOpti func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) { cfg := metric.NewFloat64UpDownCounterConfig(options...) const kind = InstrumentKindUpDownCounter - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -186,7 +191,8 @@ func (m *meter) Float64UpDownCounter(name string, options ...metric.Float64UpDow func (m *meter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) { cfg := metric.NewFloat64HistogramConfig(options...) const kind = InstrumentKindHistogram - i, err := m.float64IP.lookup(kind, name, cfg.Description(), cfg.Unit()) + p := float64InstProvider{m} + i, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return i, err } @@ -201,7 +207,7 @@ func (m *meter) Float64Histogram(name string, options ...metric.Float64Histogram func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) { cfg := metric.NewFloat64ObservableCounterConfig(options...) const kind = InstrumentKindObservableCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -217,7 +223,7 @@ func (m *meter) Float64ObservableCounter(name string, options ...metric.Float64O func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) { cfg := metric.NewFloat64ObservableUpDownCounterConfig(options...) const kind = InstrumentKindObservableUpDownCounter - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -233,7 +239,7 @@ func (m *meter) Float64ObservableUpDownCounter(name string, options ...metric.Fl func (m *meter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) { cfg := metric.NewFloat64ObservableGaugeConfig(options...) const kind = InstrumentKindObservableGauge - p := float64ObservProvider{m.float64IP} + p := float64ObservProvider{m} inst, err := p.lookup(kind, name, cfg.Description(), cfg.Unit()) if err != nil { return nil, err @@ -301,7 +307,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) switch o := inst.(type) { case int64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -309,7 +315,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } reg.registerInt64(o.observablID) case float64Observable: - if err := o.registerable(m.scope); err != nil { + if err := o.registerable(m); err != nil { if !errors.Is(err, errEmptyAgg) { errs.append(err) } @@ -322,19 +328,15 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable) } } - if err := errs.errorOrNil(); err != nil { - return nil, err - } - + err := errs.errorOrNil() if reg.len() == 0 { - // All insts use drop aggregation. - return noopRegister{}, nil + // All insts use drop aggregation or are invalid. + return noopRegister{}, err } - cback := func(ctx context.Context) error { - return f(ctx, reg) - } - return m.pipes.registerMultiCallback(cback), nil + // Some or all instruments were valid. + cback := func(ctx context.Context) error { return f(ctx, reg) } + return m.pipes.registerMultiCallback(cback), err } type observer struct { @@ -441,17 +443,9 @@ func (noopRegister) Unregister() error { } // int64InstProvider provides int64 OpenTelemetry instruments. -type int64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[int64] -} +type int64InstProvider struct{ *meter } -func newInt64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *int64InstProvider { - return &int64InstProvider{scope: s, pipes: p, resolve: newResolver[int64](p, c)} -} - -func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { +func (p int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[int64], error) { inst := Instrument{ Name: name, Description: desc, @@ -459,27 +453,19 @@ func (p *int64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]a Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.int64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { +func (p int64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*int64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &int64Inst{measures: aggs}, err } // float64InstProvider provides float64 OpenTelemetry instruments. -type float64InstProvider struct { - scope instrumentation.Scope - pipes pipelines - resolve resolver[float64] -} - -func newFloat64InstProvider(s instrumentation.Scope, p pipelines, c *cache[string, streamID]) *float64InstProvider { - return &float64InstProvider{scope: s, pipes: p, resolve: newResolver[float64](p, c)} -} +type float64InstProvider struct{ *meter } -func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { +func (p float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([]aggregate.Measure[float64], error) { inst := Instrument{ Name: name, Description: desc, @@ -487,20 +473,20 @@ func (p *float64InstProvider) aggs(kind InstrumentKind, name, desc, u string) ([ Kind: kind, Scope: p.scope, } - return p.resolve.Aggregators(inst) + return p.float64Resolver.Aggregators(inst) } // lookup returns the resolved instrumentImpl. -func (p *float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { +func (p float64InstProvider) lookup(kind InstrumentKind, name, desc, u string) (*float64Inst, error) { aggs, err := p.aggs(kind, name, desc, u) return &float64Inst{measures: aggs}, err } -type int64ObservProvider struct{ *int64InstProvider } +type int64ObservProvider struct{ *meter } func (p int64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (int64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newInt64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (int64InstProvider)(p).aggs(kind, name, desc, u) + return newInt64Observable(p.meter, kind, name, desc, u, aggs), err } func (p int64ObservProvider) registerCallbacks(inst int64Observable, cBacks []metric.Int64Callback) { @@ -529,11 +515,11 @@ func (o int64Observer) Observe(val int64, opts ...metric.ObserveOption) { o.observe(val, c.Attributes()) } -type float64ObservProvider struct{ *float64InstProvider } +type float64ObservProvider struct{ *meter } func (p float64ObservProvider) lookup(kind InstrumentKind, name, desc, u string) (float64Observable, error) { - aggs, err := p.aggs(kind, name, desc, u) - return newFloat64Observable(p.scope, kind, name, desc, u, aggs), err + aggs, err := (float64InstProvider)(p).aggs(kind, name, desc, u) + return newFloat64Observable(p.meter, kind, name, desc, u, aggs), err } func (p float64ObservProvider) registerCallbacks(inst float64Observable, cBacks []metric.Float64Callback) { diff --git a/sdk/metric/provider_test.go b/sdk/metric/provider_test.go index 5f02b130570..774e026ad87 100644 --- a/sdk/metric/provider_test.go +++ b/sdk/metric/provider_test.go @@ -21,11 +21,14 @@ import ( "testing" "github.com/go-logr/logr/funcr" + "github.com/go-logr/logr/testr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + api "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestMeterConcurrentSafe(t *testing.T) { @@ -120,3 +123,51 @@ func TestMeterProviderReturnsNoopMeterAfterShutdown(t *testing.T) { _, ok = m.(noop.Meter) assert.Truef(t, ok, "Meter from shutdown MeterProvider is not NoOp: %T", m) } + +func TestMeterProviderMixingOnRegisterErrors(t *testing.T) { + otel.SetLogger(testr.New(t)) + + rdr0 := NewManualReader() + mp0 := NewMeterProvider(WithReader(rdr0)) + + rdr1 := NewManualReader() + mp1 := NewMeterProvider(WithReader(rdr1)) + + // Meters with the same scope but different MeterProviders. + m0 := mp0.Meter("TestMeterProviderMixingOnRegisterErrors") + m1 := mp1.Meter("TestMeterProviderMixingOnRegisterErrors") + + m0Gauge, err := m0.Float64ObservableGauge("float64Gauge") + require.NoError(t, err) + + m1Gauge, err := m1.Int64ObservableGauge("int64Gauge") + require.NoError(t, err) + + _, err = m0.RegisterCallback( + func(_ context.Context, o api.Observer) error { + o.ObserveFloat64(m0Gauge, 2) + // Observe an instrument from a different MeterProvider. + o.ObserveInt64(m1Gauge, 1) + + return nil + }, + m0Gauge, m1Gauge, + ) + assert.Error( + t, + err, + "Instrument registered with Meter from different MeterProvider", + ) + + var data metricdata.ResourceMetrics + _ = rdr0.Collect(context.Background(), &data) + // Only the metrics from mp0 should be produced. + assert.Len(t, data.ScopeMetrics, 1) + + err = rdr1.Collect(context.Background(), &data) + assert.NoError(t, err, "Errored when collect should be a noop") + assert.Len( + t, data.ScopeMetrics, 0, + "Metrics produced for instrument collected by different MeterProvider", + ) +}