From 6d8426cea0a0cf187482ec8813aa972950302b27 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Thu, 9 Nov 2023 13:48:07 +0800 Subject: [PATCH] Randomize sessions after aggregation --- pkg/distributor/distributor.go | 10 ++++++++-- pkg/distributor/distributor_test.go | 13 ++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 843f352bc2..47ed7bf63a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -266,7 +266,6 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push req.TotalBytesUncompressed += int64(len(lbs.Value)) } profName := phlaremodel.Labels(series.Labels).Get(ProfileName) - series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels) for _, raw := range series.Samples { usagestats.NewCounter(fmt.Sprintf("distributor_profile_type_%s_received", profName)).Inc(1) d.profileReceivedStats.Inc(1) @@ -385,6 +384,11 @@ func (d *Distributor) sendAggregatedProfile(ctx context.Context, req *distributo } func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.PushRequest, tenantID string) (resp *connect.Response[pushv1.PushResponse], err error) { + // Reduce cardinality of session_id label. + for _, series := range req.Series { + series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels) + } + // Next we split profiles by labels. Newly allocated profiles should be closed after use. profileSeries, newProfiles := extractSampleSeries(req) defer func() { @@ -509,7 +513,9 @@ func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels, if !ok { return nil, false, nil } - r, ok, err := a.Aggregate(labels.Hash(), profile.TimeNanos, mergeProfile(profile)) + + k, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), phlaremodel.LabelNameSessionID) + r, ok, err := a.Aggregate(k, profile.TimeNanos, mergeProfile(profile)) if err != nil { return nil, false, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 3306e9bf7b..27fb00236f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "math/rand" "net/http" "net/http/httptest" "os" @@ -789,6 +790,7 @@ func TestPush_ShuffleSharding(t *testing.T) { } func TestPush_Aggregation(t *testing.T) { + const maxSessions = 16 ingesterClient := newFakeIngester(t, false) d, err := New( Config{DistributorRing: ringConfig, PushTimeout: time.Second * 10}, @@ -798,6 +800,7 @@ func TestPush_Aggregation(t *testing.T) { l := validation.MockDefaultLimits() l.DistributorAggregationPeriod = model.Duration(time.Second) l.DistributorAggregationWindow = model.Duration(time.Second) + l.MaxSessionsPerSeries = maxSessions tenantLimits["user-1"] = l }), nil, log.NewLogfmtLogger(os.Stdout), @@ -824,6 +827,10 @@ func TestPush_Aggregation(t *testing.T) { {Name: "cluster", Value: "us-central1"}, {Name: "client", Value: strconv.Itoa(i)}, {Name: "__name__", Value: "cpu"}, + { + Name: phlaremodel.LabelNameSessionID, + Value: phlaremodel.SessionID(rand.Uint64()).String(), + }, }, Samples: []*distributormodel.ProfileSample{ { @@ -844,11 +851,13 @@ func TestPush_Aggregation(t *testing.T) { d.asyncRequests.Wait() var sum int64 + sessions := make(map[string]struct{}) assert.GreaterOrEqual(t, len(ingesterClient.requests), 20) assert.Less(t, len(ingesterClient.requests), 100) for _, r := range ingesterClient.requests { for _, s := range r.Series { - require.Len(t, s.Samples, 1) + sessionID := phlaremodel.Labels(s.Labels).Get(phlaremodel.LabelNameSessionID) + sessions[sessionID] = struct{}{} p, err := pprof2.RawFromBytes(s.Samples[0].RawProfile) require.NoError(t, err) for _, x := range p.Sample { @@ -859,6 +868,8 @@ func TestPush_Aggregation(t *testing.T) { // RF * samples_per_profile * clients * requests assert.Equal(t, int64(3*2*clients*requests), sum) + assert.GreaterOrEqual(t, len(sessions), clients) + assert.LessOrEqual(t, len(sessions), maxSessions+1) } func testProfile(t int64) *profilev1.Profile {