Skip to content

Commit

Permalink
Merge pull request #14 from coreweave/vb/smart-metric-collection-to-a…
Browse files Browse the repository at this point in the history
…void-dups

mds: preprocess metrics to avoid dups
  • Loading branch information
neurodrone authored Apr 16, 2024
2 parents 34c91fb + cd7d399 commit d331104
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions ceph/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package ceph

import (
"bytes"
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -262,6 +266,27 @@ type mdsStatus struct {
Uptime float64 `json:"uptime"`
}

type mdsLabels struct {
FSName string
MDSName string
State string
OpType string
FSOpType string
FlagPoint string
Inode string
}

func (ml mdsLabels) Hash() string {
var b bytes.Buffer
gob.NewEncoder(&b).Encode(ml)
return b.String()
}

func (ml *mdsLabels) UnHash(hash string) error {
return gob.NewDecoder(strings.NewReader(hash)).Decode(ml)

}

type mdsSlowOp struct {
Ops []struct {
// Custom fields for easy parsing by caller.
Expand Down Expand Up @@ -361,50 +386,56 @@ func (m *MDSCollector) collectMDSSlowOps() {
return
}

var metricMap sync.Map

for _, op := range mso.Ops {
var ml mdsLabels

if op.TypeData.OpType == "client_request" {
opd, err := extractOpFromDescription(op.Description)
if err != nil {
m.logger.WithField("mds", mdsName).WithError(err).Error("failed parsing blocked ops description")
continue
}

select {
case m.ch <- prometheus.MustNewConstMetric(
m.MDSBlockedOps,
prometheus.CounterValue,
1,
mss.FsName,
mdsName,
mss.State,
op.TypeData.OpType,
opd.fsOpType,
op.TypeData.FlagPoint,
opd.inode,
):
default:
}

continue
ml.FSOpType = opd.fsOpType
ml.Inode = opd.inode
}

ml.FSName = mss.FsName
ml.MDSName = mdsName
ml.State = mss.State
ml.OpType = op.TypeData.OpType
ml.FlagPoint = op.TypeData.FlagPoint

cnt, _ := metricMap.LoadOrStore(ml.Hash(), new(int32))
v := cnt.(*int32)
atomic.AddInt32(v, 1)
}

metricMap.Range(func(key, value any) bool {
var ml mdsLabels
ml.UnHash(fmt.Sprint(key))
v := value.(*int32)

select {
case m.ch <- prometheus.MustNewConstMetric(
m.MDSBlockedOps,
prometheus.CounterValue,
1,
mss.FsName,
mdsName,
mss.State,
op.TypeData.OpType,
"",
op.TypeData.FlagPoint,
"",
float64(*v),
ml.FSName,
ml.MDSName,
ml.State,
ml.OpType,
ml.FSOpType,
ml.FlagPoint,
ml.Inode,
):
default:
}
}

return true
})
}
}

Expand Down

0 comments on commit d331104

Please sign in to comment.