Skip to content

Commit

Permalink
chore: [k195] fix: don't shard quantile_over_time if it's not the t…
Browse files Browse the repository at this point in the history
…op level aggregation for a query (#12343)

Co-authored-by: Callum Styan <[email protected]>
  • Loading branch information
grafanabot and cstyan authored Mar 25, 2024
1 parent bab7cfe commit a0caadf
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 61 deletions.
40 changes: 20 additions & 20 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewShardMapperMetrics(registerer prometheus.Registerer) *MapperMetrics {
func (m ShardMapper) Parse(parsed syntax.Expr) (noop bool, bytesPerShard uint64, expr syntax.Expr, err error) {
recorder := m.metrics.downstreamRecorder()

mapped, bytesPerShard, err := m.Map(parsed, recorder)
mapped, bytesPerShard, err := m.Map(parsed, recorder, true)
if err != nil {
m.metrics.ParsedQueries.WithLabelValues(FailureKey).Inc()
return false, 0, nil, err
Expand All @@ -74,7 +74,7 @@ func (m ShardMapper) Parse(parsed syntax.Expr) (noop bool, bytesPerShard uint64,
return noop, bytesPerShard, mapped, err
}

func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr, uint64, error) {
func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder, topLevel bool) (syntax.Expr, uint64, error) {
// immediately clone the passed expr to avoid mutating the original
expr, err := syntax.Clone(expr)
if err != nil {
Expand All @@ -89,19 +89,19 @@ func (m ShardMapper) Map(expr syntax.Expr, r *downstreamRecorder) (syntax.Expr,
case *syntax.MatchersExpr, *syntax.PipelineExpr:
return m.mapLogSelectorExpr(e.(syntax.LogSelectorExpr), r)
case *syntax.VectorAggregationExpr:
return m.mapVectorAggregationExpr(e, r)
return m.mapVectorAggregationExpr(e, r, topLevel)
case *syntax.LabelReplaceExpr:
return m.mapLabelReplaceExpr(e, r)
return m.mapLabelReplaceExpr(e, r, topLevel)
case *syntax.RangeAggregationExpr:
return m.mapRangeAggregationExpr(e, r)
return m.mapRangeAggregationExpr(e, r, topLevel)
case *syntax.BinOpExpr:
return m.mapBinOpExpr(e, r)
return m.mapBinOpExpr(e, r, topLevel)
default:
return nil, 0, errors.Errorf("unexpected expr type (%T) for ASTMapper type (%T) ", expr, m)
}
}

func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*syntax.BinOpExpr, uint64, error) {
func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder, topLevel bool) (*syntax.BinOpExpr, uint64, error) {
// In a BinOp expression both sides need to be either executed locally or wrapped
// into a downstream expression to be executed on the querier, since the default
// evaluator on the query frontend cannot select logs or samples.
Expand All @@ -110,7 +110,7 @@ func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*
// check if LHS is shardable by mapping the tree
// only wrap in downstream expression if the mapping is a no-op and the
// expression is a vector or literal
lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r)
lhsMapped, lhsBytesPerShard, err := m.Map(e.SampleExpr, r, topLevel)
if err != nil {
return nil, 0, err
}
Expand All @@ -124,7 +124,7 @@ func (m ShardMapper) mapBinOpExpr(e *syntax.BinOpExpr, r *downstreamRecorder) (*
// check if RHS is shardable by mapping the tree
// only wrap in downstream expression if the mapping is a no-op and the
// expression is a vector or literal
rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r)
rhsMapped, rhsBytesPerShard, err := m.Map(e.RHS, r, topLevel)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -232,8 +232,8 @@ func (m ShardMapper) wrappedShardedVectorAggr(expr *syntax.VectorAggregationExpr

// technically, std{dev,var} are also parallelizable if there is no cross-shard merging
// in descendent nodes in the AST. This optimization is currently avoided for simplicity.
func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
if expr.Shardable() {
func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr, r *downstreamRecorder, topLevel bool) (syntax.SampleExpr, uint64, error) {
if expr.Shardable(topLevel) {

switch expr.Operation {

Expand All @@ -258,7 +258,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
}, r, false)
if err != nil {
return nil, 0, err
}
Expand All @@ -267,7 +267,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr
Left: expr.Left,
Grouping: expr.Grouping,
Operation: syntax.OpTypeCount,
}, r)
}, r, false)
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -318,7 +318,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr

// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
subMapped, bytesPerShard, err := m.Map(expr.Left, r)
subMapped, bytesPerShard, err := m.Map(expr.Left, r, false)
if err != nil {
return nil, 0, err
}
Expand All @@ -336,8 +336,8 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *syntax.VectorAggregationExpr

}

func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
subMapped, bytesPerShard, err := m.Map(expr.Left, r)
func (m ShardMapper) mapLabelReplaceExpr(expr *syntax.LabelReplaceExpr, r *downstreamRecorder, topLevel bool) (syntax.SampleExpr, uint64, error) {
subMapped, bytesPerShard, err := m.Map(expr.Left, r, topLevel)
if err != nil {
return nil, 0, err
}
Expand All @@ -364,8 +364,8 @@ var rangeMergeMap = map[string]string{
syntax.OpRangeTypeMax: syntax.OpTypeMax,
}

func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder) (syntax.SampleExpr, uint64, error) {
if !expr.Shardable() {
func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr, r *downstreamRecorder, topLevel bool) (syntax.SampleExpr, uint64, error) {
if !expr.Shardable(topLevel) {
return noOp(expr, m.shards)
}

Expand Down Expand Up @@ -418,7 +418,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
}, r, false)
if err != nil {
return nil, 0, err
}
Expand All @@ -436,7 +436,7 @@ func (m ShardMapper) mapRangeAggregationExpr(expr *syntax.RangeAggregationExpr,
},
Grouping: expr.Grouping,
Operation: syntax.OpTypeSum,
}, r)
}, r, false)
if err != nil {
return nil, 0, err
}
Expand Down
69 changes: 55 additions & 14 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ func TestMappingStrings(t *testing.T) {
in string
out string
}{
// NOTE (callum):These two queries containing quantile_over_time should result in
// the same unsharded of the max and not the inner quantile regardless of whether
// quantile sharding is turned on. This should be the case even if the inner quantile
// does not contain a grouping until we decide whether to do the further optimization
// of sharding the inner quantile.
{
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]))`,
out: `maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s]))`,
},
{
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (baz))`,
out: `maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])by(baz))`,
},
{
in: `{foo="bar"}`,
out: `downstream<{foo="bar"}, shard=0_of_2>
Expand Down Expand Up @@ -363,10 +376,6 @@ func TestMappingStrings(t *testing.T) {
)
)`,
},
{ // This should result in the same downstream sharding of the max and not the inner quantile regardless of whether quantile sharding is turned on.
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]))`,
out: `maxby(status)(downstream<maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])),shard=0_of_2>++downstream<maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])),shard=1_of_2>)`,
},
// should be noop if VectorExpr
{
in: `vector(0)`,
Expand Down Expand Up @@ -413,7 +422,7 @@ func TestMappingStrings(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)
require.Nil(t, err)

mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder())
mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder(), true)
require.Nil(t, err)

require.Equal(t, removeWhiteSpace(tc.out), removeWhiteSpace(mapped.String()))
Expand All @@ -424,28 +433,44 @@ func TestMappingStrings(t *testing.T) {
// Test that mapping of queries for operation types that have probabilistic
// sharding options, but whose sharding is turned off, are not sharded on those operations.
func TestMappingStrings_NoProbabilisticSharding(t *testing.T) {
m := NewShardMapper(ConstantShards(2), nilShardMetrics, []string{})
for _, tc := range []struct {
in string
out string
}{
{ // This should result in the same downstream sharding of the max and not the inner quantile regardless of whether quantile sharding is turned on.
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]))`,
out: `maxby(status)(downstream<maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])),shard=0_of_2>++downstream<maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])),shard=1_of_2>)`,
// NOTE (callum):These two queries containing quantile_over_time should result in
// the same unsharded of the max and not the inner quantile regardless of whether
// quantile sharding is turned on. This should be the case even if the inner quantile
// does not contain a grouping until we decide whether to do the further optimization
// of sharding the inner quantile.
{
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (baz))`,
out: `maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])by(baz))`,
},
{
in: `quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s])`,
out: `quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s])`,
in: `max by (status)(quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]))`,
out: `maxby(status)(quantile_over_time(0.7,{a=~".+"}|logfmt|unwrapvalue[1s]))`,
},
} {
t.Run(tc.in, func(t *testing.T) {
shardedMapper := NewShardMapper(ConstantShards(2), nilShardMetrics, []string{ShardQuantileOverTime})

ast, err := syntax.ParseExpr(tc.in)
require.Nil(t, err)

mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder())
sharded, _, err := shardedMapper.Map(ast, nilShardMetrics.downstreamRecorder(), true)
require.Nil(t, err)

require.Equal(t, removeWhiteSpace(tc.out), removeWhiteSpace(mapped.String()))
require.Equal(t, removeWhiteSpace(tc.out), removeWhiteSpace(sharded.String()))

unshardedMapper := NewShardMapper(ConstantShards(2), nilShardMetrics, []string{})

ast, err = syntax.ParseExpr(tc.in)
require.Nil(t, err)

unsharded, _, err := unshardedMapper.Map(ast, nilShardMetrics.downstreamRecorder(), true)
require.Nil(t, err)

require.Equal(t, removeWhiteSpace(tc.out), removeWhiteSpace(unsharded.String()))
})
}
}
Expand Down Expand Up @@ -1373,6 +1398,22 @@ func TestMapping(t *testing.T) {
},
},
},
{
in: `quantile_over_time(0.8, {foo="bar"} | unwrap bytes [5m])`,
expr: &syntax.RangeAggregationExpr{
Operation: syntax.OpRangeTypeQuantile,
Params: float64p(0.8),
Left: &syntax.LogRange{
Left: &syntax.MatchersExpr{
Mts: []*labels.Matcher{mustNewMatcher(labels.MatchEqual, "foo", "bar")},
},
Unwrap: &syntax.UnwrapExpr{
Identifier: "bytes",
},
Interval: 5 * time.Minute,
},
},
},
{
in: `quantile_over_time(0.8, {foo="bar"} | unwrap bytes [5m]) by (cluster)`,
expr: &syntax.RangeAggregationExpr{
Expand Down Expand Up @@ -1546,7 +1587,7 @@ func TestMapping(t *testing.T) {
ast, err := syntax.ParseExpr(tc.in)
require.Equal(t, tc.err, err)

mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder())
mapped, _, err := m.Map(ast, nilShardMetrics.downstreamRecorder(), true)
switch e := mapped.(type) {
case syntax.SampleExpr:
optimized, err := optimizeSampleExpr(e)
Expand Down
Loading

0 comments on commit a0caadf

Please sign in to comment.