diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 78a3f14e1c9c..e0f430d924f1 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "sync" "time" "github.com/k0sproject/k0s/internal/pkg/dir" @@ -39,11 +40,13 @@ type Manager struct { KubeClientFactory kubeutil.ClientFactoryInterface // client kubernetes.Interface - applier Applier - bundlePath string - cancelWatcher context.CancelFunc - log *logrus.Entry - stacks map[string]stack + applier Applier + bundlePath string + stacks map[string]stack + log *logrus.Entry + startChan chan struct{} + mux sync.Mutex + watcherCancelFn context.CancelFunc LeaderElector leaderelector.Interface } @@ -67,35 +70,80 @@ func (m *Manager) Init(ctx context.Context) error { m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) - m.LeaderElector.AddAcquiredLeaseCallback(func() { - watcherCtx, cancel := context.WithCancel(ctx) - m.cancelWatcher = cancel - go func() { - _ = m.runWatchers(watcherCtx) - }() - }) - m.LeaderElector.AddLostLeaseCallback(func() { - if m.cancelWatcher != nil { - m.cancelWatcher() - } - }) - - return err + return nil } // Run runs the Manager func (m *Manager) Start(_ context.Context) error { + m.log.Debug("Starting") + m.startChan = make(chan struct{}, 1) + + m.LeaderElector.AddLostLeaseCallback(m.leaseLost) + + m.LeaderElector.AddAcquiredLeaseCallback(m.leaseAcquired) + + // It's possible that by the time we added the callback, we are already the leader, + // If this is true the callback will not be called, so we need to check if we are + // the leader and notify the channel manually + if m.LeaderElector.IsLeader() { + m.leaseAcquired() + } + + go m.watchStartChan() return nil } +func (m *Manager) watchStartChan() { + m.log.Debug("Watching start channel") + for range m.startChan { + m.log.Info("Acquired leader lease") + m.mux.Lock() + ctx, cancel := context.WithCancel(context.Background()) + // If there is a previous cancel func, call it + if m.watcherCancelFn != nil { + m.watcherCancelFn() + } + m.watcherCancelFn = cancel + m.mux.Unlock() + _ = m.runWatchers(ctx) + } + m.log.Info("Start channel closed, stopping applier-manager") +} + // Stop stops the Manager func (m *Manager) Stop() error { - if m.cancelWatcher != nil { - m.cancelWatcher() + m.log.Info("Stopping applier-manager") + // We have no guarantees on concurrency here, so use mutex + m.mux.Lock() + watcherCancelFn := m.watcherCancelFn + m.mux.Unlock() + if watcherCancelFn != nil { + watcherCancelFn() } + close(m.startChan) + m.log.Debug("Stopped applier-manager") return nil } +func (m *Manager) leaseLost() { + m.mux.Lock() + defer m.mux.Unlock() + m.log.Warn("Lost leader lease, stopping applier-manager") + + watcherCancelFn := m.watcherCancelFn + if watcherCancelFn != nil { + watcherCancelFn() + } +} + +func (m *Manager) leaseAcquired() { + m.log.Info("Acquired leader lease") + select { + case m.startChan <- struct{}{}: + default: + } +} + func (m *Manager) runWatchers(ctx context.Context) error { log := logrus.WithField("component", constant.ApplierManagerComponentName)