Skip to content

Commit

Permalink
Blooms: Align compactor to shipper (grafana#11855)
Browse files Browse the repository at this point in the history
Does some alignment work between the `bloomcompactor` and the
`bloomshipper` pkgs. Notably:

* Uses `bloomshipper.BlockRef` everywhere (removes old bloomshipper
struct
* Integrates `v1.FingerprintBounds` in `Ref` struct
* `Location` interface to distinguish local paths vs paths in object
storage for certain types (`{Meta,Bloom}Ref`s)
* Introduces `KeyResolver` interface to generate locations from these
structs
* Integrates `KeyResolver` into our bloom store. In the future, this
will allow us to change key structures across schema boundaries when we
want to change|improve them.
* Removes `BlockPath` from `BlockRef` in favor of the new resolving
functionality. This is also beneficial because it lets us _calculate_
locations from the pure Ref objects, rather than tie some arbitrary
state to them which can change (or not be populated by accident).
  • Loading branch information
owen-d authored and rhnasc committed Apr 12, 2024
1 parent 0e5aace commit 68a58d6
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 324 deletions.
21 changes: 11 additions & 10 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)

Expand Down Expand Up @@ -178,7 +179,7 @@ func (s *SimpleBloomController) loadWorkForGap(id tsdb.Identifier, gap gapWithBl

type gapWithBlocks struct {
bounds v1.FingerprintBounds
blocks []BlockRef
blocks []bloomshipper.BlockRef
}

// blockPlan is a plan for all the work needed to build a meta.json
Expand Down Expand Up @@ -220,7 +221,7 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) {
}

for _, block := range meta.Blocks {
if block.OwnershipRange.Intersection(gap) == nil {
if block.Bounds.Intersection(gap) == nil {
// this block doesn't overlap the gap, skip
continue
}
Expand All @@ -232,27 +233,27 @@ func blockPlansForGaps(tsdbs []tsdbGaps, metas []Meta) ([]blockPlan, error) {

// ensure we sort blocks so deduping iterator works as expected
sort.Slice(planGap.blocks, func(i, j int) bool {
return planGap.blocks[i].OwnershipRange.Less(planGap.blocks[j].OwnershipRange)
return planGap.blocks[i].Bounds.Less(planGap.blocks[j].Bounds)
})

peekingBlocks := v1.NewPeekingIter[BlockRef](
v1.NewSliceIter[BlockRef](
peekingBlocks := v1.NewPeekingIter[bloomshipper.BlockRef](
v1.NewSliceIter[bloomshipper.BlockRef](
planGap.blocks,
),
)
// dedupe blocks which could be in multiple metas
itr := v1.NewDedupingIter[BlockRef, BlockRef](
func(a, b BlockRef) bool {
itr := v1.NewDedupingIter[bloomshipper.BlockRef, bloomshipper.BlockRef](
func(a, b bloomshipper.BlockRef) bool {
return a == b
},
v1.Identity[BlockRef],
func(a, _ BlockRef) BlockRef {
v1.Identity[bloomshipper.BlockRef],
func(a, _ bloomshipper.BlockRef) bloomshipper.BlockRef {
return a
},
peekingBlocks,
)

deduped, err := v1.Collect[BlockRef](itr)
deduped, err := v1.Collect[bloomshipper.BlockRef](itr)
if err != nil {
return nil, errors.Wrap(err, "failed to dedupe blocks")
}
Expand Down
49 changes: 26 additions & 23 deletions pkg/bloomcompactor/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/require"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
)

Expand Down Expand Up @@ -119,7 +120,7 @@ func tsdbID(n int) tsdb.SingleTenantTSDBIdentifier {
}
}

func genMeta(min, max model.Fingerprint, sources []int, blocks []BlockRef) Meta {
func genMeta(min, max model.Fingerprint, sources []int, blocks []bloomshipper.BlockRef) Meta {
m := Meta{
OwnershipRange: v1.NewBounds(min, max),
Blocks: blocks,
Expand Down Expand Up @@ -224,10 +225,12 @@ func Test_gapsBetweenTSDBsAndMetas(t *testing.T) {
}
}

func genBlockRef(min, max model.Fingerprint) BlockRef {
func genBlockRef(min, max model.Fingerprint) bloomshipper.BlockRef {
bounds := v1.NewBounds(min, max)
return BlockRef{
OwnershipRange: bounds,
return bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Bounds: bounds,
},
}
}

Expand All @@ -245,7 +248,7 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(11, 20)}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(11, 20)}),
},
exp: []blockPlan{
{
Expand All @@ -263,15 +266,15 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}),
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}),
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []gapWithBlocks{
{
bounds: v1.NewBounds(0, 10),
blocks: []BlockRef{genBlockRef(9, 20)},
blocks: []bloomshipper.BlockRef{genBlockRef(9, 20)},
},
},
},
Expand All @@ -285,8 +288,8 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(9, 20, []int{1}, []BlockRef{genBlockRef(9, 20)}), // block for different tsdb
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
Expand All @@ -304,16 +307,16 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
genMeta(9, 20, []int{0}, []BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(5, 20, []int{1}, []BlockRef{genBlockRef(5, 20)}), // block for different tsdb
genMeta(9, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(9, 20)}), // block for same tsdb
genMeta(5, 20, []int{1}, []bloomshipper.BlockRef{genBlockRef(5, 20)}), // block for different tsdb
},
exp: []blockPlan{
{
tsdb: tsdbID(0),
gaps: []gapWithBlocks{
{
bounds: v1.NewBounds(0, 8),
blocks: []BlockRef{genBlockRef(5, 20)},
blocks: []bloomshipper.BlockRef{genBlockRef(5, 20)},
},
},
},
Expand All @@ -324,14 +327,14 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0), tsdbID(1)}, // generate for both tsdbs
metas: []Meta{
genMeta(0, 2, []int{0}, []BlockRef{
genMeta(0, 2, []int{0}, []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
}), // tsdb_0
genMeta(6, 8, []int{0}, []BlockRef{genBlockRef(6, 8)}), // tsdb_0
genMeta(6, 8, []int{0}, []bloomshipper.BlockRef{genBlockRef(6, 8)}), // tsdb_0

genMeta(3, 5, []int{1}, []BlockRef{genBlockRef(3, 5)}), // tsdb_1
genMeta(8, 10, []int{1}, []BlockRef{genBlockRef(8, 10)}), // tsdb_1
genMeta(3, 5, []int{1}, []bloomshipper.BlockRef{genBlockRef(3, 5)}), // tsdb_1
genMeta(8, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(8, 10)}), // tsdb_1
},
exp: []blockPlan{
{
Expand All @@ -340,11 +343,11 @@ func Test_blockPlansForGaps(t *testing.T) {
// tsdb (id=0) can source chunks from the blocks built from tsdb (id=1)
{
bounds: v1.NewBounds(3, 5),
blocks: []BlockRef{genBlockRef(3, 5)},
blocks: []bloomshipper.BlockRef{genBlockRef(3, 5)},
},
{
bounds: v1.NewBounds(9, 10),
blocks: []BlockRef{genBlockRef(8, 10)},
blocks: []bloomshipper.BlockRef{genBlockRef(8, 10)},
},
},
},
Expand All @@ -354,14 +357,14 @@ func Test_blockPlansForGaps(t *testing.T) {
gaps: []gapWithBlocks{
{
bounds: v1.NewBounds(0, 2),
blocks: []BlockRef{
blocks: []bloomshipper.BlockRef{
genBlockRef(0, 1),
genBlockRef(1, 2),
},
},
{
bounds: v1.NewBounds(6, 7),
blocks: []BlockRef{genBlockRef(6, 8)},
blocks: []bloomshipper.BlockRef{genBlockRef(6, 8)},
},
},
},
Expand All @@ -372,11 +375,11 @@ func Test_blockPlansForGaps(t *testing.T) {
ownershipRange: v1.NewBounds(0, 10),
tsdbs: []tsdb.Identifier{tsdbID(0)},
metas: []Meta{
genMeta(9, 20, []int{1}, []BlockRef{
genMeta(9, 20, []int{1}, []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(9, 20),
}), // blocks for first diff tsdb
genMeta(5, 20, []int{2}, []BlockRef{
genMeta(5, 20, []int{2}, []bloomshipper.BlockRef{
genBlockRef(5, 10),
genBlockRef(9, 20), // same block references in prior meta (will be deduped)
}), // block for second diff tsdb
Expand All @@ -387,7 +390,7 @@ func Test_blockPlansForGaps(t *testing.T) {
gaps: []gapWithBlocks{
{
bounds: v1.NewBounds(0, 10),
blocks: []BlockRef{
blocks: []bloomshipper.BlockRef{
genBlockRef(1, 4),
genBlockRef(5, 10),
genBlockRef(9, 20),
Expand Down
30 changes: 4 additions & 26 deletions pkg/bloomcompactor/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,20 @@ package bloomcompactor

import (
"fmt"
"hash"
"path"

"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/pkg/util/encoding"
)

const (
BloomPrefix = "bloom"
MetasPrefix = "metas"
)

// TODO(owen-d): Probably want to integrate against the block shipper
// instead of defining here, but only (min,max,fp) should be required for
// the ref. Things like index-paths, etc are not needed and possibly harmful
// in the case we want to do migrations. It's easier to load a block-ref or similar
// within the context of a specific tenant+period+index path and not couple them.
type BlockRef struct {
OwnershipRange v1.FingerprintBounds
Checksum uint32
}

func (r BlockRef) Hash(h hash.Hash32) error {
if err := r.OwnershipRange.Hash(h); err != nil {
return err
}

var enc encoding.Encbuf
enc.PutBE32(r.Checksum)
_, err := h.Write(enc.Get())
return errors.Wrap(err, "writing BlockRef")
}

type MetaRef struct {
OwnershipRange v1.FingerprintBounds
Checksum uint32
Expand All @@ -63,13 +41,13 @@ type Meta struct {
OwnershipRange v1.FingerprintBounds

// Old blocks which can be deleted in the future. These should be from previous compaction rounds.
Tombstones []BlockRef
Tombstones []bloomshipper.BlockRef

// The specific TSDB files used to generate the block.
Sources []tsdb.SingleTenantTSDBIdentifier

// A list of blocks that were generated
Blocks []BlockRef
Blocks []bloomshipper.BlockRef
}

// Generate MetaRef from Meta
Expand Down Expand Up @@ -131,6 +109,6 @@ type MetaStore interface {

type BlockStore interface {
// TODO(owen-d): flesh out|integrate against bloomshipper.Client
GetBlocks([]BlockRef) ([]*v1.Block, error)
GetBlocks([]bloomshipper.BlockRef) ([]*v1.Block, error)
PutBlock(interface{}) error
}
7 changes: 1 addition & 6 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomgateway

import (
"context"
"fmt"
"math/rand"
"testing"
"time"
Expand Down Expand Up @@ -343,9 +342,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
EndTimestamp: through,
}
block := bloomshipper.BlockRef{
Ref: ref,
IndexPath: "index.tsdb.gz",
BlockPath: fmt.Sprintf("block-%d", i),
Ref: ref,
}
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Expand Down Expand Up @@ -459,8 +456,6 @@ func createBlockRefsFromBlockData(t *testing.T, tenant string, data []bloomshipp
EndTimestamp: 0,
Checksum: 0,
},
IndexPath: fmt.Sprintf("index-%d", i),
BlockPath: fmt.Sprintf("block-%d", i),
})
}
return res
Expand Down
Loading

0 comments on commit 68a58d6

Please sign in to comment.