From b7369b0258dc5b770cae9fbdd4c3a749e54963af Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Thu, 31 Aug 2023 21:30:25 +0700 Subject: [PATCH] fix: update jfr parser (#2340) --- go.mod | 2 +- go.sum | 4 +- go.work.sum | 2 + pkg/og/convert/jfr/parser.go | 164 ++++++++++++------------ pkg/og/convert/jfr/parser_suite_test.go | 48 ++++--- pkg/og/convert/jfr/profile.go | 8 +- 6 files changed, 125 insertions(+), 103 deletions(-) diff --git a/go.mod b/go.mod index 05ebb8f494..8bb8feb772 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/grafana/agent v0.35.4 github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c + github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8 github.com/grafana/pyroscope-go v1.0.0 github.com/grafana/pyroscope/api v0.0.0-00010101000000-000000000000 github.com/grafana/regexp v0.0.0-20221123153739-15dc172cd2db @@ -49,7 +50,6 @@ require ( github.com/prometheus/client_golang v1.16.0 github.com/prometheus/common v0.44.0 github.com/prometheus/prometheus v1.99.0 - github.com/pyroscope-io/jfr-parser v0.7.1 github.com/samber/lo v1.38.1 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 diff --git a/go.sum b/go.sum index 04b0bb59aa..165be0985b 100644 --- a/go.sum +++ b/go.sum @@ -801,6 +801,8 @@ github.com/grafana/agent v0.35.4 h1:5MBSSZIB8qwDJRfeMndSTJpeHAFnz68O2DeTT/AZN2A= github.com/grafana/agent v0.35.4/go.mod h1:/NHq7TBP8AcX5ucJCgtbM7s5CyTnn7JDAsXKXvPsetQ= github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c h1:UFt2isLqVSMfT2ajK6ew1Vg+3iePcNngqxnMpv3Rj5w= github.com/grafana/dskit v0.0.0-20230817134647-90d7ee0bed7c/go.mod h1:3u7fr4hmOhuUL9Yc1QP/oa3za73kxvqJnRJH4BA5fOM= +github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8 h1:Cod+QZWJXGLoCfKfuH66J3iSQ/WWw+R03R6QCwm8IB8= +github.com/grafana/jfr-parser v0.7.2-0.20230831140626-08fa3a941bf8/go.mod h1:M5u1ux34Qo47ZBWksbMYVk40s7dvU3WMVYpxweEu4R0= github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91 h1:/NipyHnOmvRsVzj81j2qE0VxsvsqhOB0f4vJIhk2qCQ= github.com/grafana/memberlist v0.3.1-0.20220708130638-bd88e10a3d91/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/objstore v0.0.0-20230727114110-2394c72a3ec7 h1:UrHG0C+TawTkMxaONRYUdtaM2fzBsvzkdbrTt2cXCQM= @@ -1068,8 +1070,6 @@ github.com/prometheus/prometheus v0.45.0 h1:O/uG+Nw4kNxx/jDPxmjsSDd+9Ohql6E7ZSY1 github.com/prometheus/prometheus v0.45.0/go.mod h1:jC5hyO8ItJBnDWGecbEucMyXjzxGv9cxsxsjS9u5s1w= github.com/pyroscope-io/godeltaprof v0.1.2 h1:MdlEmYELd5w+lvIzmZvXGNMVzW2Qc9jDMuJaPOR75g4= github.com/pyroscope-io/godeltaprof v0.1.2/go.mod h1:psMITXp90+8pFenXkKIpNhrfmI9saQnPbba27VIaiQE= -github.com/pyroscope-io/jfr-parser v0.7.1 h1:DzfaSi+wzasIcCMl+dhNnyzX53eUUsYpSEcYr/IG8Rg= -github.com/pyroscope-io/jfr-parser v0.7.1/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw= github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= diff --git a/go.work.sum b/go.work.sum index ef751bf8a7..cf7818760c 100644 --- a/go.work.sum +++ b/go.work.sum @@ -1946,6 +1946,8 @@ github.com/pyroscope-io/goldga v0.4.2-0.20220218190441-817afcc3a7f1 h1:T1fDdt3E3 github.com/pyroscope-io/goldga v0.4.2-0.20220218190441-817afcc3a7f1/go.mod h1:PbX5bxlj/WxyKIEAxYgNMNWUpXP4rt9GqtjfvTf8m+I= github.com/pyroscope-io/jfr-parser v0.6.0 h1:4cQqs+9edZMbZ1ogJ0XDGtgM+PoII4kybJOunVMeD9I= github.com/pyroscope-io/jfr-parser v0.6.0/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8= +github.com/pyroscope-io/jfr-parser v0.7.1 h1:DzfaSi+wzasIcCMl+dhNnyzX53eUUsYpSEcYr/IG8Rg= +github.com/pyroscope-io/jfr-parser v0.7.1/go.mod h1:ZMcbJjfDkOwElEK8CvUJbpetztRWRXszCmf5WU0erV8= github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be h1:ta7tUOvsPHVHGom5hKW5VXNc2xZIkfCKP8iaqOyYtUQ= github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be/go.mod h1:MIDFMn7db1kT65GmV94GzpX9Qdi7N/pQlwb+AN8wh+Q= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= diff --git a/pkg/og/convert/jfr/parser.go b/pkg/og/convert/jfr/parser.go index e4f6c6d9ae..8cb5ece7da 100644 --- a/pkg/og/convert/jfr/parser.go +++ b/pkg/og/convert/jfr/parser.go @@ -6,12 +6,13 @@ import ( "io" "regexp" + "github.com/grafana/jfr-parser/parser" + "github.com/grafana/jfr-parser/parser/types" "github.com/grafana/pyroscope/pkg/og/storage" "github.com/grafana/pyroscope/pkg/og/storage/metadata" "github.com/grafana/pyroscope/pkg/og/storage/segment" "github.com/grafana/pyroscope/pkg/og/storage/tree" "github.com/hashicorp/go-multierror" - "github.com/pyroscope-io/jfr-parser/parser" ) const ( @@ -27,40 +28,72 @@ const ( sampleTypeLiveObject ) -func ParseJFR(ctx context.Context, s storage.Putter, body io.Reader, pi *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) { - chunks, err := parser.ParseWithOptions(body, &parser.ChunkParseOptions{ - CPoolProcessor: processSymbols, - UnsafeByteToString: true, - }) - if err != nil { - return fmt.Errorf("unable to parse JFR format: %w", err) - } - for _, c := range chunks { - if pErr := parse(ctx, c, s, pi, jfrLabels); pErr != nil { - err = multierror.Append(err, pErr) +func ParseJFR(ctx context.Context, s storage.Putter, body []byte, pi *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("jfr parser panic: %v", r) } + }() + p := parser.NewParser(body, parser.Options{ + SymbolProcessor: processSymbols, + }) + if pErr := parse(ctx, p, s, pi, jfrLabels); pErr != nil { + return err } - return err + return nil } // revive:disable-next-line:cognitive-complexity necessary complexity -func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) { +func parse(ctx context.Context, c *parser.Parser, s storage.Putter, piOriginal *storage.PutInput, jfrLabels *LabelsSnapshot) (err error) { var event string + + frames := func(ref types.StackTraceRef) [][]byte { + st := c.GetStacktrace(ref) + if st == nil { + return nil + } + frames := make([][]byte, 0, len(st.Frames)) + for i := len(st.Frames) - 1; i >= 0; i-- { + f := st.Frames[i] + m := c.GetMethod(f.Method) + if m != nil { + if m.Scratch == nil { + cls := c.GetClass(m.Type) + if cls == nil { + + } else { + clsName := c.GetSymbolString(cls.Name) + methodName := c.GetSymbolString(m.Name) + m.Scratch = []byte(clsName + "." + methodName) + } + + } + frames = append(frames, m.Scratch) + } + } + return frames + } cache := make(tree.LabelsCache) type labelsWithHash struct { Labels tree.Labels Hash uint64 } - contexts := make(map[int64]labelsWithHash) - for c.Next() { - e := c.Event + contexts := make(map[uint64]labelsWithHash) + for { + typ, err := c.ParseEvent() + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("jfr parser ParseEvent error: %w", err) + } - getLabels := func(contextID int64) labelsWithHash { + getLabels := func(contextID uint64) labelsWithHash { res, ok := contexts[contextID] if ok { return res } - ls := getContextLabels(contextID, jfrLabels) + ls := getContextLabels(int64(contextID), jfrLabels) res = labelsWithHash{ Labels: ls, Hash: ls.Hash(), @@ -69,62 +102,54 @@ func parse(ctx context.Context, c parser.Chunk, s storage.Putter, piOriginal *st return res } - switch e.(type) { - case *parser.ExecutionSample: - es := e.(*parser.ExecutionSample) - if fs := frames(es.StackTrace); fs != nil { - lwh := getLabels(es.ContextId) - if es.State.Name == "STATE_RUNNABLE" { + switch typ { + case c.TypeMap.T_EXECUTION_SAMPLE: + if fs := frames(c.ExecutionSample.StackTrace); fs != nil { + lwh := getLabels(c.ExecutionSample.ContextId) + ts := c.GetThreadState(c.ExecutionSample.State) + if ts != nil && ts.Name == "STATE_RUNNABLE" { cache.GetOrCreateTreeByHash(sampleTypeCPU, lwh.Labels, lwh.Hash).InsertStack(fs, 1) } cache.GetOrCreateTreeByHash(sampleTypeWall, lwh.Labels, lwh.Hash).InsertStack(fs, 1) } - case *parser.ObjectAllocationInNewTLAB: - oa := e.(*parser.ObjectAllocationInNewTLAB) - if fs := frames(oa.StackTrace); fs != nil { - lwh := getLabels(oa.ContextId) + case c.TypeMap.T_ALLOC_IN_NEW_TLAB: + if fs := frames(c.ObjectAllocationInNewTLAB.StackTrace); fs != nil { + lwh := getLabels(c.ObjectAllocationInNewTLAB.ContextId) cache.GetOrCreateTreeByHash(sampleTypeInTLABObjects, lwh.Labels, lwh.Hash).InsertStack(fs, 1) - cache.GetOrCreateTreeByHash(sampleTypeInTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(oa.TLABSize)) + cache.GetOrCreateTreeByHash(sampleTypeInTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ObjectAllocationInNewTLAB.TlabSize)) } - case *parser.ObjectAllocationOutsideTLAB: - oa := e.(*parser.ObjectAllocationOutsideTLAB) - if fs := frames(oa.StackTrace); fs != nil { - lwh := getLabels(oa.ContextId) + case c.TypeMap.T_ALLOC_OUTSIDE_TLAB: + if fs := frames(c.ObjectAllocationOutsideTLAB.StackTrace); fs != nil { + lwh := getLabels(c.ObjectAllocationOutsideTLAB.ContextId) cache.GetOrCreateTreeByHash(sampleTypeOutTLABObjects, lwh.Labels, lwh.Hash).InsertStack(fs, 1) - cache.GetOrCreateTreeByHash(sampleTypeOutTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(oa.AllocationSize)) + cache.GetOrCreateTreeByHash(sampleTypeOutTLABBytes, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ObjectAllocationOutsideTLAB.AllocationSize)) } - case *parser.JavaMonitorEnter: - jme := e.(*parser.JavaMonitorEnter) - if fs := frames(jme.StackTrace); fs != nil { - lwh := getLabels(jme.ContextId) + case c.TypeMap.T_MONITOR_ENTER: + if fs := frames(c.JavaMonitorEnter.StackTrace); fs != nil { + lwh := getLabels(c.JavaMonitorEnter.ContextId) cache.GetOrCreateTreeByHash(sampleTypeLockSamples, lwh.Labels, lwh.Hash).InsertStack(fs, 1) - cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(jme.Duration)) + cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.JavaMonitorEnter.Duration)) } - case *parser.ThreadPark: - tp := e.(*parser.ThreadPark) - if fs := frames(tp.StackTrace); fs != nil { - lwh := getLabels(tp.ContextId) + case c.TypeMap.T_THREAD_PARK: + if fs := frames(c.ThreadPark.StackTrace); fs != nil { + lwh := getLabels(c.ThreadPark.ContextId) cache.GetOrCreateTreeByHash(sampleTypeLockSamples, lwh.Labels, lwh.Hash).InsertStack(fs, 1) - cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(tp.Duration)) + cache.GetOrCreateTreeByHash(sampleTypeLockDuration, lwh.Labels, lwh.Hash).InsertStack(fs, uint64(c.ThreadPark.Duration)) } - case *parser.LiveObject: - lo := e.(*parser.LiveObject) - if fs := frames(lo.StackTrace); fs != nil { + case c.TypeMap.T_LIVE_OBJECT: + if fs := frames(c.LiveObject.StackTrace); fs != nil { lwh := getLabels(0) cache.GetOrCreateTreeByHash(sampleTypeLiveObject, lwh.Labels, lwh.Hash).InsertStack(fs, 1) } - case *parser.ActiveSetting: - if as, ok := e.(*parser.ActiveSetting); ok { - if as.Name == "event" { - event = as.Value - } + case c.TypeMap.T_ACTIVE_SETTING: + + if c.ActiveSetting.Name == "event" { + event = c.ActiveSetting.Value } + } } - if c.Err() != nil { - return fmt.Errorf("unable to parse JFR chunk to PutInput: %w", c.Err()) - } for sampleType, entries := range cache { for _, e := range entries { @@ -271,24 +296,6 @@ func labelIndex(s *LabelsSnapshot, labels tree.Labels, key string) int { return -1 } -func frames(st *parser.StackTrace) [][]byte { - if st == nil { - return nil - } - frames := make([][]byte, 0, len(st.Frames)) - for i := len(st.Frames) - 1; i >= 0; i-- { - f := st.Frames[i] - // TODO(abeaumont): Add support for line numbers. - if f.Method != nil && f.Method.Type != nil && f.Method.Type.Name != nil && f.Method.Name != nil { - if f.Method.Scratch == nil { - f.Method.Scratch = []byte(f.Method.Type.Name.String + "." + f.Method.Name.String) - } - frames = append(frames, f.Method.Scratch) - } - } - return frames -} - // jdk/internal/reflect/GeneratedMethodAccessor31 var generatedMethodAccessor = regexp.MustCompile("^(jdk/internal/reflect/GeneratedMethodAccessor)(\\d+)$") @@ -316,11 +323,8 @@ func mergeJVMGeneratedClasses(frame string) string { return frame } -func processSymbols(meta *parser.ClassMetadata, cpool *parser.CPool) { - if meta.Name == "jdk.types.Symbol" { - for _, v := range cpool.Pool { - sym := v.(*parser.Symbol) - sym.String = mergeJVMGeneratedClasses(sym.String) - } +func processSymbols(ref *types.SymbolList) { + for i := range ref.Symbol { //todo regex replace inplace + ref.Symbol[i].String = mergeJVMGeneratedClasses(ref.Symbol[i].String) } } diff --git a/pkg/og/convert/jfr/parser_suite_test.go b/pkg/og/convert/jfr/parser_suite_test.go index 0b4ccae459..174f4089c1 100644 --- a/pkg/og/convert/jfr/parser_suite_test.go +++ b/pkg/og/convert/jfr/parser_suite_test.go @@ -1,7 +1,6 @@ package jfr import ( - "bytes" "context" "strings" "testing" @@ -59,7 +58,7 @@ func TestParseCompareExpectedData(t *testing.T) { err = proto.Unmarshal(labelsBytes, labels) require.NoError(t, err) } - err = ParseJFR(context.TODO(), putter, bytes.NewBuffer(jfr), pi, labels) + err = ParseJFR(context.TODO(), putter, jfr, pi, labels) require.NoError(t, err) jsonFile := strings.TrimSuffix(td.jfr, ".jfr.gz") + ".json.gz" //err = putter.DumpJson(jsonFile) @@ -70,20 +69,37 @@ func TestParseCompareExpectedData(t *testing.T) { } func BenchmarkParser(b *testing.B) { - jfr, err := bench.ReadGzipFile("testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz") - require.NoError(b, err) - k, err := segment.ParseKey("kafka.app") - require.NoError(b, err) - pi := &storage.PutInput{ - StartTime: time.UnixMilli(1000), - EndTime: time.UnixMilli(2000), - Key: k, - SpyName: "java", - SampleRate: 100, + tests := []string{ + "testdata/cortex-dev-01__kafka-0__cpu__0.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu__1.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu__2.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu__3.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__0.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__1.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__2.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu_lock_alloc__3.jfr.gz", + "testdata/cortex-dev-01__kafka-0__cpu_lock0_alloc0__0.jfr.gz", } - putter := &bench.MockPutter{Keep: false} - b.ResetTimer() - for i := 0; i < b.N; i++ { - err = ParseJFR(context.TODO(), putter, bytes.NewBuffer(jfr), pi, nil) + + for _, testdata := range tests { + f := testdata + b.Run(testdata, func(b *testing.B) { + jfr, err := bench.ReadGzipFile(f) + require.NoError(b, err) + k, err := segment.ParseKey("kafka.app") + require.NoError(b, err) + pi := &storage.PutInput{ + StartTime: time.UnixMilli(1000), + EndTime: time.UnixMilli(2000), + Key: k, + SpyName: "java", + SampleRate: 100, + } + putter := &bench.MockPutter{Keep: false} + b.ResetTimer() + for i := 0; i < b.N; i++ { + err = ParseJFR(context.TODO(), putter, jfr, pi, nil) + } + }) } } diff --git a/pkg/og/convert/jfr/profile.go b/pkg/og/convert/jfr/profile.go index 10eb519e06..6ecc5ad3bf 100644 --- a/pkg/og/convert/jfr/profile.go +++ b/pkg/og/convert/jfr/profile.go @@ -35,7 +35,7 @@ func (p *RawProfile) Parse(ctx context.Context, putter storage.Putter, _ storage } labels := new(LabelsSnapshot) - var r io.Reader = bytes.NewReader(p.RawData) + var r = p.RawData var err error if strings.Contains(p.FormDataContentType, "multipart/form-data") { if r, labels, err = loadJFRFromForm(r, p.FormDataContentType); err != nil { @@ -53,13 +53,13 @@ func (p *RawProfile) ContentType() string { return p.FormDataContentType } -func loadJFRFromForm(r io.Reader, contentType string) (io.Reader, *LabelsSnapshot, error) { +func loadJFRFromForm(r []byte, contentType string) ([]byte, *LabelsSnapshot, error) { boundary, err := form.ParseBoundary(contentType) if err != nil { return nil, nil, err } - f, err := multipart.NewReader(r, boundary).ReadForm(32 << 20) + f, err := multipart.NewReader(bytes.NewBuffer(r), boundary).ReadForm(32 << 20) if err != nil { return nil, nil, err } @@ -95,7 +95,7 @@ func loadJFRFromForm(r io.Reader, contentType string) (io.Reader, *LabelsSnapsho } } - return bytes.NewReader(jfrField), &labels, nil + return jfrField, &labels, nil } func decompress(bs []byte) ([]byte, error) {