Skip to content

Commit

Permalink
fix: update jfr parser (#2340)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Aug 31, 2023
1 parent 485f165 commit b7369b0
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 103 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gorilla/mux v1.8.0
github.com/grafana/agent v0.35.4
github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c
github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8
github.com/grafana/pyroscope-go v1.0.0
github.com/grafana/pyroscope/api v0.0.0-00010101000000-000000000000
github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db
Expand All @@ -49,7 +50,6 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/common v0.44.0
github.com/prometheus/prometheus v1.99.0
github.com/pyroscope-io/jfr-parser v0.7.1
github.com/samber/lo v1.38.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,8 @@ github.com/grafana/agent v0.35.4 h1:5MBSSZIB8qwDJRfeMndSTJpeHAFnz68O2DeTT/AZN2A=
github.com/grafana/agent v0.35.4/go.mod h1:/NHq7TBP8AcX5ucJCgtbM7s5CyTnn7JDAsXKXvPsetQ=
github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c h1:UFt2isLqVSMfT2ajK6ew1Vg+3iePcNngqxnMpv3Rj5w=
github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM=
github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8 h1:Cod+QZWJXGLoCfKfuH66J3iSQ/WWw+R03R6QCwm8IB8=
github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8/go.mod h1:M5u1ux34Qo47ZBWksbMYVk40s7dvU3WMVYpxweEu4R0=
github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ=
github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/grafana/objstore v0.0.0-20230727114110-2394c72a3ec7 h1:UrHG0C+TawTkMxaONRYUdtaM2fzBsvzkdbrTt2cXCQM=
Expand Down Expand Up @@ -1068,8 +1070,6 @@ github.com/prometheus/prometheus v0.45.0 h1:O/uG+Nw4kNxx/jDPxmjsSDd+9Ohql6E7ZSY1
github.com/prometheus/prometheus v0.45.0/go.mod h1:jC5hyO8ItJBnDWGecbEucMyXjzxGv9cxsxsjS9u5s1w=
github.com/pyroscope-io/godeltaprof v0.1.2 h1:MdlEmYELd5w+lvIzmZvXGNMVzW2Qc9jDMuJaPOR75g4=
github.com/pyroscope-io/godeltaprof v0.1.2/go.mod h1:psMITXp90+8pFenXkKIpNhrfmI9saQnPbba27VIaiQE=
github.com/pyroscope-io/jfr-parser v0.7.1 h1:DzfaSi+wzasIcCMl+dhNnyzX53eUUsYpSEcYr/IG8Rg=
github.com/pyroscope-io/jfr-parser v0.7.1/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
Expand Down
2 changes: 2 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1946,6 +1946,8 @@ github.com/pyroscope-io/goldga v0.4.2-0.20220218190441-817afcc3a7f1 h1:T1fDdt3E3
github.com/pyroscope-io/goldga v0.4.2-0.20220218190441-817afcc3a7f1/go.mod h1:PbX5bxlj/WxyKIEAxYgNMNWUpXP4rt9GqtjfvTf8m+I=
github.com/pyroscope-io/jfr-parser v0.6.0 h1:4cQqs+9edZMbZ1ogJ0XDGtgM+PoII4kybJOunVMeD9I=
github.com/pyroscope-io/jfr-parser v0.6.0/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8=
github.com/pyroscope-io/jfr-parser v0.7.1 h1:DzfaSi+wzasIcCMl+dhNnyzX53eUUsYpSEcYr/IG8Rg=
github.com/pyroscope-io/jfr-parser v0.7.1/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ=
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be/go.mod h1:MIDFMn7db1kT65GmV94GzpX9Qdi7N/pQlwb+AN8wh+Q=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
Expand Down
164 changes: 84 additions & 80 deletions pkg/og/convert/jfr/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"io"
"regexp"

"github.com/grafana/jfr-parser/parser"
"github.com/grafana/jfr-parser/parser/types"
"github.com/grafana/pyroscope/pkg/og/storage"
"github.com/grafana/pyroscope/pkg/og/storage/metadata"
"github.com/grafana/pyroscope/pkg/og/storage/segment"
"github.com/grafana/pyroscope/pkg/og/storage/tree"
"github.com/hashicorp/go-multierror"
"github.com/pyroscope-io/jfr-parser/parser"
)

const (
Expand All @@ -27,40 +28,72 @@ const (
sampleTypeLiveObject
)

func ParseJFR(ctx context.Context, s storage.Putter, body io.Reader, pi *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) {
chunks, err := parser.ParseWithOptions(body, &parser.ChunkParseOptions{
CPoolProcessor: processSymbols,
UnsafeByteToString: true,
})
if err != nil {
return fmt.Errorf("unable to parse JFR format: %w", err)
}
for _, c := range chunks {
if pErr := parse(ctx, c, s, pi, jfrLabels); pErr != nil {
err = multierror.Append(err, pErr)
func ParseJFR(ctx context.Context, s storage.Putter, body []byte, pi *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("jfr parser panic: %v", r)
}
}()
p := parser.NewParser(body, parser.Options{
SymbolProcessor: processSymbols,
})
if pErr := parse(ctx, p, s, pi, jfrLabels); pErr != nil {
return err
}
return err
return nil
}

// revive:disable-next-line:cognitive-complexity necessary complexity
func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) {
func parse(ctx context.Context, c *parser.Parser, s storage.Putter, piOriginal *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) {
var event string

frames := func(ref types.StackTraceRef) [][]byte {
st := c.GetStacktrace(ref)
if st == nil {
return nil
}
frames := make([][]byte, 0, len(st.Frames))
for i := len(st.Frames) - 1; i >= 0; i-- {
f := st.Frames[i]
m := c.GetMethod(f.Method)
if m != nil {
if m.Scratch == nil {
cls := c.GetClass(m.Type)
if cls == nil {

} else {
clsName := c.GetSymbolString(cls.Name)
methodName := c.GetSymbolString(m.Name)
m.Scratch = []byte(clsName + "." + methodName)
}

}
frames = append(frames, m.Scratch)
}
}
return frames
}
cache := make(tree.LabelsCache)
type labelsWithHash struct {
Labels tree.Labels
Hash uint64
}
contexts := make(map[int64]labelsWithHash)
for c.Next() {
e := c.Event
contexts := make(map[uint64]labelsWithHash)
for {
typ, err := c.ParseEvent()
if err != nil {
if err == io.EOF {
break
}
return fmt.Errorf("jfr parser ParseEvent error: %w", err)
}

getLabels := func(contextID int64) labelsWithHash {
getLabels := func(contextID uint64) labelsWithHash {
res, ok := contexts[contextID]
if ok {
return res
}
ls := getContextLabels(contextID, jfrLabels)
ls := getContextLabels(int64(contextID), jfrLabels)
res = labelsWithHash{
Labels: ls,
Hash: ls.Hash(),
Expand All @@ -69,62 +102,54 @@ func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *st
return res
}

switch e.(type) {
case *parser.ExecutionSample:
es := e.(*parser.ExecutionSample)
if fs := frames(es.StackTrace); fs != nil {
lwh := getLabels(es.ContextId)
if es.State.Name == "STATE_RUNNABLE" {
switch typ {
case c.TypeMap.T_EXECUTION_SAMPLE:
if fs := frames(c.ExecutionSample.StackTrace); fs != nil {
lwh := getLabels(c.ExecutionSample.ContextId)
ts := c.GetThreadState(c.ExecutionSample.State)
if ts != nil && ts.Name == "STATE_RUNNABLE" {
cache.GetOrCreateTreeByHash(sampleTypeCPU, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
}
cache.GetOrCreateTreeByHash(sampleTypeWall, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
}
case *parser.ObjectAllocationInNewTLAB:
oa := e.(*parser.ObjectAllocationInNewTLAB)
if fs := frames(oa.StackTrace); fs != nil {
lwh := getLabels(oa.ContextId)
case c.TypeMap.T_ALLOC_IN_NEW_TLAB:
if fs := frames(c.ObjectAllocationInNewTLAB.StackTrace); fs != nil {
lwh := getLabels(c.ObjectAllocationInNewTLAB.ContextId)
cache.GetOrCreateTreeByHash(sampleTypeInTLABObjects, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
cache.GetOrCreateTreeByHash(sampleTypeInTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(oa.TLABSize))
cache.GetOrCreateTreeByHash(sampleTypeInTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ObjectAllocationInNewTLAB.TlabSize))
}
case *parser.ObjectAllocationOutsideTLAB:
oa := e.(*parser.ObjectAllocationOutsideTLAB)
if fs := frames(oa.StackTrace); fs != nil {
lwh := getLabels(oa.ContextId)
case c.TypeMap.T_ALLOC_OUTSIDE_TLAB:
if fs := frames(c.ObjectAllocationOutsideTLAB.StackTrace); fs != nil {
lwh := getLabels(c.ObjectAllocationOutsideTLAB.ContextId)
cache.GetOrCreateTreeByHash(sampleTypeOutTLABObjects, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
cache.GetOrCreateTreeByHash(sampleTypeOutTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(oa.AllocationSize))
cache.GetOrCreateTreeByHash(sampleTypeOutTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ObjectAllocationOutsideTLAB.AllocationSize))
}
case *parser.JavaMonitorEnter:
jme := e.(*parser.JavaMonitorEnter)
if fs := frames(jme.StackTrace); fs != nil {
lwh := getLabels(jme.ContextId)
case c.TypeMap.T_MONITOR_ENTER:
if fs := frames(c.JavaMonitorEnter.StackTrace); fs != nil {
lwh := getLabels(c.JavaMonitorEnter.ContextId)
cache.GetOrCreateTreeByHash(sampleTypeLockSamples, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(jme.Duration))
cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.JavaMonitorEnter.Duration))
}
case *parser.ThreadPark:
tp := e.(*parser.ThreadPark)
if fs := frames(tp.StackTrace); fs != nil {
lwh := getLabels(tp.ContextId)
case c.TypeMap.T_THREAD_PARK:
if fs := frames(c.ThreadPark.StackTrace); fs != nil {
lwh := getLabels(c.ThreadPark.ContextId)

cache.GetOrCreateTreeByHash(sampleTypeLockSamples, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(tp.Duration))
cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ThreadPark.Duration))
}
case *parser.LiveObject:
lo := e.(*parser.LiveObject)
if fs := frames(lo.StackTrace); fs != nil {
case c.TypeMap.T_LIVE_OBJECT:
if fs := frames(c.LiveObject.StackTrace); fs != nil {
lwh := getLabels(0)
cache.GetOrCreateTreeByHash(sampleTypeLiveObject, lwh.Labels, lwh.Hash).InsertStack(fs, 1)
}
case *parser.ActiveSetting:
if as, ok := e.(*parser.ActiveSetting); ok {
if as.Name == "event" {
event = as.Value
}
case c.TypeMap.T_ACTIVE_SETTING:

if c.ActiveSetting.Name == "event" {
event = c.ActiveSetting.Value
}

}
}
if c.Err() != nil {
return fmt.Errorf("unable to parse JFR chunk to PutInput: %w", c.Err())
}

for sampleType, entries := range cache {
for _, e := range entries {
Expand Down Expand Up @@ -271,24 +296,6 @@ func labelIndex(s *LabelsSnapshot, labels tree.Labels, key string) int {
return -1
}

func frames(st *parser.StackTrace) [][]byte {
if st == nil {
return nil
}
frames := make([][]byte, 0, len(st.Frames))
for i := len(st.Frames) - 1; i >= 0; i-- {
f := st.Frames[i]
// TODO(abeaumont): Add support for line numbers.
if f.Method != nil && f.Method.Type != nil && f.Method.Type.Name != nil && f.Method.Name != nil {
if f.Method.Scratch == nil {
f.Method.Scratch = []byte(f.Method.Type.Name.String + "." + f.Method.Name.String)
}
frames = append(frames, f.Method.Scratch)
}
}
return frames
}

// jdk/internal/reflect/GeneratedMethodAccessor31
var generatedMethodAccessor = regexp.MustCompile("^(jdk/internal/reflect/GeneratedMethodAccessor)(\\d+)$")

Expand Down Expand Up @@ -316,11 +323,8 @@ func mergeJVMGeneratedClasses(frame string) string {
return frame
}

func processSymbols(meta *parser.ClassMetadata, cpool *parser.CPool) {
if meta.Name == "jdk.types.Symbol" {
for _, v := range cpool.Pool {
sym := v.(*parser.Symbol)
sym.String = mergeJVMGeneratedClasses(sym.String)
}
func processSymbols(ref *types.SymbolList) {
for i := range ref.Symbol { //todo regex replace inplace
ref.Symbol[i].String = mergeJVMGeneratedClasses(ref.Symbol[i].String)
}
}
48 changes: 32 additions & 16 deletions pkg/og/convert/jfr/parser_suite_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package jfr

import (
"bytes"
"context"
"strings"
"testing"
Expand Down Expand Up @@ -59,7 +58,7 @@ func TestParseCompareExpectedData(t *testing.T) {
err = proto.Unmarshal(labelsBytes, labels)
require.NoError(t, err)
}
err = ParseJFR(context.TODO(), putter, bytes.NewBuffer(jfr), pi, labels)
err = ParseJFR(context.TODO(), putter, jfr, pi, labels)
require.NoError(t, err)
jsonFile := strings.TrimSuffix(td.jfr, ".jfr.gz") + ".json.gz"
//err = putter.DumpJson(jsonFile)
Expand All @@ -70,20 +69,37 @@ func TestParseCompareExpectedData(t *testing.T) {
}

func BenchmarkParser(b *testing.B) {
jfr, err := bench.ReadGzipFile("testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz")
require.NoError(b, err)
k, err := segment.ParseKey("kafka.app")
require.NoError(b, err)
pi := &storage.PutInput{
StartTime: time.UnixMilli(1000),
EndTime: time.UnixMilli(2000),
Key: k,
SpyName: "java",
SampleRate: 100,
tests := []string{
"testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu__1.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu__2.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu__3.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__0.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__1.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__2.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__3.jfr.gz",
"testdata/cortex-dev-01__kafka-0__cpu_lock0_alloc0__0.jfr.gz",
}
putter := &bench.MockPutter{Keep: false}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = ParseJFR(context.TODO(), putter, bytes.NewBuffer(jfr), pi, nil)

for _, testdata := range tests {
f := testdata
b.Run(testdata, func(b *testing.B) {
jfr, err := bench.ReadGzipFile(f)
require.NoError(b, err)
k, err := segment.ParseKey("kafka.app")
require.NoError(b, err)
pi := &storage.PutInput{
StartTime: time.UnixMilli(1000),
EndTime: time.UnixMilli(2000),
Key: k,
SpyName: "java",
SampleRate: 100,
}
putter := &bench.MockPutter{Keep: false}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err = ParseJFR(context.TODO(), putter, jfr, pi, nil)
}
})
}
}
8 changes: 4 additions & 4 deletions pkg/og/convert/jfr/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *RawProfile) Parse(ctx context.Context, putter storage.Putter, _ storage
}

labels := new(LabelsSnapshot)
var r io.Reader = bytes.NewReader(p.RawData)
var r = p.RawData
var err error
if strings.Contains(p.FormDataContentType, "multipart/form-data") {
if r, labels, err = loadJFRFromForm(r, p.FormDataContentType); err != nil {
Expand All @@ -53,13 +53,13 @@ func (p *RawProfile) ContentType() string {
return p.FormDataContentType
}

func loadJFRFromForm(r io.Reader, contentType string) (io.Reader, *LabelsSnapshot, error) {
func loadJFRFromForm(r []byte, contentType string) ([]byte, *LabelsSnapshot, error) {
boundary, err := form.ParseBoundary(contentType)
if err != nil {
return nil, nil, err
}

f, err := multipart.NewReader(r, boundary).ReadForm(32 << 20)
f, err := multipart.NewReader(bytes.NewBuffer(r), boundary).ReadForm(32 << 20)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -95,7 +95,7 @@ func loadJFRFromForm(r io.Reader, contentType string) (io.Reader, *LabelsSnapsho
}
}

return bytes.NewReader(jfrField), &labels, nil
return jfrField, &labels, nil
}

func decompress(bs []byte) ([]byte, error) {
Expand Down

0 comments on commit b7369b0

Please sign in to comment.