diff --git a/pkg/bloomcompactor/v2spec.go b/pkg/bloomcompactor/v2spec.go index dbf7ecc179a82..610e9f7f6fbfd 100644 --- a/pkg/bloomcompactor/v2spec.go +++ b/pkg/bloomcompactor/v2spec.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/chunk" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -37,15 +38,13 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck { return v1.Overlap } -// placeholder -type Block struct{} - // Store is likely bound within. This allows specifying impls like ShardedStore // to only request the shard-range needed from the existing store. type BloomGenerator interface { - Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[Block], err error) + Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) } +// Simple implementation of a BloomGenerator. type SimpleBloomGenerator struct { store v1.Iterator[*v1.Series] // TODO(owen-d): blocks need not be all downloaded prior. Consider implementing @@ -59,13 +58,20 @@ type SimpleBloomGenerator struct { logger log.Logger readWriterFn func() (v1.BlockWriter, v1.BlockReader) + + tokenizer *v1.BloomTokenizer } +// SimpleBloomGenerator is a foundational implementation of BloomGenerator. +// It mainly wires up a few different components to generate bloom filters for a set of blocks +// and handles schema compatibility: +// Blocks which are incompatible with the schema are skipped and will have their chunks reindexed func NewSimpleBloomGenerator( opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block, readWriterFn func() (v1.BlockWriter, v1.BlockReader), + metrics *Metrics, logger log.Logger, ) *SimpleBloomGenerator { return &SimpleBloomGenerator{ @@ -74,12 +80,23 @@ func NewSimpleBloomGenerator( blocks: blocks, logger: logger, readWriterFn: readWriterFn, + metrics: metrics, + + tokenizer: v1.NewBloomTokenizer(opts.Schema.NGramLen(), opts.Schema.NGramSkip(), metrics.bloomMetrics), } } func (s *SimpleBloomGenerator) populate(series *v1.Series, bloom *v1.Bloom) error { // TODO(owen-d): impl after threading in store - return nil + var chunkItr v1.Iterator[[]chunk.Chunk] = v1.NewEmptyIter[[]chunk.Chunk](nil) + + return s.tokenizer.PopulateSeriesWithBloom( + &v1.SeriesWithBloom{ + Series: series, + Bloom: bloom, + }, + chunkItr, + ) } func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) { diff --git a/pkg/bloomcompactor/v2spec_test.go b/pkg/bloomcompactor/v2spec_test.go new file mode 100644 index 0000000000000..35ea5df4af47e --- /dev/null +++ b/pkg/bloomcompactor/v2spec_test.go @@ -0,0 +1,122 @@ +package bloomcompactor + +import ( + "bytes" + "context" + "testing" + + "github.com/go-kit/log" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom) { + return blocksFromSchemaWithRange(t, n, options, 0, 0xffff) +} + +// splits 100 series across `n` non-overlapping blocks. +// uses options to build blocks with. +func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom) { + if 100%n != 0 { + panic("100 series must be evenly divisible by n") + } + + numSeries := 100 + numKeysPerSeries := 10000 + data, _ = v1.MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFP, throughFp, 0, 10000) + + seriesPerBlock := 100 / n + + for i := 0; i < n; i++ { + // references for linking in memory reader+writer + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := v1.NewByteReader(indexBuf, bloomsBuf) + + builder, err := v1.NewBlockBuilder( + options, + writer, + ) + require.Nil(t, err) + + itr := v1.NewSliceIter[v1.SeriesWithBloom](data[i*seriesPerBlock : (i+1)*seriesPerBlock]) + _, err = builder.BuildFrom(itr) + require.Nil(t, err) + + res = append(res, v1.NewBlock(reader)) + } + + return res, data +} + +func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator { + return NewSimpleBloomGenerator( + opts, + store, + blocks, + func() (v1.BlockWriter, v1.BlockReader) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf) + }, + NewMetrics(nil, v1.NewMetrics(nil)), + log.NewNopLogger(), + ) +} + +func TestSimpleBloomGenerator(t *testing.T) { + for _, tc := range []struct { + desc string + fromSchema, toSchema v1.BlockOptions + sourceBlocks, numSkipped int + }{ + { + desc: "SkipsIncompatibleSchemas", + fromSchema: v1.NewBlockOptions(3, 0), + toSchema: v1.NewBlockOptions(4, 0), + sourceBlocks: 2, + numSkipped: 2, + }, + { + desc: "CombinesBlocks", + fromSchema: v1.NewBlockOptions(4, 0), + toSchema: v1.NewBlockOptions(4, 0), + sourceBlocks: 2, + numSkipped: 0, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + sourceBlocks, data := blocksFromSchema(t, tc.sourceBlocks, tc.fromSchema) + storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series]( + v1.NewSliceIter[v1.SeriesWithBloom](data), + func(swb v1.SeriesWithBloom) *v1.Series { + return swb.Series + }, + ) + + gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks) + skipped, results, err := gen.Generate(context.Background()) + require.Nil(t, err) + require.Equal(t, tc.numSkipped, len(skipped)) + + require.True(t, results.Next()) + block := results.At() + require.False(t, results.Next()) + + refs := v1.PointerSlice[v1.SeriesWithBloom](data) + + v1.EqualIterators[*v1.SeriesWithBloom]( + t, + func(a, b *v1.SeriesWithBloom) { + // TODO(owen-d): better equality check + // once chunk fetching is implemented + require.Equal(t, a.Series, b.Series) + }, + v1.NewSliceIter[*v1.SeriesWithBloom](refs), + block.Querier(), + ) + }) + } +} diff --git a/pkg/storage/bloom/v1/archive_test.go b/pkg/storage/bloom/v1/archive_test.go index e1216899f8eeb..e0c9407210f20 100644 --- a/pkg/storage/bloom/v1/archive_test.go +++ b/pkg/storage/bloom/v1/archive_test.go @@ -17,7 +17,7 @@ func TestArchive(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index d3d1609aa03fe..8afcdba2d02ad 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -62,6 +62,11 @@ func (b *Block) LoadHeaders() error { } +// convenience method +func (b *Block) Querier() *BlockQuerier { + return NewBlockQuerier(b) +} + func (b *Block) Series() *LazySeriesIter { return NewLazySeriesIter(b) } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index da1fca7ca33b8..22e44fdf3d96c 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -11,21 +11,10 @@ import ( "github.com/grafana/loki/pkg/chunkenc" ) -func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) { - for expected.Next() { - require.True(t, actual.Next()) - a, b := expected.At(), actual.At() - test(a, b) - } - require.False(t, actual.Next()) - require.Nil(t, expected.Err()) - require.Nil(t, actual.Err()) -} - func TestBlockBuilderRoundTrip(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 - data, keys := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, keys := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) @@ -115,7 +104,7 @@ func TestMergeBuilder(t *testing.T) { numSeries := 100 numKeysPerSeries := 100 blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks) - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) blockOpts := BlockOptions{ Schema: Schema{ version: DefaultSchemaVersion, @@ -198,7 +187,7 @@ func TestMergeBuilder(t *testing.T) { func TestBlockReset(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 1, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 1, 0xffff, 0, 10000) indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) @@ -250,7 +239,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { numSeries := 100 numKeysPerSeries := 100 minTs, maxTs := model.Time(0), model.Time(10000) - xs, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, minTs, maxTs) + xs, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, minTs, maxTs) var data [][]*SeriesWithBloom diff --git a/pkg/storage/bloom/v1/dedupe_test.go b/pkg/storage/bloom/v1/dedupe_test.go index f30578c13cd1b..443d8e3e3750e 100644 --- a/pkg/storage/bloom/v1/dedupe_test.go +++ b/pkg/storage/bloom/v1/dedupe_test.go @@ -10,7 +10,7 @@ func TestMergeDedupeIter(t *testing.T) { var ( numSeries = 100 numKeysPerSeries = 10000 - data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) dataPtr = PointerSlice(data) queriers = make([]PeekingIterator[*SeriesWithBloom], 4) ) diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index c867431f3d8f7..1b51320e1566b 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -20,7 +20,7 @@ func TestFusedQuerier(t *testing.T) { reader := NewByteReader(indexBuf, bloomsBuf) numSeries := 100 numKeysPerSeries := 10000 - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ @@ -112,7 +112,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou reader := NewByteReader(indexBuf, bloomsBuf) numSeries := 10000 numKeysPerSeries := 100 - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 6d74eea64bb22..10c1e41fd1139 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -21,6 +21,14 @@ func (s Schema) Compatible(other Schema) bool { return s == other } +func (s Schema) NGramLen() int { + return int(s.nGramLength) +} + +func (s Schema) NGramSkip() int { + return int(s.nGramSkip) +} + // byte length func (s Schema) Len() int { // magic number + version + encoding + ngram length + ngram skip @@ -78,10 +86,6 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { return dec.Err() } -func (s Schema) NGramLen() int { - return int(s.nGramLength) -} - // Block index is a set of series pages along with // the headers for each page type BlockIndex struct { diff --git a/pkg/storage/bloom/v1/merge_test.go b/pkg/storage/bloom/v1/merge_test.go index 8f2bf02f4c2de..1b91366c2cb6d 100644 --- a/pkg/storage/bloom/v1/merge_test.go +++ b/pkg/storage/bloom/v1/merge_test.go @@ -12,7 +12,7 @@ func TestMergeBlockQuerier_NonOverlapping(t *testing.T) { numKeysPerSeries = 10000 numQueriers = 4 queriers []PeekingIterator[*SeriesWithBloom] - data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) ) for i := 0; i < numQueriers; i++ { var ptrs []*SeriesWithBloom @@ -39,7 +39,7 @@ func TestMergeBlockQuerier_Duplicate(t *testing.T) { numKeysPerSeries = 10000 numQueriers = 2 queriers []PeekingIterator[*SeriesWithBloom] - data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) ) for i := 0; i < numQueriers; i++ { queriers = append( @@ -69,7 +69,7 @@ func TestMergeBlockQuerier_Overlapping(t *testing.T) { numKeysPerSeries = 10000 numQueriers = 4 queriers []PeekingIterator[*SeriesWithBloom] - data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) + data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000) slices = make([][]*SeriesWithBloom, numQueriers) ) for i := 0; i < numSeries; i++ { diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index c79fcf70f76a9..fb9c8a0e4a389 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -13,6 +13,8 @@ import ( "github.com/grafana/loki/pkg/storage/bloom/v1/filter" ) +// TODO(owen-d): this should probably be in it's own testing-util package + func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (*BlockQuerier, []SeriesWithBloom) { // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) @@ -21,7 +23,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, reader := NewByteReader(indexBuf, bloomsBuf) numSeries := int(throughFp - fromFp) numKeysPerSeries := 1000 - data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs) + data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs) builder, err := NewBlockBuilder( BlockOptions{ @@ -44,7 +46,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, return NewBlockQuerier(block), data } -func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { +func MkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) { seriesList = make([]SeriesWithBloom, 0, nSeries) keysList = make([][][]byte, 0, nSeries) @@ -85,3 +87,14 @@ func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model } return } + +func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) { + for expected.Next() { + require.True(t, actual.Next()) + a, b := expected.At(), actual.At() + test(a, b) + } + require.False(t, actual.Next()) + require.Nil(t, expected.Err()) + require.Nil(t, actual.Err()) +}