Skip to content

Commit

Permalink
Merge branch 'main' into simplify_filtered
Browse files Browse the repository at this point in the history
  • Loading branch information
MadVikingGod authored Jul 19, 2023
2 parents 85e2d50 + f2a9f2f commit 645c548
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 230 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)
- Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333)

### Fixed

- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
- Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307)
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)

## [1.16.0/0.39.0] 2023-05-18

Expand Down
20 changes: 11 additions & 9 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
func newFloat64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

Expand All @@ -296,28 +296,30 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
func newInt64Observable(m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, meas),
observable: newObservable(m, kind, name, desc, u, meas),
}
}

type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

meter *meter
measures []aggregate.Measure[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
func newObservable[N int64 | float64](m *meter, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
description: desc,
kind: kind,
unit: u,
scope: scope,
scope: m.scope,
},
meter: m,
measures: meas,
}
}
Expand All @@ -335,16 +337,16 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// and nil if it should. An errEmptyAgg error is returned if o is effectively a
// no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
func (o *observable[N]) registerable(m *meter) error {
if len(o.measures) == 0 {
return errEmptyAgg
}
if scope != o.scope {
if m != o.meter {
return fmt.Errorf(
"invalid registration: observable %q from Meter %q, registered with Meter %q",
o.name,
o.scope.Name,
scope.Name,
m.scope.Name,
)
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {

// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) {
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) {
var h aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg)
h = newDeltaHistogram[N](cfg, noSum)
default:
h = newCumulativeHistogram[N](cfg)
h = newCumulativeHistogram[N](cfg, noSum)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
Expand Down
111 changes: 0 additions & 111 deletions sdk/metric/internal/aggregate/aggregator_example_test.go

This file was deleted.

28 changes: 19 additions & 9 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type buckets[N int64 | float64] struct {
counts []uint64
count uint64
sum N
total N
min, max N
}

Expand All @@ -36,10 +36,11 @@ func newBuckets[N int64 | float64](n int) *buckets[N] {
return &buckets[N]{counts: make([]uint64, n)}
}

func (b *buckets[N]) sum(value N) { b.total += value }

func (b *buckets[N]) bin(idx int, value N) {
b.counts[idx]++
b.count++
b.sum += value
if value < b.min {
b.min = value
} else if value > b.max {
Expand All @@ -50,13 +51,14 @@ func (b *buckets[N]) bin(idx int, value N) {
// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand All @@ -65,6 +67,7 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
copy(b, bounds)
sort.Float64s(b)
return &histValues[N]{
noSum: noSum,
bounds: b,
values: make(map[attribute.Set]*buckets[N]),
}
Expand Down Expand Up @@ -98,6 +101,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
s.values[attr] = b
}
b.bin(idx, value)
if !s.noSum {
b.sum(value)
}
}

// newDeltaHistogram returns an Aggregator that summarizes a set of
Expand All @@ -107,9 +113,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &deltaHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
Expand Down Expand Up @@ -148,7 +154,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
Count: b.count,
Bounds: bounds,
BucketCounts: b.counts,
Sum: b.sum,
}
if !s.noSum {
hdp.Sum = b.total
}
if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
Expand All @@ -170,9 +178,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
// Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
Expand Down Expand Up @@ -219,7 +227,9 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
Count: b.count,
Bounds: bounds,
BucketCounts: counts,
Sum: b.sum,
}
if !s.noSum {
hdp.Sum = b.total
}
if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
Expand Down
Loading

0 comments on commit 645c548

Please sign in to comment.