Skip to content

Commit

Permalink
Speedup DistinctString and ScopedDistinctString collectors (grafana#4109
Browse files Browse the repository at this point in the history
)

* add local pprof files in .gitignore

* fast distinct_string_collector

* faster scoped_distinct_string collector

* Update the usage of the combiners

* add a benchmark for duplicates

* Add benchmarks

* Update CHANGELOG.md

* return error on calls to Diff with diff disabled

* return exceeded bool on Collect in ScopedDistinctString

* test collect return and Exceeded

* Add a note about DistinctString collector's Collect return bool

* only new in withDiff versions

* remove extra note

* move the lock down in the Diff method

* fix flaky test
  • Loading branch information
electron0zero authored Oct 11, 2024
1 parent 3df4a7a commit eca7f9c
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 97 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.idea
.vscode
*.test
*.pprof
/bin
/cmd/tempo-cli/tempo-cli
/cmd/tempo-query/tempo-query
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137]
* [ENHANCEMENT] Speedup tempo-query trace search by allowing parallel queries [#4159](https://github.com/grafana/tempo/pull/4159) (@pavolloffay)
* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions.
Expand Down
13 changes: 10 additions & 3 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ var (

func NewSearchTagValues(limitBytes int) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctString(limitBytes)
d := collector.NewDistinctStringWithDiff(limitBytes)

c := &genericCombiner[*tempopb.SearchTagValuesResponse]{
httpStatusCode: 200,
Expand All @@ -33,7 +33,11 @@ func NewSearchTagValues(limitBytes int) Combiner {
return d.Exceeded()
},
diff: func(response *tempopb.SearchTagValuesResponse) (*tempopb.SearchTagValuesResponse, error) {
response.TagValues = d.Diff()
resp, err := d.Diff()
if err != nil {
return nil, err
}
response.TagValues = resp
return response, nil
},
}
Expand Down Expand Up @@ -72,7 +76,10 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
return d.Exceeded()
},
diff: func(response *tempopb.SearchTagValuesV2Response) (*tempopb.SearchTagValuesV2Response, error) {
diff := d.Diff()
diff, err := d.Diff()
if err != nil {
return nil, err
}
response.TagValues = make([]*tempopb.TagValue, 0, len(diff))
for _, v := range diff {
v2 := v
Expand Down
16 changes: 12 additions & 4 deletions modules/frontend/combiner/search_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var (
)

func NewSearchTags(limitBytes int) Combiner {
d := collector.NewDistinctString(limitBytes)
d := collector.NewDistinctStringWithDiff(limitBytes)

c := &genericCombiner[*tempopb.SearchTagsResponse]{
httpStatusCode: 200,
Expand All @@ -32,7 +32,12 @@ func NewSearchTags(limitBytes int) Combiner {
return d.Exceeded()
},
diff: func(response *tempopb.SearchTagsResponse) (*tempopb.SearchTagsResponse, error) {
response.TagNames = d.Diff()
resp, err := d.Diff()
if err != nil {
return nil, err
}

response.TagNames = resp
return response, nil
},
}
Expand All @@ -46,7 +51,7 @@ func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse

func NewSearchTagsV2(limitBytes int) Combiner {
// Distinct collector map to collect scopes and scope values
distinctValues := collector.NewScopedDistinctString(limitBytes)
distinctValues := collector.NewScopedDistinctStringWithDiff(limitBytes)

c := &genericCombiner[*tempopb.SearchTagsV2Response]{
httpStatusCode: 200,
Expand Down Expand Up @@ -76,7 +81,10 @@ func NewSearchTagsV2(limitBytes int) Combiner {
return distinctValues.Exceeded()
},
diff: func(response *tempopb.SearchTagsV2Response) (*tempopb.SearchTagsV2Response, error) {
collected := distinctValues.Diff()
collected, err := distinctValues.Diff()
if err != nil {
return nil, err
}
response.Scopes = make([]*tempopb.SearchTagsV2Scope, 0, len(collected))

for scope, vals := range collected {
Expand Down
5 changes: 2 additions & 3 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ
})

return engine.ExecuteTagNames(ctx, attributeScope, query, func(tag string, scope traceql.AttributeScope) bool {
distinctValues.Collect(scope.String(), tag)
return distinctValues.Exceeded()
return distinctValues.Collect(scope.String(), tag)
}, fetcher)
}

Expand Down Expand Up @@ -376,7 +375,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
}

return &tempopb.SearchTagValuesResponse{
Expand Down
10 changes: 4 additions & 6 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "size", distinctValues.Size())
}

resp := &tempopb.SearchTagsResponse{
Expand All @@ -591,8 +591,7 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque
}
for _, res := range resp.Scopes {
for _, tag := range res.Tags {
distinctValues.Collect(res.Name, tag)
if distinctValues.Exceeded() {
if distinctValues.Collect(res.Name, tag) {
return nil
}
}
Expand Down Expand Up @@ -659,7 +658,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
}

if distinctValues.Exceeded() {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize())
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", distinctValues.Size())
}

resp := &tempopb.SearchTagValuesResponse{
Expand Down Expand Up @@ -970,8 +969,7 @@ func (q *Querier) internalTagsSearchBlockV2(ctx context.Context, req *tempopb.Se

valueCollector := collector.NewScopedDistinctString(q.limits.MaxBytesPerTagValuesQuery(tenantID))
err = q.engine.ExecuteTagNames(ctx, scope, query, func(tag string, scope traceql.AttributeScope) bool {
valueCollector.Collect(scope.String(), tag)
return valueCollector.Exceeded()
return valueCollector.Collect(scope.String(), tag)
}, fetcher)
if err != nil {
return nil, err
Expand Down
72 changes: 47 additions & 25 deletions pkg/collector/distinct_string_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,51 +7,68 @@ import (
)

type DistinctString struct {
values map[string]struct{}
new map[string]struct{}
maxLen int
currLen int
totalLen int
mtx sync.Mutex
values map[string]struct{}
new map[string]struct{}
maxLen int
currLen int
diffEnabled bool
limExceeded bool
mtx sync.Mutex
}

// NewDistinctString with the given maximum data size. This is calculated
// as the total length of the recorded strings. For ease of use, maximum=0
// is interpreted as unlimited.
func NewDistinctString(maxDataSize int) *DistinctString {
return &DistinctString{
values: make(map[string]struct{}),
new: make(map[string]struct{}),
maxLen: maxDataSize,
values: make(map[string]struct{}),
maxLen: maxDataSize,
diffEnabled: false, // disable diff to make it faster
}
}

// Collect adds a new value to the distinct string collector.
// return indicates if the value was added or not.
func (d *DistinctString) Collect(s string) bool {
// NewDistinctStringWithDiff is like NewDistinctString but with diff support enabled.
func NewDistinctStringWithDiff(maxDataSize int) *DistinctString {
return &DistinctString{
values: make(map[string]struct{}),
new: make(map[string]struct{}),
maxLen: maxDataSize,
diffEnabled: true,
}
}

// Collect adds a new value to the distinct string collector
// and returns a boolean indicating whether the value was successfully added or not.
// To check if the limit has been reached, you must call the Exceeded method separately.
func (d *DistinctString) Collect(s string) (added bool) {
d.mtx.Lock()
defer d.mtx.Unlock()

if d.limExceeded {
return false
}

if _, ok := d.values[s]; ok {
// Already present
return false
}

// New entry
d.totalLen += len(s)

valueLen := len(s)
// Can it fit?
if d.maxLen > 0 && d.currLen+len(s) > d.maxLen {
// No
if d.maxLen > 0 && d.currLen+valueLen > d.maxLen {
// No, it can't fit
d.limExceeded = true
return false
}

// Clone instead of referencing original
s = strings.Clone(s)

d.new[s] = struct{}{}
if d.diffEnabled {
d.new[s] = struct{}{}
}
d.values[s] = struct{}{}
d.currLen += len(s)
d.currLen += valueLen

return true
}
Expand All @@ -76,19 +93,24 @@ func (d *DistinctString) Exceeded() bool {
d.mtx.Lock()
defer d.mtx.Unlock()

return d.totalLen > d.currLen
return d.limExceeded
}

// TotalDataSize is the total size of all distinct strings encountered.
func (d *DistinctString) TotalDataSize() int {
// Size is the total size of all distinct strings encountered.
func (d *DistinctString) Size() int {
d.mtx.Lock()
defer d.mtx.Unlock()

return d.totalLen
return d.currLen
}

// Diff returns all new strings collected since the last time diff was called
func (d *DistinctString) Diff() []string {
func (d *DistinctString) Diff() ([]string, error) {
// can check diffEnabled without lock because it is not modified after creation
if !d.diffEnabled {
return nil, errDiffNotEnabled
}

d.mtx.Lock()
defer d.mtx.Unlock()

Expand All @@ -100,5 +122,5 @@ func (d *DistinctString) Diff() []string {

clear(d.new)
sort.Strings(ss)
return ss
return ss, nil
}
75 changes: 69 additions & 6 deletions pkg/collector/distinct_string_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"fmt"
"strconv"
"sync"
"testing"

Expand All @@ -17,23 +18,34 @@ func TestDistinctStringCollector(t *testing.T) {
d.Collect("11")

require.True(t, d.Exceeded())
require.Equal(t, []string{"123", "4567", "890"}, d.Strings())
stringsSlicesEqual(t, []string{"123", "4567", "890"}, d.Strings())

// diff fails when diff is not enabled
res, err := d.Diff()
require.Nil(t, res)
require.Error(t, err, errDiffNotEnabled)
}

func TestDistinctStringCollectorDiff(t *testing.T) {
d := NewDistinctString(0)
d := NewDistinctStringWithDiff(0)

d.Collect("123")
d.Collect("4567")

require.Equal(t, []string{"123", "4567"}, d.Diff())
require.Equal(t, []string{}, d.Diff())
stringsSlicesEqual(t, []string{"123", "4567"}, readDistinctStringDiff(t, d))
stringsSlicesEqual(t, []string{}, readDistinctStringDiff(t, d))

d.Collect("123")
d.Collect("890")

require.Equal(t, []string{"890"}, d.Diff())
require.Equal(t, []string{}, d.Diff())
stringsSlicesEqual(t, []string{"890"}, readDistinctStringDiff(t, d))
stringsSlicesEqual(t, []string{}, readDistinctStringDiff(t, d))
}

func readDistinctStringDiff(t *testing.T, d *DistinctString) []string {
res, err := d.Diff()
require.NoError(t, err)
return res
}

func TestDistinctStringCollectorIsSafe(t *testing.T) {
Expand All @@ -53,3 +65,54 @@ func TestDistinctStringCollectorIsSafe(t *testing.T) {
require.Equal(t, len(d.Strings()), 10*100)
require.False(t, d.Exceeded())
}

func BenchmarkDistinctStringCollect(b *testing.B) {
// simulate 100 ingesters, each returning 10_000 tag values
numIngesters := 100
numTagValuesPerIngester := 10_000
ingesterStrings := make([][]string, numIngesters)
for i := 0; i < numIngesters; i++ {
strings := make([]string, numTagValuesPerIngester)
for j := 0; j < numTagValuesPerIngester; j++ {
strings[j] = fmt.Sprintf("string_%d_%d", i, j)
}
ingesterStrings[i] = strings
}

limits := []int{
0, // no limit
100_000, // 100KB
1_000_000, // 1MB
10_000_000, // 10MB
}

b.ResetTimer() // to exclude the setup time for generating tag values
for _, lim := range limits {
b.Run("uniques_limit:"+strconv.Itoa(lim), func(b *testing.B) {
for n := 0; n < b.N; n++ {
distinctStrings := NewDistinctString(lim)
for _, values := range ingesterStrings {
for _, v := range values {
if distinctStrings.Collect(v) {
break // stop early if limit is reached
}
}
}
}
})

b.Run("duplicates_limit:"+strconv.Itoa(lim), func(b *testing.B) {
for n := 0; n < b.N; n++ {
distinctStrings := NewDistinctString(lim)
for i := 0; i < numIngesters; i++ {
for j := 0; j < numTagValuesPerIngester; j++ {
// collect first item to simulate duplicates
if distinctStrings.Collect(ingesterStrings[i][0]) {
break // stop early if limit is reached
}
}
}
}
})
}
}
Loading

0 comments on commit eca7f9c

Please sign in to comment.