Skip to content

Commit

Permalink
chore(blooms): Make max bloom page size for querying configurable (#1…
Browse files Browse the repository at this point in the history
…2337)

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Mar 26, 2024
1 parent 9810e8e commit 19c046f
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 27 deletions.
5 changes: 5 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2362,6 +2362,11 @@ bloom_shipper:
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

# Maximum size of bloom pages that should be queried. Larger pages than this
# limit are skipped when querying blooms to limit memory usage.
# CLI flag: -bloom.max-query-page-size
[max_query_page_size: <int> | default = 64MiB]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser
for i, b := range blocks {
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockRef: refs[i],
BlockQuerier: v1.NewBlockQuerier(b, false),
BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize),
})
}

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) {
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := v1.NewBlockQuerier(block, false)
bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize)
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
// }
// }
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(block, false),
BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize),
BlockRef: blockRef,
}
queriers = append(queriers, querier)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ type BlockQuerier struct {
// will be returned to the pool for efficiency. This can only safely be used
// when the underlying bloom bytes don't escape the decoder, i.e.
// when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor).
func NewBlockQuerier(b *Block, noCapture bool) *BlockQuerier {
func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier {
return &BlockQuerier{
block: b,
series: NewLazySeriesIter(b),
blooms: NewLazyBloomIter(b, noCapture),
blooms: NewLazyBloomIter(b, noCapture, maxPageSize),
}
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import (
// NB(chaudum): Some block pages are way bigger than others (400MiB and
// bigger), and loading multiple pages into memory in parallel can cause the
// gateways to OOM.
// Figure out a decent maximum page size that we can process.
// TODO(chaudum): Make max page size configurable
var maxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20))
// Figure out a decent default maximum page size that we can process.
var DefaultMaxPageSize = 64 << 20 // 64MB
var ErrPageTooLarge = errors.Errorf("bloom page too large")

type Bloom struct {
filter.ScalableBloomFilter
Expand Down Expand Up @@ -276,7 +275,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Metrics) (res *BloomPageDecoder, err error) {
func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) {
if pageIdx < 0 || pageIdx >= len(b.pageHeaders) {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen))
Expand All @@ -292,7 +291,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Met
return nil, ErrPageTooLarge
}

if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil {
if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen))
return nil, errors.Wrap(err, "seeking to bloom page")
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/bloom/v1/bloom_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type LazyBloomIter struct {
usePool bool

b *Block
m int // max page size in bytes

// state
initialized bool
Expand All @@ -23,10 +24,11 @@ type LazyBloomIter struct {
// will be returned to the pool for efficiency.
// This can only safely be used when the underlying bloom
// bytes don't escape the decoder.
func NewLazyBloomIter(b *Block, pool bool) *LazyBloomIter {
func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter {
return &LazyBloomIter{
usePool: pool,
b: b,
m: maxSize,
}
}

Expand Down Expand Up @@ -58,7 +60,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) {
it.err = errors.Wrap(err, "getting blooms reader")
return
}
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.b.metrics)
decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics)
if err != nil {
it.err = errors.Wrap(err, "loading bloom page")
return
Expand Down Expand Up @@ -97,6 +99,7 @@ func (it *LazyBloomIter) next() bool {
it.curPage, err = it.b.blooms.BloomPageDecoder(
r,
it.curPageIndex,
it.m,
it.b.metrics,
)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) {
}

block := NewBlock(tc.reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

err = block.LoadHeaders()
require.Nil(t, err)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) {
itr := NewSliceIter[SeriesWithBloom](data[min:max])
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false)))
blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)))
}

// We're not testing the ability to extend a bloom in this test
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) {
require.Nil(t, err)

block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

EqualIterators[*SeriesWithBloom](
t,
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

rounds := make([][]model.Fingerprint, 2)

Expand Down Expand Up @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
_, err = builder.BuildFrom(itr)
require.Nil(t, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, false)
querier := NewBlockQuerier(block, false, DefaultMaxPageSize)

// rather than use the block querier directly, collect it's data
// so we can use it in a few places later
Expand Down Expand Up @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {

// ensure the new block contains one copy of all the data
// by comparing it against an iterator over the source data
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false)
mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize)
sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs))

EqualIterators[*SeriesWithBloom](
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestFusedQuerier(t *testing.T) {
require.NoError(t, err)
require.False(t, itr.Next())
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, true)
querier := NewBlockQuerier(block, true, DefaultMaxPageSize)

n := 2
nReqs := numSeries / n
Expand Down Expand Up @@ -143,7 +143,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou
_, err = builder.BuildFrom(itr)
require.Nil(b, err)
block := NewBlock(reader, NewMetrics(nil))
querier := NewBlockQuerier(block, true)
querier := NewBlockQuerier(block, true, DefaultMaxPageSize)

numRequestChains := 100
seriesPerRequest := 100
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,15 @@ func (b *BlockDirectory) resolveSize() error {

// BlockQuerier returns a new block querier from the directory.
// The passed function `close` is called when the the returned querier is closed.

func (b BlockDirectory) BlockQuerier(
usePool bool,
close func() error,
maxPageSize int,
metrics *v1.Metrics,
) *CloseableBlockQuerier {
return &CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool),
BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize),
BlockRef: b.BlockRef,
close: close,
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

type Config struct {
WorkingDirectory string `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
Expand All @@ -31,6 +32,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.")
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
func() error {
return f.blocksCache.Release(ctx, key)
},
f.cfg.maxBloomPageSize,
f.bloomMetrics,
)
}
Expand Down Expand Up @@ -277,6 +278,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
func() error {
return f.blocksCache.Release(ctx, key)
},
f.cfg.maxBloomPageSize,
f.bloomMetrics,
)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ type StoreWithMetrics interface {
}

type bloomStoreConfig struct {
workingDir string
numWorkers int
workingDir string
numWorkers int
maxBloomPageSize int
}

// Compiler check to ensure bloomStoreEntry implements the Store interface
Expand Down Expand Up @@ -192,8 +193,9 @@ func NewBloomStore(

// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
workingDir: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}

if err := util.EnsureDirectory(cfg.workingDir); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion tools/bloom/inspector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func main() {

r := v1.NewDirectoryBlockReader(path)
b := v1.NewBlock(r, v1.NewMetrics(nil))
q := v1.NewBlockQuerier(b, true)
q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize)

md, err := q.Metadata()
if err != nil {
Expand Down

0 comments on commit 19c046f

Please sign in to comment.