Skip to content

Commit

Permalink
refactoring+bloom_gen testware
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Jan 24, 2024
1 parent 05340bd commit 7702ec0
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 33 deletions.
27 changes: 22 additions & 5 deletions pkg/bloomcompactor/v2spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -37,15 +38,13 @@ func (k Keyspace) Cmp(other Keyspace) v1.BoundsCheck {
return v1.Overlap
}

// placeholder
type Block struct{}

// Store is likely bound within. This allows specifying impls like ShardedStore<Store>
// to only request the shard-range needed from the existing store.
type BloomGenerator interface {
Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[Block], err error)
Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error)
}

// Simple implementation of a BloomGenerator.
type SimpleBloomGenerator struct {
store v1.Iterator[*v1.Series]
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
Expand All @@ -59,13 +58,20 @@ type SimpleBloomGenerator struct {
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)

tokenizer *v1.BloomTokenizer
}

// SimpleBloomGenerator is a foundational implementation of BloomGenerator.
// It mainly wires up a few different components to generate bloom filters for a set of blocks
// and handles schema compatibility:
// Blocks which are incompatible with the schema are skipped and will have their chunks reindexed
func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
blocks []*v1.Block,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
) *SimpleBloomGenerator {
return &SimpleBloomGenerator{
Expand All @@ -74,12 +80,23 @@ func NewSimpleBloomGenerator(
blocks: blocks,
logger: logger,
readWriterFn: readWriterFn,
metrics: metrics,

tokenizer: v1.NewBloomTokenizer(opts.Schema.NGramLen(), opts.Schema.NGramSkip(), metrics.bloomMetrics),
}
}

func (s *SimpleBloomGenerator) populate(series *v1.Series, bloom *v1.Bloom) error {
// TODO(owen-d): impl after threading in store
return nil
var chunkItr v1.Iterator[[]chunk.Chunk] = v1.NewEmptyIter[[]chunk.Chunk](nil)

return s.tokenizer.PopulateSeriesWithBloom(
&v1.SeriesWithBloom{
Series: series,
Bloom: bloom,
},
chunkItr,
)
}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []*v1.Block, results v1.Iterator[*v1.Block], err error) {
Expand Down
122 changes: 122 additions & 0 deletions pkg/bloomcompactor/v2spec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package bloomcompactor

import (
"bytes"
"context"
"testing"

"github.com/go-kit/log"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)

func blocksFromSchema(t *testing.T, n int, options v1.BlockOptions) (res []*v1.Block, data []v1.SeriesWithBloom) {
return blocksFromSchemaWithRange(t, n, options, 0, 0xffff)
}

// splits 100 series across `n` non-overlapping blocks.
// uses options to build blocks with.
func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fromFP, throughFp model.Fingerprint) (res []*v1.Block, data []v1.SeriesWithBloom) {
if 100%n != 0 {
panic("100 series must be evenly divisible by n")
}

numSeries := 100
numKeysPerSeries := 10000
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFP, throughFp, 0, 10000)

seriesPerBlock := 100 / n

for i := 0; i < n; i++ {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

builder, err := v1.NewBlockBuilder(
options,
writer,
)
require.Nil(t, err)

itr := v1.NewSliceIter[v1.SeriesWithBloom](data[i*seriesPerBlock : (i+1)*seriesPerBlock])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)

res = append(res, v1.NewBlock(reader))
}

return res, data
}

func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator {
return NewSimpleBloomGenerator(
opts,
store,
blocks,
func() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
},
NewMetrics(nil, v1.NewMetrics(nil)),
log.NewNopLogger(),
)
}

func TestSimpleBloomGenerator(t *testing.T) {
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped int
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(3, 0),
toSchema: v1.NewBlockOptions(4, 0),
sourceBlocks: 2,
numSkipped: 2,
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(4, 0),
toSchema: v1.NewBlockOptions(4, 0),
sourceBlocks: 2,
numSkipped: 0,
},
} {
t.Run(tc.desc, func(t *testing.T) {
sourceBlocks, data := blocksFromSchema(t, tc.sourceBlocks, tc.fromSchema)
storeItr := v1.NewMapIter[v1.SeriesWithBloom, *v1.Series](
v1.NewSliceIter[v1.SeriesWithBloom](data),
func(swb v1.SeriesWithBloom) *v1.Series {
return swb.Series
},
)

gen := dummyBloomGen(tc.toSchema, storeItr, sourceBlocks)
skipped, results, err := gen.Generate(context.Background())
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))

require.True(t, results.Next())
block := results.At()
require.False(t, results.Next())

refs := v1.PointerSlice[v1.SeriesWithBloom](data)

v1.EqualIterators[*v1.SeriesWithBloom](
t,
func(a, b *v1.SeriesWithBloom) {
// TODO(owen-d): better equality check
// once chunk fetching is implemented
require.Equal(t, a.Series, b.Series)
},
v1.NewSliceIter[*v1.SeriesWithBloom](refs),
block.Querier(),
)
})
}
}
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/archive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestArchive(t *testing.T) {

numSeries := 100
numKeysPerSeries := 10000
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (b *Block) LoadHeaders() error {

}

// convenience method
func (b *Block) Querier() *BlockQuerier {
return NewBlockQuerier(b)
}

func (b *Block) Series() *LazySeriesIter {
return NewLazySeriesIter(b)
}
Expand Down
19 changes: 4 additions & 15 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,10 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
)

func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) {
for expected.Next() {
require.True(t, actual.Next())
a, b := expected.At(), actual.At()
test(a, b)
}
require.False(t, actual.Next())
require.Nil(t, expected.Err())
require.Nil(t, actual.Err())
}

func TestBlockBuilderRoundTrip(t *testing.T) {
numSeries := 100
numKeysPerSeries := 10000
data, keys := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, keys := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)

// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -115,7 +104,7 @@ func TestMergeBuilder(t *testing.T) {
numSeries := 100
numKeysPerSeries := 100
blocks := make([]PeekingIterator[*SeriesWithBloom], 0, nBlocks)
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
blockOpts := BlockOptions{
Schema: Schema{
version: DefaultSchemaVersion,
Expand Down Expand Up @@ -198,7 +187,7 @@ func TestMergeBuilder(t *testing.T) {
func TestBlockReset(t *testing.T) {
numSeries := 100
numKeysPerSeries := 10000
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 1, 0xffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 1, 0xffff, 0, 10000)

indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
Expand Down Expand Up @@ -250,7 +239,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
numSeries := 100
numKeysPerSeries := 100
minTs, maxTs := model.Time(0), model.Time(10000)
xs, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, minTs, maxTs)
xs, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, minTs, maxTs)

var data [][]*SeriesWithBloom

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/dedupe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func TestMergeDedupeIter(t *testing.T) {
var (
numSeries = 100
numKeysPerSeries = 10000
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
dataPtr = PointerSlice(data)
queriers = make([]PeekingIterator[*SeriesWithBloom], 4)
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestFusedQuerier(t *testing.T) {
reader := NewByteReader(indexBuf, bloomsBuf)
numSeries := 100
numKeysPerSeries := 10000
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Expand Down Expand Up @@ -112,7 +112,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
reader := NewByteReader(indexBuf, bloomsBuf)
numSeries := 10000
numKeysPerSeries := 100
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffffff, 0, 10000)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffffff, 0, 10000)

builder, err := NewBlockBuilder(
BlockOptions{
Expand Down
12 changes: 8 additions & 4 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ func (s Schema) Compatible(other Schema) bool {
return s == other
}

func (s Schema) NGramLen() int {
return int(s.nGramLength)
}

func (s Schema) NGramSkip() int {
return int(s.nGramSkip)
}

// byte length
func (s Schema) Len() int {
// magic number + version + encoding + ngram length + ngram skip
Expand Down Expand Up @@ -78,10 +86,6 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error {
return dec.Err()
}

func (s Schema) NGramLen() int {
return int(s.nGramLength)
}

// Block index is a set of series pages along with
// the headers for each page
type BlockIndex struct {
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/bloom/v1/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestMergeBlockQuerier_NonOverlapping(t *testing.T) {
numKeysPerSeries = 10000
numQueriers = 4
queriers []PeekingIterator[*SeriesWithBloom]
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
)
for i := 0; i < numQueriers; i++ {
var ptrs []*SeriesWithBloom
Expand All @@ -39,7 +39,7 @@ func TestMergeBlockQuerier_Duplicate(t *testing.T) {
numKeysPerSeries = 10000
numQueriers = 2
queriers []PeekingIterator[*SeriesWithBloom]
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
)
for i := 0; i < numQueriers; i++ {
queriers = append(
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestMergeBlockQuerier_Overlapping(t *testing.T) {
numKeysPerSeries = 10000
numQueriers = 4
queriers []PeekingIterator[*SeriesWithBloom]
data, _ = mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
data, _ = MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 0, 0xffff, 0, 10000)
slices = make([][]*SeriesWithBloom, numQueriers)
)
for i := 0; i < numSeries; i++ {
Expand Down
17 changes: 15 additions & 2 deletions pkg/storage/bloom/v1/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
)

// TODO(owen-d): this should probably be in it's own testing-util package

func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (*BlockQuerier, []SeriesWithBloom) {
// references for linking in memory reader+writer
indexBuf := bytes.NewBuffer(nil)
Expand All @@ -21,7 +23,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs,
reader := NewByteReader(indexBuf, bloomsBuf)
numSeries := int(throughFp - fromFp)
numKeysPerSeries := 1000
data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs)
data, _ := MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFp, throughFp, fromTs, throughTs)

builder, err := NewBlockBuilder(
BlockOptions{
Expand All @@ -44,7 +46,7 @@ func MakeBlockQuerier(t testing.TB, fromFp, throughFp model.Fingerprint, fromTs,
return NewBlockQuerier(block), data
}

func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) {
func MkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model.Fingerprint, fromTs, throughTs model.Time) (seriesList []SeriesWithBloom, keysList [][][]byte) {
seriesList = make([]SeriesWithBloom, 0, nSeries)
keysList = make([][][]byte, 0, nSeries)

Expand Down Expand Up @@ -85,3 +87,14 @@ func mkBasicSeriesWithBlooms(nSeries, keysPerSeries int, fromFp, throughFp model
}
return
}

func EqualIterators[T any](t *testing.T, test func(a, b T), expected, actual Iterator[T]) {
for expected.Next() {
require.True(t, actual.Next())
a, b := expected.At(), actual.At()
test(a, b)
}
require.False(t, actual.Next())
require.Nil(t, expected.Err())
require.Nil(t, actual.Err())
}

0 comments on commit 7702ec0

Please sign in to comment.