Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(blooms): bloom integration in query planning #12208

Merged
merged 72 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
b831d07
streaming GetShards on idx-gw + some refactoring
owen-d Mar 14, 2024
91ee79e
moves FingerprintBounds to logproto pkg to avoid circular imports
owen-d Mar 14, 2024
2db2852
proto alignment, wiring `GetShards()` through storage
owen-d Mar 14, 2024
6801fbb
tsdb index `ForSeries()` can signal a stop to iteration
owen-d Mar 14, 2024
c2200a0
ForSeries in tsdb.Index ifc
owen-d Mar 14, 2024
db1c830
pr feedback
owen-d Mar 14, 2024
693c48b
removes pointers in protos
owen-d Mar 14, 2024
808d88b
gateway `GetShards` impl
owen-d Mar 14, 2024
391093f
[wip] integrating bound ranges into `GetShards()`
owen-d Mar 14, 2024
42dccdd
generic result accumulator
owen-d Mar 15, 2024
cc105e0
indexclient getshards impl
owen-d Mar 15, 2024
b355b9f
comment
owen-d Mar 15, 2024
14e4815
thread plan+predicate
owen-d Mar 15, 2024
c42344e
[wip] HasForSeries wiring
owen-d Mar 15, 2024
3052e2f
finish HasForSeries impl, move ifc to index pkg
owen-d Mar 15, 2024
548fe30
moves HasForSeries to StatsReader
owen-d Mar 15, 2024
272e06f
new sharding pkg
owen-d Mar 15, 2024
cfb38a5
removes bounds from shardsrequest
owen-d Mar 15, 2024
8988232
HasForSeries accepts timerange + composite store impl
owen-d Mar 15, 2024
300d710
resultAccumulator doesnt run merge for single list
owen-d Mar 15, 2024
1ea82b1
sorts MultiIndex.GetChunkRefs during merge
owen-d Mar 15, 2024
6bad531
naive implementation of bloom-accelerated sharding calculation in gat…
owen-d Mar 16, 2024
09f1618
extracts & tests accumulating chunks to shards
owen-d Mar 16, 2024
6b8feac
moves common sharding logic to shared pkg, lets idx-gw fallback to St…
owen-d Mar 17, 2024
3629b7b
GatewayClient.poolDoWithStrategy to avoid cascading requests during r…
owen-d Mar 17, 2024
a6e0506
shardsrequest compat.go
owen-d Mar 18, 2024
1961eb8
shardsresonse extensions.go
owen-d Mar 18, 2024
27599b7
removes plan from ShardsRequest in favor of a query string which can …
owen-d Mar 18, 2024
fdbe1fa
queryrange+codec shardrequesting support + some cleanup to use query …
owen-d Mar 18, 2024
42fe9cf
ShardResolver.GetShardingRanges support
owen-d Mar 18, 2024
3704abc
querier, http wiring for shards request
owen-d Mar 18, 2024
8854fb4
json tags for shard protos
owen-d Mar 18, 2024
32d400d
[unrelated] fixes double-stats-requesting on quantiles
owen-d Mar 19, 2024
f8ff1fb
multiple logql shard reprs
owen-d Mar 19, 2024
8118844
logql sharding strategy
owen-d Mar 19, 2024
21a2edd
test alignment
owen-d Mar 19, 2024
b9f6121
shard parsing returns verison
owen-d Mar 19, 2024
29cb72a
integrating new shards into label injection
owen-d Mar 19, 2024
7bb381f
aligns inclusivity expectations between sharding impls for Fingerprin…
owen-d Mar 19, 2024
b5f2501
integrates shard-bounds into ingesters
owen-d Mar 19, 2024
e54db18
tsdb sharding strategy
owen-d Mar 19, 2024
48258a0
ingester fixes
owen-d Mar 19, 2024
e7b4930
fix test + docs
owen-d Mar 19, 2024
f476128
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 19, 2024
f4b4e86
fix ifc alignment
owen-d Mar 19, 2024
189b4e3
gen files
owen-d Mar 19, 2024
91277c8
linting, circular imports, default impls for mocks
owen-d Mar 20, 2024
3f14f1a
preserve old indexreaderwriter behavior
owen-d Mar 20, 2024
eda6ef8
goimports -s
owen-d Mar 20, 2024
1f22cee
legacy index reader getshards
owen-d Mar 20, 2024
f24c768
adds pre+postfilter chunkref histograms to idx-gws bloom-accelerated …
owen-d Mar 20, 2024
9049d97
corrects shard routing in middleware
owen-d Mar 20, 2024
5d0d1a0
proto response wrapping for new shards types
owen-d Mar 20, 2024
66e5998
promauto for indexgateway metrics
owen-d Mar 20, 2024
5300648
proto roundtripping + lint
owen-d Mar 20, 2024
5f82560
s/table-compaction-period/table-offset/g
owen-d Mar 21, 2024
60b305b
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 21, 2024
37edc8b
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 21, 2024
b0350ae
index statistics
owen-d Mar 22, 2024
5b70e96
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
db506cb
stats cleanup and proper propagation
owen-d Mar 22, 2024
5ca8d2f
routing stats through store
owen-d Mar 22, 2024
d43b3f7
gatewayclient propagates stats
owen-d Mar 22, 2024
52df554
logging fixes
owen-d Mar 22, 2024
ce33787
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
3e96e75
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 22, 2024
1e77cee
skip (unhealthy?) bloom-gws with no token range
owen-d Mar 22, 2024
fed5262
correctly merges stats in collector middleware for bloom data, some a…
owen-d Mar 22, 2024
2805339
v1 metrics wired up through read path
owen-d Mar 23, 2024
11d8b42
optional pool support for bloom pages
owen-d Mar 26, 2024
ade14a9
Merge remote-tracking branch 'upstream/main' into blooms/query-planni…
owen-d Mar 26, 2024
463dafd
test signature
owen-d Mar 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,454 changes: 1,430 additions & 24 deletions pkg/logproto/indexgateway.pb.go

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions pkg/logproto/indexgateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

package indexgatewaypb;

import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";

option go_package = "github.com/grafana/loki/pkg/logproto";
Expand All @@ -25,4 +26,44 @@ service IndexGateway {
// Note: this MUST be the same as the variant defined in
// logproto.proto on the Querier service.
rpc GetVolume(logproto.VolumeRequest) returns (logproto.VolumeResponse) {}

// GetShards is an optimized implemented shard-planning implementation
// on the index gateway and not on the ingester.
rpc GetShards(ShardsRequest) returns (stream ShardsResponse);
}

message ShardsRequest {
int64 from = 1 [
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
int64 through = 2 [
(gogoproto.customtype) = "github.com/prometheus/common/model.Time",
(gogoproto.nullable) = false
];
string matchers = 3;
uint64 target_bytes_per_shard = 4;
repeated FPBounds shards = 5;
}

message ShardsResponse {
repeated Shard shards = 1;
}

message Shard {
FPBounds bounds = 1 [
(gogoproto.nullable) = false
];
logproto.IndexStatsResponse stats = 2;
}

// FPBounds is identical to the definition in `pkg/storage/bloom/v1/bounds.FingerprintBounds`
// which ensures we can cast between them without allocations.
message FPBounds {
uint64 min = 1 [
(gogoproto.casttype) = "github.com/prometheus/common/model.Fingerprint"
];
uint64 max = 2 [
(gogoproto.casttype) = "github.com/prometheus/common/model.Fingerprint"
];
}
27 changes: 21 additions & 6 deletions pkg/storage/bloom/v1/bounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"fmt"
"hash"
"strings"
"unsafe"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/encoding"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"

"github.com/grafana/loki/pkg/util/encoding"
)

type BoundsCheck uint8
Expand All @@ -20,6 +21,24 @@ const (
After
)

type FingerprintBounds struct {
Min, Max model.Fingerprint
}

// Proto compat
// compiler check ensuring equal repr of underlying types
var _ FingerprintBounds = FingerprintBounds(logproto.FPBounds{})

func BoundsFromProto(pb logproto.FPBounds) FingerprintBounds {
return FingerprintBounds(pb)
}

// Unsafe cast to avoid allocation. This _requires_ that the underlying types are the same
// which is checked by the compiler above
func MultiBoundsFromProto(pb []logproto.FPBounds) MultiFingerprintBounds {
return MultiFingerprintBounds(*(*MultiFingerprintBounds)(unsafe.Pointer(&pb)))
}

// ParseBoundsFromAddr parses a fingerprint bounds from a string
func ParseBoundsFromAddr(s string) (FingerprintBounds, error) {
parts := strings.Split(s, "-")
Expand All @@ -40,10 +59,6 @@ func ParseBoundsFromParts(a, b string) (FingerprintBounds, error) {
return NewBounds(minFingerprint, maxFingerprint), nil
}

type FingerprintBounds struct {
Min, Max model.Fingerprint
}

func NewBounds(min, max model.Fingerprint) FingerprintBounds {
return FingerprintBounds{Min: min, Max: max}
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/bloom/v1/bounds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,30 @@ package v1
import (
"testing"

"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

func TestBoundsFromProto(t *testing.T) {
bounds := BoundsFromProto(logproto.FPBounds{
Min: 10,
Max: 2000,
})
assert.Equal(t, NewBounds(10, 2000), bounds)
}

func TestMultiBoundsFromProto(t *testing.T) {
bounds := MultiBoundsFromProto([]logproto.FPBounds{
{Min: 10, Max: 2000},
{Min: 2001, Max: 4000},
})
assert.Equal(t, MultiFingerprintBounds{
NewBounds(10, 2000),
NewBounds(2001, 4000),
}, bounds)
}

func Test_ParseFingerprint(t *testing.T) {
t.Parallel()
fp, err := model.ParseFingerprint("7d0")
Expand Down
35 changes: 35 additions & 0 deletions pkg/storage/stores/composite_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,41 @@ func (c CompositeStore) Volume(ctx context.Context, userID string, from, through
return res, err
}

func (c CompositeStore) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error) {
// TODO(owen-d): improve. Since shards aren't easily merge-able,
// we choose the store which returned the highest shard count
var groups [][]*logproto.Shard
err := c.forStores(ctx, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
shards, err := store.GetShards(innerCtx, userID, from, through, targetBytesPerShard, matchers...)
if err != nil {
return err
}
groups = append(groups, shards)
return nil
})

switch {
case err != nil:
return nil, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move the error handling into a regular if err != nil statement

case len(groups) == 1:
return groups[0], nil
case len(groups) == 0:
return nil, nil
default:
sort.Slice(groups, func(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are looking for the biggest group, you don't need to sort this slice. Instead, you can use slices.MinFunc O(n).

return len(groups[i]) > len(groups[j])
})
return groups[0], nil
}

}

func (c CompositeStore) GetChunkFetcher(tm model.Time) *fetcher.Fetcher {
// find the schema with the lowest start _after_ tm
j := sort.Search(len(c.stores), func(j int) bool {
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/stores/composite_store_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,21 @@ func (c *storeEntry) Stats(ctx context.Context, userID string, from, through mod
return c.indexReader.Stats(ctx, userID, from, through, matchers...)
}

func (c *storeEntry) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error) {
_, err := c.validateQueryTimeRange(ctx, userID, &from, &through)
if err != nil {
return nil, err
}

return c.indexReader.GetShards(ctx, userID, from, through, targetBytesPerShard, matchers...)
}

func (c *storeEntry) Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SeriesStore.Volume")
defer sp.Finish()
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/stores/composite_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (m mockStore) Volume(_ context.Context, _ string, _, _ model.Time, _ int32,
return nil, nil
}

func (m mockStore) GetShards(_ context.Context, _ string, _, _ model.Time, _ uint64, _ ...*labels.Matcher) ([]*logproto.Shard, error) {
return nil, nil
}

func (m mockStore) Stop() {}

func TestCompositeStore(t *testing.T) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/stores/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ type BaseReader interface {
type StatsReader interface {
Stats(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) (*stats.Stats, error)
Volume(ctx context.Context, userID string, from, through model.Time, limit int32, targetLabels []string, aggregateBy string, matchers ...*labels.Matcher) (*logproto.VolumeResponse, error)
GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error)
}

type Reader interface {
Expand Down Expand Up @@ -137,6 +144,24 @@ func (m MonitoredReaderWriter) Volume(ctx context.Context, userID string, from,
return vol, nil
}

func (m MonitoredReaderWriter) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error) {
var shards []*logproto.Shard
if err := loki_instrument.TimeRequest(ctx, "shards", instrument.NewHistogramCollector(m.metrics.indexQueryLatency), instrument.ErrorCode, func(ctx context.Context) error {
var err error
shards, err = m.rw.GetShards(ctx, userID, from, through, targetBytesPerShard, matchers...)
return err
}); err != nil {
return nil, err
}
return shards, nil
}

func (m MonitoredReaderWriter) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
m.rw.SetChunkFilterer(chunkFilter)
}
Expand Down
36 changes: 34 additions & 2 deletions pkg/storage/stores/series/series_index_gateway_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,26 @@ import (
"github.com/grafana/loki/pkg/storage/stores/index/stats"
)

// NB(owen-d): mostly modeled off of the proto-generated `logproto.IndexGatewayClient`,
// but decoupled from explicit GRPC dependencies to work well with streaming grpc methods
type GatewayClient interface {
GetChunkRef(ctx context.Context, in *logproto.GetChunkRefRequest) (*logproto.GetChunkRefResponse, error)
GetSeries(ctx context.Context, in *logproto.GetSeriesRequest) (*logproto.GetSeriesResponse, error)
LabelNamesForMetricName(ctx context.Context, in *logproto.LabelNamesForMetricNameRequest) (*logproto.LabelResponse, error)
LabelValuesForMetricName(ctx context.Context, in *logproto.LabelValuesForMetricNameRequest) (*logproto.LabelResponse, error)
GetStats(ctx context.Context, in *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error)
GetVolume(ctx context.Context, in *logproto.VolumeRequest) (*logproto.VolumeResponse, error)

GetShards(ctx context.Context, in *logproto.ShardsRequest) (*logproto.ShardsResponse, error)
}

// IndexGatewayClientStore implements pkg/storage/stores/index.ReaderWriter
type IndexGatewayClientStore struct {
client logproto.IndexGatewayClient
client GatewayClient
logger log.Logger
}

func NewIndexGatewayClientStore(client logproto.IndexGatewayClient, logger log.Logger) *IndexGatewayClientStore {
func NewIndexGatewayClientStore(client GatewayClient, logger log.Logger) *IndexGatewayClientStore {
return &IndexGatewayClientStore{
client: client,
logger: logger,
Expand Down Expand Up @@ -111,6 +124,25 @@ func (c *IndexGatewayClientStore) Volume(ctx context.Context, _ string, from, th
})
}

func (c *IndexGatewayClientStore) GetShards(
ctx context.Context,
_ string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error) {
resp, err := c.client.GetShards(ctx, &logproto.ShardsRequest{
From: from,
Through: through,
Matchers: (&syntax.MatchersExpr{Mts: matchers}).String(),
TargetBytesPerShard: targetBytesPerShard,
})
if err != nil {
return nil, err
}
return resp.Shards, nil
}

func (c *IndexGatewayClientStore) SetChunkFilterer(_ chunk.RequestChunkFilterer) {
level.Warn(c.logger).Log("msg", "SetChunkFilterer called on index gateway client store, but it does not support it")
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/storage/stores/series/series_index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package series

import (
"context"
"errors"
"fmt"
"sort"
"sync"
Expand Down Expand Up @@ -751,10 +752,20 @@ func (c *IndexReaderWriter) convertChunkIDsToChunkRefs(_ context.Context, userID

// old index stores do not implement stats -- skip
func (c *IndexReaderWriter) Stats(_ context.Context, _ string, _, _ model.Time, _ ...*labels.Matcher) (*stats.Stats, error) {
return nil, nil
return nil, errors.New("unimplemented Stats() on legacy index stores")
}

// old index stores do not implement label volume -- skip
func (c *IndexReaderWriter) Volume(_ context.Context, _ string, _, _ model.Time, _ int32, _ []string, _ string, _ ...*labels.Matcher) (*logproto.VolumeResponse, error) {
return nil, nil
return nil, errors.New("unimplemented Volume() on legacy index stores")
}

func (c *IndexReaderWriter) GetShards(
ctx context.Context,
userID string,
from, through model.Time,
targetBytesPerShard uint64,
matchers ...*labels.Matcher,
) ([]*logproto.Shard, error) {
return nil, errors.New("unimplemented GetShards() on legacy index stores")
}
Loading
Loading