Skip to content

Commit

Permalink
Calculate block options checksums and use that too
Browse files Browse the repository at this point in the history
  • Loading branch information
paul1r committed Jan 23, 2024
1 parent f575a6b commit c038b49
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 1 deletion.
31 changes: 30 additions & 1 deletion pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1

import (
"bytes"
"encoding/binary"
"fmt"
"hash"
"io"
Expand Down Expand Up @@ -88,6 +89,34 @@ func (b *BlockBuilder) BuildFrom(itr Iterator[SeriesWithBloom]) (uint32, error)
return checksum + checksum2, nil
}

func writeInt32(h hash.Hash32, value int32) {
// Convert int32 to little-endian byte slice
bytes := make([]byte, 4)
binary.BigEndian.PutUint32(bytes, uint32(value))
h.Write(bytes)
}

func writeUInt64(h hash.Hash32, value uint64) {
// Convert int32 to little-endian byte slice
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(value))
h.Write(bytes)
}

func (b *BlockBuilder) calcuateBlockOptionsHash() uint32 {
crc32Hash := Crc32HashPool.Get()
defer Crc32HashPool.Put(crc32Hash)
writeInt32(crc32Hash, int32(b.opts.schema.version))
writeInt32(crc32Hash, int32(b.opts.schema.encoding))
writeUInt64(crc32Hash, b.opts.schema.nGramLength)
writeUInt64(crc32Hash, b.opts.schema.nGramSkip)
writeUInt64(crc32Hash, uint64(b.opts.SeriesPageSize))
writeUInt64(crc32Hash, uint64(b.opts.BloomPageSize))
writeUInt64(crc32Hash, uint64(b.opts.BlockSize))

return crc32Hash.Sum32()
}

func (b *BlockBuilder) AddSeries(series SeriesWithBloom) error {
offset, err := b.blooms.Append(series)
if err != nil {
Expand Down Expand Up @@ -582,5 +611,5 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
if err != nil {
return 0, errors.Wrap(err, "closing series file")
}
return checksum + checksum2, nil
return checksum + checksum2 + builder.calcuateBlockOptionsHash(), nil
}
150 changes: 150 additions & 0 deletions pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,156 @@ func TestBlockReset(t *testing.T) {
require.Equal(t, rounds[0], rounds[1])
}

func TestBlockBuilderBlockOptionsChecksums(t *testing.T) {
defaultSchema := Schema{
version: DefaultSchemaVersion,
encoding: chunkenc.EncSnappy,
nGramLength: 4,
nGramSkip: 0,
}
defaultOptions := BlockOptions{
schema: defaultSchema,
SeriesPageSize: 100,
BloomPageSize: 10 << 10,
BlockSize: 0,
}

testCases := []struct {
name string
blockOpts BlockOptions
expectEqual bool
}{
{
name: "differentVersion",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version + 1,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentEncoding",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: chunkenc.EncGZIP,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentNGramLength",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength + 1,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentNGramSkip",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip + 1,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentSeriesPageSize",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize + 1,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentBloomPageSize",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize + 1,
BlockSize: defaultOptions.BlockSize,
},
expectEqual: false,
},
{
name: "differentBlockSize",
blockOpts: BlockOptions{
schema: Schema{
version: defaultSchema.version,
encoding: defaultSchema.encoding,
nGramLength: defaultSchema.nGramLength,
nGramSkip: defaultSchema.nGramSkip,
},
SeriesPageSize: defaultOptions.SeriesPageSize,
BloomPageSize: defaultOptions.BloomPageSize,
BlockSize: defaultOptions.BlockSize + 1,
},
expectEqual: false,
},
}

for _, tc := range testCases {

t.Run(tc.name, func(t *testing.T) {
defaultBlockBuilder := &BlockBuilder{
opts: defaultOptions,
}

testCaseBlockBuilder := &BlockBuilder{
opts: tc.blockOpts,
}

checksum1 := defaultBlockBuilder.calcuateBlockOptionsHash()
checksum2 := testCaseBlockBuilder.calcuateBlockOptionsHash()

if tc.expectEqual {
require.Equal(t, checksum1, checksum2, "checksums should be equal")
} else {
require.NotEqual(t, checksum1, checksum2, "checksums should not be equal")
}

})
}
}

func TestBlockChecksums(t *testing.T) {
testCases := []struct {
name string
Expand Down

0 comments on commit c038b49

Please sign in to comment.