Skip to content

Commit

Permalink
Merge pull request #269 from changweige/release-0.4
Browse files Browse the repository at this point in the history
get rid of getting daemon state in single flight
  • Loading branch information
changweige authored Nov 30, 2022
2 parents dba3aaf + 222d577 commit 1e18acb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 33 deletions.
53 changes: 23 additions & 30 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/containerd/nydus-snapshotter/pkg/utils/mount"
"github.com/containerd/nydus-snapshotter/pkg/utils/retry"
"github.com/pkg/errors"
"golang.org/x/sync/singleflight"
)

const (
Expand Down Expand Up @@ -73,8 +72,6 @@ type Daemon struct {
Supervisor *supervisor.Supervisor
Config daemonconfig.DaemonConfig

stateGetterGroup singleflight.Group

ref int32
// Cache the nydusd daemon state to avoid frequently querying nydusd by API.
State types.DaemonState
Expand Down Expand Up @@ -162,7 +159,9 @@ func (d *Daemon) GetState() (types.DaemonState, error) {

st := info.DaemonState()

d.Lock()
d.State = st
d.Unlock()

return st, nil
}
Expand All @@ -173,37 +172,31 @@ func (d *Daemon) GetState() (types.DaemonState, error) {
// 2. READY
// 3. RUNNING
func (d *Daemon) WaitUntilState(expected types.DaemonState) error {
stateGetter := func() (v any, err error) {
err = retry.Do(func() error {
if expected == d.State {
return nil
}

state, err := d.GetState()
if err != nil {
return errors.Wrapf(err, "wait until daemon is %s", expected)
}

if state != expected {
return errors.Errorf("daemon %s is not %s yet, current state %s",
d.ID(), expected, state)
}
collector.CollectDaemonEvent(d.ID(), string(expected))

return retry.Do(func() error {
d.Lock()
if expected == d.State {
d.Unlock()
return nil
},
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.LastErrorOnly(true),
retry.Delay(100*time.Millisecond),
)
}
d.Unlock()

return
}
state, err := d.GetState()
if err != nil {
return errors.Wrapf(err, "wait until daemon is %s", expected)
}

_, err, shared := d.stateGetterGroup.Do(d.ID(), stateGetter)
log.L.Debugf("Get daemon %s with shared result: %v ", d.ID(), shared)
if state != expected {
return errors.Errorf("daemon %s is not %s yet, current state %s",
d.ID(), expected, state)
}
collector.CollectDaemonEvent(d.ID(), string(expected))

return err
return nil
},
retry.Attempts(20), // totally wait for 2 seconds, should be enough
retry.LastErrorOnly(true),
retry.Delay(100*time.Millisecond),
)
}

func (d *Daemon) SharedMount(rafs *Rafs) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/daemon_adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
}

if err := d.WaitUntilState(types.DaemonStateRunning); err != nil {
log.L.Errorf("daemon %s is not managed to reach RUNNING state", d.ID())
log.L.WithError(err).Errorf("daemon %s is not managed to reach RUNNING state", d.ID())
return
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (m *Manager) doDaemonFailover(d *daemon.Daemon) {
}

if err := d.WaitUntilState(types.DaemonStateInit); err != nil {
log.L.Errorf("daemon din't reach state %s", types.DaemonStateInit)
log.L.WithError(err).Errorf("daemon didn't reach state %s,", types.DaemonStateInit)
return
}

Expand Down Expand Up @@ -311,8 +311,9 @@ func (m *Manager) handleDaemonDeathEvent() {
log.L.Warnf("Daemon %s was not found", ev.daemonID)
return
}

d.Lock()
d.State = types.DaemonStateUnknown
d.Unlock()
if m.RecoverPolicy == RecoverPolicyRestart {
log.L.Infof("Restart daemon %s", ev.daemonID)
go m.doDaemonRestart(d)
Expand Down

0 comments on commit 1e18acb

Please sign in to comment.