diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index b939620572..7e73c5bfa6 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -211,23 +211,7 @@ func (d *Daemon) SharedMount(rafs *Rafs) error { return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID) } - defer func() { - su := d.Supervisor - if su != nil { - // TODO: This should be optional by checking snapshotter's configuration. - // FIXME: Is it possible the states are overwritten during two API mounts. - // FIXME: What if nydusd does not support sending states. - err = su.FetchDaemonStates(func() error { - if err := d.SendStates(); err != nil { - return errors.Wrapf(err, "send daemon %s states", d.ID()) - } - return nil - }) - if err != nil { - log.L.Warnf("Daemon %s does not support sending states, %v", d.ID(), err) - } - } - }() + defer d.SendStates() if d.States.FsDriver == config.FsDriverFscache { if err := d.sharedErofsMount(rafs); err != nil { @@ -266,6 +250,8 @@ func (d *Daemon) SharedUmount(rafs *Rafs) error { return errors.Wrapf(err, "umount instance %s", rafs.SnapshotID) } + defer d.SendStates() + if d.States.FsDriver == config.FsDriverFscache { if err := d.sharedErofsUmount(rafs); err != nil { return errors.Wrapf(err, "failed to erofs mount") @@ -357,7 +343,25 @@ func (d *Daemon) sharedErofsUmount(rafs *Rafs) error { return nil } -func (d *Daemon) SendStates() error { +func (d *Daemon) SendStates() { + su := d.Supervisor + if su != nil { + // TODO: This should be optional by checking snapshotter's configuration. + // FIXME: Is it possible the states are overwritten during two API mounts. + // FIXME: What if nydusd does not support sending states. + err := su.FetchDaemonStates(func() error { + if err := d.doSendStates(); err != nil { + return errors.Wrapf(err, "send daemon %s states", d.ID()) + } + return nil + }) + if err != nil { + log.L.Warnf("Daemon %s does not support sending states, %v", d.ID(), err) + } + } +} + +func (d *Daemon) doSendStates() error { c, err := d.GetClient() if err != nil { return errors.Wrapf(err, "send states %s", d.ID()) diff --git a/pkg/filesystem/fs.go b/pkg/filesystem/fs.go index b7e68e1c02..118f97c9c9 100644 --- a/pkg/filesystem/fs.go +++ b/pkg/filesystem/fs.go @@ -55,7 +55,7 @@ type Filesystem struct { rootMountpoint string } -func (fs *Filesystem) tryRetainSharedDaemon(d *daemon.Daemon) { +func (fs *Filesystem) TryRetainSharedDaemon(d *daemon.Daemon) { // FsDriver can be changed between two startups. if d.HostMountpoint() == fs.rootMountpoint || config.GetFsDriver() == config.FsDriverFscache { fs.sharedDaemon = d @@ -110,13 +110,13 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) { // Found shared daemon // Fscache userspace daemon has no host mountpoint. - fs.tryRetainSharedDaemon(d) + fs.TryRetainSharedDaemon(d) } for _, d := range liveDaemons { // Found shared daemon - fs.tryRetainSharedDaemon(d) + fs.TryRetainSharedDaemon(d) } return &fs, nil diff --git a/pkg/manager/daemon_adaptor.go b/pkg/manager/daemon_adaptor.go index a60cb78e92..63882e1d42 100644 --- a/pkg/manager/daemon_adaptor.go +++ b/pkg/manager/daemon_adaptor.go @@ -99,21 +99,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error { collector.NewDaemonInfoCollector(&d.Version, 1).Collect() d.Unlock() - if d.Supervisor == nil { - return - } - - su := d.Supervisor - err = su.FetchDaemonStates(func() error { - if err := d.SendStates(); err != nil { - return errors.Wrapf(err, "send daemon %s states", d.ID()) - } - return nil - }) - if err != nil { - log.L.Errorf("send states") - return - } + d.SendStates() }() return nil diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 58f7dd73e1..07adae9a82 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -391,6 +391,10 @@ func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error { m.mu.Lock() defer m.mu.Unlock() + return m.UpdateDaemonNoLock(daemon) +} + +func (m *Manager) UpdateDaemonNoLock(daemon *daemon.Daemon) error { if old := m.daemonStates.GetByDaemonID(daemon.ID(), nil); old == nil { return errdefs.ErrNotFound } @@ -562,19 +566,7 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s } // Snapshotter's lost the daemons' states after exit, refetch them. - su := d.Supervisor - if su != nil { - err = su.FetchDaemonStates(func() error { - if err := d.SendStates(); err != nil { - return errors.Wrapf(err, "send daemon %s states", d.ID()) - } - return nil - }) - if err != nil { - log.L.Errorf("Send daemon %s states", d.ID()) - return - } - } + d.SendStates() }() return nil diff --git a/pkg/system/system.go b/pkg/system/system.go index b4bf6ec06f..03cdd219cd 100644 --- a/pkg/system/system.go +++ b/pkg/system/system.go @@ -17,6 +17,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -25,6 +26,7 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/daemon" "github.com/containerd/nydus-snapshotter/pkg/daemon/types" "github.com/containerd/nydus-snapshotter/pkg/errdefs" + "github.com/containerd/nydus-snapshotter/pkg/filesystem" "github.com/containerd/nydus-snapshotter/pkg/manager" metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool" ) @@ -50,6 +52,7 @@ const defaultErrorCode string = "Unknown" // 3. Rolling update // 4. Daemons failures record as metrics type Controller struct { + fs *filesystem.Filesystem manager *manager.Manager // httpSever *http.Server addr *net.UnixAddr @@ -117,7 +120,7 @@ type rafsInstanceInfo struct { ImageID string `json:"image_id"` } -func NewSystemController(manager *manager.Manager, sock string) (*Controller, error) { +func NewSystemController(fs *filesystem.Filesystem, manager *manager.Manager, sock string) (*Controller, error) { if err := os.MkdirAll(filepath.Dir(sock), os.ModePerm); err != nil { return nil, err } @@ -134,6 +137,7 @@ func NewSystemController(manager *manager.Manager, sock string) (*Controller, er } sc := Controller{ + fs: fs, manager: manager, addr: addr, router: mux.NewRouter(), @@ -293,9 +297,11 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err log.L.Infof("Upgrading nydusd %s, request %v", d.ID(), c) manager := sc.manager + fs := sc.fs var new daemon.Daemon new.States = d.States + new.Supervisor = d.Supervisor new.CloneInstances(d) s := path.Base(d.GetAPISock()) @@ -312,6 +318,11 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err return err } + su := manager.SupervisorSet.GetSupervisor(d.ID()) + if err := su.SendStatesTimeout(time.Second * 10); err != nil { + return errors.Wrap(err, "Send states") + } + if err := cmd.Start(); err != nil { return errors.Wrap(err, "start process") } @@ -337,6 +348,8 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err return errors.Wrap(err, "old daemon exits") } + fs.TryRetainSharedDaemon(&new) + if err := new.Start(); err != nil { return errors.Wrap(err, "start file system service") } @@ -347,10 +360,12 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err log.L.Infof("Started service of upgraded daemon on socket %s", new.GetAPISock()) - if err := manager.UpdateDaemon(&new); err != nil { + if err := manager.UpdateDaemonNoLock(&new); err != nil { return err } + log.L.Infof("Upgraded daemon success on socket %s", new.GetAPISock()) + return nil } diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 488ca6d68d..d7545835f1 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -39,10 +39,10 @@ import ( "github.com/containerd/nydus-snapshotter/pkg/metrics" "github.com/containerd/nydus-snapshotter/pkg/metrics/collector" "github.com/containerd/nydus-snapshotter/pkg/pprof" + "github.com/containerd/nydus-snapshotter/pkg/system" "github.com/containerd/nydus-snapshotter/pkg/resolve" "github.com/containerd/nydus-snapshotter/pkg/store" - "github.com/containerd/nydus-snapshotter/pkg/system" "github.com/containerd/nydus-snapshotter/pkg/filesystem" "github.com/containerd/nydus-snapshotter/pkg/label" @@ -122,24 +122,6 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho }() } - if config.IsSystemControllerEnabled() { - systemController, err := system.NewSystemController(manager, config.SystemControllerAddress()) - if err != nil { - return nil, errors.Wrap(err, "create system controller") - } - go func() { - if err := systemController.Run(); err != nil { - log.L.WithError(err).Error("Failed to start system controller") - } - }() - pprofAddress := config.SystemControllerPprofAddress() - if pprofAddress != "" { - if err := pprof.NewPprofHTTPListener(pprofAddress); err != nil { - return nil, errors.Wrap(err, "Failed to start pprof HTTP server") - } - } - } - opts := []filesystem.NewFSOpt{ filesystem.WithManager(manager), filesystem.WithNydusImageBinaryPath(cfg.DaemonConfig.NydusdPath), @@ -169,6 +151,24 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho return nil, errors.Wrap(err, "failed to initialize nydus filesystem") } + if config.IsSystemControllerEnabled() { + systemController, err := system.NewSystemController(nydusFs, manager, config.SystemControllerAddress()) + if err != nil { + return nil, errors.Wrap(err, "create system controller") + } + go func() { + if err := systemController.Run(); err != nil { + log.L.WithError(err).Error("Failed to start system controller") + } + }() + pprofAddress := config.SystemControllerPprofAddress() + if pprofAddress != "" { + if err := pprof.NewPprofHTTPListener(pprofAddress); err != nil { + return nil, errors.Wrap(err, "Failed to start pprof HTTP server") + } + } + } + // With fuse driver enabled and a fuse daemon configuration with "localfs" // storage backend, it indicates that a Blobs Manager is needed to download // blobs from registry alone with no help of nydusd or containerd.