Skip to content

Commit

Permalink
feat: move pprof split to segment writer (#3561)
Browse files Browse the repository at this point in the history
* feat: move pprof split to segment writer
  • Loading branch information
kolesnikovae authored Sep 18, 2024
1 parent 8bd07ea commit fcc3dfd
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 248 deletions.
287 changes: 64 additions & 223 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sort"
"sync"
"time"
"unsafe"

"connectrpc.com/connect"
"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -41,6 +40,7 @@ import (
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
writepath "github.com/grafana/pyroscope/pkg/distributor/write_path"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split"
"github.com/grafana/pyroscope/pkg/model/relabel"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/slices"
Expand Down Expand Up @@ -567,18 +567,6 @@ func (d *Distributor) sendRequestsToIngester(ctx context.Context, req *distribut
}
}

// sampleSize returns the size of a samples in bytes.
func sampleSize(stringTable []string, samplesSlice []*profilev1.Sample) int64 {
var size int64
for _, s := range samplesSlice {
size += int64(s.SizeVT())
for _, l := range s.Label {
size += int64(len(stringTable[l.Key]) + len(stringTable[l.Str]) + len(stringTable[l.NumUnit]))
}
}
return size
}

// profileSizeBytes returns the size of symbols and samples in bytes.
func profileSizeBytes(p *profilev1.Profile) (symbols, samples int64) {
fullSize := p.SizeVT()
Expand Down Expand Up @@ -703,190 +691,6 @@ func (d *Distributor) HealthyInstancesCount() int {
return int(d.healthyInstancesCount.Load())
}

type sampleKey struct {
stacktrace string
// note this is an index into the string table, rather than span ID
spanIDIdx int64
}

func sampleKeyFromSample(stringTable []string, s *profilev1.Sample) sampleKey {
var k sampleKey

// populate spanID if present
for _, l := range s.Label {
if stringTable[int(l.Key)] == pprof.SpanIDLabelName {
k.spanIDIdx = l.Str
}
}
if len(s.LocationId) > 0 {
k.stacktrace = unsafe.String(
(*byte)(unsafe.Pointer(&s.LocationId[0])),
len(s.LocationId)*8,
)
}
return k
}

type lazyGroup struct {
sampleGroup pprof.SampleGroup
// The map is only initialized when the group is being modified. Key is the
// string representation (unsafe) of the sample stack trace and its potential
// span ID.
sampleMap map[sampleKey]*profilev1.Sample
labels phlaremodel.Labels
}

func (g *lazyGroup) addSampleGroup(stringTable []string, sg pprof.SampleGroup) {
if len(g.sampleGroup.Samples) == 0 {
g.sampleGroup = sg
return
}

// If the group is already initialized, we need to merge the samples.
if g.sampleMap == nil {
g.sampleMap = make(map[sampleKey]*profilev1.Sample)
for _, s := range g.sampleGroup.Samples {
g.sampleMap[sampleKeyFromSample(stringTable, s)] = s
}
}

for _, s := range sg.Samples {
k := sampleKeyFromSample(stringTable, s)
if _, ok := g.sampleMap[k]; !ok {
g.sampleGroup.Samples = append(g.sampleGroup.Samples, s)
g.sampleMap[k] = s
} else {
// merge the samples
for idx := range s.Value {
g.sampleMap[k].Value[idx] += s.Value[idx]
}
}
}
}

type groupsWithFingerprints struct {
m map[uint64][]lazyGroup
order []uint64
}

func newGroupsWithFingerprints() *groupsWithFingerprints {
return &groupsWithFingerprints{
m: make(map[uint64][]lazyGroup),
}
}

func (g *groupsWithFingerprints) add(stringTable []string, lbls phlaremodel.Labels, group pprof.SampleGroup) {
fp := lbls.Hash()
idxs, ok := g.m[fp]
if ok {
// fingerprint matches, check if the labels are the same
for _, idx := range idxs {
if phlaremodel.CompareLabelPairs(idx.labels, lbls) == 0 {
// append samples to the group
idx.addSampleGroup(stringTable, group)
return
}
}
} else {
g.order = append(g.order, fp)
}

// add the labels to the list
g.m[fp] = append(g.m[fp], lazyGroup{
sampleGroup: group,
labels: lbls,
})
}

func extractSampleSeries(req *distributormodel.PushRequest, tenantID string, usageGroups *validation.UsageGroupConfig, relabelRules []*relabel.Config) (result []*distributormodel.ProfileSeries, bytesRelabelDropped, profilesRelabelDropped float64) {
var (
lblbuilder = phlaremodel.NewLabelsBuilder(phlaremodel.EmptyLabels())
)

profileSeries := make([]*distributormodel.ProfileSeries, 0, len(req.Series))
for _, series := range req.Series {
s := &distributormodel.ProfileSeries{
Labels: series.Labels,
Samples: make([]*distributormodel.ProfileSample, 0, len(series.Samples)),
}
usageGroups := usageGroups.GetUsageGroups(tenantID, series.Labels)

for _, raw := range series.Samples {
pprof.RenameLabel(raw.Profile.Profile, pprof.ProfileIDLabelName, pprof.SpanIDLabelName)
groups := pprof.GroupSamplesWithoutLabels(raw.Profile.Profile, pprof.SpanIDLabelName)

if len(groups) == 0 || (len(groups) == 1 && len(groups[0].Labels) == 0) {
// No sample labels in the profile.

// relabel the labels of the series
lblbuilder.Reset(series.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
bytesRelabelDropped += float64(raw.Profile.SizeVT())
profilesRelabelDropped++ // in this case we dropped a whole profile
usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(raw.Profile.SizeVT()))
continue
}
}

// Copy over the labels from the builder
s.Labels = lblbuilder.Labels()

// We do not modify the request.
s.Samples = append(s.Samples, raw)

continue
}

// iterate through groups relabel them and find relevant overlapping labelsets
groupsKept := newGroupsWithFingerprints()
for _, group := range groups {
lblbuilder.Reset(series.Labels)
addSampleLabelsToLabelsBuilder(lblbuilder, raw.Profile.Profile, group.Labels)
if len(relabelRules) > 0 {
keep := relabel.ProcessBuilder(lblbuilder, relabelRules...)
if !keep {
droppedBytes := sampleSize(raw.Profile.Profile.StringTable, group.Samples)
bytesRelabelDropped += float64(droppedBytes)
usageGroups.CountDiscardedBytes(string(validation.DroppedByRelabelRules), droppedBytes)
continue
}
}

// add the group to the list
groupsKept.add(raw.Profile.StringTable, lblbuilder.Labels(), group)
}

if len(groupsKept.m) == 0 {
// no groups kept, count the whole profile as dropped
profilesRelabelDropped++
continue
}

e := pprof.NewSampleExporter(raw.Profile.Profile)
for _, idx := range groupsKept.order {
for _, group := range groupsKept.m[idx] {
// exportSamples creates a new profile with the samples provided.
// The samples are obtained via GroupSamples call, which means
// the underlying capacity is referenced by the source profile.
// Therefore, the slice has to be copied and samples zeroed to
// avoid ownership issues.
profile := exportSamples(e, group.sampleGroup.Samples)
profileSeries = append(profileSeries, &distributormodel.ProfileSeries{
Labels: group.labels,
Samples: []*distributormodel.ProfileSample{{Profile: profile}},
})
}
}
}
if len(s.Samples) > 0 {
profileSeries = append(profileSeries, s)
}
}
return profileSeries, bytesRelabelDropped, profilesRelabelDropped
}

func (d *Distributor) limitMaxSessionsPerSeries(maxSessionsPerSeries int, labels phlaremodel.Labels) phlaremodel.Labels {
if maxSessionsPerSeries == 0 {
return labels.Delete(phlaremodel.LabelNameSessionID)
Expand Down Expand Up @@ -927,32 +731,6 @@ func (d *Distributor) rateLimit(tenantID string, req *distributormodel.PushReque
return nil
}

// addSampleLabelsToLabelsBuilder: adds sample label that don't exists yet on the profile builder. So the existing labels take precedence.
func addSampleLabelsToLabelsBuilder(b *phlaremodel.LabelsBuilder, p *profilev1.Profile, pl []*profilev1.Label) {
var name string
for _, l := range pl {
name = p.StringTable[l.Key]
if l.Str <= 0 {
// skip if label value is not a string
continue
}
if b.Get(name) != "" {
// do nothing if label name already exists
continue
}
b.Set(name, p.StringTable[l.Str])
}
}

func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile {
samplesCopy := make([]*profilev1.Sample, len(samples))
copy(samplesCopy, samples)
slices.Clear(samples)
n := pprof.NewProfile()
e.ExportSamples(n.Profile, samplesCopy)
return n
}

type profileTracker struct {
profile *distributormodel.ProfileSeries
minSuccess int
Expand Down Expand Up @@ -1029,3 +807,66 @@ func injectMappingVersions(series []*distributormodel.ProfileSeries) error {
}
return nil
}

func extractSampleSeries(
req *distributormodel.PushRequest,
tenantID string,
usage *validation.UsageGroupConfig,
rules []*relabel.Config,
) (
result []*distributormodel.ProfileSeries,
bytesRelabelDropped float64,
profilesRelabelDropped float64,
) {
for _, series := range req.Series {
for _, p := range series.Samples {
v := &sampleSeriesVisitor{profile: p.Profile.Profile}
pprofsplit.VisitSampleSeries(
p.Profile.Profile,
series.Labels,
rules,
v,
)
result = append(result, v.series...)
bytesRelabelDropped += float64(v.discardedBytes)
profilesRelabelDropped += float64(v.discardedProfiles)
usage.GetUsageGroups(tenantID, series.Labels).
CountDiscardedBytes(string(validation.DroppedByRelabelRules), int64(v.discardedBytes))
}
}

return result, bytesRelabelDropped, profilesRelabelDropped
}

type sampleSeriesVisitor struct {
profile *profilev1.Profile
exp *pprof.SampleExporter
series []*distributormodel.ProfileSeries

discardedBytes int
discardedProfiles int
}

func (v *sampleSeriesVisitor) VisitSampleSeries(labels []*typesv1.LabelPair, samples []*profilev1.Sample) {
if v.exp == nil {
v.exp = pprof.NewSampleExporter(v.profile)
}
v.series = append(v.series, &distributormodel.ProfileSeries{
Samples: []*distributormodel.ProfileSample{{Profile: exportSamples(v.exp, samples)}},
Labels: labels,
})
}

func (v *sampleSeriesVisitor) Discarded(profiles, bytes int) {
v.discardedProfiles += profiles
v.discardedBytes += bytes
}

func exportSamples(e *pprof.SampleExporter, samples []*profilev1.Sample) *pprof.Profile {
samplesCopy := make([]*profilev1.Sample, len(samples))
copy(samplesCopy, samples)
clear(samples)
n := pprof.NewProfile()
e.ExportSamples(n.Profile, samplesCopy)
return n
}
24 changes: 13 additions & 11 deletions pkg/experiment/ingester/memdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@ import (
"bytes"
"context"
"fmt"
"math"
"sync"

"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
phlarelabels "github.com/grafana/pyroscope/pkg/phlaredb/labels"
"github.com/grafana/pyroscope/pkg/phlaredb/labels"
schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1"
"github.com/grafana/pyroscope/pkg/phlaredb/symdb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"math"
"sync"
)

type FlushedHead struct {
Expand Down Expand Up @@ -65,14 +67,14 @@ func (h *Head) Ingest(p *profilev1.Profile, id uuid.UUID, externalLabels []*type
return
}

// delta not supported
// Delta not supported.
externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameDelta)

enforceLabelOrder := phlaremodel.Labels(externalLabels).Get(phlaremodel.LabelNameOrder) == phlaremodel.LabelOrderEnforced
// Label order is enforced to ensure that __profile_type__ and __service_name__ always
// come first in the label set. This is important for spatial locality: profiles are
// stored in the label series order.
externalLabels = phlaremodel.Labels(externalLabels).Delete(phlaremodel.LabelNameOrder)

lbls, seriesFingerprints := phlarelabels.CreateProfileLabels(enforceLabelOrder, p, externalLabels...)

lbls, seriesFingerprints := labels.CreateProfileLabels(true, p, externalLabels...)
metricName := phlaremodel.Labels(externalLabels).Get(model.MetricNameLabel)

var profileIngested bool
Expand Down
Loading

0 comments on commit fcc3dfd

Please sign in to comment.