Skip to content

Commit

Permalink
chore: Use filesystem backed writer for blooms (#13522)
Browse files Browse the repository at this point in the history
The DirectoryBlockWriter and DirectoryBlockReader are used to avoid OOMing of compactors/builders.

The tradeoff is that the writer/reader needs to be cleaned up and that it is I/O bound.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jul 17, 2024
1 parent aeb23bb commit 7aa7c15
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 62 deletions.
1 change: 0 additions & 1 deletion integration/bloom_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ func checkForTimestampMetric(t *testing.T, cliPlanner *client.Client, metricName

func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)

schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
Expand Down
25 changes: 18 additions & 7 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package builder

import (
"bytes"
"context"
"fmt"
"os"
"sync"
"time"

Expand Down Expand Up @@ -356,7 +356,7 @@ func (b *Builder) processTask(
seriesItrWithCounter,
b.chunkLoader,
blocksIter,
b.rwFn,
b.writerReaderFunc,
nil, // TODO(salvacorts): Pass reporter or remove when we address tracking
b.bloomStore.BloomMetrics(),
logger,
Expand All @@ -372,6 +372,9 @@ func (b *Builder) processTask(
built, err := bloomshipper.BlockFrom(tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to build block: %w", err)
}

Expand All @@ -382,10 +385,17 @@ func (b *Builder) processTask(
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to write block: %w", err)
}
b.metrics.blocksCreated.Inc()

if err := blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}

totalGapKeyspace := gap.Bounds.Max - gap.Bounds.Min
progress := built.Bounds.Max - gap.Bounds.Min
pct := float64(progress) / float64(totalGapKeyspace) * 100
Expand Down Expand Up @@ -477,9 +487,10 @@ func (b *Builder) loadWorkForGap(
return seriesItr, blocksIter, nil
}

// TODO(owen-d): pool, evaluate if memory-only is the best choice
func (b *Builder) rwFn() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) {
dir, err := os.MkdirTemp(b.cfg.WorkingDir, "bloom-block-")
if err != nil {
panic(err)
}
return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir)
}
2 changes: 2 additions & 0 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type Config struct {
GrpcConfig grpcclient.Config `yaml:"grpc_config"`
PlannerAddress string `yaml:"planner_address"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
WorkingDir string `yaml:"working_directory" doc:"hidden"`
}

// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.PlannerAddress, prefix+".planner-address", "", "Hostname (and port) of the bloom planner")
cfg.GrpcConfig.RegisterFlagsWithPrefix(prefix+".grpc", f)
cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+".backoff", f)
f.StringVar(&cfg.WorkingDir, prefix+".working-directory", "", "Working directory to which blocks are temporarily written to. Empty string defaults to the operating system's temp directory.")
}

func (cfg *Config) Validate() error {
Expand Down
48 changes: 25 additions & 23 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct {
metrics *v1.Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)

tokenizer *v1.BloomTokenizer
}
Expand All @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator(
store iter.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *v1.Metrics,
logger log.Logger,
Expand All @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator(
"component", "bloom_generator",
"org_id", userID,
),
readWriterFn: readWriterFn,
metrics: metrics,
reporter: reporter,
writerReaderFunc: writerReaderFunc,
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
Expand Down Expand Up @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator(
opts v1.BlockOptions,
metrics *v1.Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
blocks iter.ResetIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
blocks: blocks,
}
}

Expand All @@ -221,16 +221,18 @@ func (b *LazyBlockBuilderIterator) Next() bool {
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
writer, reader := b.readWriterFn()
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
_ = writer.Cleanup()
b.err = errors.Wrap(err, "failed to create bloom block builder")
return false
}
_, sourceBytes, err := mergeBuilder.Build(blockBuilder)
b.bytesAdded += sourceBytes

if err != nil {
_ = writer.Cleanup()
b.err = errors.Wrap(err, "failed to build bloom block")
return false
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package bloomcompactor

import (
"bytes"
"context"
"fmt"
"math"
"os"
"sort"
"sync"

Expand Down Expand Up @@ -49,11 +49,12 @@ func NewSimpleBloomController(
}
}

// TODO(owen-d): pool, evaluate if memory-only is the best choice
func (s *SimpleBloomController) rwFn() (v1.BlockWriter, v1.BlockReader) {
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
return v1.NewMemoryBlockWriter(indexBuf, bloomsBuf), v1.NewByteReader(indexBuf, bloomsBuf)
func (s *SimpleBloomController) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) {
dir, err := os.MkdirTemp("", "bloom-block-")
if err != nil {
panic(err)
}
return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir)
}

/*
Expand Down Expand Up @@ -409,7 +410,7 @@ func (s *SimpleBloomController) buildGaps(
seriesItrWithCounter,
s.chunkLoader,
blocksIter,
s.rwFn,
s.writerReaderFunc,
reporter,
s.metrics,
logger,
Expand Down
46 changes: 23 additions & 23 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type SimpleBloomGenerator struct {
metrics *Metrics
logger log.Logger

readWriterFn func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
reporter func(model.Fingerprint)

tokenizer *v1.BloomTokenizer
}
Expand All @@ -69,7 +69,7 @@ func NewSimpleBloomGenerator(
store iter.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocksIter iter.ResetIterator[*v1.SeriesWithBlooms],
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
reporter func(model.Fingerprint),
metrics *Metrics,
logger log.Logger,
Expand All @@ -85,9 +85,9 @@ func NewSimpleBloomGenerator(
"component", "bloom_generator",
"org_id", userID,
),
readWriterFn: readWriterFn,
metrics: metrics,
reporter: reporter,
writerReaderFunc: writerReaderFunc,
metrics: metrics,
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
Expand Down Expand Up @@ -161,19 +161,19 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate v1.BloomPopulatorFunc
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
blocks iter.ResetIterator[*v1.SeriesWithBlooms]

bytesAdded int
curr *v1.Block
Expand All @@ -185,18 +185,18 @@ func NewLazyBlockBuilderIterator(
opts v1.BlockOptions,
metrics *Metrics,
populate v1.BloomPopulatorFunc,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
blocks iter.ResetIterator[*v1.SeriesWithBlooms],
) *LazyBlockBuilderIterator {
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
blocks: blocks,
}
}

Expand All @@ -221,7 +221,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
writer, reader := b.readWriterFn()
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
b.err = errors.Wrap(err, "failed to create bloom block builder")
Expand Down
19 changes: 19 additions & 0 deletions pkg/storage/bloom/v1/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/pkg/errors"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
)

Expand All @@ -22,6 +24,7 @@ type BlockWriter interface {
Blooms() (io.WriteCloser, error)
Size() (int, error) // byte size of accumualted index & blooms
Full(maxSize uint64) (full bool, size int, err error)
Cleanup() error
}

// in memory impl
Expand All @@ -39,6 +42,7 @@ func NewMemoryBlockWriter(index, blooms *bytes.Buffer) MemoryBlockWriter {
func (b MemoryBlockWriter) Index() (io.WriteCloser, error) {
return NewNoopCloser(b.index), nil
}

func (b MemoryBlockWriter) Blooms() (io.WriteCloser, error) {
return NewNoopCloser(b.blooms), nil
}
Expand All @@ -60,6 +64,12 @@ func (b MemoryBlockWriter) Full(maxSize uint64) (full bool, size int, err error)
return uint64(size) >= maxSize, size, nil
}

func (b MemoryBlockWriter) Cleanup() error {
b.index.Reset()
b.blooms.Reset()
return nil
}

// Directory based impl
type DirectoryBlockWriter struct {
dir string
Expand Down Expand Up @@ -139,3 +149,12 @@ func (b *DirectoryBlockWriter) Full(maxSize uint64) (full bool, size int, err er

return uint64(size) >= maxSize, size, nil
}

func (b *DirectoryBlockWriter) Cleanup() error {
b.initialized = false
err := multierror.New()
err.Add(os.Remove(b.index.Name()))
err.Add(os.Remove(b.blooms.Name()))
err.Add(os.RemoveAll(b.dir))
return err.Err()
}
Loading

0 comments on commit 7aa7c15

Please sign in to comment.