diff --git a/.gitignore b/.gitignore index 2d6dc7eff7b..101858b1890 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ .idea .vscode *.test +*.pprof /bin /cmd/tempo-cli/tempo-cli /cmd/tempo-query/tempo-query diff --git a/CHANGELOG.md b/CHANGELOG.md index eaa5255fba9..91ab807cc06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [ENHANCEMENT] Send semver version in api/stattus/buildinfo for cloud deployments [#4110](https://github.com/grafana/tempo/pull/4110) [@Aki0x137] * [ENHANCEMENT] Speedup tempo-query trace search by allowing parallel queries [#4159](https://github.com/grafana/tempo/pull/4159) (@pavolloffay) +* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero) * [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero) **BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default, please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions. diff --git a/modules/frontend/combiner/search_tag_values.go b/modules/frontend/combiner/search_tag_values.go index 29d8ec4163f..1b1e9ba3dac 100644 --- a/modules/frontend/combiner/search_tag_values.go +++ b/modules/frontend/combiner/search_tag_values.go @@ -13,7 +13,7 @@ var ( func NewSearchTagValues(limitBytes int) Combiner { // Distinct collector with no limit - d := collector.NewDistinctString(limitBytes) + d := collector.NewDistinctStringWithDiff(limitBytes) c := &genericCombiner[*tempopb.SearchTagValuesResponse]{ httpStatusCode: 200, @@ -33,7 +33,11 @@ func NewSearchTagValues(limitBytes int) Combiner { return d.Exceeded() }, diff: func(response *tempopb.SearchTagValuesResponse) (*tempopb.SearchTagValuesResponse, error) { - response.TagValues = d.Diff() + resp, err := d.Diff() + if err != nil { + return nil, err + } + response.TagValues = resp return response, nil }, } @@ -72,7 +76,10 @@ func NewSearchTagValuesV2(limitBytes int) Combiner { return d.Exceeded() }, diff: func(response *tempopb.SearchTagValuesV2Response) (*tempopb.SearchTagValuesV2Response, error) { - diff := d.Diff() + diff, err := d.Diff() + if err != nil { + return nil, err + } response.TagValues = make([]*tempopb.TagValue, 0, len(diff)) for _, v := range diff { v2 := v diff --git a/modules/frontend/combiner/search_tags.go b/modules/frontend/combiner/search_tags.go index 47af9b5a1e6..a8546da7213 100644 --- a/modules/frontend/combiner/search_tags.go +++ b/modules/frontend/combiner/search_tags.go @@ -12,7 +12,7 @@ var ( ) func NewSearchTags(limitBytes int) Combiner { - d := collector.NewDistinctString(limitBytes) + d := collector.NewDistinctStringWithDiff(limitBytes) c := &genericCombiner[*tempopb.SearchTagsResponse]{ httpStatusCode: 200, @@ -32,7 +32,12 @@ func NewSearchTags(limitBytes int) Combiner { return d.Exceeded() }, diff: func(response *tempopb.SearchTagsResponse) (*tempopb.SearchTagsResponse, error) { - response.TagNames = d.Diff() + resp, err := d.Diff() + if err != nil { + return nil, err + } + + response.TagNames = resp return response, nil }, } @@ -46,7 +51,7 @@ func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse func NewSearchTagsV2(limitBytes int) Combiner { // Distinct collector map to collect scopes and scope values - distinctValues := collector.NewScopedDistinctString(limitBytes) + distinctValues := collector.NewScopedDistinctStringWithDiff(limitBytes) c := &genericCombiner[*tempopb.SearchTagsV2Response]{ httpStatusCode: 200, @@ -76,7 +81,10 @@ func NewSearchTagsV2(limitBytes int) Combiner { return distinctValues.Exceeded() }, diff: func(response *tempopb.SearchTagsV2Response) (*tempopb.SearchTagsV2Response, error) { - collected := distinctValues.Diff() + collected, err := distinctValues.Diff() + if err != nil { + return nil, err + } response.Scopes = make([]*tempopb.SearchTagsV2Scope, 0, len(collected)) for scope, vals := range collected { diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 107d9cabaa9..4a84a333ae6 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -265,8 +265,7 @@ func (i *instance) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsRequ }) return engine.ExecuteTagNames(ctx, attributeScope, query, func(tag string, scope traceql.AttributeScope) bool { - distinctValues.Collect(scope.String(), tag) - return distinctValues.Exceeded() + return distinctValues.Collect(scope.String(), tag) }, fetcher) } @@ -376,7 +375,7 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize()) + level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID, "limit", limit, "size", distinctValues.Size()) } return &tempopb.SearchTagValuesResponse{ diff --git a/modules/querier/querier.go b/modules/querier/querier.go index b5602462164..1a3accb897c 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -564,7 +564,7 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize()) + level.Warn(log.Logger).Log("msg", "size of tags in instance exceeded limit, reduce cardinality or size of tags", "userID", userID, "limit", limit, "size", distinctValues.Size()) } resp := &tempopb.SearchTagsResponse{ @@ -591,8 +591,7 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque } for _, res := range resp.Scopes { for _, tag := range res.Tags { - distinctValues.Collect(res.Name, tag) - if distinctValues.Exceeded() { + if distinctValues.Collect(res.Name, tag) { return nil } } @@ -659,7 +658,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal } if distinctValues.Exceeded() { - level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "total", distinctValues.TotalDataSize()) + level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID, "limit", limit, "size", distinctValues.Size()) } resp := &tempopb.SearchTagValuesResponse{ @@ -970,8 +969,7 @@ func (q *Querier) internalTagsSearchBlockV2(ctx context.Context, req *tempopb.Se valueCollector := collector.NewScopedDistinctString(q.limits.MaxBytesPerTagValuesQuery(tenantID)) err = q.engine.ExecuteTagNames(ctx, scope, query, func(tag string, scope traceql.AttributeScope) bool { - valueCollector.Collect(scope.String(), tag) - return valueCollector.Exceeded() + return valueCollector.Collect(scope.String(), tag) }, fetcher) if err != nil { return nil, err diff --git a/pkg/collector/distinct_string_collector.go b/pkg/collector/distinct_string_collector.go index a1b4a6c95c0..ca881703e26 100644 --- a/pkg/collector/distinct_string_collector.go +++ b/pkg/collector/distinct_string_collector.go @@ -7,12 +7,13 @@ import ( ) type DistinctString struct { - values map[string]struct{} - new map[string]struct{} - maxLen int - currLen int - totalLen int - mtx sync.Mutex + values map[string]struct{} + new map[string]struct{} + maxLen int + currLen int + diffEnabled bool + limExceeded bool + mtx sync.Mutex } // NewDistinctString with the given maximum data size. This is calculated @@ -20,38 +21,54 @@ type DistinctString struct { // is interpreted as unlimited. func NewDistinctString(maxDataSize int) *DistinctString { return &DistinctString{ - values: make(map[string]struct{}), - new: make(map[string]struct{}), - maxLen: maxDataSize, + values: make(map[string]struct{}), + maxLen: maxDataSize, + diffEnabled: false, // disable diff to make it faster } } -// Collect adds a new value to the distinct string collector. -// return indicates if the value was added or not. -func (d *DistinctString) Collect(s string) bool { +// NewDistinctStringWithDiff is like NewDistinctString but with diff support enabled. +func NewDistinctStringWithDiff(maxDataSize int) *DistinctString { + return &DistinctString{ + values: make(map[string]struct{}), + new: make(map[string]struct{}), + maxLen: maxDataSize, + diffEnabled: true, + } +} + +// Collect adds a new value to the distinct string collector +// and returns a boolean indicating whether the value was successfully added or not. +// To check if the limit has been reached, you must call the Exceeded method separately. +func (d *DistinctString) Collect(s string) (added bool) { d.mtx.Lock() defer d.mtx.Unlock() + if d.limExceeded { + return false + } + if _, ok := d.values[s]; ok { // Already present return false } - // New entry - d.totalLen += len(s) - + valueLen := len(s) // Can it fit? - if d.maxLen > 0 && d.currLen+len(s) > d.maxLen { - // No + if d.maxLen > 0 && d.currLen+valueLen > d.maxLen { + // No, it can't fit + d.limExceeded = true return false } // Clone instead of referencing original s = strings.Clone(s) - d.new[s] = struct{}{} + if d.diffEnabled { + d.new[s] = struct{}{} + } d.values[s] = struct{}{} - d.currLen += len(s) + d.currLen += valueLen return true } @@ -76,19 +93,24 @@ func (d *DistinctString) Exceeded() bool { d.mtx.Lock() defer d.mtx.Unlock() - return d.totalLen > d.currLen + return d.limExceeded } -// TotalDataSize is the total size of all distinct strings encountered. -func (d *DistinctString) TotalDataSize() int { +// Size is the total size of all distinct strings encountered. +func (d *DistinctString) Size() int { d.mtx.Lock() defer d.mtx.Unlock() - return d.totalLen + return d.currLen } // Diff returns all new strings collected since the last time diff was called -func (d *DistinctString) Diff() []string { +func (d *DistinctString) Diff() ([]string, error) { + // can check diffEnabled without lock because it is not modified after creation + if !d.diffEnabled { + return nil, errDiffNotEnabled + } + d.mtx.Lock() defer d.mtx.Unlock() @@ -100,5 +122,5 @@ func (d *DistinctString) Diff() []string { clear(d.new) sort.Strings(ss) - return ss + return ss, nil } diff --git a/pkg/collector/distinct_string_collector_test.go b/pkg/collector/distinct_string_collector_test.go index 06d0b1277da..ec778e3012c 100644 --- a/pkg/collector/distinct_string_collector_test.go +++ b/pkg/collector/distinct_string_collector_test.go @@ -2,6 +2,7 @@ package collector import ( "fmt" + "strconv" "sync" "testing" @@ -17,23 +18,34 @@ func TestDistinctStringCollector(t *testing.T) { d.Collect("11") require.True(t, d.Exceeded()) - require.Equal(t, []string{"123", "4567", "890"}, d.Strings()) + stringsSlicesEqual(t, []string{"123", "4567", "890"}, d.Strings()) + + // diff fails when diff is not enabled + res, err := d.Diff() + require.Nil(t, res) + require.Error(t, err, errDiffNotEnabled) } func TestDistinctStringCollectorDiff(t *testing.T) { - d := NewDistinctString(0) + d := NewDistinctStringWithDiff(0) d.Collect("123") d.Collect("4567") - require.Equal(t, []string{"123", "4567"}, d.Diff()) - require.Equal(t, []string{}, d.Diff()) + stringsSlicesEqual(t, []string{"123", "4567"}, readDistinctStringDiff(t, d)) + stringsSlicesEqual(t, []string{}, readDistinctStringDiff(t, d)) d.Collect("123") d.Collect("890") - require.Equal(t, []string{"890"}, d.Diff()) - require.Equal(t, []string{}, d.Diff()) + stringsSlicesEqual(t, []string{"890"}, readDistinctStringDiff(t, d)) + stringsSlicesEqual(t, []string{}, readDistinctStringDiff(t, d)) +} + +func readDistinctStringDiff(t *testing.T, d *DistinctString) []string { + res, err := d.Diff() + require.NoError(t, err) + return res } func TestDistinctStringCollectorIsSafe(t *testing.T) { @@ -53,3 +65,54 @@ func TestDistinctStringCollectorIsSafe(t *testing.T) { require.Equal(t, len(d.Strings()), 10*100) require.False(t, d.Exceeded()) } + +func BenchmarkDistinctStringCollect(b *testing.B) { + // simulate 100 ingesters, each returning 10_000 tag values + numIngesters := 100 + numTagValuesPerIngester := 10_000 + ingesterStrings := make([][]string, numIngesters) + for i := 0; i < numIngesters; i++ { + strings := make([]string, numTagValuesPerIngester) + for j := 0; j < numTagValuesPerIngester; j++ { + strings[j] = fmt.Sprintf("string_%d_%d", i, j) + } + ingesterStrings[i] = strings + } + + limits := []int{ + 0, // no limit + 100_000, // 100KB + 1_000_000, // 1MB + 10_000_000, // 10MB + } + + b.ResetTimer() // to exclude the setup time for generating tag values + for _, lim := range limits { + b.Run("uniques_limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + distinctStrings := NewDistinctString(lim) + for _, values := range ingesterStrings { + for _, v := range values { + if distinctStrings.Collect(v) { + break // stop early if limit is reached + } + } + } + } + }) + + b.Run("duplicates_limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + distinctStrings := NewDistinctString(lim) + for i := 0; i < numIngesters; i++ { + for j := 0; j < numTagValuesPerIngester; j++ { + // collect first item to simulate duplicates + if distinctStrings.Collect(ingesterStrings[i][0]) { + break // stop early if limit is reached + } + } + } + } + }) + } +} diff --git a/pkg/collector/distinct_value_collector.go b/pkg/collector/distinct_value_collector.go index 930241ef796..a8359667639 100644 --- a/pkg/collector/distinct_value_collector.go +++ b/pkg/collector/distinct_value_collector.go @@ -1,9 +1,12 @@ package collector import ( + "errors" "sync" ) +var errDiffNotEnabled = errors.New("diff not enabled") + type DistinctValue[T comparable] struct { values map[T]struct{} new map[T]struct{} @@ -22,7 +25,6 @@ type DistinctValue[T comparable] struct { func NewDistinctValue[T comparable](maxDataSize int, len func(T) int) *DistinctValue[T] { return &DistinctValue[T]{ values: make(map[T]struct{}), - new: make(map[T]struct{}), maxLen: maxDataSize, diffEnabled: false, // disable diff to make it faster len: len, @@ -108,19 +110,20 @@ func (d *DistinctValue[T]) Size() int { // Diff returns all new strings collected since the last time diff was called // returns nil if diff is not enabled -func (d *DistinctValue[T]) Diff() []T { - d.mtx.Lock() - defer d.mtx.Unlock() - +func (d *DistinctValue[T]) Diff() ([]T, error) { + // can check diffEnabled without lock because it is not modified after creation if !d.diffEnabled { - return nil + return nil, errDiffNotEnabled } + d.mtx.Lock() + defer d.mtx.Unlock() + ss := make([]T, 0, len(d.new)) for k := range d.new { ss = append(ss, k) } clear(d.new) - return ss + return ss, nil } diff --git a/pkg/collector/distinct_value_collector_test.go b/pkg/collector/distinct_value_collector_test.go index b17b1c3e680..d0bd470db97 100644 --- a/pkg/collector/distinct_value_collector_test.go +++ b/pkg/collector/distinct_value_collector_test.go @@ -10,20 +10,47 @@ import ( "github.com/stretchr/testify/require" ) +func TestDistinctValueCollector(t *testing.T) { + d := NewDistinctValue[string](10, func(s string) int { return len(s) }) + + var stop bool + stop = d.Collect("123") + require.False(t, stop) + stop = d.Collect("4567") + require.False(t, stop) + stop = d.Collect("890") + require.True(t, stop) + + require.True(t, d.Exceeded()) + require.Equal(t, stop, d.Exceeded()) // final stop should be same as Exceeded + stringsSlicesEqual(t, []string{"123", "4567"}, d.Values()) + + // diff fails when diff is not enabled + res, err := d.Diff() + require.Nil(t, res) + require.Error(t, err, errDiffNotEnabled) +} + func TestDistinctValueCollectorDiff(t *testing.T) { d := NewDistinctValueWithDiff[string](0, func(s string) int { return len(s) }) d.Collect("123") d.Collect("4567") - stringsSlicesEqual(t, []string{"123", "4567"}, d.Diff()) - stringsSlicesEqual(t, []string{}, d.Diff()) + stringsSlicesEqual(t, []string{"123", "4567"}, readDistinctValueDiff(t, d)) + stringsSlicesEqual(t, []string{}, readDistinctValueDiff(t, d)) d.Collect("123") d.Collect("890") - stringsSlicesEqual(t, []string{"890"}, d.Diff()) - stringsSlicesEqual(t, []string{}, d.Diff()) + stringsSlicesEqual(t, []string{"890"}, readDistinctValueDiff(t, d)) + stringsSlicesEqual(t, []string{}, readDistinctValueDiff(t, d)) +} + +func readDistinctValueDiff(t *testing.T, d *DistinctValue[string]) []string { + res, err := d.Diff() + require.NoError(t, err) + return res } func stringsSlicesEqual(t *testing.T, a, b []string) { @@ -32,7 +59,7 @@ func stringsSlicesEqual(t *testing.T, a, b []string) { require.Equal(t, a, b) } -func BenchmarkCollect(b *testing.B) { +func BenchmarkDistinctValueCollect(b *testing.B) { // simulate 100 ingesters, each returning 10_000 tag values numIngesters := 100 numTagValuesPerIngester := 10_000 @@ -45,7 +72,6 @@ func BenchmarkCollect(b *testing.B) { Value: fmt.Sprintf("value_%d_%d", i, j), } } - ingesterTagValues[i] = tagValues } limits := []int{ @@ -57,9 +83,8 @@ func BenchmarkCollect(b *testing.B) { b.ResetTimer() // to exclude the setup time for generating tag values for _, lim := range limits { - b.Run("limit:"+strconv.Itoa(lim), func(b *testing.B) { + b.Run("uniques_limit:"+strconv.Itoa(lim), func(b *testing.B) { for n := 0; n < b.N; n++ { - // NewDistinctValue is collecting tag values without diff support distinctValues := NewDistinctValue(lim, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) }) for _, tagValues := range ingesterTagValues { for _, v := range tagValues { @@ -70,5 +95,19 @@ func BenchmarkCollect(b *testing.B) { } } }) + + b.Run("duplicates_limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + distinctValues := NewDistinctValue(lim, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) }) + for i := 0; i < numIngesters; i++ { + for j := 0; j < numTagValuesPerIngester; j++ { + // collect first item to simulate duplicates + if distinctValues.Collect(ingesterTagValues[i][0]) { + break // stop early if limit is reached + } + } + } + } + }) } } diff --git a/pkg/collector/scoped_distinct_string.go b/pkg/collector/scoped_distinct_string.go index aa948607c3a..abe902122c7 100644 --- a/pkg/collector/scoped_distinct_string.go +++ b/pkg/collector/scoped_distinct_string.go @@ -1,44 +1,68 @@ package collector -import "sync" +import ( + "sync" +) type ScopedDistinctString struct { - cols map[string]*DistinctString - maxLen int - curLen int - exceeded bool - mtx sync.Mutex + cols map[string]*DistinctString + newCol func(int) *DistinctString + maxLen int + curLen int + limExceeded bool + diffEnabled bool + mtx sync.Mutex } -func NewScopedDistinctString(sz int) *ScopedDistinctString { +func NewScopedDistinctString(maxDataSize int) *ScopedDistinctString { return &ScopedDistinctString{ - cols: map[string]*DistinctString{}, - maxLen: sz, + cols: map[string]*DistinctString{}, + newCol: NewDistinctString, + maxLen: maxDataSize, + diffEnabled: false, } } -func (d *ScopedDistinctString) Collect(scope string, val string) { +func NewScopedDistinctStringWithDiff(maxDataSize int) *ScopedDistinctString { + return &ScopedDistinctString{ + cols: map[string]*DistinctString{}, + newCol: NewDistinctStringWithDiff, + maxLen: maxDataSize, + diffEnabled: true, + } +} + +// Collect adds a new value to the distinct string collector. +// returns true when it reaches the limits and can't fit more values. +// can be used to stop early during Collect without calling Exceeded. +func (d *ScopedDistinctString) Collect(scope string, val string) (exceeded bool) { d.mtx.Lock() defer d.mtx.Unlock() + if d.limExceeded { + return true + } + + valueLen := len(val) // can it fit? - if d.maxLen > 0 && d.curLen+len(val) > d.maxLen { - d.exceeded = true + if d.maxLen > 0 && d.curLen+valueLen > d.maxLen { // No - return + d.limExceeded = true + return true } // get or create collector col, ok := d.cols[scope] if !ok { - col = NewDistinctString(0) + col = d.newCol(0) d.cols[scope] = col } - added := col.Collect(val) - if added { - d.curLen += len(val) + // add valueLen if we successfully added the value + if col.Collect(val) { + d.curLen += valueLen } + return false } // Strings returns the final list of distinct values collected and sorted. @@ -60,22 +84,30 @@ func (d *ScopedDistinctString) Exceeded() bool { d.mtx.Lock() defer d.mtx.Unlock() - return d.exceeded + return d.limExceeded } -// Diff returns all new strings collected since the last time diff was called -func (d *ScopedDistinctString) Diff() map[string][]string { +// Diff returns all new strings collected since the last time Diff was called +func (d *ScopedDistinctString) Diff() (map[string][]string, error) { + if !d.diffEnabled { + return nil, errDiffNotEnabled + } + d.mtx.Lock() defer d.mtx.Unlock() ss := map[string][]string{} for k, v := range d.cols { - diff := v.Diff() + diff, err := v.Diff() + if err != nil { + return nil, err + } + if len(diff) > 0 { ss[k] = diff } } - return ss + return ss, nil } diff --git a/pkg/collector/scoped_distinct_string_test.go b/pkg/collector/scoped_distinct_string_test.go index 2e28191db6e..9b090cef442 100644 --- a/pkg/collector/scoped_distinct_string_test.go +++ b/pkg/collector/scoped_distinct_string_test.go @@ -3,6 +3,7 @@ package collector import ( "fmt" "slices" + "strconv" "sync" "testing" @@ -60,14 +61,18 @@ func TestScopedDistinct(t *testing.T) { } slices.Sort(keys) + var stop bool for _, k := range keys { v := tc.in[k] for _, val := range v { - c.Collect(k, val) + stop = c.Collect(k, val) } } + // check if we exceeded the limit, and Collect and Exceeded return the same value require.Equal(t, tc.exceeded, c.Exceeded()) + require.Equal(t, tc.exceeded, stop) + require.Equal(t, stop, c.Exceeded()) actual := c.Strings() assertMaps(t, tc.expected, actual) @@ -75,35 +80,35 @@ func TestScopedDistinct(t *testing.T) { } func TestScopedDistinctDiff(t *testing.T) { - c := NewScopedDistinctString(0) + c := NewScopedDistinctStringWithDiff(0) c.Collect("scope1", "val1") expected := map[string][]string{ "scope1": {"val1"}, } - assertMaps(t, expected, c.Diff()) + assertMaps(t, expected, readScopedDistinctStringDiff(t, c)) // no diff c.Collect("scope1", "val1") expected = map[string][]string{} - assertMaps(t, expected, c.Diff()) - assertMaps(t, map[string][]string{}, c.Diff()) + assertMaps(t, expected, readScopedDistinctStringDiff(t, c)) + assertMaps(t, map[string][]string{}, readScopedDistinctStringDiff(t, c)) // new value c.Collect("scope1", "val2") expected = map[string][]string{ "scope1": {"val2"}, } - assertMaps(t, expected, c.Diff()) - assertMaps(t, map[string][]string{}, c.Diff()) + assertMaps(t, expected, readScopedDistinctStringDiff(t, c)) + assertMaps(t, map[string][]string{}, readScopedDistinctStringDiff(t, c)) // new scope c.Collect("scope2", "val1") expected = map[string][]string{ "scope2": {"val1"}, } - assertMaps(t, expected, c.Diff()) - assertMaps(t, map[string][]string{}, c.Diff()) + assertMaps(t, expected, readScopedDistinctStringDiff(t, c)) + assertMaps(t, map[string][]string{}, readScopedDistinctStringDiff(t, c)) // all c.Collect("scope2", "val1") @@ -113,8 +118,21 @@ func TestScopedDistinctDiff(t *testing.T) { "scope1": {"val3"}, "scope2": {"val2"}, } - assertMaps(t, expected, c.Diff()) - assertMaps(t, map[string][]string{}, c.Diff()) + assertMaps(t, expected, readScopedDistinctStringDiff(t, c)) + assertMaps(t, map[string][]string{}, readScopedDistinctStringDiff(t, c)) + + // diff should error when diff is not enabled + col := NewScopedDistinctString(0) + col.Collect("scope1", "val1") + res, err := col.Diff() + require.Nil(t, res) + require.Error(t, err, errDiffNotEnabled) +} + +func readScopedDistinctStringDiff(t *testing.T, d *ScopedDistinctString) map[string][]string { + res, err := d.Diff() + require.NoError(t, err) + return res } func assertMaps(t *testing.T, expected, actual map[string][]string) { @@ -148,3 +166,60 @@ func TestScopedDistinctStringCollectorIsSafe(t *testing.T) { require.Equal(t, totalStrings, 10*100) require.False(t, d.Exceeded()) } + +func BenchmarkScopedDistinctStringCollect(b *testing.B) { + // simulate 100 ingesters, each returning 10_000 tags with various scopes + numIngesters := 100 + numTagsPerIngester := 10_000 + ingesterTags := make([]map[string][]string, numIngesters) + scopeTypes := []string{"resource", "span", "event", "instrumentation"} + + for i := 0; i < numIngesters; i++ { + tags := make(map[string][]string) + for j := 0; j < numTagsPerIngester; j++ { + scope := scopeTypes[j%len(scopeTypes)] + value := fmt.Sprintf("tag_%d_%d", i, j) + tags[scope] = append(tags[scope], value) + } + ingesterTags[i] = tags + } + + limits := []int{ + 0, // no limit + 100_000, // 100KB + 1_000_000, // 1MB + 10_000_000, // 10MB + } + + b.ResetTimer() // to exclude the setup time for generating tags + for _, lim := range limits { + b.Run("uniques_limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + scopedDistinctStrings := NewScopedDistinctString(lim) + for _, tags := range ingesterTags { + for scope, values := range tags { + for _, v := range values { + if scopedDistinctStrings.Collect(scope, v) { + break // stop early if limit is reached + } + } + } + } + } + }) + + b.Run("duplicates_limit:"+strconv.Itoa(lim), func(b *testing.B) { + for n := 0; n < b.N; n++ { + scopedDistinctStrings := NewScopedDistinctString(lim) + for i := 0; i < numIngesters; i++ { + for scope := range ingesterTags[i] { + // collect first item to simulate duplicates + if scopedDistinctStrings.Collect(scope, ingesterTags[i][scope][0]) { + break // stop early if limit is reached + } + } + } + } + }) + } +} diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index 5c93c26975d..1f917b8b770 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -1457,8 +1457,7 @@ func tagNamesRunner(t *testing.T, _ *tempopb.Trace, _ *tempopb.TraceSearchMetada valueCollector := collector.NewScopedDistinctString(0) err := e.ExecuteTagNames(ctx, traceql.AttributeScopeFromString(tc.scope), tc.query, func(tag string, scope traceql.AttributeScope) bool { - valueCollector.Collect(scope.String(), tag) - return valueCollector.Exceeded() + return valueCollector.Collect(scope.String(), tag) }, fetcher) if errors.Is(err, common.ErrUnsupported) { return