Skip to content

Commit

Permalink
Merge pull request #385 from mofishzz/hjn/live-upgrade
Browse files Browse the repository at this point in the history
Fix some small problems in the live upgrade procedure
  • Loading branch information
changweige authored Feb 27, 2023
2 parents 5338178 + c1971c3 commit 2fc62a6
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 70 deletions.
40 changes: 22 additions & 18 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,23 +211,7 @@ func (d *Daemon) SharedMount(rafs *Rafs) error {
return errors.Wrapf(err, "mount instance %s", rafs.SnapshotID)
}

defer func() {
su := d.Supervisor
if su != nil {
// TODO: This should be optional by checking snapshotter's configuration.
// FIXME: Is it possible the states are overwritten during two API mounts.
// FIXME: What if nydusd does not support sending states.
err = su.FetchDaemonStates(func() error {
if err := d.SendStates(); err != nil {
return errors.Wrapf(err, "send daemon %s states", d.ID())
}
return nil
})
if err != nil {
log.L.Warnf("Daemon %s does not support sending states, %v", d.ID(), err)
}
}
}()
defer d.SendStates()

if d.States.FsDriver == config.FsDriverFscache {
if err := d.sharedErofsMount(rafs); err != nil {
Expand Down Expand Up @@ -266,6 +250,8 @@ func (d *Daemon) SharedUmount(rafs *Rafs) error {
return errors.Wrapf(err, "umount instance %s", rafs.SnapshotID)
}

defer d.SendStates()

if d.States.FsDriver == config.FsDriverFscache {
if err := d.sharedErofsUmount(rafs); err != nil {
return errors.Wrapf(err, "failed to erofs mount")
Expand Down Expand Up @@ -357,7 +343,25 @@ func (d *Daemon) sharedErofsUmount(rafs *Rafs) error {
return nil
}

func (d *Daemon) SendStates() error {
func (d *Daemon) SendStates() {
su := d.Supervisor
if su != nil {
// TODO: This should be optional by checking snapshotter's configuration.
// FIXME: Is it possible the states are overwritten during two API mounts.
// FIXME: What if nydusd does not support sending states.
err := su.FetchDaemonStates(func() error {
if err := d.doSendStates(); err != nil {
return errors.Wrapf(err, "send daemon %s states", d.ID())
}
return nil
})
if err != nil {
log.L.Warnf("Daemon %s does not support sending states, %v", d.ID(), err)
}
}
}

func (d *Daemon) doSendStates() error {
c, err := d.GetClient()
if err != nil {
return errors.Wrapf(err, "send states %s", d.ID())
Expand Down
6 changes: 3 additions & 3 deletions pkg/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type Filesystem struct {
rootMountpoint string
}

func (fs *Filesystem) tryRetainSharedDaemon(d *daemon.Daemon) {
func (fs *Filesystem) TryRetainSharedDaemon(d *daemon.Daemon) {
// FsDriver can be changed between two startups.
if d.HostMountpoint() == fs.rootMountpoint || config.GetFsDriver() == config.FsDriverFscache {
fs.sharedDaemon = d
Expand Down Expand Up @@ -110,13 +110,13 @@ func NewFileSystem(ctx context.Context, opt ...NewFSOpt) (*Filesystem, error) {

// Found shared daemon
// Fscache userspace daemon has no host mountpoint.
fs.tryRetainSharedDaemon(d)
fs.TryRetainSharedDaemon(d)

}

for _, d := range liveDaemons {
// Found shared daemon
fs.tryRetainSharedDaemon(d)
fs.TryRetainSharedDaemon(d)
}

return &fs, nil
Expand Down
16 changes: 1 addition & 15 deletions pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,21 +99,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
collector.NewDaemonInfoCollector(&d.Version, 1).Collect()
d.Unlock()

if d.Supervisor == nil {
return
}

su := d.Supervisor
err = su.FetchDaemonStates(func() error {
if err := d.SendStates(); err != nil {
return errors.Wrapf(err, "send daemon %s states", d.ID())
}
return nil
})
if err != nil {
log.L.Errorf("send states")
return
}
d.SendStates()
}()

return nil
Expand Down
18 changes: 5 additions & 13 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ func (m *Manager) UpdateDaemon(daemon *daemon.Daemon) error {
m.mu.Lock()
defer m.mu.Unlock()

return m.UpdateDaemonNoLock(daemon)
}

func (m *Manager) UpdateDaemonNoLock(daemon *daemon.Daemon) error {
if old := m.daemonStates.GetByDaemonID(daemon.ID(), nil); old == nil {
return errdefs.ErrNotFound
}
Expand Down Expand Up @@ -562,19 +566,7 @@ func (m *Manager) Recover(ctx context.Context) (map[string]*daemon.Daemon, map[s
}

// Snapshotter's lost the daemons' states after exit, refetch them.
su := d.Supervisor
if su != nil {
err = su.FetchDaemonStates(func() error {
if err := d.SendStates(); err != nil {
return errors.Wrapf(err, "send daemon %s states", d.ID())
}
return nil
})
if err != nil {
log.L.Errorf("Send daemon %s states", d.ID())
return
}
}
d.SendStates()
}()

return nil
Expand Down
19 changes: 17 additions & 2 deletions pkg/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/pkg/errors"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/daemon"
"github.com/containerd/nydus-snapshotter/pkg/daemon/types"
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
"github.com/containerd/nydus-snapshotter/pkg/filesystem"
"github.com/containerd/nydus-snapshotter/pkg/manager"
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
)
Expand All @@ -50,6 +52,7 @@ const defaultErrorCode string = "Unknown"
// 3. Rolling update
// 4. Daemons failures record as metrics
type Controller struct {
fs *filesystem.Filesystem
manager *manager.Manager
// httpSever *http.Server
addr *net.UnixAddr
Expand Down Expand Up @@ -117,7 +120,7 @@ type rafsInstanceInfo struct {
ImageID string `json:"image_id"`
}

func NewSystemController(manager *manager.Manager, sock string) (*Controller, error) {
func NewSystemController(fs *filesystem.Filesystem, manager *manager.Manager, sock string) (*Controller, error) {
if err := os.MkdirAll(filepath.Dir(sock), os.ModePerm); err != nil {
return nil, err
}
Expand All @@ -134,6 +137,7 @@ func NewSystemController(manager *manager.Manager, sock string) (*Controller, er
}

sc := Controller{
fs: fs,
manager: manager,
addr: addr,
router: mux.NewRouter(),
Expand Down Expand Up @@ -293,9 +297,11 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err
log.L.Infof("Upgrading nydusd %s, request %v", d.ID(), c)

manager := sc.manager
fs := sc.fs

var new daemon.Daemon
new.States = d.States
new.Supervisor = d.Supervisor
new.CloneInstances(d)

s := path.Base(d.GetAPISock())
Expand All @@ -312,6 +318,11 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err
return err
}

su := manager.SupervisorSet.GetSupervisor(d.ID())
if err := su.SendStatesTimeout(time.Second * 10); err != nil {
return errors.Wrap(err, "Send states")
}

if err := cmd.Start(); err != nil {
return errors.Wrap(err, "start process")
}
Expand All @@ -337,6 +348,8 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err
return errors.Wrap(err, "old daemon exits")
}

fs.TryRetainSharedDaemon(&new)

if err := new.Start(); err != nil {
return errors.Wrap(err, "start file system service")
}
Expand All @@ -347,10 +360,12 @@ func (sc *Controller) upgradeNydusDaemon(d *daemon.Daemon, c upgradeRequest) err

log.L.Infof("Started service of upgraded daemon on socket %s", new.GetAPISock())

if err := manager.UpdateDaemon(&new); err != nil {
if err := manager.UpdateDaemonNoLock(&new); err != nil {
return err
}

log.L.Infof("Upgraded daemon success on socket %s", new.GetAPISock())

return nil
}

Expand Down
38 changes: 19 additions & 19 deletions snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/metrics"
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
"github.com/containerd/nydus-snapshotter/pkg/pprof"
"github.com/containerd/nydus-snapshotter/pkg/system"

"github.com/containerd/nydus-snapshotter/pkg/resolve"
"github.com/containerd/nydus-snapshotter/pkg/store"
"github.com/containerd/nydus-snapshotter/pkg/system"

"github.com/containerd/nydus-snapshotter/pkg/filesystem"
"github.com/containerd/nydus-snapshotter/pkg/label"
Expand Down Expand Up @@ -122,24 +122,6 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
}()
}

if config.IsSystemControllerEnabled() {
systemController, err := system.NewSystemController(manager, config.SystemControllerAddress())
if err != nil {
return nil, errors.Wrap(err, "create system controller")
}
go func() {
if err := systemController.Run(); err != nil {
log.L.WithError(err).Error("Failed to start system controller")
}
}()
pprofAddress := config.SystemControllerPprofAddress()
if pprofAddress != "" {
if err := pprof.NewPprofHTTPListener(pprofAddress); err != nil {
return nil, errors.Wrap(err, "Failed to start pprof HTTP server")
}
}
}

opts := []filesystem.NewFSOpt{
filesystem.WithManager(manager),
filesystem.WithNydusImageBinaryPath(cfg.DaemonConfig.NydusdPath),
Expand Down Expand Up @@ -169,6 +151,24 @@ func NewSnapshotter(ctx context.Context, cfg *config.SnapshotterConfig) (snapsho
return nil, errors.Wrap(err, "failed to initialize nydus filesystem")
}

if config.IsSystemControllerEnabled() {
systemController, err := system.NewSystemController(nydusFs, manager, config.SystemControllerAddress())
if err != nil {
return nil, errors.Wrap(err, "create system controller")
}
go func() {
if err := systemController.Run(); err != nil {
log.L.WithError(err).Error("Failed to start system controller")
}
}()
pprofAddress := config.SystemControllerPprofAddress()
if pprofAddress != "" {
if err := pprof.NewPprofHTTPListener(pprofAddress); err != nil {
return nil, errors.Wrap(err, "Failed to start pprof HTTP server")
}
}
}

// With fuse driver enabled and a fuse daemon configuration with "localfs"
// storage backend, it indicates that a Blobs Manager is needed to download
// blobs from registry alone with no help of nydusd or containerd.
Expand Down

0 comments on commit 2fc62a6

Please sign in to comment.