From ac476a04fd08167c7b3ba0c8275e901480c4c771 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 2 Feb 2024 13:18:00 -0800 Subject: [PATCH] Blooms/block metadata (#11859) A few updates to the bloom library: * Uses `FingerprintBounds` in series headers * Encodes `BlockOptions` in the series file so we can later read the target page & block sizes the block was generated with in addition to the schema. * Introduces `BlockMetadata` struct and loads it correctly from blocks. This struct will be used to convert to the `BlockRef`s from the `bloomshipper` pkg and used in the bloom compactor + bloom gateway * Integrates checksums better into block building and XORs the headers metadata from each file (blooms, series) together to generate a final checksum for the block (a combination of both files). --- pkg/storage/bloom/v1/block.go | 46 ++++++++--- pkg/storage/bloom/v1/bloom.go | 19 ++--- pkg/storage/bloom/v1/builder.go | 109 +++++++++++++------------- pkg/storage/bloom/v1/builder_test.go | 26 +++++- pkg/storage/bloom/v1/index.go | 56 ++++++------- pkg/storage/bloom/v1/index_querier.go | 2 +- 6 files changed, 157 insertions(+), 101 deletions(-) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index b1b08be008405..6d13349855f40 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -7,21 +7,23 @@ import ( "github.com/prometheus/common/model" ) +type BlockMetadata struct { + Options BlockOptions + Series SeriesHeader + Checksum uint32 +} + type Block struct { // covers series pages index BlockIndex // covers bloom pages blooms BloomBlock - // TODO(owen-d): implement - // synthetic header for the entire block - // built from all the pages in the index - header SeriesHeader + metadata BlockMetadata reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)? initialized bool - dataRange SeriesHeader } func NewBlock(reader BlockReader) *Block { @@ -38,30 +40,49 @@ func (b *Block) LoadHeaders() error { return errors.Wrap(err, "getting index reader") } - if err := b.index.DecodeHeaders(idx); err != nil { + indexChecksum, err := b.index.DecodeHeaders(idx) + if err != nil { return errors.Wrap(err, "decoding index") } + b.metadata.Options = b.index.opts + // TODO(owen-d): better pattern xs := make([]SeriesHeader, 0, len(b.index.pageHeaders)) for _, h := range b.index.pageHeaders { xs = append(xs, h.SeriesHeader) } - b.dataRange = aggregateHeaders(xs) + b.metadata.Series = aggregateHeaders(xs) blooms, err := b.reader.Blooms() if err != nil { return errors.Wrap(err, "getting blooms reader") } - if err := b.blooms.DecodeHeaders(blooms); err != nil { + bloomChecksum, err := b.blooms.DecodeHeaders(blooms) + if err != nil { return errors.Wrap(err, "decoding blooms") } b.initialized = true + + if !b.metadata.Options.Schema.Compatible(b.blooms.schema) { + return fmt.Errorf( + "schema mismatch: index (%v) vs blooms (%v)", + b.metadata.Options.Schema, b.blooms.schema, + ) + } + + b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum) } return nil } +// XOR checksums as a simple checksum combiner with the benefit that +// each part can be recomputed by XORing the result against the other +func combineChecksums(index, blooms uint32) uint32 { + return index ^ blooms +} + // convenience method func (b *Block) Querier() *BlockQuerier { return NewBlockQuerier(b) @@ -75,11 +96,18 @@ func (b *Block) Blooms() *LazyBloomIter { return NewLazyBloomIter(b) } +func (b *Block) Metadata() (BlockMetadata, error) { + if err := b.LoadHeaders(); err != nil { + return BlockMetadata{}, err + } + return b.metadata, nil +} + func (b *Block) Schema() (Schema, error) { if err := b.LoadHeaders(); err != nil { return Schema{}, err } - return b.index.schema, nil + return b.metadata.Options.Schema, nil } type BlockQuerier struct { diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index be95f96862eac..20c310ef695c0 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock { } } -func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { +func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { if err := b.schema.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding schema") + return 0, errors.Wrap(err, "decoding schema") } var ( @@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error { ) // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to bloom headers") + return 0, errors.Wrap(err, "seeking to bloom headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom page headers") + return 0, errors.Wrap(err, "reading bloom page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make([]BloomPageHeader, dec.Uvarint()) for i := 0; i < len(b.pageHeaders); i++ { header := &b.pageHeaders[i] if err := header.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } } - return nil + return checksum, nil } func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 26b9a39cfd7bf..ac7a83baad374 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,7 +5,6 @@ import ( "fmt" "hash" "io" - "sort" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -21,15 +20,46 @@ var ( type BlockOptions struct { // Schema determines the Schema of the block and cannot be changed + // without recreating the block from underlying data Schema Schema // The following options can be changed on the fly. // For instance, adding another page to a block with - // a different target page size is supported. + // a different target page size is supported, although + // the block will store the original sizes it was created with // target size in bytes (decompressed) // of each page type - SeriesPageSize, BloomPageSize, BlockSize int + SeriesPageSize, BloomPageSize, BlockSize uint64 +} + +func (b BlockOptions) Len() int { + return 3*8 + b.Schema.Len() +} + +func (b *BlockOptions) DecodeFrom(r io.ReadSeeker) error { + buf := make([]byte, b.Len()) + _, err := io.ReadFull(r, buf) + if err != nil { + return errors.Wrap(err, "reading block options") + } + + dec := encoding.DecWith(buf) + + if err := b.Schema.Decode(&dec); err != nil { + return errors.Wrap(err, "decoding schema") + } + b.SeriesPageSize = dec.Be64() + b.BloomPageSize = dec.Be64() + b.BlockSize = dec.Be64() + return nil +} + +func (b BlockOptions) Encode(enc *encoding.Encbuf) { + b.Schema.Encode(enc) + enc.PutBE64(b.SeriesPageSize) + enc.PutBE64(b.BloomPageSize) + enc.PutBE64(b.BlockSize) } type BlockBuilder struct { @@ -90,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error) return 0, errors.Wrap(err, "iterating series with blooms") } - checksum, err := b.blooms.Close() + return b.Close() +} + +func (b *BlockBuilder) Close() (uint32, error) { + bloomChecksum, err := b.blooms.Close() if err != nil { return 0, errors.Wrap(err, "closing bloom file") } - if err := b.index.Close(); err != nil { + indexCheckSum, err := b.index.Close() + if err != nil { return 0, errors.Wrap(err, "closing series file") } - return checksum, nil + return combineChecksums(indexCheckSum, bloomChecksum), nil } func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error { @@ -131,7 +166,7 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB return &BloomBlockBuilder{ opts: opts, writer: writer, - page: NewPageWriter(opts.BloomPageSize), + page: NewPageWriter(int(opts.BloomPageSize)), scratch: &encoding.Encbuf{}, } } @@ -307,16 +342,16 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder { return &IndexBuilder{ opts: opts, writer: writer, - page: NewPageWriter(opts.SeriesPageSize), + page: NewPageWriter(int(opts.SeriesPageSize)), scratch: &encoding.Encbuf{}, } } -func (b *IndexBuilder) WriteSchema() error { +func (b *IndexBuilder) WriteOpts() error { b.scratch.Reset() - b.opts.Schema.Encode(b.scratch) + b.opts.Encode(b.scratch) if _, err := b.writer.Write(b.scratch.Get()); err != nil { - return errors.Wrap(err, "writing schema") + return errors.Wrap(err, "writing opts+schema") } b.writtenSchema = true b.offset += b.scratch.Len() @@ -325,8 +360,8 @@ func (b *IndexBuilder) WriteSchema() error { func (b *IndexBuilder) Append(series SeriesWithOffset) error { if !b.writtenSchema { - if err := b.WriteSchema(); err != nil { - return errors.Wrap(err, "writing schema") + if err := b.WriteOpts(); err != nil { + return errors.Wrap(err, "appending series") } } @@ -408,8 +443,7 @@ func (b *IndexBuilder) flushPage() error { DecompressedLen: decompressedLen, SeriesHeader: SeriesHeader{ NumSeries: b.page.Count(), - FromFp: b.fromFp, - ThroughFp: b.previousFp, + Bounds: NewBounds(b.fromFp, b.previousFp), FromTs: b.fromTs, ThroughTs: b.throughTs, }, @@ -428,10 +462,10 @@ func (b *IndexBuilder) flushPage() error { return nil } -func (b *IndexBuilder) Close() error { +func (b *IndexBuilder) Close() (uint32, error) { if b.page.Count() > 0 { if err := b.flushPage(); err != nil { - return errors.Wrap(err, "flushing final series page") + return 0, errors.Wrap(err, "flushing final series page") } } @@ -451,39 +485,9 @@ func (b *IndexBuilder) Close() error { b.scratch.PutHash(crc32Hash) _, err := b.writer.Write(b.scratch.Get()) if err != nil { - return errors.Wrap(err, "writing series page headers") + return 0, errors.Wrap(err, "writing series page headers") } - return errors.Wrap(b.writer.Close(), "closing series writer") -} - -// SortBlocksIntoOverlappingGroups sorts a list of blocks into a sorted list of lists, -// where each list contains blocks that overlap with each other. -// TODO(owen-d): implement as an iterator so we don't have to load all blocks at once -// NB: unused now, but likely useful when we want to optimize compaction. I wrote this expecting to need it now -// but it feels unsavory to remove it -func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) { - sort.Slice(xs, func(i, j int) bool { - a, b := xs[i].index, xs[j].index - return a.pageHeaders[0].FromFp <= b.pageHeaders[0].FromFp - }) - - var curGroup []*Block - for _, x := range xs { - switch { - case len(curGroup) == 0: - curGroup = append(curGroup, x) - case curGroup[len(curGroup)-1].dataRange.OverlapFingerprintRange(x.dataRange): - curGroup = append(curGroup, x) - default: - groups = append(groups, curGroup) - curGroup = []*Block{x} - } - } - - if len(curGroup) > 0 { - groups = append(groups, curGroup) - } - return groups + return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer") } // Simplistic implementation of a merge builder that builds a single block @@ -586,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } - checksum, err := builder.blooms.Close() + checksum, err := builder.Close() if err != nil { - return 0, errors.Wrap(err, "closing bloom file") - } - if err := builder.index.Close(); err != nil { - return 0, errors.Wrap(err, "closing series file") + return 0, errors.Wrap(err, "closing block") } return checksum, nil } diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index cb28f0cb53357..6bf2c26e7b585 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -9,8 +9,32 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/encoding" ) +func TestBlockOptionsRoundTrip(t *testing.T) { + opts := BlockOptions{ + Schema: Schema{ + version: V1, + encoding: chunkenc.EncSnappy, + nGramLength: 10, + nGramSkip: 2, + }, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + BlockSize: 10 << 20, + } + + var enc encoding.Encbuf + opts.Encode(&enc) + + var got BlockOptions + err := got.DecodeFrom(bytes.NewReader(enc.Get())) + require.Nil(t, err) + + require.Equal(t, opts, got) +} + func TestBlockBuilderRoundTrip(t *testing.T) { numSeries := 100 numKeysPerSeries := 10000 @@ -334,7 +358,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { checksum, err := mb.Build(builder) require.Nil(t, err) - require.Equal(t, uint32(0x2ec4fd6a), checksum) + require.Equal(t, uint32(0xe306ec6e), checksum) // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index 10c1e41fd1139..e3a14dc5453ea 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "fmt" "io" "github.com/pkg/errors" @@ -17,6 +18,10 @@ type Schema struct { nGramLength, nGramSkip uint64 } +func (s Schema) String() string { + return fmt.Sprintf("v%d,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) +} + func (s Schema) Compatible(other Schema) bool { return s == other } @@ -89,19 +94,14 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { // Block index is a set of series pages along with // the headers for each page type BlockIndex struct { - schema Schema - pageHeaders []SeriesPageHeaderWithOffset // headers for each series page -} + opts BlockOptions -func NewBlockIndex(encoding chunkenc.Encoding) BlockIndex { - return BlockIndex{ - schema: Schema{version: DefaultSchemaVersion, encoding: encoding}, - } + pageHeaders []SeriesPageHeaderWithOffset // headers for each series page } -func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { - if err := b.schema.DecodeFrom(r); err != nil { - return errors.Wrap(err, "decoding schema") +func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) (uint32, error) { + if err := b.opts.DecodeFrom(r); err != nil { + return 0, errors.Wrap(err, "decoding block options") } var ( @@ -111,24 +111,25 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { // last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32) if _, err := r.Seek(-12, io.SeekEnd); err != nil { - return errors.Wrap(err, "seeking to bloom headers metadata") + return 0, errors.Wrap(err, "seeking to bloom headers metadata") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading bloom headers metadata") + return 0, errors.Wrap(err, "reading bloom headers metadata") } headerOffset := dec.Be64() + checksum := dec.Be32() if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil { - return errors.Wrap(err, "seeking to index headers") + return 0, errors.Wrap(err, "seeking to index headers") } dec.B, err = io.ReadAll(r) if err != nil { - return errors.Wrap(err, "reading index page headers") + return 0, errors.Wrap(err, "reading index page headers") } if err := dec.CheckCrc(castagnoliTable); err != nil { - return errors.Wrap(err, "checksumming page headers") + return 0, errors.Wrap(err, "checksumming page headers") } b.pageHeaders = make( @@ -139,12 +140,12 @@ func (b *BlockIndex) DecodeHeaders(r io.ReadSeeker) error { for i := 0; i < len(b.pageHeaders); i++ { var s SeriesPageHeaderWithOffset if err := s.Decode(&dec); err != nil { - return errors.Wrapf(err, "decoding %dth series header", i) + return 0, errors.Wrapf(err, "decoding %dth series header", i) } b.pageHeaders[i] = s } - return nil + return checksum, nil } // decompress page and return an iterator over the bytes @@ -167,7 +168,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "checksumming series page") } - decompressor, err := b.schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get())) + decompressor, err := b.opts.Schema.DecompressorPool().GetReader(bytes.NewReader(dec.Get())) if err != nil { return nil, errors.Wrap(err, "getting decompressor") } @@ -213,12 +214,12 @@ func (h *SeriesPageHeaderWithOffset) Decode(dec *encoding.Decbuf) error { type SeriesHeader struct { NumSeries int - FromFp, ThroughFp model.Fingerprint + Bounds FingerprintBounds FromTs, ThroughTs model.Time } func (h SeriesHeader) OverlapFingerprintRange(other SeriesHeader) bool { - return h.ThroughFp >= other.FromFp && h.FromFp <= other.ThroughFp + return h.Bounds.Overlaps(other.Bounds) } // build one aggregated header for the entire block @@ -227,9 +228,10 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { return SeriesHeader{} } + fromFp, _ := xs[0].Bounds.GetFromThrough() + _, throughFP := xs[len(xs)-1].Bounds.GetFromThrough() res := SeriesHeader{ - FromFp: xs[0].FromFp, - ThroughFp: xs[len(xs)-1].ThroughFp, + Bounds: NewBounds(fromFp, throughFP), } for _, x := range xs { @@ -245,16 +247,16 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { func (h *SeriesHeader) Encode(enc *encoding.Encbuf) { enc.PutUvarint(h.NumSeries) - enc.PutUvarint64(uint64(h.FromFp)) - enc.PutUvarint64(uint64(h.ThroughFp)) + enc.PutUvarint64(uint64(h.Bounds.Min)) + enc.PutUvarint64(uint64(h.Bounds.Max)) enc.PutVarint64(int64(h.FromTs)) enc.PutVarint64(int64(h.ThroughTs)) } func (h *SeriesHeader) Decode(dec *encoding.Decbuf) error { h.NumSeries = dec.Uvarint() - h.FromFp = model.Fingerprint(dec.Uvarint64()) - h.ThroughFp = model.Fingerprint(dec.Uvarint64()) + h.Bounds.Min = model.Fingerprint(dec.Uvarint64()) + h.Bounds.Max = model.Fingerprint(dec.Uvarint64()) h.FromTs = model.Time(dec.Varint64()) h.ThroughTs = model.Time(dec.Varint64()) return dec.Err() @@ -305,7 +307,7 @@ func (d *SeriesPageDecoder) Next() bool { } func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) { - if fp > d.header.ThroughFp { + if fp > d.header.Bounds.Max { // shortcut: we know the fingerprint is too large so nothing in this page // will match the seek call, which returns the first found fingerprint >= fp. // so masquerade the index as if we've already iterated through diff --git a/pkg/storage/bloom/v1/index_querier.go b/pkg/storage/bloom/v1/index_querier.go index 005f480e68e9c..142b6423185b6 100644 --- a/pkg/storage/bloom/v1/index_querier.go +++ b/pkg/storage/bloom/v1/index_querier.go @@ -49,7 +49,7 @@ func (it *LazySeriesIter) Seek(fp model.Fingerprint) error { // first potentially relevant page desiredPage := sort.Search(len(it.b.index.pageHeaders), func(i int) bool { header := it.b.index.pageHeaders[i] - return header.ThroughFp >= fp + return header.Bounds.Max >= fp }) switch {