Skip to content

Commit

Permalink
chore: Make dedicated iterator package (#13273)
Browse files Browse the repository at this point in the history
Iterators of various types are widely used throughout the Loki code base. With the recent code additions of the bloom filters, a new set of utility functions for iterators emerged in `github.com/grafana/loki/v3/pkg/storage/bloom/v1`. The package defines interfaces for various common types, but also provides implementations to create and compose new iterators that implement these interfaces. This new package uses Go Generics.

- However, at the current state, there are multiple iterator interfaces and implementations, which adds cognitive overhead in understanding how they work and which ones should be used. The idea is to unify them into a single iterator lib for Loki.
- As a first step towards a single iterator library for the Loki code base, this PR moves the utilities from the `pkg/storage/bloom/v1` package to the `pkg/iter/v2` package.
- Second, it changes the existing `EntryIterator` and `SampleIterator` iterators to "inherit" `v2.CloseIterator` in order to expose the same function names.
- And lastly, naming conventions of iterator interfaces and structs are unified.

The basic iterator interface (defined in `pkg/iter/v2/interface.go`) looks like so:

```go
// Usage:
//
//	for it.Next() {
//	    curr := it.At()
//	    // do something
//	}
//	if it.Err() != nil {
//	    // do something
//	}
type Iterator[T any] interface {
	Next() bool
	Err() error
	At() T
}
```

---

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jul 3, 2024
1 parent 3ac2317 commit d8cc1ce
Show file tree
Hide file tree
Showing 92 changed files with 1,102 additions and 1,160 deletions.
39 changes: 20 additions & 19 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
logql_log "github.com/grafana/loki/v3/pkg/logql/log"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -168,9 +169,9 @@ func newBatchedBlockLoader(
}

// compiler checks
var _ v1.Iterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ iter.Iterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ iter.CloseIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}
var _ iter.ResetIterator[*v1.SeriesWithBlooms] = &blockLoadingIter{}

// TODO(chaudum): testware
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {
Expand All @@ -189,14 +190,14 @@ type blockLoadingIter struct {
ctx context.Context
fetcher Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier]
inputs []bloomshipper.BlockRef
overlapping v1.Iterator[[]bloomshipper.BlockRef]
overlapping iter.Iterator[[]bloomshipper.BlockRef]
batchSize int
// optional arguments
filter func(*bloomshipper.CloseableBlockQuerier) bool
// internals
initialized bool
err error
iter v1.Iterator[*v1.SeriesWithBlooms]
iter iter.Iterator[*v1.SeriesWithBlooms]
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
loaded map[io.Closer]struct{}
}
Expand Down Expand Up @@ -229,7 +230,7 @@ func (i *blockLoadingIter) init() {
i.overlapping = overlappingBlocksIter(i.inputs)

// set initial iter
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.iter = iter.NewEmptyIter[*v1.SeriesWithBlooms]()

// set "match all" filter function if not present
if i.filter == nil {
Expand All @@ -247,24 +248,24 @@ func (i *blockLoadingIter) loadNext() bool {
blockRefs := i.overlapping.At()

loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)
filtered := iter.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
iters := make([]iter.PeekIterator[*v1.SeriesWithBlooms], 0, len(blockRefs))
for filtered.Next() {
bq := filtered.At()
i.loaded[bq] = struct{}{}
iter, err := bq.SeriesIter()
itr, err := bq.SeriesIter()
if err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.iter = iter.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}
iters = append(iters, iter)
iters = append(iters, itr)
}

if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.iter = iter.NewEmptyIter[*v1.SeriesWithBlooms]()
return false
}

Expand All @@ -278,23 +279,23 @@ func (i *blockLoadingIter) loadNext() bool {
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = v1.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
i.iter = iter.NewDedupingIter[*v1.SeriesWithBlooms, *v1.SeriesWithBlooms](
func(a, b *v1.SeriesWithBlooms) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
v1.Identity[*v1.SeriesWithBlooms],
iter.Identity[*v1.SeriesWithBlooms],
func(a, b *v1.SeriesWithBlooms) *v1.SeriesWithBlooms {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
return b
},
v1.NewPeekingIter(mergedBlocks),
iter.NewPeekIter(mergedBlocks),
)
return i.iter.Next()
}

i.iter = v1.NewEmptyIter[*v1.SeriesWithBlooms]()
i.iter = iter.NewEmptyIter[*v1.SeriesWithBlooms]()
i.err = i.overlapping.Err()
return false
}
Expand Down Expand Up @@ -335,11 +336,11 @@ func (i *blockLoadingIter) Filter(filter func(*bloomshipper.CloseableBlockQuerie
i.filter = filter
}

func overlappingBlocksIter(inputs []bloomshipper.BlockRef) v1.Iterator[[]bloomshipper.BlockRef] {
func overlappingBlocksIter(inputs []bloomshipper.BlockRef) iter.Iterator[[]bloomshipper.BlockRef] {
// can we assume sorted blocks?
peekIter := v1.NewPeekingIter(v1.NewSliceIter(inputs))
peekIter := iter.NewPeekIter(iter.NewSliceIter(inputs))

return v1.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
return iter.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/builder/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestBatchedLoader(t *testing.T) {
tc.batchSize,
)

got, err := v1.Collect[int](loader)
got, err := v2.Collect[int](loader)
if tc.err {
require.Error(t, err)
return
Expand Down
5 changes: 3 additions & 2 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand Down Expand Up @@ -334,7 +335,7 @@ func (b *Builder) processTask(
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := v1.NewCounterIter[*v1.Series](seriesItr)
seriesItrWithCounter := iter.NewCounterIter[*v1.Series](seriesItr)

gen := NewSimpleBloomGenerator(
tenant,
Expand Down Expand Up @@ -429,7 +430,7 @@ func (b *Builder) loadWorkForGap(
tenant string,
id tsdb.Identifier,
gap protos.GapWithBlocks,
) (v1.Iterator[*v1.Series], v1.CloseableResettableIterator[*v1.SeriesWithBlooms], error) {
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
// load a series iterator for the gap
seriesItr, err := b.tsdbStore.LoadTSDB(ctx, table, tenant, id, gap.Bounds)
if err != nil {
Expand Down
25 changes: 13 additions & 12 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"

iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logproto"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk"
Expand All @@ -36,15 +37,15 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results v1.Iterator[*v1.Block], err error)
Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, toClose []io.Closer, results iter.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
userID string
store v1.Iterator[*v1.Series]
store iter.Iterator[*v1.Series]
chunkLoader ChunkLoader
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms]
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms]

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -65,9 +66,9 @@ type SimpleBloomGenerator struct {
func NewSimpleBloomGenerator(
userID string,
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
store iter.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter v1.ResettableIterator[*v1.SeriesWithBlooms],
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *v1.Metrics,
Expand Down Expand Up @@ -100,7 +101,7 @@ func NewSimpleBloomGenerator(
func (s *SimpleBloomGenerator) populator(ctx context.Context) v1.BloomPopulatorFunc {
return func(
series *v1.Series,
srcBlooms v1.SizedIterator[*v1.Bloom],
srcBlooms iter.SizedIterator[*v1.Bloom],
toAdd v1.ChunkRefs,
ch chan *v1.BloomCreation,
) {
Expand All @@ -126,7 +127,7 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) v1.BloomPopulatorF
func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIterator {
level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "schema", fmt.Sprintf("%+v", s.opts.Schema))

series := v1.NewPeekingIter(s.store)
series := iter.NewPeekIter(s.store)

// TODO: Use interface
impl, ok := s.blocksIter.(*blockLoadingIter)
Expand Down Expand Up @@ -166,8 +167,8 @@ type LazyBlockBuilderIterator struct {
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks v1.ResettableIterator[*v1.SeriesWithBlooms]
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -180,8 +181,8 @@ func NewLazyBlockBuilderIterator(
metrics *v1.Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks v1.ResettableIterator[*v1.SeriesWithBlooms],
series iter.PeekIterator[*v1.Series],
blocks iter.ResetIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
Expand Down Expand Up @@ -250,7 +251,7 @@ type indexLoader interface {
// ChunkItersByFingerprint models the chunks belonging to a fingerprint
type ChunkItersByFingerprint struct {
fp model.Fingerprint
itr v1.Iterator[v1.ChunkRefWithIter]
itr iter.Iterator[v1.ChunkRefWithIter]
}

// ChunkLoader loads chunks from a store
Expand Down
11 changes: 6 additions & 5 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/chunkenc"
v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util/mempool"
Expand Down Expand Up @@ -47,7 +48,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro

minIdx, maxIdx := i*seriesPerBlock, (i+1)*seriesPerBlock

itr := v1.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
itr := v2.NewSliceIter[v1.SeriesWithBlooms](data[minIdx:maxIdx])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

Expand All @@ -66,11 +67,11 @@ type dummyChunkLoader struct{}
func (dummyChunkLoader) Load(_ context.Context, _ string, series *v1.Series) *ChunkItersByFingerprint {
return &ChunkItersByFingerprint{
fp: series.Fingerprint,
itr: v1.NewEmptyIter[v1.ChunkRefWithIter](),
itr: v2.NewEmptyIter[v1.ChunkRefWithIter](),
}
}

func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v2.Iterator[*v1.Series], blocks []*v1.Block, refs []bloomshipper.BlockRef) *SimpleBloomGenerator {
bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks))
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
Expand Down Expand Up @@ -133,8 +134,8 @@ func TestSimpleBloomGenerator(t *testing.T) {
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
sourceBlocks, data, refs := blocksFromSchemaWithRange(t, 2, tc.fromSchema, 0x00000, 0x6ffff)
storeItr := v1.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBlooms](data),
storeItr := v2.NewMapIter[v1.SeriesWithBlooms, *v1.Series](
v2.NewSliceIter[v1.SeriesWithBlooms](data),
func(swb v1.SeriesWithBlooms) *v1.Series {
return swb.Series
},
Expand Down
13 changes: 7 additions & 6 deletions pkg/bloombuild/common/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/chunkenc"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
baseStore "github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
Expand All @@ -38,7 +39,7 @@ type TSDBStore interface {
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.Iterator[*v1.Series], error)
) (iter.Iterator[*v1.Series], error)
}

// BloomTSDBStore is a wrapper around the storage.Client interface which
Expand Down Expand Up @@ -90,7 +91,7 @@ func (b *BloomTSDBStore) LoadTSDB(
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.Iterator[*v1.Series], error) {
) (iter.Iterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
Expand Down Expand Up @@ -126,7 +127,7 @@ func (b *BloomTSDBStore) LoadTSDB(
return NewTSDBSeriesIter(ctx, tenant, idx, bounds)
}

func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (v1.Iterator[*v1.Series], error) {
func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) {
// TODO(salvacorts): Create a pool
series := make([]*v1.Series, 0, 100)

Expand Down Expand Up @@ -163,9 +164,9 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b

select {
case <-ctx.Done():
return v1.NewEmptyIter[*v1.Series](), ctx.Err()
return iter.NewEmptyIter[*v1.Series](), ctx.Err()
default:
return v1.NewCancelableIter[*v1.Series](ctx, v1.NewSliceIter[*v1.Series](series)), nil
return iter.NewCancelableIter[*v1.Series](ctx, iter.NewSliceIter[*v1.Series](series)), nil
}
}

Expand Down Expand Up @@ -251,7 +252,7 @@ func (s *TSDBStores) LoadTSDB(
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.Iterator[*v1.Series], error) {
) (iter.Iterator[*v1.Series], error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/bloombuild/common/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

v2 "github.com/grafana/loki/v3/pkg/iter/v2"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func TestTSDBSeriesIter(t *testing.T) {
},
},
}
srcItr := v1.NewSliceIter(input)
srcItr := v2.NewSliceIter(input)
itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64))
require.NoError(t, err)

Expand Down
Loading

0 comments on commit d8cc1ce

Please sign in to comment.