Skip to content

Commit

Permalink
Make accumulator generic
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Jan 23, 2024
1 parent 37bc19f commit 1b7b8be
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,32 @@ func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, t
return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...)
}

type resultAccumulator struct {
type resultAccumulator[V, R any] struct {
mtx sync.Mutex
items []interface{}
merge func(xs []interface{}) (interface{}, error)
items []V
merge func(xs []V) (R, error)
}

func newResultAccumulator(merge func(xs []interface{}) (interface{}, error)) *resultAccumulator {
return &resultAccumulator{
func newResultAccumulator[V, R any](merge func(xs []V) (R, error)) *resultAccumulator[V, R] {
return &resultAccumulator[V, R]{
merge: merge,
}
}

func (acc *resultAccumulator) Add(item interface{}) {
func (acc *resultAccumulator[V, R]) Add(item V) {
acc.mtx.Lock()
defer acc.mtx.Unlock()
acc.items = append(acc.items, item)

}

func (acc *resultAccumulator) Merge() (interface{}, error) {
func (acc *resultAccumulator[V, R]) Merge() (R, error) {
acc.mtx.Lock()
defer acc.mtx.Unlock()

if len(acc.items) == 0 {
return nil, ErrEmptyAccumulator
var empty R
return empty, ErrEmptyAccumulator
}

return acc.merge(acc.items)
Expand Down
34 changes: 15 additions & 19 deletions pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model
}

func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
acc := newResultAccumulator(func(xs [][]ChunkRef) ([]ChunkRef, error) {
if res == nil {
res = ChunkRefsPool.Get()
}
Expand All @@ -144,16 +144,15 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
// TODO(owen-d): Do this more efficiently,
// not all indices overlap each other
for _, group := range xs {
g := group.([]ChunkRef)
for _, ref := range g {
for _, ref := range group {
_, ok := seen[ref]
if ok {
continue
}
seen[ref] = struct{}{}
res = append(res, ref)
}
ChunkRefsPool.Put(g)
ChunkRefsPool.Put(group)

}

Expand Down Expand Up @@ -183,21 +182,20 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro
}
return nil, err
}
return merged.([]ChunkRef), nil
return merged, nil

}

func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
acc := newResultAccumulator(func(xs [][]Series) ([]Series, error) {
if res == nil {
res = SeriesPool.Get()
}
res = res[:0]

seen := make(map[model.Fingerprint]struct{})

for _, x := range xs {
seriesSet := x.([]Series)
for _, seriesSet := range xs {
for _, s := range seriesSet {
_, ok := seen[s.Fingerprint]
if ok {
Expand Down Expand Up @@ -235,19 +233,18 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo
}
return nil, err
}
return merged.([]Series), nil
return merged, nil
}

func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
acc := newResultAccumulator(func(xs [][]string) ([]string, error) {
var (
maxLn int // maximum number of lNames, assuming no duplicates
lists [][]string
)
for _, group := range xs {
x := group.([]string)
maxLn += len(x)
lists = append(lists, x)
maxLn += len(group)
lists = append(lists, group)
}

// optimistically allocate the maximum length slice
Expand Down Expand Up @@ -293,19 +290,18 @@ func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, throug
}
return nil, err
}
return merged.([]string), nil
return merged, nil
}

func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) {
acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) {
acc := newResultAccumulator(func(xs [][]string) ([]string, error) {
var (
maxLn int // maximum number of lValues, assuming no duplicates
lists [][]string
)
for _, group := range xs {
x := group.([]string)
maxLn += len(x)
lists = append(lists, x)
maxLn += len(group)
lists = append(lists, group)
}

// optimistically allocate the maximum length slice
Expand Down Expand Up @@ -351,7 +347,7 @@ func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, throu
}
return nil, err
}
return merged.([]string), nil
return merged, nil
}

func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
Expand Down

0 comments on commit 1b7b8be

Please sign in to comment.