From 6e1547fe62403132d1453519a4750ce6746c86e8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 28 Mar 2024 17:59:15 -0700 Subject: [PATCH] fix(blooms): logql shard embeds tsdb (#12396) --- pkg/ingester/index/bitprefix.go | 4 +- pkg/ingester/index/bitprefix_test.go | 39 +++++----- pkg/ingester/index/index.go | 12 +-- pkg/ingester/index/index_test.go | 30 +++---- pkg/ingester/index/multi_test.go | 5 +- pkg/logql/downstream_test.go | 8 +- pkg/logql/shardmapper.go | 8 +- pkg/logql/shardmapper_test.go | 86 ++++++++++----------- pkg/logql/shards.go | 14 ++-- pkg/logql/shards_test.go | 12 +-- pkg/logql/test_utils.go | 8 +- pkg/querier/queryrange/downstreamer_test.go | 8 +- 12 files changed, 117 insertions(+), 117 deletions(-) diff --git a/pkg/ingester/index/bitprefix.go b/pkg/ingester/index/bitprefix.go index 524bd56a6985..fe24a885917b 100644 --- a/pkg/ingester/index/bitprefix.go +++ b/pkg/ingester/index/bitprefix.go @@ -69,7 +69,7 @@ func (ii *BitPrefixInvertedIndex) getShards(shard *logql.Shard) ([]*indexShard, switch shard.Variant() { case logql.PowerOfTwoVersion: - if shard.PowerOfTwo.Of <= len(ii.shards) { + if int(shard.PowerOfTwo.Of) <= len(ii.shards) { filter = false } } @@ -114,7 +114,7 @@ func (ii *BitPrefixInvertedIndex) validateShard(shard *logql.Shard) error { switch shard.Variant() { case logql.PowerOfTwoVersion: - return shard.PowerOfTwo.TSDB().Validate() + return shard.PowerOfTwo.Validate() } return nil diff --git a/pkg/ingester/index/bitprefix_test.go b/pkg/ingester/index/bitprefix_test.go index 4c67ac4c5e32..fbb297bd9c26 100644 --- a/pkg/ingester/index/bitprefix_test.go +++ b/pkg/ingester/index/bitprefix_test.go @@ -12,7 +12,6 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -24,20 +23,20 @@ func Test_BitPrefixGetShards(t *testing.T) { expected []uint32 }{ // equal factors - {16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}}, - {16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{4}}, - {16, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{15}}, + {16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}}, + {16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{4}}, + {16, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{15}}, // idx factor a larger factor of 2 - {32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0, 1}}, - {32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{8, 9}}, - {32, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{30, 31}}, - {64, false, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{60, 61, 62, 63}}, + {32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0, 1}}, + {32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{8, 9}}, + {32, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{30, 31}}, + {64, false, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{60, 61, 62, 63}}, // // idx factor a smaller factor of 2 - {8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}}, - {8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{2}}, - {8, true, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{7}}, + {8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 16}).Ptr(), []uint32{0}}, + {8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 4, Of: 16}).Ptr(), []uint32{2}}, + {8, true, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 15, Of: 16}).Ptr(), []uint32{7}}, } { tt := tt t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) { @@ -151,8 +150,8 @@ func Test_BitPrefixGetShards_Bounded(t *testing.T) { func Test_BitPrefixValidateShards(t *testing.T) { ii, err := NewBitPrefixWithShards(32) require.Nil(t, err) - require.NoError(t, ii.validateShard(logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 16}).Ptr())) - require.Error(t, ii.validateShard(logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 15}).Ptr())) + require.NoError(t, ii.validateShard(logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 16}).Ptr())) + require.Error(t, ii.validateShard(logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 15}).Ptr())) } func Test_BitPrefixCreation(t *testing.T) { @@ -212,9 +211,9 @@ func Test_BitPrefix_hash_mapping(t *testing.T) { []*labels.Matcher{{Type: labels.MatchEqual, Name: "compose_project", Value: "loki-tsdb-storage-s3"}}, - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{ - Shard: int(expShard), - Of: requestedFactor, + logql.NewPowerOfTwoShard(index.ShardAnnotation{ + Shard: expShard, + Of: uint32(requestedFactor), }).Ptr(), ) require.NoError(t, err) @@ -243,7 +242,7 @@ func Test_BitPrefixNoMatcherLookup(t *testing.T) { require.Nil(t, err) expShard := uint32(fp >> (64 - index.NewShard(0, 16).RequiredBits())) ii.Add(logproto.FromLabelsToLabelAdapters(lbs), fp) - ids, err = ii.Lookup(nil, logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(expShard), Of: 16}).Ptr()) + ids, err = ii.Lookup(nil, logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: expShard, Of: 16}).Ptr()) require.Nil(t, err) require.Equal(t, fp, ids[0]) } @@ -265,9 +264,9 @@ func Test_BitPrefixConsistentMapping(t *testing.T) { b.Add(logproto.FromLabelsToLabelAdapters(lbs), fp) } - shardMax := 8 - for i := 0; i < shardMax; i++ { - shard := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shardMax := uint32(8) + for i := uint32(0); i < shardMax; i++ { + shard := logql.NewPowerOfTwoShard(index.ShardAnnotation{ Shard: i, Of: shardMax, }).Ptr() diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 064c0ddc45ba..6536cc7f7c44 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -19,8 +19,8 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/stores/series" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) const DefaultIndexShards = 32 @@ -56,15 +56,15 @@ func NewWithShards(totalShards uint32) *InvertedIndex { } } -func (ii *InvertedIndex) getShards(shard *astmapper.ShardAnnotation) []*indexShard { +func (ii *InvertedIndex) getShards(shard *index.ShardAnnotation) []*indexShard { if shard == nil { return ii.shards } - totalRequested := int(ii.totalShards) / shard.Of + totalRequested := ii.totalShards / shard.Of result := make([]*indexShard, totalRequested) var j int - for i := 0; i < totalRequested; i++ { + for i := uint32(0); i < totalRequested; i++ { subShard := ((shard.Shard) + (i * shard.Of)) result[j] = ii.shards[subShard] j++ @@ -72,7 +72,7 @@ func (ii *InvertedIndex) getShards(shard *astmapper.ShardAnnotation) []*indexSha return result } -func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*astmapper.ShardAnnotation, error) { +func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*index.ShardAnnotation, error) { if shard == nil { return nil, nil } @@ -82,7 +82,7 @@ func (ii *InvertedIndex) validateShard(shard *logql.Shard) (*astmapper.ShardAnno return nil, errors.New("inverted index only supports shard annotations with `PowerOfTwo`") } - if int(ii.totalShards)%s.Of != 0 || uint32(s.Of) > ii.totalShards { + if ii.totalShards%s.Of != 0 || s.Of > ii.totalShards { return nil, fmt.Errorf("%w index_shard:%d query_shard:%v", ErrInvalidShardQuery, ii.totalShards, s) } return s, nil diff --git a/pkg/ingester/index/index_test.go b/pkg/ingester/index/index_test.go index 06625a357970..f34633c0c6b5 100644 --- a/pkg/ingester/index/index_test.go +++ b/pkg/ingester/index/index_test.go @@ -11,26 +11,26 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/pkg/util" ) func Test_GetShards(t *testing.T) { for _, tt := range []struct { total uint32 - shard *astmapper.ShardAnnotation + shard *index.ShardAnnotation expected []uint32 }{ // equal factors - {16, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0}}, - {16, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4}}, - {16, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}}, + {16, &index.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0}}, + {16, &index.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4}}, + {16, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15}}, // idx factor a larger multiple of schema factor - {32, &astmapper.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}}, - {32, &astmapper.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}}, - {32, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}}, - {64, &astmapper.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}}, + {32, &index.ShardAnnotation{Shard: 0, Of: 16}, []uint32{0, 16}}, + {32, &index.ShardAnnotation{Shard: 4, Of: 16}, []uint32{4, 20}}, + {32, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31}}, + {64, &index.ShardAnnotation{Shard: 15, Of: 16}, []uint32{15, 31, 47, 63}}, } { tt := tt t.Run(tt.shard.String()+fmt.Sprintf("_total_%d", tt.total), func(t *testing.T) { @@ -48,7 +48,7 @@ func Test_GetShards(t *testing.T) { func Test_ValidateShards(t *testing.T) { ii := NewWithShards(32) _, err := ii.validateShard( - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 16}).Ptr(), + logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 16}).Ptr(), ) require.NoError(t, err) } @@ -112,7 +112,7 @@ func Test_hash_mapping(t *testing.T) { ii := NewWithShards(shard) ii.Add(logproto.FromLabelsToLabelAdapters(lbs), 1) - x := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16}) + x := logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: labelsSeriesIDHash(lbs) % 16, Of: 16}) res, err := ii.Lookup([]*labels.Matcher{{Type: labels.MatchEqual, Name: "compose_project", Value: "loki-tsdb-storage-s3"}}, &x) require.NoError(t, err) require.Len(t, res, 1) @@ -136,7 +136,7 @@ func Test_NoMatcherLookup(t *testing.T) { // with shard param ii = NewWithShards(16) ii.Add(logproto.FromLabelsToLabelAdapters(lbs), 1) - x := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(labelsSeriesIDHash(lbs) % 16), Of: 16}) + x := logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: labelsSeriesIDHash(lbs) % 16, Of: 16}) ids, err = ii.Lookup(nil, &x) require.Nil(t, err) require.Equal(t, model.Fingerprint(1), ids[0]) @@ -157,9 +157,9 @@ func Test_ConsistentMapping(t *testing.T) { shardMax := 8 for i := 0; i < shardMax; i++ { - shard := logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{ - Shard: i, - Of: shardMax, + shard := logql.NewPowerOfTwoShard(index.ShardAnnotation{ + Shard: uint32(i), + Of: uint32(shardMax), }).Ptr() aIDs, err := a.Lookup([]*labels.Matcher{ diff --git a/pkg/ingester/index/multi_test.go b/pkg/ingester/index/multi_test.go index 50d5db945edb..6be07effea8d 100644 --- a/pkg/ingester/index/multi_test.go +++ b/pkg/ingester/index/multi_test.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) @@ -131,7 +130,7 @@ func TestMultiIndex(t *testing.T) { labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"), }, logql.NewPowerOfTwoShard( - astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)}, + index.ShardAnnotation{Shard: expShard, Of: factor}, ).Ptr(), ) @@ -147,7 +146,7 @@ func TestMultiIndex(t *testing.T) { []*labels.Matcher{ labels.MustNewMatcher(labels.MatchEqual, "foo", "foo"), }, - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: int(expShard), Of: int(factor)}).Ptr(), + logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: expShard, Of: factor}).Ptr(), ) require.Nil(t, err) diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index 68afe83cceea..9dbf261668a4 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -15,7 +15,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) var nilShardMetrics = NewShardMapperMetrics(nil) @@ -624,7 +624,7 @@ func TestFormat_ShardedExpr(t *testing.T) { name: "ConcatSampleExpr", in: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 3, }).Ptr(), @@ -640,7 +640,7 @@ func TestFormat_ShardedExpr(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 3, }).Ptr(), @@ -656,7 +656,7 @@ func TestFormat_ShardedExpr(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 3, }).Ptr(), diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index a1c17c86da03..3095fc0a1aaf 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -8,7 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -464,9 +464,9 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, downstreams := make([]DownstreamSampleExpr, 0, shards) expr.Operation = syntax.OpRangeTypeQuantileSketch for shard := shards - 1; shard >= 0; shard-- { - s := NewPowerOfTwoShard(astmapper.ShardAnnotation{ - Shard: shard, - Of: shards, + s := NewPowerOfTwoShard(index.ShardAnnotation{ + Shard: uint32(shard), + Of: uint32(shards), }) downstreams = append(downstreams, DownstreamSampleExpr{ shard: &s, diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index c6b8e9c4b34c..9f5757b7d8ee 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -10,7 +10,7 @@ import ( "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) func TestShardedStringer(t *testing.T) { @@ -21,7 +21,7 @@ func TestShardedStringer(t *testing.T) { { in: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -31,7 +31,7 @@ func TestShardedStringer(t *testing.T) { }, next: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -72,7 +72,7 @@ func TestMapSampleExpr(t *testing.T) { }, out: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -88,7 +88,7 @@ func TestMapSampleExpr(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -493,7 +493,7 @@ func TestMapping(t *testing.T) { in: `{foo="bar"}`, expr: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -503,7 +503,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -519,7 +519,7 @@ func TestMapping(t *testing.T) { in: `{foo="bar"} |= "error"`, expr: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -540,7 +540,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatLogSelectorExpr{ DownstreamLogSelectorExpr: DownstreamLogSelectorExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -567,7 +567,7 @@ func TestMapping(t *testing.T) { in: `rate({foo="bar"}[5m])`, expr: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -583,7 +583,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -605,7 +605,7 @@ func TestMapping(t *testing.T) { in: `count_over_time({foo="bar"}[5m])`, expr: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -621,7 +621,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -646,7 +646,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -666,7 +666,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -697,7 +697,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeTopK, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -713,7 +713,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -739,7 +739,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -759,7 +759,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -791,7 +791,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -811,7 +811,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -838,7 +838,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -858,7 +858,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -898,7 +898,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -920,7 +920,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -960,7 +960,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -980,7 +980,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1015,7 +1015,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1035,7 +1035,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1077,7 +1077,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1099,7 +1099,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1129,7 +1129,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1149,7 +1149,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1191,7 +1191,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1213,7 +1213,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1242,7 +1242,7 @@ func TestMapping(t *testing.T) { Grouping: &syntax.Grouping{}, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1262,7 +1262,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1297,7 +1297,7 @@ func TestMapping(t *testing.T) { Operation: syntax.OpTypeSum, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1322,7 +1322,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1356,7 +1356,7 @@ func TestMapping(t *testing.T) { }, Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1378,7 +1378,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), @@ -1467,7 +1467,7 @@ func TestMapping(t *testing.T) { RHS: &syntax.VectorAggregationExpr{ Left: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 0, Of: 2, }).Ptr(), @@ -1490,7 +1490,7 @@ func TestMapping(t *testing.T) { }, next: &ConcatSampleExpr{ DownstreamSampleExpr: DownstreamSampleExpr{ - shard: NewPowerOfTwoShard(astmapper.ShardAnnotation{ + shard: NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }).Ptr(), diff --git a/pkg/logql/shards.go b/pkg/logql/shards.go index 7d35cea26d76..9265dac5f0e8 100644 --- a/pkg/logql/shards.go +++ b/pkg/logql/shards.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/querier/astmapper" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" ) @@ -133,7 +134,7 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) (Shards, uint64, error) { res := make(Shards, 0, factor) for i := 0; i < factor; i++ { - res = append(res, NewPowerOfTwoShard(astmapper.ShardAnnotation{Of: factor, Shard: i})) + res = append(res, NewPowerOfTwoShard(index.ShardAnnotation{Of: uint32(factor), Shard: uint32(i)})) } return res, bytesPerShard, nil } @@ -141,7 +142,7 @@ func (s PowerOfTwoStrategy) Shards(expr syntax.Expr) (Shards, uint64, error) { // Shard represents a shard annotation // It holds either a power of two shard (legacy) or a bounded shard type Shard struct { - PowerOfTwo *astmapper.ShardAnnotation + PowerOfTwo *index.ShardAnnotation Bounded *logproto.Shard } @@ -159,7 +160,7 @@ func (s *Shard) Match(fp model.Fingerprint) bool { return v1.BoundsFromProto(s.Bounded.Bounds).Match(fp) } - return s.PowerOfTwo.TSDB().Match(fp) + return s.PowerOfTwo.Match(fp) } func (s *Shard) GetFromThrough() (model.Fingerprint, model.Fingerprint) { @@ -167,7 +168,7 @@ func (s *Shard) GetFromThrough() (model.Fingerprint, model.Fingerprint) { return v1.BoundsFromProto(s.Bounded.Bounds).GetFromThrough() } - return s.PowerOfTwo.TSDB().GetFromThrough() + return s.PowerOfTwo.GetFromThrough() } // convenience method for unaddressability concerns using constructors in literals (tests) @@ -179,7 +180,7 @@ func NewBoundedShard(shard logproto.Shard) Shard { return Shard{Bounded: &shard} } -func NewPowerOfTwoShard(shard astmapper.ShardAnnotation) Shard { +func NewPowerOfTwoShard(shard index.ShardAnnotation) Shard { return Shard{PowerOfTwo: &shard} } @@ -236,8 +237,9 @@ func ParseShard(s string) (Shard, ShardVersion, error) { } old, v1Err := astmapper.ParseShard(s) + casted := old.TSDB() if v1Err == nil { - return Shard{PowerOfTwo: &old}, PowerOfTwoVersion, nil + return Shard{PowerOfTwo: &casted}, PowerOfTwoVersion, nil } err := errors.Wrap( diff --git a/pkg/logql/shards_test.go b/pkg/logql/shards_test.go index fd0adb35f881..1a2d78889cc5 100644 --- a/pkg/logql/shards_test.go +++ b/pkg/logql/shards_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) func TestShardString(t *testing.T) { @@ -17,7 +17,7 @@ func TestShardString(t *testing.T) { }{ { shard: Shard{ - PowerOfTwo: &astmapper.ShardAnnotation{ + PowerOfTwo: &index.ShardAnnotation{ Shard: 1, Of: 2, }, @@ -62,7 +62,7 @@ func TestShardString(t *testing.T) { Max: 2, }, }, - PowerOfTwo: &astmapper.ShardAnnotation{ + PowerOfTwo: &index.ShardAnnotation{ Shard: 1, Of: 2, }, @@ -86,7 +86,7 @@ func TestParseShard(t *testing.T) { str: "1_of_2", version: PowerOfTwoVersion, exp: Shard{ - PowerOfTwo: &astmapper.ShardAnnotation{ + PowerOfTwo: &index.ShardAnnotation{ Shard: 1, Of: 2, }, @@ -140,11 +140,11 @@ func TestParseShards(t *testing.T) { strs: []string{"1_of_2", "1_of_2"}, version: PowerOfTwoVersion, exp: Shards{ - NewPowerOfTwoShard(astmapper.ShardAnnotation{ + NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }), - NewPowerOfTwoShard(astmapper.ShardAnnotation{ + NewPowerOfTwoShard(index.ShardAnnotation{ Shard: 1, Of: 2, }), diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 7f41e45be60d..8154b18fb691 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logqlmodel" - "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) func NewMockQuerier(shards int, streams []logproto.Stream) MockQuerier { @@ -34,7 +34,7 @@ type MockQuerier struct { streams []logproto.Stream } -func (q MockQuerier) extractOldShard(xs []string) (*astmapper.ShardAnnotation, error) { +func (q MockQuerier) extractOldShard(xs []string) (*index.ShardAnnotation, error) { parsed, version, err := ParseShards(xs) if err != nil { return nil, err @@ -60,7 +60,7 @@ func (q MockQuerier) SelectLogs(_ context.Context, req SelectLogParams) (iter.En matchers := expr.Matchers() - var shard *astmapper.ShardAnnotation + var shard *index.ShardAnnotation if len(req.Shards) > 0 { shard, err = q.extractOldShard(req.Shards) if err != nil { @@ -185,7 +185,7 @@ func (q MockQuerier) SelectSamples(_ context.Context, req SelectSampleParams) (i matchers := selector.Matchers() - var shard *astmapper.ShardAnnotation + var shard *index.ShardAnnotation if len(req.Shards) > 0 { shard, err = q.extractOldShard(req.Shards) if err != nil { diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index 8a305176b687..95b79d72d30a 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -22,8 +22,8 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" - "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) func testSampleStreams() []queryrangebase.SampleStream { @@ -291,7 +291,7 @@ func TestInstanceFor(t *testing.T) { Params: logql.ParamsWithShardsOverride{ Params: newParams(), ShardsOverride: logql.Shards{ - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 2}), + logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 2}), }.Encode(), }, }, @@ -299,7 +299,7 @@ func TestInstanceFor(t *testing.T) { Params: logql.ParamsWithShardsOverride{ Params: newParams(), ShardsOverride: logql.Shards{ - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 1, Of: 2}), + logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 1, Of: 2}), }.Encode(), }, }, @@ -366,7 +366,7 @@ func TestInstanceDownstream(t *testing.T) { Params: logql.ParamsWithShardsOverride{ Params: logql.ParamsWithExpressionOverride{Params: params, ExpressionOverride: expr}, ShardsOverride: logql.Shards{ - logql.NewPowerOfTwoShard(astmapper.ShardAnnotation{Shard: 0, Of: 2}), + logql.NewPowerOfTwoShard(index.ShardAnnotation{Shard: 0, Of: 2}), }.Encode(), }, },