Skip to content

Commit

Permalink
fix: parse jfr to pprof, without intermediate tree (#2355)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Sep 7, 2023
1 parent 9c8c6c1 commit 5dee87a
Show file tree
Hide file tree
Showing 35 changed files with 944 additions and 592 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.12
github.com/k0kubun/pp/v3 v3.2.0
github.com/klauspost/compress v1.16.7
Expand Down Expand Up @@ -144,6 +143,7 @@ require (
github.com/hashicorp/go-hclog v1.5.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
Expand Down
24 changes: 24 additions & 0 deletions pkg/distributor/model/push.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package model

import (
v1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/pprof"
)

type PushRequest struct {
RawProfileSize int
RawProfileType string // should be set if not pprof, eg jfr

Series []*ProfileSeries
}

type ProfileSample struct {
Profile *pprof.Profile
RawProfile []byte // may be nil if the Profile is composed not from pprof ( e.g. jfr)
ID string
}

type ProfileSeries struct {
Labels []*v1.LabelPair
Samples []*ProfileSample
}
66 changes: 61 additions & 5 deletions pkg/ingester/pyroscope/ingest_adapter.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package pyroscope

import (
"bytes"
"context"
"fmt"
"net/http"
"strings"
"time"

"github.com/grafana/pyroscope/pkg/distributor/model"

"github.com/bufbuild/connect-go"
"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/protobuf/proto"

"github.com/grafana/pyroscope/pkg/og/ingestion"
"github.com/grafana/pyroscope/pkg/og/storage"
"github.com/grafana/pyroscope/pkg/og/storage/tree"

pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/og/ingestion"
"github.com/grafana/pyroscope/pkg/og/storage"
"github.com/grafana/pyroscope/pkg/og/storage/tree"
)

type PushService interface {
Expand All @@ -38,7 +40,12 @@ type pyroscopeIngesterAdapter struct {
}

func (p *pyroscopeIngesterAdapter) Ingest(ctx context.Context, in *ingestion.IngestInput) error {
return in.Profile.Parse(ctx, p, p, in.Metadata)
pprofable, ok := in.Profile.(ingestion.ParseableToPprof)
if ok {
return p.parseToPprof(ctx, in, pprofable)
} else {
return in.Profile.Parse(ctx, p, p, in.Metadata)
}
}

const (
Expand Down Expand Up @@ -152,6 +159,55 @@ func (p *pyroscopeIngesterAdapter) Evaluate(input *storage.PutInput) (storage.Sa
return nil, false // noop
}

func (p *pyroscopeIngesterAdapter) parseToPprof(ctx context.Context, in *ingestion.IngestInput, pprofable ingestion.ParseableToPprof) error {
plainReq, err := pprofable.ParseToPprof(ctx, in.Metadata)
if err != nil {
return fmt.Errorf("parsing IngestInput-pprof failed %w", err)
}
grpcReq, err := p.convertToGRPC(plainReq)
if err != nil {
return fmt.Errorf("converting IngestInput-pprof failed %w", err)
}
_, err = p.svc.Push(ctx, grpcReq)
if err != nil {
return fmt.Errorf("pushing IngestInput-pprof failed %w", err)
}
return nil
}

func (p *pyroscopeIngesterAdapter) convertToGRPC(profiles *model.PushRequest) (*connect.Request[pushv1.PushRequest], error) {
defer func() {
for _, series := range profiles.Series {
for _, sample := range series.Samples {
sample.Profile.Close()
}
}
}()
req := &pushv1.PushRequest{
Series: make([]*pushv1.RawProfileSeries, len(profiles.Series)),
}
for i, series := range profiles.Series {
grpcSeries := &pushv1.RawProfileSeries{
Labels: make([]*typesv1.LabelPair, len(series.Labels)),
Samples: make([]*pushv1.RawSample, len(series.Samples)),
}
copy(grpcSeries.Labels, series.Labels)

for j, sample := range series.Samples {
buf := bytes.NewBuffer(nil)
_, err := sample.Profile.WriteTo(buf)
if err != nil {
return nil, fmt.Errorf("failed to serialize pprof to bytes for distributor push %w", err)
}
grpcSeries.Samples[j] = &pushv1.RawSample{
RawProfile: buf.Bytes(),
}
}
req.Series[i] = grpcSeries
}
return connect.NewRequest(req), nil
}

func convertMetadata(pi *storage.PutInput) (metricName, stType, stUnit, app string, err error) {
app = pi.Key.AppName()
parts := strings.Split(app, ".")
Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/pyroscope/ingest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (h ingestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var connectErr *connect.Error
if ok := errors.As(err, &connectErr); ok {
w.WriteHeader(int(connectgrpc.CodeToHTTP(connectErr.Code())))
_, _ = w.Write([]byte(connectErr.Message()))
return
}

Expand Down
Loading

0 comments on commit 5dee87a

Please sign in to comment.