Skip to content

Commit

Permalink
Revert to having a valueMap
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Jul 24, 2023
1 parent c755236 commit 6b3a4f7
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions sdk/metric/internal/aggregate/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,37 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
values map[attribute.Set]N
}

func newValueMap[N int64 | float64]() *valueMap[N] {
return &valueMap[N]{values: make(map[attribute.Set]N)}
}

func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
s.Unlock()
}

// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64]() *sum[N] {
return &sum[N]{
values: make(map[attribute.Set]N),
start: now(),
valueMap: newValueMap[N](),
start: now(),
}
}

// sum summarizes a set of measurements made as their arithmetic sum.
type sum[N int64 | float64] struct {
sync.Mutex
*valueMap[N]

values map[attribute.Set]N
start time.Time
}

func (s *sum[N]) measure(ctx context.Context, value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
s.Unlock()
start time.Time
}

func (s *sum[N]) delta(dest *[]metricdata.DataPoint[N]) {
Expand All @@ -57,11 +66,11 @@ func (s *sum[N]) delta(dest *[]metricdata.DataPoint[N]) {
*dest = reset(*dest, n, n)

var i int
for a, val := range s.values {
for a, value := range s.values {
(*dest)[i].Attributes = a
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = val
(*dest)[i].Value = value
// Do not report stale values.
delete(s.values, a)
i++
Expand All @@ -80,11 +89,11 @@ func (s *sum[N]) cumulative(dest *[]metricdata.DataPoint[N]) {
*dest = reset(*dest, n, n)

var i int
for a, val := range s.values {
(*dest)[i].Attributes = a
for attr, value := range s.values {
(*dest)[i].Attributes = attr
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = val
(*dest)[i].Value = value
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
Expand All @@ -98,26 +107,19 @@ func (s *sum[N]) cumulative(dest *[]metricdata.DataPoint[N]) {
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64]() *precomputedSum[N] {
return &precomputedSum[N]{
values: make(map[attribute.Set]N),
start: now(),
valueMap: newValueMap[N](),
start: now(),
}
}

// precomputedSum summarizes a set of observatrions as their arithmetic sum.
type precomputedSum[N int64 | float64] struct {
sync.Mutex
*valueMap[N]

values map[attribute.Set]N
reported map[attribute.Set]N
start time.Time
}

func (s *precomputedSum[N]) measure(ctx context.Context, value N, attr attribute.Set) {
s.Lock()
s.values[attr] += value
s.Unlock()
}

func (s *precomputedSum[N]) delta(dest *[]metricdata.DataPoint[N]) {
t := now()
newReported := make(map[attribute.Set]N)
Expand All @@ -129,17 +131,17 @@ func (s *precomputedSum[N]) delta(dest *[]metricdata.DataPoint[N]) {
*dest = reset(*dest, n, n)

var i int
for a, val := range s.values {
delta := val - s.reported[a]
for attr, value := range s.values {
delta := value - s.reported[attr]

(*dest)[i].Attributes = a
(*dest)[i].Attributes = attr
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = delta

newReported[a] = val
newReported[attr] = value
// Unused attribute sets do not report.
delete(s.values, a)
delete(s.values, attr)
i++
}
// Unused attribute sets are forgotten.
Expand All @@ -158,14 +160,14 @@ func (s *precomputedSum[N]) cumulative(dest *[]metricdata.DataPoint[N]) {
*dest = reset(*dest, n, n)

var i int
for a, val := range s.values {
(*dest)[i].Attributes = a
for attr, value := range s.values {
(*dest)[i].Attributes = attr
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = val
s.values[a] = val
(*dest)[i].Value = value
s.values[attr] = value
// Unused attribute sets do not report.
delete(s.values, a)
delete(s.values, attr)
i++
}
}

0 comments on commit 6b3a4f7

Please sign in to comment.