Skip to content

Commit

Permalink
Randomize sessions after aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 9, 2023
1 parent 4486d81 commit 6d8426c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
10 changes: 8 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ func (d *Distributor) PushParsed(ctx context.Context, req *distributormodel.Push
req.TotalBytesUncompressed += int64(len(lbs.Value))
}
profName := phlaremodel.Labels(series.Labels).Get(ProfileName)
series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels)
for _, raw := range series.Samples {
usagestats.NewCounter(fmt.Sprintf("distributor_profile_type_%s_received", profName)).Inc(1)
d.profileReceivedStats.Inc(1)
Expand Down Expand Up @@ -385,6 +384,11 @@ func (d *Distributor) sendAggregatedProfile(ctx context.Context, req *distributo
}

func (d *Distributor) sendRequests(ctx context.Context, req *distributormodel.PushRequest, tenantID string) (resp *connect.Response[pushv1.PushResponse], err error) {
// Reduce cardinality of session_id label.
for _, series := range req.Series {
series.Labels = d.limitMaxSessionsPerSeries(tenantID, series.Labels)
}

// Next we split profiles by labels. Newly allocated profiles should be closed after use.
profileSeries, newProfiles := extractSampleSeries(req)
defer func() {
Expand Down Expand Up @@ -509,7 +513,9 @@ func (d *Distributor) maybeAggregate(tenantID string, labels phlaremodel.Labels,
if !ok {
return nil, false, nil
}
r, ok, err := a.Aggregate(labels.Hash(), profile.TimeNanos, mergeProfile(profile))

k, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), phlaremodel.LabelNameSessionID)
r, ok, err := a.Aggregate(k, profile.TimeNanos, mergeProfile(profile))
if err != nil {
return nil, false, err
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -789,6 +790,7 @@ func TestPush_ShuffleSharding(t *testing.T) {
}

func TestPush_Aggregation(t *testing.T) {
const maxSessions = 16
ingesterClient := newFakeIngester(t, false)
d, err := New(
Config{DistributorRing: ringConfig, PushTimeout: time.Second * 10},
Expand All @@ -798,6 +800,7 @@ func TestPush_Aggregation(t *testing.T) {
l := validation.MockDefaultLimits()
l.DistributorAggregationPeriod = model.Duration(time.Second)
l.DistributorAggregationWindow = model.Duration(time.Second)
l.MaxSessionsPerSeries = maxSessions
tenantLimits["user-1"] = l
}),
nil, log.NewLogfmtLogger(os.Stdout),
Expand All @@ -824,6 +827,10 @@ func TestPush_Aggregation(t *testing.T) {
{Name: "cluster", Value: "us-central1"},
{Name: "client", Value: strconv.Itoa(i)},
{Name: "__name__", Value: "cpu"},
{
Name: phlaremodel.LabelNameSessionID,
Value: phlaremodel.SessionID(rand.Uint64()).String(),
},
},
Samples: []*distributormodel.ProfileSample{
{
Expand All @@ -844,11 +851,13 @@ func TestPush_Aggregation(t *testing.T) {
d.asyncRequests.Wait()

var sum int64
sessions := make(map[string]struct{})
assert.GreaterOrEqual(t, len(ingesterClient.requests), 20)
assert.Less(t, len(ingesterClient.requests), 100)
for _, r := range ingesterClient.requests {
for _, s := range r.Series {
require.Len(t, s.Samples, 1)
sessionID := phlaremodel.Labels(s.Labels).Get(phlaremodel.LabelNameSessionID)
sessions[sessionID] = struct{}{}
p, err := pprof2.RawFromBytes(s.Samples[0].RawProfile)
require.NoError(t, err)
for _, x := range p.Sample {
Expand All @@ -859,6 +868,8 @@ func TestPush_Aggregation(t *testing.T) {

// RF * samples_per_profile * clients * requests
assert.Equal(t, int64(3*2*clients*requests), sum)
assert.GreaterOrEqual(t, len(sessions), clients)
assert.LessOrEqual(t, len(sessions), maxSessions+1)
}

func testProfile(t int64) *profilev1.Profile {
Expand Down

0 comments on commit 6d8426c

Please sign in to comment.