diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index d5411f8b1c9d..c1ad2e29eff9 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2362,6 +2362,11 @@ bloom_shipper: # CLI flag: -bloom.shipper.working-directory [working_directory: | default = "bloom-shipper"] + # Maximum size of bloom pages that should be queried. Larger pages than this + # limit are skipped when querying blooms to limit memory usage. + # CLI flag: -bloom.max-query-page-size + [max_query_page_size: | default = 64MiB] + blocks_downloading_queue: # The count of parallel workers that download Bloom Blocks. # CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index fe1b2a09b5a0..e9a403ac6929 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false), + BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false) + bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 48d0f99f9004..9bd158219e13 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -334,7 +334,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // } // } querier := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(block, false), + BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize), BlockRef: blockRef, } queriers = append(queriers, querier) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index b1e534df731b..91ba171b272c 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -117,11 +117,11 @@ type BlockQuerier struct { // will be returned to the pool for efficiency. This can only safely be used // when the underlying bloom bytes don't escape the decoder, i.e. // when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor). -func NewBlockQuerier(b *Block, noCapture bool) *BlockQuerier { +func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier { return &BlockQuerier{ block: b, series: NewLazySeriesIter(b), - blooms: NewLazyBloomIter(b, noCapture), + blooms: NewLazyBloomIter(b, noCapture, maxPageSize), } } diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index d740c170fcc9..058ac68818d5 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -15,10 +15,9 @@ import ( // NB(chaudum): Some block pages are way bigger than others (400MiB and // bigger), and loading multiple pages into memory in parallel can cause the // gateways to OOM. -// Figure out a decent maximum page size that we can process. -// TODO(chaudum): Make max page size configurable -var maxPageSize = 64 << 20 // 64MB -var ErrPageTooLarge = errors.Errorf("bloom page too large: size limit is %.1fMiB", float64(maxPageSize)/float64(1<<20)) +// Figure out a decent default maximum page size that we can process. +var DefaultMaxPageSize = 64 << 20 // 64MB +var ErrPageTooLarge = errors.Errorf("bloom page too large") type Bloom struct { filter.ScalableBloomFilter @@ -276,7 +275,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { return checksum, nil } -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Metrics) (res *BloomPageDecoder, err error) { +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) @@ -292,7 +291,7 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, metrics *Met return nil, ErrPageTooLarge } - if _, err := r.Seek(int64(page.Offset), io.SeekStart); err != nil { + if _, err = r.Seek(int64(page.Offset), io.SeekStart); err != nil { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonErr).Add(float64(page.DecompressedLen)) return nil, errors.Wrap(err, "seeking to bloom page") diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 535e2a379250..01c0216c36f0 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -10,6 +10,7 @@ type LazyBloomIter struct { usePool bool b *Block + m int // max page size in bytes // state initialized bool @@ -23,10 +24,11 @@ type LazyBloomIter struct { // will be returned to the pool for efficiency. // This can only safely be used when the underlying bloom // bytes don't escape the decoder. -func NewLazyBloomIter(b *Block, pool bool) *LazyBloomIter { +func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter { return &LazyBloomIter{ usePool: pool, b: b, + m: maxSize, } } @@ -58,7 +60,7 @@ func (it *LazyBloomIter) Seek(offset BloomOffset) { it.err = errors.Wrap(err, "getting blooms reader") return } - decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.b.metrics) + decoder, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") return @@ -97,6 +99,7 @@ func (it *LazyBloomIter) next() bool { it.curPage, err = it.b.blooms.BloomPageDecoder( r, it.curPageIndex, + it.m, it.b.metrics, ) if err != nil { diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 540d0a768ca7..481c8ec9f915 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { } block := NewBlock(tc.reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) err = block.LoadHeaders() require.Nil(t, err) @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) { itr := NewSliceIter[SeriesWithBloom](data[min:max]) _, err = builder.BuildFrom(itr) require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false))) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize))) } // We're not testing the ability to extend a bloom in this test @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) EqualIterators[*SeriesWithBloom]( t, @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) rounds := make([][]model.Fingerprint, 2) @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false) + querier := NewBlockQuerier(block, false, DefaultMaxPageSize) // rather than use the block querier directly, collect it's data // so we can use it in a few places later @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data - mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false) + mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize) sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs)) EqualIterators[*SeriesWithBloom]( diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 7ca7267b6ecc..5c9f2f06f047 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -49,7 +49,7 @@ func TestFusedQuerier(t *testing.T) { require.NoError(t, err) require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true) + querier := NewBlockQuerier(block, true, DefaultMaxPageSize) n := 2 nReqs := numSeries / n @@ -143,7 +143,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true) + querier := NewBlockQuerier(block, true, DefaultMaxPageSize) numRequestChains := 100 seriesPerRequest := 100 diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index ebd2d6a53ff8..3097822fccf7 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -120,13 +120,15 @@ func (b *BlockDirectory) resolveSize() error { // BlockQuerier returns a new block querier from the directory. // The passed function `close` is called when the the returned querier is closed. + func (b BlockDirectory) BlockQuerier( usePool bool, close func() error, + maxPageSize int, metrics *v1.Metrics, ) *CloseableBlockQuerier { return &CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool), + BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize), BlockRef: b.BlockRef, close: close, } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index a37a3028e66e..eda55e8fbbeb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -14,6 +14,7 @@ import ( type Config struct { WorkingDirectory string `yaml:"working_directory"` + MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"` BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"` BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` @@ -31,6 +32,8 @@ func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *fla func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.") + _ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go + f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.") c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f) c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index f2e40534a095..74fb9a177a66 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -243,6 +243,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc func() error { return f.blocksCache.Release(ctx, key) }, + f.cfg.maxBloomPageSize, f.bloomMetrics, ) } @@ -277,6 +278,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc func() error { return f.blocksCache.Release(ctx, key) }, + f.cfg.maxBloomPageSize, f.bloomMetrics, ) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index c8a162dbd66b..8daa94bddf00 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -41,8 +41,9 @@ type StoreWithMetrics interface { } type bloomStoreConfig struct { - workingDir string - numWorkers int + workingDir string + numWorkers int + maxBloomPageSize int } // Compiler check to ensure bloomStoreEntry implements the Store interface @@ -192,8 +193,9 @@ func NewBloomStore( // TODO(chaudum): Remove wrapper cfg := bloomStoreConfig{ - workingDir: storageConfig.BloomShipperConfig.WorkingDirectory, - numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, + workingDir: storageConfig.BloomShipperConfig.WorkingDirectory, + numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, + maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize), } if err := util.EnsureDirectory(cfg.workingDir); err != nil { diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index d7f1a7c89bf3..36d152371429 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -18,7 +18,7 @@ func main() { r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, true) + q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize) md, err := q.Metadata() if err != nil {