Skip to content

Commit

Permalink
feat: support labels for self_metric (#1240)
Browse files Browse the repository at this point in the history
* Support labels for self_metric

* fix gofmt issue

* use const
  • Loading branch information
liuhaoyang committed Nov 14, 2023
1 parent 4e9bd0e commit 7e4ed46
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 42 deletions.
74 changes: 46 additions & 28 deletions pkg/helper/self_metric_imp.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ import (
"time"
)

const SelfMetricNameKey = "__name__"

var mu sync.Mutex

type StrMetric struct {
name string
value string
name string
value string
labels []*protocol.Log_Content
}

func (s *StrMetric) Name() string {
return s.name
return getNameWithLables(s.name, s.labels)
}

func (s *StrMetric) Set(v string) {
Expand All @@ -49,12 +52,14 @@ func (s *StrMetric) Get() string {
}

func (s *StrMetric) Serialize(log *protocol.Log) {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: s.Get()})
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: s.Get()}, &protocol.Log_Content{Key: SelfMetricNameKey, Value: s.name})
log.Contents = append(log.Contents, s.labels...)
}

type NormalMetric struct {
name string
value int64
name string
value int64
labels []*protocol.Log_Content
}

func (s *NormalMetric) Add(v int64) {
Expand All @@ -70,18 +75,20 @@ func (s *NormalMetric) Get() int64 {
}

func (s *NormalMetric) Name() string {
return s.name
return getNameWithLables(s.name, s.labels)
}

func (s *NormalMetric) Serialize(log *protocol.Log) {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatInt(s.Get(), 10)})
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatInt(s.Get(), 10)}, &protocol.Log_Content{Key: SelfMetricNameKey, Value: s.name})
log.Contents = append(log.Contents, s.labels...)
}

type AvgMetric struct {
name string
value int64
count int64
prevAvg float64
labels []*protocol.Log_Content
}

func (s *AvgMetric) Add(v int64) {
Expand Down Expand Up @@ -118,22 +125,24 @@ func (s *AvgMetric) GetAvg() float64 {
}

func (s *AvgMetric) Name() string {
return s.name
return getNameWithLables(s.name, s.labels)
}

func (s *AvgMetric) Serialize(log *protocol.Log) {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatFloat(s.GetAvg(), 'f', 4, 64)})
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatFloat(s.GetAvg(), 'f', 4, 64)}, &protocol.Log_Content{Key: SelfMetricNameKey, Value: s.name})
log.Contents = append(log.Contents, s.labels...)
}

type LatMetric struct {
name string
t time.Time
count int
latencySum time.Duration
labels []*protocol.Log_Content
}

func (s *LatMetric) Name() string {
return s.name
return getNameWithLables(s.name, s.labels)
}

func (s *LatMetric) Begin() {
Expand Down Expand Up @@ -169,45 +178,54 @@ func (s *LatMetric) Get() int64 {
}

func (s *LatMetric) Serialize(log *protocol.Log) {
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatFloat(float64(s.Get())/1000, 'f', 4, 64)})
log.Contents = append(log.Contents, &protocol.Log_Content{Key: s.name, Value: strconv.FormatFloat(float64(s.Get())/1000, 'f', 4, 64)}, &protocol.Log_Content{Key: SelfMetricNameKey, Value: s.name})
log.Contents = append(log.Contents, s.labels...)
}

func getNameWithLables(name string, labels []*protocol.Log_Content) string {
n := name
for _, lable := range labels {
n = n + "#" + lable.Key + "=" + lable.Value
}
return n
}

func NewCounterMetric(n string) pipeline.CounterMetric {
return &NormalMetric{name: n}
func NewCounterMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric {
return &NormalMetric{name: n, labels: lables}
}

func NewAverageMetric(n string) pipeline.CounterMetric {
return &AvgMetric{name: n}
func NewAverageMetric(n string, lables ...*protocol.Log_Content) pipeline.CounterMetric {
return &AvgMetric{name: n, labels: lables}
}

func NewStringMetric(n string) pipeline.StringMetric {
return &StrMetric{name: n}
func NewStringMetric(n string, lables ...*protocol.Log_Content) pipeline.StringMetric {
return &StrMetric{name: n, labels: lables}
}

func NewLatencyMetric(n string) pipeline.LatencyMetric {
return &LatMetric{name: n}
func NewLatencyMetric(n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric {
return &LatMetric{name: n, labels: lables}
}

func NewCounterMetricAndRegister(n string, c pipeline.Context) pipeline.CounterMetric {
metric := &NormalMetric{name: n}
func NewCounterMetricAndRegister(c pipeline.Context, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric {
metric := &NormalMetric{name: n, labels: lables}
c.RegisterCounterMetric(metric)
return metric
}

func NewAverageMetricAndRegister(n string, c pipeline.Context) pipeline.CounterMetric {
metric := &AvgMetric{name: n}
func NewAverageMetricAndRegister(c pipeline.Context, n string, lables ...*protocol.Log_Content) pipeline.CounterMetric {
metric := &AvgMetric{name: n, labels: lables}
c.RegisterCounterMetric(metric)
return metric
}

func NewStringMetricAndRegister(n string, c pipeline.Context) pipeline.StringMetric {
metric := &StrMetric{name: n}
func NewStringMetricAndRegister(c pipeline.Context, n string, lables ...*protocol.Log_Content) pipeline.StringMetric {
metric := &StrMetric{name: n, labels: lables}
c.RegisterStringMetric(metric)
return metric
}

func NewLatencyMetricAndRegister(n string, c pipeline.Context) pipeline.LatencyMetric {
metric := &LatMetric{name: n}
func NewLatencyMetricAndRegister(c pipeline.Context, n string, lables ...*protocol.Log_Content) pipeline.LatencyMetric {
metric := &LatMetric{name: n, labels: lables}
c.RegisterLatencyMetric(metric)
return metric
}
24 changes: 20 additions & 4 deletions pkg/helper/self_metric_imp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import (

func TestStrMetric_Name(t *testing.T) {
type fields struct {
name string
value string
name string
value string
labels []*protocol.Log_Content
}
tests := []struct {
name string
Expand All @@ -41,12 +42,27 @@ func TestStrMetric_Name(t *testing.T) {
},
want: "field",
},
{
name: "test_name",
fields: fields{
name: "field",
value: "v",
labels: []*protocol.Log_Content{
{
Key: "key",
Value: "value",
},
},
},
want: "field#key=value",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &StrMetric{
name: tt.fields.name,
value: tt.fields.value,
name: tt.fields.name,
value: tt.fields.value,
labels: tt.fields.labels,
}
if got := s.Name(); got != tt.want {
t.Errorf("StrMetric.Name() = %v, want %v", got, tt.want)
Expand Down
16 changes: 8 additions & 8 deletions plugins/input/canal/input_canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ func (sc *ServiceCanal) Init(context pipeline.Context) (int, error) {

sc.lastErrorChan = make(chan error, 1)

sc.rotateCounter = helper.NewCounterMetricAndRegister("binlog_rotate", context)
sc.syncCounter = helper.NewCounterMetricAndRegister("binlog_sync", context)
sc.ddlCounter = helper.NewCounterMetricAndRegister("binlog_ddl", context)
sc.rowCounter = helper.NewCounterMetricAndRegister("binlog_row", context)
sc.xgidCounter = helper.NewCounterMetricAndRegister("binlog_xgid", context)
sc.checkpointCounter = helper.NewCounterMetricAndRegister("binlog_checkpoint", context)
sc.lastBinLogMetric = helper.NewStringMetricAndRegister("binlog_filename", context)
sc.lastGTIDMetric = helper.NewStringMetricAndRegister("binlog_gtid", context)
sc.rotateCounter = helper.NewCounterMetricAndRegister(context, "binlog_rotate")
sc.syncCounter = helper.NewCounterMetricAndRegister(context, "binlog_sync")
sc.ddlCounter = helper.NewCounterMetricAndRegister(context, "binlog_ddl")
sc.rowCounter = helper.NewCounterMetricAndRegister(context, "binlog_row")
sc.xgidCounter = helper.NewCounterMetricAndRegister(context, "binlog_xgid")
sc.checkpointCounter = helper.NewCounterMetricAndRegister(context, "binlog_checkpoint")
sc.lastBinLogMetric = helper.NewStringMetricAndRegister(context, "binlog_filename")
sc.lastGTIDMetric = helper.NewStringMetricAndRegister(context, "binlog_gtid")

return 0, nil
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/processor/encrypt/processor_encrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func (p *ProcessorEncrypt) Init(context pipeline.Context) error {
return err
}

p.encryptedCountMetric = helper.NewCounterMetricAndRegister("encrypted_count", p.context)
p.encryptedBytesMetric = helper.NewCounterMetricAndRegister("encrypted_bytes", p.context)
p.encryptedCountMetric = helper.NewCounterMetricAndRegister(p.context, "encrypted_count")
p.encryptedBytesMetric = helper.NewCounterMetricAndRegister(p.context, "encrypted_bytes")
return nil
}

Expand Down

0 comments on commit 7e4ed46

Please sign in to comment.