Skip to content

Commit

Permalink
Support sharding of binary operation for non shardable side. (#11635)
Browse files Browse the repository at this point in the history
Binary expressions with one un-shardable side have been broken for quite some time. The error `unimplemented` however was shadowed by some other code. It surfaced only with a fix for an issue with `quantile_over_time` with was solved with #11629  

---------

Signed-off-by: Christian Haudum <[email protected]>
Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
jeschkies and chaudum authored Jan 10, 2024
1 parent 8b48a18 commit 03156ed
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,11 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre
type errorQuerier struct{}

func (errorQuerier) SelectLogs(_ context.Context, _ SelectLogParams) (iter.EntryIterator, error) {
return nil, errors.New("unimplemented")
return nil, errors.New("SelectLogs unimplemented: the query-frontend cannot evaluate an expression that selects logs. this is likely a bug in the query engine. please contact your system operator")
}

func (errorQuerier) SelectSamples(_ context.Context, _ SelectSampleParams) (iter.SampleIterator, error) {
return nil, errors.New("unimplemented")
return nil, errors.New("SelectSamples unimplemented: the query-frontend cannot evaluate an expression that selects samples. this is likely a bug in the query engine. please contact your system operator")
}

func NewDownstreamEvaluator(downstreamer Downstreamer) *DownstreamEvaluator {
Expand Down
8 changes: 8 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ func TestMappingEquivalence(t *testing.T) {
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`avg_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, true},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s])`, true},
{
`
(quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a) > 1)
and
avg by (a) (rate({a=~".+"}[1s]))
`,
false,
},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down
104 changes: 73 additions & 31 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ func (m ShardMapper) Parse(parsed syntax.Expr) (noop bool, bytesPerShard uint64,
return false, 0, nil, err
}

originalStr := parsed.String()
mappedStr := mapped.String()
noop = originalStr == mappedStr
noop = isNoOp(parsed, mapped)
if noop {
m.metrics.ParsedQueries.WithLabelValues(NoopKey).Inc()
} else {
Expand Down Expand Up @@ -97,32 +95,62 @@ func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr,
case *syntax.RangeAggregationExpr:
return m.mapRangeAggregationExpr(e, r)
case *syntax.BinOpExpr:
lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r)
if err != nil {
return nil, 0, err
}
rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r)
if err != nil {
return nil, 0, err
}
lhsSampleExpr, ok := lhsMapped.(syntax.SampleExpr)
if !ok {
return nil, 0, badASTMapping(lhsMapped)
}
rhsSampleExpr, ok := rhsMapped.(syntax.SampleExpr)
if !ok {
return nil, 0, badASTMapping(rhsMapped)
return m.mapBinOpExpr(e, r)
default:
return nil, 0, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}
}

func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*syntax.BinOpExpr, uint64, error) {
// In a BinOp expression both sides need to be either executed locally or wrapped
// into a downstream expression to be executed on the querier, since the default
// evaluator on the query frontend cannot select logs or samples.
// However, it can evaluate literals and vectors.

// check if LHS is shardable by mapping the tree
// only wrap in downstream expression if the mapping is a no-op and the
// expression is a vector or literal
lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r)
if err != nil {
return nil, 0, err
}
if isNoOp(e.SampleExpr, lhsMapped) && !isLiteralOrVector(lhsMapped) {
lhsMapped = DownstreamSampleExpr{
shard: nil,
SampleExpr: e.SampleExpr,
}
e.SampleExpr = lhsSampleExpr
e.RHS = rhsSampleExpr
}

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))
// check if RHS is shardable by mapping the tree
// only wrap in downstream expression if the mapping is a no-op and the
// expression is a vector or literal
rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r)
if err != nil {
return nil, 0, err
}
if isNoOp(e.SampleExpr, rhsMapped) && !isLiteralOrVector(rhsMapped) {
// TODO: check if literal or vector
rhsMapped = DownstreamSampleExpr{
shard: nil,
SampleExpr: e.RHS,
}
}

return e, bytesPerShard, nil
default:
return nil, 0, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
lhsSampleExpr, ok := lhsMapped.(syntax.SampleExpr)
if !ok {
return nil, 0, badASTMapping(lhsMapped)
}
rhsSampleExpr, ok := rhsMapped.(syntax.SampleExpr)
if !ok {
return nil, 0, badASTMapping(rhsMapped)
}
e.SampleExpr = lhsSampleExpr
e.RHS = rhsSampleExpr

// We take the maximum bytes per shard of both sides of the operation
bytesPerShard := uint64(math.Max(int(lhsBytesPerShard), int(rhsBytesPerShard)))

return e, bytesPerShard, nil
}

func (m ShardMapper) mapLogSelectorExpr(expr syntax.LogSelectorExpr, r *downstreamRecorder) (syntax.LogSelectorExpr, uint64, error) {
Expand Down Expand Up @@ -338,7 +366,7 @@ var rangeMergeMap = map[string]string{

func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
if !expr.Shardable() {
return m.noOp(expr)
return noOp(expr, m.shards)
}

switch expr.Operation {
Expand Down Expand Up @@ -433,7 +461,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
return nil, 0, err
}
if shards == 0 || !m.quantileOverTimeSharding {
return m.noOp(expr)
return noOp(expr, m.shards)
}

// quantile_over_time() by (foo) ->
Expand Down Expand Up @@ -461,18 +489,32 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,

default:
// don't shard if there's not an appropriate optimization
return m.noOp(expr)
return noOp(expr, m.shards)
}
}

func (m ShardMapper) noOp(expr *syntax.RangeAggregationExpr) (syntax.SampleExpr, uint64, error) {
exprStats, err := m.shards.GetStats(expr)
func noOp[E syntax.Expr](expr E, shards ShardResolver) (E, uint64, error) {
exprStats, err := shards.GetStats(expr)
if err != nil {
return nil, 0, err
var empty E
return empty, 0, err
}
return expr, exprStats.Bytes, nil
}

func isNoOp(left syntax.Expr, right syntax.Expr) bool {
return left.String() == right.String()
}

func isLiteralOrVector(e syntax.Expr) bool {
switch e.(type) {
case *syntax.VectorExpr, *syntax.LiteralExpr:
return true
default:
return false
}
}

func badASTMapping(got syntax.Expr) error {
return fmt.Errorf("bad AST mapping: expected SampleExpr, but got (%T)", got)
}
91 changes: 89 additions & 2 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,93 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `
quantile_over_time(0.99, {a="foo"} | unwrap bytes [1s]) by (b)
and
sum by (b) (rate({a="bar"}[1s]))
`,
expr: &syntax.BinOpExpr{
SampleExpr: DownstreamSampleExpr{
SampleExpr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeQuantile,
Params: float64p(0.99),
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "foo")},
},
Unwrap: &syntax.UnwrapExpr{
Identifier: "bytes",
},
Interval: 1 * time.Second,
},
Grouping: &syntax.Grouping{
Groups: []string{"b"},
},
},
},
RHS: &syntax.VectorAggregationExpr{
Left: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 0,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "bar")},
},
Interval: 1 * time.Second,
},
Operation: syntax.OpRangeTypeRate,
},
Grouping: &syntax.Grouping{
Groups: []string{"b"},
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: &ConcatSampleExpr{
DownstreamSampleExpr: DownstreamSampleExpr{
shard: &astmapper.ShardAnnotation{
Shard: 1,
Of: 2,
},
SampleExpr: &syntax.VectorAggregationExpr{
Left: &syntax.RangeAggregationExpr{
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "a", "bar")},
},
Interval: 1 * time.Second,
},
Operation: syntax.OpRangeTypeRate,
},
Grouping: &syntax.Grouping{
Groups: []string{"b"},
},
Params: 0,
Operation: syntax.OpTypeSum,
},
},
next: nil,
},
},
Grouping: &syntax.Grouping{
Groups: []string{"b"},
},
Operation: syntax.OpTypeSum,
},
Op: syntax.OpTypeAnd,
Opts: &syntax.BinOpOptions{
ReturnBool: false,
VectorMatching: &syntax.VectorMatching{},
},
},
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)
Expand All @@ -1367,8 +1454,8 @@ func TestMapping(t *testing.T) {
mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder())

require.Equal(t, tc.err, err)
require.Equal(t, tc.expr.String(), mapped.String())
require.Equal(t, tc.expr, mapped)
require.Equal(t, mapped.String(), tc.expr.String())
require.Equal(t, mapped, tc.expr)
})
}
}
Expand Down
1 change: 0 additions & 1 deletion tools/dev/loki-boltdb-storage-s3/config/loki.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ ingester_client:
remote_timeout: 1s
limits_config:
cardinality_limit: 100000
enforce_metric_name: false
ingestion_burst_size_mb: 5
ingestion_rate_mb: 2
ingestion_rate_strategy: global
Expand Down

0 comments on commit 03156ed

Please sign in to comment.