From 1b7b8be0c960ae95d49ad94fe8897677c1dfeb2f Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 23 Jan 2024 17:45:05 +0100 Subject: [PATCH] Make accumulator generic --- .../tsdb/index_shipper_querier.go | 17 +++++----- .../indexshipper/tsdb/multi_file_index.go | 34 ++++++++----------- 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go index c07add72b671..5cb8338fc916 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go @@ -134,31 +134,32 @@ func (i *indexShipperQuerier) Volume(ctx context.Context, userID string, from, t return idx.Volume(ctx, userID, from, through, acc, shard, shouldIncludeChunk, targetLabels, aggregateBy, matchers...) } -type resultAccumulator struct { +type resultAccumulator[V, R any] struct { mtx sync.Mutex - items []interface{} - merge func(xs []interface{}) (interface{}, error) + items []V + merge func(xs []V) (R, error) } -func newResultAccumulator(merge func(xs []interface{}) (interface{}, error)) *resultAccumulator { - return &resultAccumulator{ +func newResultAccumulator[V, R any](merge func(xs []V) (R, error)) *resultAccumulator[V, R] { + return &resultAccumulator[V, R]{ merge: merge, } } -func (acc *resultAccumulator) Add(item interface{}) { +func (acc *resultAccumulator[V, R]) Add(item V) { acc.mtx.Lock() defer acc.mtx.Unlock() acc.items = append(acc.items, item) } -func (acc *resultAccumulator) Merge() (interface{}, error) { +func (acc *resultAccumulator[V, R]) Merge() (R, error) { acc.mtx.Lock() defer acc.mtx.Unlock() if len(acc.items) == 0 { - return nil, ErrEmptyAccumulator + var empty R + return empty, ErrEmptyAccumulator } return acc.merge(acc.items) diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go index 01935a842d53..2f51fbad3369 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/multi_file_index.go @@ -132,7 +132,7 @@ func (i *MultiIndex) forMatchingIndices(ctx context.Context, from, through model } func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) { - acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { + acc := newResultAccumulator(func(xs [][]ChunkRef) ([]ChunkRef, error) { if res == nil { res = ChunkRefsPool.Get() } @@ -144,8 +144,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro // TODO(owen-d): Do this more efficiently, // not all indices overlap each other for _, group := range xs { - g := group.([]ChunkRef) - for _, ref := range g { + for _, ref := range group { _, ok := seen[ref] if ok { continue @@ -153,7 +152,7 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro seen[ref] = struct{}{} res = append(res, ref) } - ChunkRefsPool.Put(g) + ChunkRefsPool.Put(group) } @@ -183,12 +182,12 @@ func (i *MultiIndex) GetChunkRefs(ctx context.Context, userID string, from, thro } return nil, err } - return merged.([]ChunkRef), nil + return merged, nil } func (i *MultiIndex) Series(ctx context.Context, userID string, from, through model.Time, res []Series, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]Series, error) { - acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { + acc := newResultAccumulator(func(xs [][]Series) ([]Series, error) { if res == nil { res = SeriesPool.Get() } @@ -196,8 +195,7 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo seen := make(map[model.Fingerprint]struct{}) - for _, x := range xs { - seriesSet := x.([]Series) + for _, seriesSet := range xs { for _, s := range seriesSet { _, ok := seen[s.Fingerprint] if ok { @@ -235,19 +233,18 @@ func (i *MultiIndex) Series(ctx context.Context, userID string, from, through mo } return nil, err } - return merged.([]Series), nil + return merged, nil } func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) { - acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { + acc := newResultAccumulator(func(xs [][]string) ([]string, error) { var ( maxLn int // maximum number of lNames, assuming no duplicates lists [][]string ) for _, group := range xs { - x := group.([]string) - maxLn += len(x) - lists = append(lists, x) + maxLn += len(group) + lists = append(lists, group) } // optimistically allocate the maximum length slice @@ -293,19 +290,18 @@ func (i *MultiIndex) LabelNames(ctx context.Context, userID string, from, throug } return nil, err } - return merged.([]string), nil + return merged, nil } func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, through model.Time, name string, matchers ...*labels.Matcher) ([]string, error) { - acc := newResultAccumulator(func(xs []interface{}) (interface{}, error) { + acc := newResultAccumulator(func(xs [][]string) ([]string, error) { var ( maxLn int // maximum number of lValues, assuming no duplicates lists [][]string ) for _, group := range xs { - x := group.([]string) - maxLn += len(x) - lists = append(lists, x) + maxLn += len(group) + lists = append(lists, group) } // optimistically allocate the maximum length slice @@ -351,7 +347,7 @@ func (i *MultiIndex) LabelValues(ctx context.Context, userID string, from, throu } return nil, err } - return merged.([]string), nil + return merged, nil } func (i *MultiIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {