Skip to content

Commit

Permalink
backport #11905 to k190 (#12013)
Browse files Browse the repository at this point in the history
Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan authored Feb 20, 2024
1 parent bdea0b6 commit 8bbb892
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 7 deletions.
51 changes: 50 additions & 1 deletion pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestMappingEquivalenceSketches(t *testing.T) {
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
t.Run(tc.query+"_range", func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
start,
Expand Down Expand Up @@ -178,6 +178,40 @@ func TestMappingEquivalenceSketches(t *testing.T) {

relativeError(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix), tc.realtiveError)
})
t.Run(tc.query+"_instant", func(t *testing.T) {
// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
params, err := NewLiteralParams(
tc.query,
time.Unix(0, int64(rounds+1)),
time.Unix(0, int64(rounds+1)),
0,
0,
logproto.FORWARD,
uint32(limit),
nil,
)
require.NoError(t, err)
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics, []string{ShardQuantileOverTime})
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: mapped,
})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError)
})
}
}

Expand Down Expand Up @@ -546,6 +580,21 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64)
}
}

func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha float64) {
require.Len(t, actual, len(expected))

e := make([]float64, len(expected))
a := make([]float64, len(expected))
for i := 0; i < len(expected); i++ {
require.Equal(t, expected[i].Metric, actual[i].Metric)

e[i] = expected[i].F
a[i] = expected[i].F
}
require.InEpsilonSlice(t, e, a, alpha)

}

func TestFormat_ShardedExpr(t *testing.T) {
oldMax := syntax.MaxCharsPerLine
syntax.MaxCharsPerLine = 20
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries)
case ProbabilisticQuantileVector:
return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params)
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
return nil, fmt.Errorf("unsupported result type: %T", r)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,17 @@ func (r *quantileSketchBatchRangeVectorIterator) agg(samples []promql.FPoint) sk
return s
}

// JoinQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix.
func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator, params Params) (promql_parser.Value, error) {
// MergeQuantileSketchVector joins the results from stepEvaluator into a ProbabilisticQuantileMatrix.
func MergeQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluator, params Params) (promql_parser.Value, error) {
vec := r.QuantileSketchVec()
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}

if GetRangeType(params) == InstantType {
return ProbabilisticQuantileMatrix{vec}, nil
}

stepCount := int(math.Ceil(float64(params.End().Sub(params.Start()).Nanoseconds()) / float64(params.Step().Nanoseconds())))
if stepCount <= 0 {
stepCount = 1
Expand Down
6 changes: 4 additions & 2 deletions pkg/logql/quantile_over_time_sketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestJoinQuantileSketchVectorError(t *testing.T) {
ev := errorStepEvaluator{
err: errors.New("could not evaluate"),
}
_, err := JoinQuantileSketchVector(true, result, ev, LiteralParams{})
_, err := MergeQuantileSketchVector(true, result, ev, LiteralParams{})
require.ErrorContains(t, err, "could not evaluate")
}

Expand Down Expand Up @@ -136,7 +136,7 @@ func BenchmarkJoinQuantileSketchVector(b *testing.B) {
iter: iter,
}
_, _, r := ev.Next()
m, err := JoinQuantileSketchVector(true, r.QuantileSketchVec(), ev, params)
m, err := MergeQuantileSketchVector(true, r.QuantileSketchVec(), ev, params)
require.NoError(b, err)
m.(ProbabilisticQuantileMatrix).Release()
}
Expand All @@ -148,7 +148,9 @@ func BenchmarkQuantileBatchRangeVectorIteratorAt(b *testing.B) {
}{
{numberSamples: 1},
{numberSamples: 1_000},
{numberSamples: 10_000},
{numberSamples: 100_000},
{numberSamples: 1_000_000},
} {
b.Run(fmt.Sprintf("%d-samples", tc.numberSamples), func(b *testing.B) {
r := rand.New(rand.NewSource(42))
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/sketch/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const relativeAccuracy = 0.01
var ddsketchPool = sync.Pool{
New: func() any {
m, _ := mapping.NewCubicallyInterpolatedMapping(relativeAccuracy)
return ddsketch.NewDDSketchFromStoreProvider(m, store.SparseStoreConstructor)
return ddsketch.NewDDSketch(m, store.NewCollapsingLowestDenseStore(2048), store.NewCollapsingLowestDenseStore(2048))
},
}

Expand Down

0 comments on commit 8bbb892

Please sign in to comment.