Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel: conceal unwrapping for global async instrument registration #5881

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Fix timer channel drain to avoid hanging on Go 1.23. (#5868)
- Fix delegation for global meter providers. (#5827)
Change the `reflect.TypeOf` to use a nil pointer to not allocate on the heap unless necessary.
- Global MeterProvider registration correctly unwraps global instrument stubs. (#5881)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
175 changes: 175 additions & 0 deletions internal/global/alt_sdk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package global // import "go.opentelemetry.io/otel/internal/global"

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/embedded"
)

type altMeterProvider struct {
t *testing.T
meters []*altMeter
embedded.MeterProvider
}

var _ metric.MeterProvider = &altMeterProvider{}

func (amp *altMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter {
am := &altMeter{
provider: amp,
}
amp.meters = append(amp.meters, am)
return am
}

type altMeter struct {
provider *altMeterProvider
cbs []metric.Callback
embedded.Meter
}

var _ metric.Meter = &altMeter{}

type testAsyncCounter struct {
meter *altMeter
embedded.Int64ObservableCounter
metric.Int64Observable
}

var _ metric.Int64ObservableCounter = &testAsyncCounter{}

type altRegistration struct {
cb metric.Callback
embedded.Registration
}

type altObserver struct {
t *testing.T
embedded.Observer
}

func (*altRegistration) Unregister() error {
return nil
}

func (am *altMeter) Int64Counter(name string, options ...metric.Int64CounterOption) (metric.Int64Counter, error) {
return nil, nil
}

func (am *altMeter) Int64UpDownCounter(name string, options ...metric.Int64UpDownCounterOption) (metric.Int64UpDownCounter, error) {
return nil, nil
}

func (am *altMeter) Int64Histogram(name string, options ...metric.Int64HistogramOption) (metric.Int64Histogram, error) {
return nil, nil
}

func (am *altMeter) Int64Gauge(name string, options ...metric.Int64GaugeOption) (metric.Int64Gauge, error) {
return nil, nil
}

func (am *altMeter) Int64ObservableCounter(name string, options ...metric.Int64ObservableCounterOption) (metric.Int64ObservableCounter, error) {
return &testAsyncCounter{
meter: am,
}, nil
}

func (am *altMeter) Int64ObservableUpDownCounter(name string, options ...metric.Int64ObservableUpDownCounterOption) (metric.Int64ObservableUpDownCounter, error) {
return nil, nil
}

func (am *altMeter) Int64ObservableGauge(name string, options ...metric.Int64ObservableGaugeOption) (metric.Int64ObservableGauge, error) {
return nil, nil
}

func (am *altMeter) Float64Counter(name string, options ...metric.Float64CounterOption) (metric.Float64Counter, error) {
return nil, nil
}

func (am *altMeter) Float64UpDownCounter(name string, options ...metric.Float64UpDownCounterOption) (metric.Float64UpDownCounter, error) {
return nil, nil
}

func (am *altMeter) Float64Histogram(name string, options ...metric.Float64HistogramOption) (metric.Float64Histogram, error) {
return nil, nil
}

func (am *altMeter) Float64Gauge(name string, options ...metric.Float64GaugeOption) (metric.Float64Gauge, error) {
return nil, nil
}

func (am *altMeter) Float64ObservableCounter(name string, options ...metric.Float64ObservableCounterOption) (metric.Float64ObservableCounter, error) {
return nil, nil
}

func (am *altMeter) Float64ObservableUpDownCounter(name string, options ...metric.Float64ObservableUpDownCounterOption) (metric.Float64ObservableUpDownCounter, error) {
return nil, nil
}

func (am *altMeter) Float64ObservableGauge(name string, options ...metric.Float64ObservableGaugeOption) (metric.Float64ObservableGauge, error) {
// Note: The global delegation also breaks when we return nil in one of these!
return nil, nil
}

func (am *altMeter) RegisterCallback(f metric.Callback, instruments ...metric.Observable) (metric.Registration, error) {
for _, inst := range instruments {
switch inst.(type) {
case *testAsyncCounter:
// OK!
default:
am.provider.t.Errorf("unexpected type %T", inst)
}
}
am.cbs = append(am.cbs, f)
return &altRegistration{cb: f}, nil
}

func (ao *altObserver) ObserveFloat64(inst metric.Float64Observable, _ float64, _ ...metric.ObserveOption) {
ao.observe(inst)
}

func (ao *altObserver) ObserveInt64(inst metric.Int64Observable, _ int64, _ ...metric.ObserveOption) {
ao.observe(inst)
}

func (ao *altObserver) observe(inst any) {
switch inst.(type) {
case *testAsyncCounter:
// OK!
default:
ao.t.Errorf("unexpected type %T", inst)
}
}

func TestMeterDelegation(t *testing.T) {
ResetForTest(t)

amp := &altMeterProvider{t: t}

gm := MeterProvider().Meter("test")
ai, err := gm.Int64ObservableCounter("test_counter")
require.NoError(t, err)

_, err = gm.RegisterCallback(func(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(ai, 10)
return nil
}, ai)
require.NoError(t, err)

SetMeterProvider(amp)

ctx := context.Background()
ao := &altObserver{t: t}
for _, meter := range amp.meters {
for _, cb := range meter.cbs {
require.NoError(t, cb(ctx, ao))
}
}
}
14 changes: 7 additions & 7 deletions internal/global/instruments.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// unwrapper unwraps to return the underlying instrument implementation.
type unwrapper interface {
Unwrap() metric.Observable
unwrap() metric.Observable
}

type afCounter struct {
Expand All @@ -40,7 +40,7 @@ func (i *afCounter) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *afCounter) Unwrap() metric.Observable {
func (i *afCounter) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Float64ObservableCounter)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func (i *afUpDownCounter) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *afUpDownCounter) Unwrap() metric.Observable {
func (i *afUpDownCounter) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Float64ObservableUpDownCounter)
}
Expand Down Expand Up @@ -102,7 +102,7 @@ func (i *afGauge) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *afGauge) Unwrap() metric.Observable {
func (i *afGauge) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Float64ObservableGauge)
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (i *aiCounter) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *aiCounter) Unwrap() metric.Observable {
func (i *aiCounter) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Int64ObservableCounter)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (i *aiUpDownCounter) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *aiUpDownCounter) Unwrap() metric.Observable {
func (i *aiUpDownCounter) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Int64ObservableUpDownCounter)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (i *aiGauge) setDelegate(m metric.Meter) {
i.delegate.Store(ctr)
}

func (i *aiGauge) Unwrap() metric.Observable {
func (i *aiGauge) unwrap() metric.Observable {
if ctr := i.delegate.Load(); ctr != nil {
return ctr.(metric.Int64ObservableGauge)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/global/instruments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,19 @@ func TestAsyncInstrumentSetDelegateConcurrentSafe(t *testing.T) {
t.Run("Float64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &afCounter{}
f := func(float64) { _ = delegate.Unwrap() }
f := func(float64) { _ = delegate.unwrap() }
testFloat64ConcurrentSafe(f, delegate.setDelegate)
})

t.Run("UpDownCounter", func(t *testing.T) {
delegate := &afUpDownCounter{}
f := func(float64) { _ = delegate.Unwrap() }
f := func(float64) { _ = delegate.unwrap() }
testFloat64ConcurrentSafe(f, delegate.setDelegate)
})

t.Run("Gauge", func(t *testing.T) {
delegate := &afGauge{}
f := func(float64) { _ = delegate.Unwrap() }
f := func(float64) { _ = delegate.unwrap() }
testFloat64ConcurrentSafe(f, delegate.setDelegate)
})
})
Expand All @@ -79,19 +79,19 @@ func TestAsyncInstrumentSetDelegateConcurrentSafe(t *testing.T) {
t.Run("Int64", func(t *testing.T) {
t.Run("Counter", func(t *testing.T) {
delegate := &aiCounter{}
f := func(int64) { _ = delegate.Unwrap() }
f := func(int64) { _ = delegate.unwrap() }
testInt64ConcurrentSafe(f, delegate.setDelegate)
})

t.Run("UpDownCounter", func(t *testing.T) {
delegate := &aiUpDownCounter{}
f := func(int64) { _ = delegate.Unwrap() }
f := func(int64) { _ = delegate.unwrap() }
testInt64ConcurrentSafe(f, delegate.setDelegate)
})

t.Run("Gauge", func(t *testing.T) {
delegate := &aiGauge{}
f := func(int64) { _ = delegate.Unwrap() }
f := func(int64) { _ = delegate.unwrap() }
testInt64ConcurrentSafe(f, delegate.setDelegate)
})
})
Expand Down
40 changes: 30 additions & 10 deletions internal/global/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package global // import "go.opentelemetry.io/otel/internal/global"

import (
"container/list"
"context"
"reflect"
"sync"

Expand Down Expand Up @@ -472,8 +473,7 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
defer m.mtx.Unlock()

if m.delegate != nil {
insts = unwrapInstruments(insts)
return m.delegate.RegisterCallback(f, insts...)
return m.delegate.RegisterCallback(unwrapCallback(f), unwrapInstruments(insts)...)
}

reg := &registration{instruments: insts, function: f}
Expand All @@ -487,15 +487,11 @@ func (m *meter) RegisterCallback(f metric.Callback, insts ...metric.Observable)
return reg, nil
}

type wrapped interface {
unwrap() metric.Observable
}

func unwrapInstruments(instruments []metric.Observable) []metric.Observable {
out := make([]metric.Observable, 0, len(instruments))

for _, inst := range instruments {
if in, ok := inst.(wrapped); ok {
if in, ok := inst.(unwrapper); ok {
out = append(out, in.unwrap())
} else {
out = append(out, inst)
Expand All @@ -515,9 +511,33 @@ type registration struct {
unregMu sync.Mutex
}

func (c *registration) setDelegate(m metric.Meter) {
insts := unwrapInstruments(c.instruments)
type unwrapObs struct {
embedded.Observer
obs metric.Observer
}

func (uo *unwrapObs) ObserveFloat64(inst metric.Float64Observable, value float64, opts ...metric.ObserveOption) {
if un, ok := inst.(unwrapper); ok {
inst = un.unwrap().(metric.Float64Observable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will panic if this is an invalid type conversion, whereas before it would be logged as an error.

Can we log this as an error? Or, better yet, include a specific unwrappableFloat64Observable type?

type unwrapFloat64Observabler interface {
	unwrapFloat64Observable() metric.Float64Observable
}

and similar for Int64Observable?

}

uo.obs.ObserveFloat64(inst, value, opts...)
}

func (uo *unwrapObs) ObserveInt64(inst metric.Int64Observable, value int64, opts ...metric.ObserveOption) {
if un, ok := inst.(unwrapper); ok {
inst = un.unwrap().(metric.Int64Observable)
}
uo.obs.ObserveInt64(inst, value, opts...)
}

func unwrapCallback(f metric.Callback) metric.Callback {
return func(ctx context.Context, obs metric.Observer) error {
return f(ctx, &unwrapObs{obs: obs})
}
}

func (c *registration) setDelegate(m metric.Meter) {
c.unregMu.Lock()
defer c.unregMu.Unlock()

Expand All @@ -526,7 +546,7 @@ func (c *registration) setDelegate(m metric.Meter) {
return
}

reg, err := m.RegisterCallback(c.function, insts...)
reg, err := m.RegisterCallback(unwrapCallback(c.function), unwrapInstruments(c.instruments)...)
if err != nil {
GetErrorHandler().Handle(err)
return
Expand Down
Loading
Loading