Skip to content

Commit

Permalink
优化调用栈的数据结构
Browse files Browse the repository at this point in the history
  • Loading branch information
blusewang committed Sep 5, 2022
1 parent f19a0ad commit 9b05dd7
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 40 deletions.
17 changes: 8 additions & 9 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ package mqms
import (
"encoding/json"
"github.com/google/uuid"
"runtime/debug"
"time"
)

// Event 事件
type Event struct {
TransactionID uuid.UUID `json:"transaction_id"` // TransactionID 业务ID,是所有事件的根
CallerTrace json.RawMessage `json:"caller_trace"` // 调用者追踪栈
CallerTrace []string `json:"caller_trace"` // 调用者追踪栈
ID uuid.UUID `json:"id"` // ID 事件ID
ParentID *uuid.UUID `json:"parent_id"` // ParentID 来源事件ID
Delay time.Duration `json:"delay"` // 延迟
Expand All @@ -32,8 +31,8 @@ type Trace struct {
Status TraceStatus `json:"status"`
BeginAt time.Time `json:"begin_at"`
EndAt *time.Time `json:"end_at"`
Error string `json:"error"`
Stack string `json:"stack"`
Error *string `json:"error"`
Stack []string `json:"stack"`
}

// IClientHandler 客户端代理协议
Expand All @@ -47,7 +46,7 @@ type IClientHandler interface {
// Log 引擎日志
Log(l string)
// Fail 失败通知
Fail(evtID uuid.UUID, evtRaw json.RawMessage, err error, stack string)
Fail(evtID uuid.UUID, evtRaw json.RawMessage, err error, stack []string)
}

// IClient 客户端协议
Expand Down Expand Up @@ -82,7 +81,7 @@ func (c *Client) Emit(path string, body interface{}) {
})
if err := c.handler.Pub(raw, 0); err != nil {
c.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
c.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
c.handler.Fail(evt.ID, raw, err, stack())
}
return
}
Expand All @@ -106,12 +105,12 @@ func (c *Client) EmitDefer(path string, body interface{}, duration time.Duration
if duration > time.Minute {
if err := c.handler.Save(evt.ID, raw, duration); err != nil {
c.handler.Log(normalLogFormat("事件存储错误:%v", err.Error()))
c.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
c.handler.Fail(evt.ID, raw, err, stack())
}
} else {
if err := c.handler.Pub(raw, duration); err != nil {
c.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
c.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
c.handler.Fail(evt.ID, raw, err, stack())
}
}
return
Expand All @@ -131,7 +130,7 @@ func (c *Client) EmitEvent(evtRaw json.RawMessage) {
})
if err := c.handler.Pub(evtRaw, 0); err != nil {
c.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
c.handler.Fail(evt.ID, evtRaw, err, string(debug.Stack()))
c.handler.Fail(evt.ID, evtRaw, err, stack())
}
return
}
Expand Down
15 changes: 15 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"encoding/json"
"log"
"runtime"
"runtime/debug"
"testing"
"time"
)
Expand All @@ -27,3 +29,16 @@ func TestCtx(t *testing.T) {
c := context.TODO()
c.Value("sdf")
}

func TestStack(t *testing.T) {
raw := debug.Stack()
log.Println(string(raw))
for i := 0; ; i++ {
_, f, n, ok := runtime.Caller(i)
if !ok {
break
}
log.Println(f, n)
}

}
17 changes: 8 additions & 9 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/google/uuid"
"math"
"net/http"
"runtime/debug"
"time"
)

Expand All @@ -24,16 +23,16 @@ type Context struct {
Http http.Client
index int
handlers []HandlerFunc
err string
stack string
err *string
stack []string
}

func (c *Context) reset() {
c.ctx = context.TODO()
c.index = -1
c.handlers = make([]HandlerFunc, 0)
c.err = ""
c.stack = ""
c.err = nil
c.stack = nil
}

func (c *Context) Bind(o interface{}) (err error) {
Expand All @@ -58,8 +57,8 @@ func (c *Context) Abort() {
// Error 错误处理
func (c *Context) Error(es error) error {
if es != nil {
c.err = es.Error()
c.stack = string(debug.Stack())
c.err = stringPtr(es.Error())
c.stack = stack()
}
return es
}
Expand Down Expand Up @@ -108,12 +107,12 @@ func (c *Context) EmitDefer(path string, body interface{}, duration time.Duratio
if duration > time.Minute {
if err := c.engine.handler.Save(evt.ID, raw, duration); err != nil {
c.engine.handler.Log(normalLogFormat("事件存储错误:%v", err.Error()))
c.engine.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
c.engine.handler.Fail(evt.ID, raw, err, stack())
}
} else {
if err := c.engine.handler.Pub(raw, duration); err != nil {
c.engine.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
c.engine.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
c.engine.handler.Fail(evt.ID, raw, err, stack())
}
}
}
14 changes: 7 additions & 7 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"encoding/json"
"fmt"
"github.com/google/uuid"
"runtime/debug"
"sync"
"time"
)
Expand Down Expand Up @@ -47,14 +46,15 @@ func (s *Engine) Shutdown() {

// Emit 发事件,在新协程中直接执行
func (s *Engine) Emit(path string, body interface{}) {
st := stack()
go func() {
var evt Event
evt.TransactionID = uuid.New()
evt.ID = uuid.New()
evt.Path = path
evt.CreateAt = time.Now()
evt.Body, _ = json.Marshal(body)
evt.CallerTrace = stack()
evt.CallerTrace = st
raw, _ := json.Marshal(evt)
defer s.handler.Trace(Trace{
Status: TraceStatusEmit,
Expand Down Expand Up @@ -88,12 +88,12 @@ func (s *Engine) EmitDefer(path string, body interface{}, duration time.Duration
if duration > time.Minute {
if err := s.handler.Save(evt.ID, raw, duration); err != nil {
s.handler.Log(normalLogFormat("事件存储错误:%v", err.Error()))
s.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
s.handler.Fail(evt.ID, raw, err, stack())
}
} else {
if err := s.handler.Pub(raw, duration); err != nil {
s.handler.Log(normalLogFormat("事件发布错误:%v", err.Error()))
s.handler.Fail(evt.ID, raw, err, string(debug.Stack()))
s.handler.Fail(evt.ID, raw, err, stack())
}
}
return
Expand Down Expand Up @@ -163,8 +163,8 @@ func (s *Engine) Handle(raw json.RawMessage) {
defer func() {
if e2 := recover(); e2 != nil {
trace.Status = TraceStatusError
trace.Error = e2.(error).Error()
trace.Stack = string(debug.Stack())
trace.Error = stringPtr(e2.(error).Error())
trace.Stack = stack()
s.handler.Fail(trace.Event.ID, raw, e2.(error), trace.Stack)
s.handler.Trace(trace)
s.handler.Log(fmt.Sprintf("[panic] [%v] --> %v : %v\n", trace.Event.Path, nameOfFunction(c.handlers[c.index]), e2))
Expand All @@ -191,7 +191,7 @@ func (s *Engine) Handle(raw json.RawMessage) {
s.pool.Put(c)
} else {
trace.Status = TraceStatusError
trace.Error = "没有可命中的服务"
trace.Error = stringPtr("没有可命中的服务")
s.handler.Trace(trace)
s.handler.Log(fmt.Sprintf("[error] [%v] --> nil : %v\n", trace.Event.Path, trace.Error))
}
Expand Down
64 changes: 64 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2022 YBCZ, Inc. All rights reserved.
//
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file in the root of the source
// tree.

package mqms

import (
"encoding/json"
"github.com/google/uuid"
"log"
"testing"
"time"
)

type testHandler struct {
}

func (t testHandler) Pub(evtRaw json.RawMessage, duration time.Duration) (err error) {
log.Println(string(evtRaw), duration)
return
}

func (t testHandler) Save(evtID uuid.UUID, evtRaw json.RawMessage, duration time.Duration) (err error) {
log.Println(evtID, string(evtRaw), duration)
return
}

func (t testHandler) Trace(trace Trace) {
raw, _ := json.Marshal(trace)
log.Println(string(raw))
}

func (t testHandler) Log(l string) {
log.Println(l)
}

func (t testHandler) Fail(evtID uuid.UUID, evtRaw json.RawMessage, err error, stack []string) {
log.Println(evtID, string(evtRaw), err, stack)
}

func (t testHandler) DoRead() (list []Row, err error) {
return
}

func (t testHandler) HttpTrace(ht HttpTrace) {
raw, _ := json.Marshal(ht)
log.Println(string(raw))
}

var _ IEngineHandler = &testHandler{}

func TestMqmsTrace(t *testing.T) {
log.SetFlags(log.Ltime | log.Lshortfile)
var h = new(testHandler)
engine := New(h)
engine.Item("test", func(c *Context) (err error) {
//err = c.Error(errors.New("test error"))
return
})
engine.Emit("test", "F")
time.Sleep(time.Hour)
}
29 changes: 14 additions & 15 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,37 @@
package mqms

import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"runtime"
"runtime/debug"
"time"
)

func nameOfFunction(f interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
}

func stack() json.RawMessage {
arr := bytes.Split(debug.Stack(), []byte("\n"))
if len(arr) < 8 {
return json.RawMessage("[]")
}
func stack() []string {
var lines []string
for i := 8; i < len(arr); i++ {
if bytes.HasPrefix(arr[i], []byte("\t")) {
lines = append(lines, string(bytes.Split(arr[i][1:], []byte(" "))[0]))
}
if len(lines) >= 4 {
for i := 2; ; i++ {
_, f, n, ok := runtime.Caller(i)
if !ok {
break
}
lines = append(lines, fmt.Sprintf("%s:%d", f, n))
}
var raw, _ = json.Marshal(lines)
return raw
return lines
}

func normalLogFormat(format string, a ...any) string {
a = append([]any{time.Now().Format("15:04:05")}, a...)
return fmt.Sprintf("[MQMS] %v "+format+"\n", a...)
}

func stringPtr(s string) *string {
if s == "" {
return nil
} else {
return &s
}
}

0 comments on commit 9b05dd7

Please sign in to comment.