Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Introduce shardable probabilistic topk for instant queries. #14243

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c917201
feat: shard instant topk queries with count min sketch.
jeschkies Sep 23, 2024
66dc3f0
Make it compile
jeschkies Sep 23, 2024
215754c
Update range
jeschkies Sep 23, 2024
436302f
Fix a few testing bugs
jeschkies Sep 24, 2024
6e53380
Calculate width and depth
jeschkies Sep 24, 2024
91133e7
Format code
jeschkies Sep 24, 2024
9a07276
Remove redundant return
jeschkies Sep 24, 2024
6dab358
Keep a limited number of labels
jeschkies Sep 27, 2024
7849984
Save one metric to string
jeschkies Sep 27, 2024
c9726cc
Convert limited label vector
jeschkies Sep 27, 2024
947a924
Rename vector
jeschkies Sep 27, 2024
33acd3b
Remove todo
jeschkies Sep 27, 2024
9ac2030
Checkin heap test
jeschkies Sep 27, 2024
acb7754
Fix vector name
jeschkies Sep 30, 2024
ad742f1
Fix marshalling
jeschkies Oct 2, 2024
6762512
Remove commented code from shard_resolver.
jeschkies Oct 2, 2024
c7c9ad4
Actually merge metrics
jeschkies Oct 2, 2024
3c3024e
Validate that approx_topk is not using groups
jeschkies Oct 2, 2024
a6f9053
support unsharded approx_topk
jeschkies Oct 3, 2024
09f7ee1
Support per tenant activation
jeschkies Oct 3, 2024
71c02fa
Make the max heap size configurable
jeschkies Oct 3, 2024
7d71609
Test shard mapper
jeschkies Oct 3, 2024
ba2ecec
Test protobuf serialization
jeschkies Oct 3, 2024
5ae7998
Remove nil group
jeschkies Oct 3, 2024
385e161
Test approx_topk integration
jeschkies Oct 3, 2024
4ccead2
Merge remote-tracking branch 'grafana/main' into karsten/ptopk
jeschkies Oct 3, 2024
ac494bd
Satisfy linter
jeschkies Oct 3, 2024
43cf741
Define separate approx topk equivalence test
jeschkies Oct 3, 2024
ecc9050
Correct error string
jeschkies Oct 3, 2024
24edc7b
Add more test cases to show how accurate the result for each item in the
cstyan Oct 4, 2024
681cd2e
Add another test case
jeschkies Oct 7, 2024
de7d83c
Remove deadline
jeschkies Oct 7, 2024
2da4270
No timeout limit
jeschkies Oct 7, 2024
00a7e1f
Test intersectiion
jeschkies Oct 7, 2024
524a10f
Remove test
jeschkies Oct 8, 2024
026b573
add hll cardinality estimate to CMS sketch and proto
cstyan Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (
CountMinSketchVectorType = "CountMinSketchVector"

epsilon = 0.0001
// delta of 0.05 results in a sketch size of 325KB, 0.01 gives a sketch size of 543KB
delta = 0.05
// delta of 0.01 results in a sketch size of 27183 * 7 * 4 bytes = 761,124 bytes, 0.05 would yield 543,660 bytes
delta = 0.01
)

// CountMinSketchVector tracks the count or sum of values of a metric, ie list of label value pairs. It's storage for
Expand Down
22 changes: 18 additions & 4 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ func TestApproxTopkSketches(t *testing.T) {
limit = 100
)

limits := &fakeLimits{
maxSeries: math.MaxInt64,
// timeout: 5 * time.Minute,
timeout: time.Hour,
}

for _, tc := range []struct {
labelShards int
totalStreams int
Expand Down Expand Up @@ -334,8 +340,14 @@ func TestApproxTopkSketches(t *testing.T) {
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0015,
},
{
labelShards: 10, // increasing this will make the test too slow
totalStreams: 1_000_000,
shardedQuery: `approx_topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
regularQuery: `topk(100, sum by (a) (sum_over_time ({a=~".+"} | logfmt | unwrap value [1s])))`,
realtiveError: 0.0015,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is one of the things that is making some these test results seem better than they would be in reality; here you've got labelShards: 10, which means that there will only be 10 unique values for the label a but we're doing approx_topk(100, ...) as the query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. So this test is actually no good. What we need are 1 million unique values...

} {

t.Run(fmt.Sprintf("%s/%d/%d", tc.shardedQuery, tc.labelShards, tc.totalStreams), func(t *testing.T) {
streams := randomStreams(tc.totalStreams, rounds+1, tc.labelShards, []string{"a", "b", "c", "d"}, true)

Expand All @@ -347,8 +359,8 @@ func TestApproxTopkSketches(t *testing.T) {
opts := EngineOpts{
MaxCountMinSketchHeapSize: 10_000,
}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())
regular := NewEngine(opts, q, limits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, limits, log.NewNopLogger())

// for an instant query we set the start and end to the same timestamp
// plus set step and interval to 0
Expand All @@ -365,7 +377,9 @@ func TestApproxTopkSketches(t *testing.T) {
)
require.NoError(t, err)
qry := regular.Query(params.Copy())
ctx := user.InjectOrgID(context.Background(), "fake")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
ctx = user.InjectOrgID(ctx, "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(tc.labelShards))
mapper := NewShardMapper(strategy, nilShardMetrics, []string{ShardQuantileOverTime, SupportApproxTopk})
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) {
}, nil
}

// NewCountMinSketchFromErrorAndProbability creates a new CMS for a given epsilon and delta. The sketch width and depth
// are calculated according to the RedisBloom implementation.
// See https://github.com/RedisBloom/RedisBloom/blob/7bc047d1ea4113419b60eb6446ac3d4e61877a7b/src/cms.c#L38-L39
func NewCountMinSketchFromErrorAndProbability(epsilon float64, delta float64) (*CountMinSketch, error) {
width := math.Ceil(math.E / epsilon)
depth := math.Ceil(math.Log(1.0 / delta))
depth := math.Ceil(math.Log(delta) / math.Log(0.5))
return NewCountMinSketch(uint32(width), uint32(depth))
}

Expand Down
40 changes: 26 additions & 14 deletions pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
"math/rand"
"sort"
"strings"
"sync"
"time"
"unicode"

"github.com/cespare/xxhash/v2"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/prometheus/model/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"

Expand All @@ -21,6 +23,8 @@
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
)

const CON_CURRENCY = 100

Check warning on line 26 in pkg/logql/test_utils.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

var-naming: don't use ALL_CAPS in Go names; use CamelCase (revive)

func NewMockQuerier(shards int, streams []logproto.Stream) MockQuerier {
return MockQuerier{
shards: shards,
Expand All @@ -45,7 +49,6 @@
}

return parsed[0].PowerOfTwo, nil

}

func (q MockQuerier) SelectLogs(_ context.Context, req SelectLogParams) (iter.EntryIterator, error) {
Expand Down Expand Up @@ -113,8 +116,8 @@
resByStream := map[string]*logproto.Stream{}

for _, stream := range in {
sp := pipeline.ForStream(mustParseLabels(stream.Labels))
for _, e := range stream.Entries {
sp := pipeline.ForStream(mustParseLabels(stream.Labels))
if l, out, matches := sp.Process(e.Timestamp.UnixNano(), []byte(e.Line)); matches {
var s *logproto.Stream
var found bool
Expand All @@ -137,12 +140,12 @@
return streams
}

func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Series {
func processSeries(in []logproto.Stream, ex log.SampleExtractor) ([]logproto.Series, error) {
resBySeries := map[string]*logproto.Series{}

for _, stream := range in {
exs := ex.ForStream(mustParseLabels(stream.Labels))
for _, e := range stream.Entries {
exs := ex.ForStream(mustParseLabels(stream.Labels))
if f, lbs, ok := exs.Process(e.Timestamp.UnixNano(), []byte(e.Line)); ok {
var s *logproto.Series
var found bool
Expand All @@ -151,6 +154,7 @@
s = &logproto.Series{Labels: lbs.String(), StreamHash: exs.BaseLabels().Hash()}
resBySeries[lbs.String()] = s
}

s.Samples = append(s.Samples, logproto.Sample{
Timestamp: e.Timestamp.UnixNano(),
Value: f,
Expand All @@ -159,15 +163,16 @@
}
}
}

series := []logproto.Series{}
for _, s := range resBySeries {
sort.Sort(s)
series = append(series, *s)
}
return series
return series, nil
}

func (q MockQuerier) SelectSamples(_ context.Context, req SelectSampleParams) (iter.SampleIterator, error) {
func (q MockQuerier) SelectSamples(ctx context.Context, req SelectSampleParams) (iter.SampleIterator, error) {

Check warning on line 175 in pkg/logql/test_utils.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
selector, err := req.LogSelector()
if err != nil {
return nil, err
Expand Down Expand Up @@ -212,7 +217,10 @@
matched = append(matched, stream)
}

filtered := processSeries(matched, extractor)
filtered, err := processSeries(matched, extractor)
if err != nil {
return nil, err
}

return iter.NewTimeRangedSampleIterator(
iter.NewMultiSeriesIterator(filtered),
Expand All @@ -228,15 +236,19 @@
func (m MockDownstreamer) Downstreamer(_ context.Context) Downstreamer { return m }

func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQuery, acc Accumulator) ([]logqlmodel.Result, error) {
for i, query := range queries {
res, err := m.Query(query.Params).Exec(ctx)
mu := sync.Mutex{}
err := concurrency.ForEachJob(ctx, len(queries), CON_CURRENCY, func(ctx context.Context, idx int) error {
res, err := m.Query(queries[idx].Params).Exec(ctx)
if err != nil {
return nil, err
}
err = acc.Accumulate(ctx, res, i)
if err != nil {
return nil, err
return err
}
mu.Lock()
defer mu.Unlock()
err = acc.Accumulate(ctx, res, idx)
return err
})
if err != nil {
return nil, err
}

return acc.Result(), nil
Expand Down
Loading