Skip to content

Commit

Permalink
Merge pull request #467 from jiangliu/metrics
Browse files Browse the repository at this point in the history
metrics: prepare to support multiple instances of Manager
  • Loading branch information
imeoer authored Jun 7, 2023
2 parents 3224e84 + 66dba0d commit 88ccb52
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 131 deletions.
144 changes: 79 additions & 65 deletions pkg/metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type ServerOpt func(*Server) error

type Server struct {
rootDir string
pm *manager.Manager
managers []*manager.Manager
snCollectors []*collector.SnapshotterMetricsCollector
fsCollector *collector.FsMetricsVecCollector
inflightCollector *collector.InflightMetricsVecCollector
snCollector *collector.SnapshotterMetricsCollector
}

func WithRootDir(rootDir string) ServerOpt {
Expand All @@ -44,7 +44,7 @@ func WithRootDir(rootDir string) ServerOpt {

func WithProcessManager(pm *manager.Manager) ServerOpt {
return func(s *Server) error {
s.pm = pm
s.managers = append(s.managers, pm)
return nil
}
}
Expand All @@ -60,87 +60,105 @@ func NewServer(ctx context.Context, opts ...ServerOpt) (*Server, error) {
s.fsCollector = collector.NewFsMetricsVecCollector()
// TODO(tangbin): make hung IO interval configurable
s.inflightCollector = collector.NewInflightMetricsVecCollector(defaultHungIOInterval)
snCollector, err := collector.NewSnapshotterMetricsCollector(ctx, s.pm.CacheDir(), os.Getpid())
if err != nil {
return nil, errors.Wrap(err, "new snapshotter metrics collector failed")
for _, pm := range s.managers {
snCollector, err := collector.NewSnapshotterMetricsCollector(ctx, pm.CacheDir(), os.Getpid())
if err != nil {
return nil, errors.Wrap(err, "new snapshotter metrics collector failed")
}
s.snCollectors = append(s.snCollectors, snCollector)
}
s.snCollector = snCollector

return &s, nil
}

func (s *Server) CollectDaemonResourceMetrics(ctx context.Context) {
var daemonResource collector.DaemonResourceCollector
for _, pm := range s.managers {
// Collect daemon resource usage metrics.
daemons := pm.ListDaemons()
for _, d := range daemons {
memRSS, err := tool.GetProcessMemoryRSSKiloBytes(d.Pid())
if err != nil {
log.L.Warnf("Failed to get daemon %s RSS memory", d.ID())
}

daemonResource.DaemonID = d.ID()
daemonResource.Value = memRSS
daemonResource.Collect()
}
}
}

func (s *Server) CollectFsMetrics(ctx context.Context) {
// Collect FS metrics from daemons.
daemons := s.pm.ListDaemons()
var fsMetricsVec []collector.FsMetricsCollector
for _, d := range daemons {
// Skip daemons that are not serving
if d.State() != types.DaemonStateRunning {

for _, pm := range s.managers {
// Collect FS metrics from fusedev daemons.
if pm.FsDriver != config.FsDriverFusedev {
continue
}

for _, i := range d.Instances.List() {
var sid string

if i.GetMountpoint() == d.HostMountpoint() {
sid = ""
} else {
sid = i.SnapshotID
}

fsMetrics, err := d.GetFsMetrics(sid)
if err != nil {
log.G(ctx).Errorf("failed to get fs metric: %v", err)
daemons := pm.ListDaemons()
for _, d := range daemons {
// Skip daemons that are not serving
if d.State() != types.DaemonStateRunning {
continue
}

fsMetricsVec = append(fsMetricsVec, collector.FsMetricsCollector{
Metrics: fsMetrics,
ImageRef: i.ImageID,
})
for _, i := range d.Instances.List() {
var sid string

if d.IsSharedDaemon() {
sid = i.SnapshotID
} else {
sid = ""
}

fsMetrics, err := d.GetFsMetrics(sid)
if err != nil {
log.G(ctx).Errorf("failed to get fs metric: %v", err)
continue
}

fsMetricsVec = append(fsMetricsVec, collector.FsMetricsCollector{
Metrics: fsMetrics,
ImageRef: i.ImageID,
})
}
}
}

if fsMetricsVec != nil {
s.fsCollector.MetricsVec = fsMetricsVec
s.fsCollector.Collect()
}
}

func (s *Server) CollectDaemonResourceMetrics(ctx context.Context) {
// Collect daemon resource usage metrics.
daemons := s.pm.ListDaemons()
var daemonResource collector.DaemonResourceCollector
for _, d := range daemons {

memRSS, err := tool.GetProcessMemoryRSSKiloBytes(d.Pid())
if err != nil {
log.L.Warnf("Failed to get daemon %s RSS memory", d.ID())
}

daemonResource.DaemonID = d.ID()
daemonResource.Value = memRSS
daemonResource.Collect()
}
}

func (s *Server) CollectInflightMetrics(ctx context.Context) {
// Collect inflight metrics from daemons.
daemons := s.pm.ListDaemons()
inflightMetricsVec := make([]*types.InflightMetrics, 0, 16)
for _, d := range daemons {

// Only count for daemon that is serving
if d.State() != types.DaemonStateRunning {
for _, pm := range s.managers {
// Collect inflight metrics from fusedev daemons.
if pm.FsDriver != config.FsDriverFusedev {
continue
}

inflightMetrics, err := d.GetInflightMetrics()
if err != nil {
log.G(ctx).Errorf("failed to get inflight metric: %v", err)
continue
daemons := pm.ListDaemons()
for _, d := range daemons {

// Only count for daemon that is serving
if d.State() != types.DaemonStateRunning {
continue
}

inflightMetrics, err := d.GetInflightMetrics()
if err != nil {
log.G(ctx).Errorf("failed to get inflight metric: %v", err)
continue
}
inflightMetricsVec = append(inflightMetricsVec, inflightMetrics)
}
inflightMetricsVec = append(inflightMetricsVec, inflightMetrics)
}

if inflightMetricsVec != nil {
s.inflightCollector.MetricsVec = inflightMetricsVec
s.inflightCollector.Collect()
Expand All @@ -161,18 +179,14 @@ outer:
for {
select {
case <-timer.C:
// Collect FS metrics.
if config.GetFsDriver() != config.FsDriverFscache {
s.CollectFsMetrics(ctx)
}
s.CollectFsMetrics(ctx)
s.CollectDaemonResourceMetrics(ctx)
// Collect snapshotter metrics.
s.snCollector.Collect()
case <-InflightTimer.C:
// Collect inflight metrics.
if config.GetFsDriver() != config.FsDriverFscache {
s.CollectInflightMetrics(ctx)
for _, snCollector := range s.snCollectors {
snCollector.Collect()
}
case <-InflightTimer.C:
s.CollectInflightMetrics(ctx)
case <-ctx.Done():
log.G(ctx).Infof("cancel metrics collecting")
break outer
Expand Down
128 changes: 66 additions & 62 deletions pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ const defaultErrorCode string = "Unknown"
// 3. Rolling update
// 4. Daemons failures record as metrics
type Controller struct {
fs *filesystem.Filesystem
manager *manager.Manager
fs *filesystem.Filesystem
managers []*manager.Manager
// httpSever *http.Server
addr *net.UnixAddr
router *mux.Router
Expand Down Expand Up @@ -120,7 +120,7 @@ type rafsInstanceInfo struct {
ImageID string `json:"image_id"`
}

func NewSystemController(fs *filesystem.Filesystem, manager *manager.Manager, sock string) (*Controller, error) {
func NewSystemController(fs *filesystem.Filesystem, managers []*manager.Manager, sock string) (*Controller, error) {
if err := os.MkdirAll(filepath.Dir(sock), os.ModePerm); err != nil {
return nil, err
}
Expand All @@ -137,10 +137,10 @@ func NewSystemController(fs *filesystem.Filesystem, manager *manager.Manager, so
}

sc := Controller{
fs: fs,
manager: manager,
addr: addr,
router: mux.NewRouter(),
fs: fs,
managers: managers,
addr: addr,
router: mux.NewRouter(),
}

sc.registerRouter()
Expand Down Expand Up @@ -171,46 +171,48 @@ func (sc *Controller) registerRouter() {

func (sc *Controller) describeDaemons() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
daemons := sc.manager.ListDaemons()

info := make([]daemonInfo, 0, 10)

for _, d := range daemons {
instances := make(map[string]rafsInstanceInfo)
for _, i := range d.Instances.List() {
instances[i.SnapshotID] = rafsInstanceInfo{
SnapshotID: i.SnapshotID,
SnapshotDir: i.SnapshotDir,
Mountpoint: i.GetMountpoint(),
ImageID: i.ImageID,
for _, manager := range sc.managers {
daemons := manager.ListDaemons()

for _, d := range daemons {
instances := make(map[string]rafsInstanceInfo)
for _, i := range d.Instances.List() {
instances[i.SnapshotID] = rafsInstanceInfo{
SnapshotID: i.SnapshotID,
SnapshotDir: i.SnapshotDir,
Mountpoint: i.GetMountpoint(),
ImageID: i.ImageID,
}
}
}

memRSS, err := metrics.GetProcessMemoryRSSKiloBytes(d.Pid())
if err != nil {
log.L.Warnf("Failed to get daemon %s RSS memory", d.ID())
}
memRSS, err := metrics.GetProcessMemoryRSSKiloBytes(d.Pid())
if err != nil {
log.L.Warnf("Failed to get daemon %s RSS memory", d.ID())
}

var readData float32
fsMetrics, err := d.GetFsMetrics("")
if err != nil {
log.L.Warnf("Failed to get file system metrics")
} else {
readData = float32(fsMetrics.DataRead) / 1024
}
var readData float32
fsMetrics, err := d.GetFsMetrics("")
if err != nil {
log.L.Warnf("Failed to get file system metrics")
} else {
readData = float32(fsMetrics.DataRead) / 1024
}

i := daemonInfo{
ID: d.ID(),
Pid: d.Pid(),
HostMountpoint: d.HostMountpoint(),
Reference: int(d.GetRef()),
Instances: instances,
StartupCPUUtilization: d.StartupCPUUtilization,
MemoryRSS: memRSS,
ReadData: readData,
}
i := daemonInfo{
ID: d.ID(),
Pid: d.Pid(),
HostMountpoint: d.HostMountpoint(),
Reference: int(d.GetRef()),
Instances: instances,
StartupCPUUtilization: d.StartupCPUUtilization,
MemoryRSS: memRSS,
ReadData: readData,
}

info = append(info, i)
info = append(info, i)
}
}

jsonResponse(w, &info)
Expand Down Expand Up @@ -245,11 +247,6 @@ func (sc *Controller) getDaemonRecords() func(w http.ResponseWriter, r *http.Req
// 6. Delete the old nydusd executive
func (sc *Controller) upgradeDaemons() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
sc.manager.Lock()
defer sc.manager.Unlock()

daemons := sc.manager.ListDaemons()

var c upgradeRequest
var err error
var statusCode int
Expand All @@ -268,35 +265,42 @@ func (sc *Controller) upgradeDaemons() func(w http.ResponseWriter, r *http.Reque
return
}

// TODO: Keep the nydusd executive path in Daemon state and persis it since nydusd
// can run on both versions.
// Create a dedicated directory storing nydusd of various versions?
// TODO: daemon client has a method to query daemon version and information.
for _, d := range daemons {
err = sc.upgradeNydusDaemon(d, c)
for _, manager := range sc.managers {
manager.Lock()
defer manager.Unlock()

daemons := manager.ListDaemons()

// TODO: Keep the nydusd executive path in Daemon state and persis it since nydusd
// can run on both versions.
// Create a dedicated directory storing nydusd of various versions?
// TODO: daemon client has a method to query daemon version and information.
for _, d := range daemons {
err = sc.upgradeNydusDaemon(d, c, manager)
if err != nil {
log.L.Errorf("Upgrade daemon %s failed, %s", d.ID(), err)
statusCode = http.StatusInternalServerError
return
}
}

// TODO: why renaming?
err = os.Rename(c.NydusdPath, manager.NydusdBinaryPath)
if err != nil {
log.L.Errorf("Upgrade daemon %s failed, %s", d.ID(), err)
log.L.Errorf("Rename nydusd binary from %s to %s failed, %v",
c.NydusdPath, manager.NydusdBinaryPath, err)
statusCode = http.StatusInternalServerError
return
}
}

err = os.Rename(c.NydusdPath, sc.manager.NydusdBinaryPath)
if err != nil {
log.L.Errorf("Rename nydusd binary from %s to %s failed, %v",
c.NydusdPath, sc.manager.NydusdBinaryPath, err)
statusCode = http.StatusInternalServerError
return
}
}
}

// Provide minimal parameters since most of it can be recovered by nydusd states.
// Create a new daemon in Manger to take over the service.
func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) error {
func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest, manager *manager.Manager) error {
log.L.Infof("Upgrading nydusd %s, request %v", d.ID(), c)

manager := sc.manager
fs := sc.fs

var new daemon.Daemon
Expand Down
Loading

0 comments on commit 88ccb52

Please sign in to comment.