Skip to content

Commit

Permalink
Blooms/block metadata (grafana#11859)
Browse files Browse the repository at this point in the history
A few updates to the bloom library:
* Uses `FingerprintBounds` in series headers
* Encodes `BlockOptions` in the series file so we can later read the
target page & block sizes the block was generated with in addition to
the schema.
* Introduces `BlockMetadata` struct and loads it correctly from blocks.
This struct will be used to convert to the `BlockRef`s from the
`bloomshipper` pkg and used in the bloom compactor + bloom gateway
* Integrates checksums better into block building and XORs the headers
metadata from each file (blooms, series) together to generate a final
checksum for the block (a combination of both files).
  • Loading branch information
owen-d authored and rhnasc committed Apr 12, 2024
1 parent b953493 commit ac476a0
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 101 deletions.
46 changes: 37 additions & 9 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@ import (
"github.com/prometheus/common/model"
)

type BlockMetadata struct {
Options BlockOptions
Series SeriesHeader
Checksum uint32
}

type Block struct {
// covers series pages
index BlockIndex
// covers bloom pages
blooms BloomBlock

// TODO(owen-d): implement
// synthetic header for the entire block
// built from all the pages in the index
header SeriesHeader
metadata BlockMetadata

reader BlockReader // should this be decoupled from the struct (accepted as method arg instead)?

initialized bool
dataRange SeriesHeader
}

func NewBlock(reader BlockReader) *Block {
Expand All @@ -38,30 +40,49 @@ func (b *Block) LoadHeaders() error {
return errors.Wrap(err, "getting index reader")
}

if err := b.index.DecodeHeaders(idx); err != nil {
indexChecksum, err := b.index.DecodeHeaders(idx)
if err != nil {
return errors.Wrap(err, "decoding index")
}

b.metadata.Options = b.index.opts

// TODO(owen-d): better pattern
xs := make([]SeriesHeader, 0, len(b.index.pageHeaders))
for _, h := range b.index.pageHeaders {
xs = append(xs, h.SeriesHeader)
}
b.dataRange = aggregateHeaders(xs)
b.metadata.Series = aggregateHeaders(xs)

blooms, err := b.reader.Blooms()
if err != nil {
return errors.Wrap(err, "getting blooms reader")
}
if err := b.blooms.DecodeHeaders(blooms); err != nil {
bloomChecksum, err := b.blooms.DecodeHeaders(blooms)
if err != nil {
return errors.Wrap(err, "decoding blooms")
}
b.initialized = true

if !b.metadata.Options.Schema.Compatible(b.blooms.schema) {
return fmt.Errorf(
"schema mismatch: index (%v) vs blooms (%v)",
b.metadata.Options.Schema, b.blooms.schema,
)
}

b.metadata.Checksum = combineChecksums(indexChecksum, bloomChecksum)
}
return nil

}

// XOR checksums as a simple checksum combiner with the benefit that
// each part can be recomputed by XORing the result against the other
func combineChecksums(index, blooms uint32) uint32 {
return index ^ blooms
}

// convenience method
func (b *Block) Querier() *BlockQuerier {
return NewBlockQuerier(b)
Expand All @@ -75,11 +96,18 @@ func (b *Block) Blooms() *LazyBloomIter {
return NewLazyBloomIter(b)
}

func (b *Block) Metadata() (BlockMetadata, error) {
if err := b.LoadHeaders(); err != nil {
return BlockMetadata{}, err
}
return b.metadata, nil
}

func (b *Block) Schema() (Schema, error) {
if err := b.LoadHeaders(); err != nil {
return Schema{}, err
}
return b.index.schema, nil
return b.metadata.Options.Schema, nil
}

type BlockQuerier struct {
Expand Down
19 changes: 10 additions & 9 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ func NewBloomBlock(encoding chunkenc.Encoding) BloomBlock {
}
}

func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) {
if err := b.schema.DecodeFrom(r); err != nil {
return errors.Wrap(err, "decoding schema")
return 0, errors.Wrap(err, "decoding schema")
}

var (
Expand All @@ -182,35 +182,36 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) error {
)
// last 12 bytes are (headers offset: 8 byte u64, checksum: 4 byte u32)
if _, err := r.Seek(-12, io.SeekEnd); err != nil {
return errors.Wrap(err, "seeking to bloom headers metadata")
return 0, errors.Wrap(err, "seeking to bloom headers metadata")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom headers metadata")
return 0, errors.Wrap(err, "reading bloom headers metadata")
}

headerOffset := dec.Be64()
checksum := dec.Be32()

if _, err := r.Seek(int64(headerOffset), io.SeekStart); err != nil {
return errors.Wrap(err, "seeking to bloom headers")
return 0, errors.Wrap(err, "seeking to bloom headers")
}
dec.B, err = io.ReadAll(r)
if err != nil {
return errors.Wrap(err, "reading bloom page headers")
return 0, errors.Wrap(err, "reading bloom page headers")
}

if err := dec.CheckCrc(castagnoliTable); err != nil {
return errors.Wrap(err, "checksumming page headers")
return 0, errors.Wrap(err, "checksumming page headers")
}

b.pageHeaders = make([]BloomPageHeader, dec.Uvarint())
for i := 0; i < len(b.pageHeaders); i++ {
header := &b.pageHeaders[i]
if err := header.Decode(&dec); err != nil {
return errors.Wrapf(err, "decoding %dth series header", i)
return 0, errors.Wrapf(err, "decoding %dth series header", i)
}
}
return nil
return checksum, nil
}

func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int) (*BloomPageDecoder, error) {
Expand Down
109 changes: 55 additions & 54 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"hash"
"io"
"sort"

"github.com/pkg/errors"
"github.com/prometheus/common/model"
Expand All @@ -21,15 +20,46 @@ var (

type BlockOptions struct {
// Schema determines the Schema of the block and cannot be changed
// without recreating the block from underlying data
Schema Schema

// The following options can be changed on the fly.
// For instance, adding another page to a block with
// a different target page size is supported.
// a different target page size is supported, although
// the block will store the original sizes it was created with

// target size in bytes (decompressed)
// of each page type
SeriesPageSize, BloomPageSize, BlockSize int
SeriesPageSize, BloomPageSize, BlockSize uint64
}

func (b BlockOptions) Len() int {
return 3*8 + b.Schema.Len()
}

func (b *BlockOptions) DecodeFrom(r io.ReadSeeker) error {
buf := make([]byte, b.Len())
_, err := io.ReadFull(r, buf)
if err != nil {
return errors.Wrap(err, "reading block options")
}

dec := encoding.DecWith(buf)

if err := b.Schema.Decode(&dec); err != nil {
return errors.Wrap(err, "decoding schema")
}
b.SeriesPageSize = dec.Be64()
b.BloomPageSize = dec.Be64()
b.BlockSize = dec.Be64()
return nil
}

func (b BlockOptions) Encode(enc *encoding.Encbuf) {
b.Schema.Encode(enc)
enc.PutBE64(b.SeriesPageSize)
enc.PutBE64(b.BloomPageSize)
enc.PutBE64(b.BlockSize)
}

type BlockBuilder struct {
Expand Down Expand Up @@ -90,14 +120,19 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error)
return 0, errors.Wrap(err, "iterating series with blooms")
}

checksum, err := b.blooms.Close()
return b.Close()
}

func (b *BlockBuilder) Close() (uint32, error) {
bloomChecksum, err := b.blooms.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := b.index.Close(); err != nil {
indexCheckSum, err := b.index.Close()
if err != nil {
return 0, errors.Wrap(err, "closing series file")
}
return checksum, nil
return combineChecksums(indexCheckSum, bloomChecksum), nil
}

func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error {
Expand Down Expand Up @@ -131,7 +166,7 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
return &BloomBlockBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.BloomPageSize),
page: NewPageWriter(int(opts.BloomPageSize)),
scratch: &encoding.Encbuf{},
}
}
Expand Down Expand Up @@ -307,16 +342,16 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder {
return &IndexBuilder{
opts: opts,
writer: writer,
page: NewPageWriter(opts.SeriesPageSize),
page: NewPageWriter(int(opts.SeriesPageSize)),
scratch: &encoding.Encbuf{},
}
}

func (b *IndexBuilder) WriteSchema() error {
func (b *IndexBuilder) WriteOpts() error {
b.scratch.Reset()
b.opts.Schema.Encode(b.scratch)
b.opts.Encode(b.scratch)
if _, err := b.writer.Write(b.scratch.Get()); err != nil {
return errors.Wrap(err, "writing schema")
return errors.Wrap(err, "writing opts+schema")
}
b.writtenSchema = true
b.offset += b.scratch.Len()
Expand All @@ -325,8 +360,8 @@ func (b *IndexBuilder) WriteSchema() error {

func (b *IndexBuilder) Append(series SeriesWithOffset) error {
if !b.writtenSchema {
if err := b.WriteSchema(); err != nil {
return errors.Wrap(err, "writing schema")
if err := b.WriteOpts(); err != nil {
return errors.Wrap(err, "appending series")
}
}

Expand Down Expand Up @@ -408,8 +443,7 @@ func (b *IndexBuilder) flushPage() error {
DecompressedLen: decompressedLen,
SeriesHeader: SeriesHeader{
NumSeries: b.page.Count(),
FromFp: b.fromFp,
ThroughFp: b.previousFp,
Bounds: NewBounds(b.fromFp, b.previousFp),
FromTs: b.fromTs,
ThroughTs: b.throughTs,
},
Expand All @@ -428,10 +462,10 @@ func (b *IndexBuilder) flushPage() error {
return nil
}

func (b *IndexBuilder) Close() error {
func (b *IndexBuilder) Close() (uint32, error) {
if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return errors.Wrap(err, "flushing final series page")
return 0, errors.Wrap(err, "flushing final series page")
}
}

Expand All @@ -451,39 +485,9 @@ func (b *IndexBuilder) Close() error {
b.scratch.PutHash(crc32Hash)
_, err := b.writer.Write(b.scratch.Get())
if err != nil {
return errors.Wrap(err, "writing series page headers")
return 0, errors.Wrap(err, "writing series page headers")
}
return errors.Wrap(b.writer.Close(), "closing series writer")
}

// SortBlocksIntoOverlappingGroups sorts a list of blocks into a sorted list of lists,
// where each list contains blocks that overlap with each other.
// TODO(owen-d): implement as an iterator so we don't have to load all blocks at once
// NB: unused now, but likely useful when we want to optimize compaction. I wrote this expecting to need it now
// but it feels unsavory to remove it
func SortBlocksIntoOverlappingGroups(xs []*Block) (groups [][]*Block) {
sort.Slice(xs, func(i, j int) bool {
a, b := xs[i].index, xs[j].index
return a.pageHeaders[0].FromFp <= b.pageHeaders[0].FromFp
})

var curGroup []*Block
for _, x := range xs {
switch {
case len(curGroup) == 0:
curGroup = append(curGroup, x)
case curGroup[len(curGroup)-1].dataRange.OverlapFingerprintRange(x.dataRange):
curGroup = append(curGroup, x)
default:
groups = append(groups, curGroup)
curGroup = []*Block{x}
}
}

if len(curGroup) > 0 {
groups = append(groups, curGroup)
}
return groups
return crc32Hash.Sum32(), errors.Wrap(b.writer.Close(), "closing series writer")
}

// Simplistic implementation of a merge builder that builds a single block
Expand Down Expand Up @@ -586,12 +590,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}

checksum, err := builder.blooms.Close()
checksum, err := builder.Close()
if err != nil {
return 0, errors.Wrap(err, "closing bloom file")
}
if err := builder.index.Close(); err != nil {
return 0, errors.Wrap(err, "closing series file")
return 0, errors.Wrap(err, "closing block")
}
return checksum, nil
}
Loading

0 comments on commit ac476a0

Please sign in to comment.