Skip to content

Commit

Permalink
fix(blooms): logql shard embeds tsdb (#12396)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Mar 29, 2024
1 parent 3922d38 commit 6e1547f
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 117 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/index/bitprefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (ii *BitPrefixInvertedIndex) getShards(shard *logql.Shard) ([]*indexShard,

switch shard.Variant() {
case logql.PowerOfTwoVersion:
if shard.PowerOfTwo.Of <= len(ii.shards) {
if int(shard.PowerOfTwo.Of) <= len(ii.shards) {
filter = false
}
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (ii *BitPrefixInvertedIndex) validateShard(shard *logql.Shard) error {

switch shard.Variant() {
case logql.PowerOfTwoVersion:
return shard.PowerOfTwo.TSDB().Validate()
return shard.PowerOfTwo.Validate()
}
return nil

Expand Down
39 changes: 19 additions & 20 deletions pkg/ingester/index/bitprefix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

Expand All @@ -24,20 +23,20 @@ func Test_BitPrefixGetShards(t *testing.T) {
expected []uint32
}{
// equal factors
{16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}},
{16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{4}},
{16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{15}},
{16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}},
{16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{4}},
{16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{15}},

// idx factor a larger factor of 2
{32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0, 1}},
{32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{8, 9}},
{32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{30, 31}},
{64, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{60, 61, 62, 63}},
{32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0, 1}},
{32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{8, 9}},
{32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{30, 31}},
{64, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{60, 61, 62, 63}},

// // idx factor a smaller factor of 2
{8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}},
{8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{2}},
{8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{7}},
{8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}},
{8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{2}},
{8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{7}},
} {
tt := tt
t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) {
Expand Down Expand Up @@ -151,8 +150,8 @@ func Test_BitPrefixGetShards_Bounded(t *testing.T) {
func Test_BitPrefixValidateShards(t *testing.T) {
ii, err := NewBitPrefixWithShards(32)
require.Nil(t, err)
require.NoError(t, ii.validateShard(logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 16}).Ptr()))
require.Error(t, ii.validateShard(logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 15}).Ptr()))
require.NoError(t, ii.validateShard(logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 16}).Ptr()))
require.Error(t, ii.validateShard(logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 15}).Ptr()))
}

func Test_BitPrefixCreation(t *testing.T) {
Expand Down Expand Up @@ -212,9 +211,9 @@ func Test_BitPrefix_hash_mapping(t *testing.T) {
[]*labels.Matcher{{Type: labels.MatchEqual,
Name: "compose_project",
Value: "loki-tsdb-storage-s3"}},
logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{
Shard: int(expShard),
Of: requestedFactor,
logql.NewPowerOfTwoShard(index.ShardAnnotation{
Shard: expShard,
Of: uint32(requestedFactor),
}).Ptr(),
)
require.NoError(t, err)
Expand Down Expand Up @@ -243,7 +242,7 @@ func Test_BitPrefixNoMatcherLookup(t *testing.T) {
require.Nil(t, err)
expShard := uint32(fp >> (64 - index.NewShard(0, 16).RequiredBits()))
ii.Add(logproto.FromLabelsToLabelAdapters(lbs), fp)
ids, err = ii.Lookup(nil, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(expShard), Of: 16}).Ptr())
ids, err = ii.Lookup(nil, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: expShard, Of: 16}).Ptr())
require.Nil(t, err)
require.Equal(t, fp, ids[0])
}
Expand All @@ -265,9 +264,9 @@ func Test_BitPrefixConsistentMapping(t *testing.T) {
b.Add(logproto.FromLabelsToLabelAdapters(lbs), fp)
}

shardMax := 8
for i := 0; i < shardMax; i++ {
shard := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{
shardMax := uint32(8)
for i := uint32(0); i < shardMax; i++ {
shard := logql.NewPowerOfTwoShard(index.ShardAnnotation{
Shard: i,
Of: shardMax,
}).Ptr()
Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/series"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

const DefaultIndexShards = 32
Expand Down Expand Up @@ -56,23 +56,23 @@ func NewWithShards(totalShards uint32) *InvertedIndex {
}
}

func (ii *InvertedIndex) getShards(shard *astmapper.ShardAnnotation) []*indexShard {
func (ii *InvertedIndex) getShards(shard *index.ShardAnnotation) []*indexShard {
if shard == nil {
return ii.shards
}

totalRequested := int(ii.totalShards) / shard.Of
totalRequested := ii.totalShards / shard.Of
result := make([]*indexShard, totalRequested)
var j int
for i := 0; i < totalRequested; i++ {
for i := uint32(0); i < totalRequested; i++ {
subShard := ((shard.Shard) + (i * shard.Of))
result[j] = ii.shards[subShard]
j++
}
return result
}

func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*astmapper.ShardAnnotation, error) {
func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*index.ShardAnnotation, error) {
if shard == nil {
return nil, nil
}
Expand All @@ -82,7 +82,7 @@ func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*astmapper.ShardAnno
return nil, errors.New("inverted index only supports shard annotations with `PowerOfTwo`")
}

if int(ii.totalShards)%s.Of != 0 || uint32(s.Of) > ii.totalShards {
if ii.totalShards%s.Of != 0 || s.Of > ii.totalShards {
return nil, fmt.Errorf("%w index_shard:%d query_shard:%v", ErrInvalidShardQuery, ii.totalShards, s)
}
return s, nil
Expand Down
30 changes: 15 additions & 15 deletions pkg/ingester/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/pkg/util"
)

func Test_GetShards(t *testing.T) {
for _, tt := range []struct {
total uint32
shard *astmapper.ShardAnnotation
shard *index.ShardAnnotation
expected []uint32
}{
// equal factors
{16, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0}},
{16, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4}},
{16, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}},
{16, &index.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0}},
{16, &index.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4}},
{16, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}},

// idx factor a larger multiple of schema factor
{32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}},
{32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}},
{32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}},
{64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}},
{32, &index.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}},
{32, &index.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}},
{32, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}},
{64, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}},
} {
tt := tt
t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) {
Expand All @@ -48,7 +48,7 @@ func Test_GetShards(t *testing.T) {
func Test_ValidateShards(t *testing.T) {
ii := NewWithShards(32)
_, err := ii.validateShard(
logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 16}).Ptr(),
logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 16}).Ptr(),
)
require.NoError(t, err)
}
Expand Down Expand Up @@ -112,7 +112,7 @@ func Test_hash_mapping(t *testing.T) {
ii := NewWithShards(shard)
ii.Add(logproto.FromLabelsToLabelAdapters(lbs), 1)

x := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16})
x := logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: labelsSeriesIDHash(lbs) % 16, Of: 16})
res, err := ii.Lookup([]*labels.Matcher{{Type: labels.MatchEqual, Name: "compose_project", Value: "loki-tsdb-storage-s3"}}, &x)
require.NoError(t, err)
require.Len(t, res, 1)
Expand All @@ -136,7 +136,7 @@ func Test_NoMatcherLookup(t *testing.T) {
// with shard param
ii = NewWithShards(16)
ii.Add(logproto.FromLabelsToLabelAdapters(lbs), 1)
x := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16})
x := logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: labelsSeriesIDHash(lbs) % 16, Of: 16})
ids, err = ii.Lookup(nil, &x)
require.Nil(t, err)
require.Equal(t, model.Fingerprint(1), ids[0])
Expand All @@ -157,9 +157,9 @@ func Test_ConsistentMapping(t *testing.T) {

shardMax := 8
for i := 0; i < shardMax; i++ {
shard := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{
Shard: i,
Of: shardMax,
shard := logql.NewPowerOfTwoShard(index.ShardAnnotation{
Shard: uint32(i),
Of: uint32(shardMax),
}).Ptr()

aIDs, err := a.Lookup([]*labels.Matcher{
Expand Down
5 changes: 2 additions & 3 deletions pkg/ingester/index/multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)
Expand Down Expand Up @@ -131,7 +130,7 @@ func TestMultiIndex(t *testing.T) {
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
logql.NewPowerOfTwoShard(
astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)},
index.ShardAnnotation{Shard: expShard, Of: factor},
).Ptr(),
)

Expand All @@ -147,7 +146,7 @@ func TestMultiIndex(t *testing.T) {
[]*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"),
},
logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)}).Ptr(),
logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: expShard, Of: factor}).Ptr(),
)

require.Nil(t, err)
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

var nilShardMetrics = NewShardMapperMetrics(nil)
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
name: "ConcatSampleExpr",
in: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 0,
Of: 3,
}).Ptr(),
Expand All @@ -640,7 +640,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 1,
Of: 3,
}).Ptr(),
Expand All @@ -656,7 +656,7 @@ func TestFormat_ShardedExpr(t *testing.T) {
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{
shard: NewPowerOfTwoShard(index.ShardAnnotation{
Shard: 1,
Of: 3,
}).Ptr(),
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index"
util_log "github.com/grafana/loki/pkg/util/log"
)

Expand Down Expand Up @@ -464,9 +464,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
downstreams := make([]DownstreamSampleExpr, 0, shards)
expr.Operation = syntax.OpRangeTypeQuantileSketch
for shard := shards - 1; shard >= 0; shard-- {
s := NewPowerOfTwoShard(astmapper.ShardAnnotation{
Shard: shard,
Of: shards,
s := NewPowerOfTwoShard(index.ShardAnnotation{
Shard: uint32(shard),
Of: uint32(shards),
})
downstreams = append(downstreams, DownstreamSampleExpr{
shard: &s,
Expand Down
Loading

0 comments on commit 6e1547f

Please sign in to comment.