Skip to content

Commit

Permalink
fix: race condition in applier-manager
Browse files Browse the repository at this point in the history
Signed-off-by: Ethan Mosbaugh <[email protected]>
  • Loading branch information
emosbaugh committed Oct 16, 2024
1 parent 75f6190 commit 499fe61
Showing 1 changed file with 69 additions and 21 deletions.
90 changes: 69 additions & 21 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/k0sproject/k0s/internal/pkg/dir"
Expand All @@ -39,11 +40,13 @@ type Manager struct {
KubeClientFactory kubeutil.ClientFactoryInterface

// client kubernetes.Interface
applier Applier
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
stacks map[string]stack
applier Applier
bundlePath string
stacks map[string]stack
log *logrus.Entry
startChan chan struct{}
mux sync.Mutex
watcherCancelFn context.CancelFunc

LeaderElector leaderelector.Interface
}
Expand All @@ -67,35 +70,80 @@ func (m *Manager) Init(ctx context.Context) error {

m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)

m.LeaderElector.AddAcquiredLeaseCallback(func() {
watcherCtx, cancel := context.WithCancel(ctx)
m.cancelWatcher = cancel
go func() {
_ = m.runWatchers(watcherCtx)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.cancelWatcher != nil {
m.cancelWatcher()
}
})

return err
return nil
}

// Run runs the Manager
func (m *Manager) Start(_ context.Context) error {
m.log.Debug("Starting")
m.startChan = make(chan struct{}, 1)

m.LeaderElector.AddLostLeaseCallback(m.leaseLost)

m.LeaderElector.AddAcquiredLeaseCallback(m.leaseAcquired)

// It's possible that by the time we added the callback, we are already the leader,
// If this is true the callback will not be called, so we need to check if we are
// the leader and notify the channel manually
if m.LeaderElector.IsLeader() {
m.leaseAcquired()
}

go m.watchStartChan()
return nil
}

func (m *Manager) watchStartChan() {
m.log.Debug("Watching start channel")
for range m.startChan {
m.log.Info("Acquired leader lease")
m.mux.Lock()
ctx, cancel := context.WithCancel(context.Background())
// If there is a previous cancel func, call it
if m.watcherCancelFn != nil {
m.watcherCancelFn()
}
m.watcherCancelFn = cancel
m.mux.Unlock()
_ = m.runWatchers(ctx)
}
m.log.Info("Start channel closed, stopping applier-manager")
}

// Stop stops the Manager
func (m *Manager) Stop() error {
if m.cancelWatcher != nil {
m.cancelWatcher()
m.log.Info("Stopping applier-manager")
// We have no guarantees on concurrency here, so use mutex
m.mux.Lock()
watcherCancelFn := m.watcherCancelFn
m.mux.Unlock()
if watcherCancelFn != nil {
watcherCancelFn()
}
close(m.startChan)
m.log.Debug("Stopped applier-manager")
return nil
}

func (m *Manager) leaseLost() {
m.mux.Lock()
defer m.mux.Unlock()
m.log.Warn("Lost leader lease, stopping applier-manager")

watcherCancelFn := m.watcherCancelFn
if watcherCancelFn != nil {
watcherCancelFn()
}
}

func (m *Manager) leaseAcquired() {
m.log.Info("Acquired leader lease")
select {
case m.startChan <- struct{}{}:
default:
}
}

func (m *Manager) runWatchers(ctx context.Context) error {
log := logrus.WithField("component", constant.ApplierManagerComponentName)

Expand Down

0 comments on commit 499fe61

Please sign in to comment.