Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clear stack watchers map when lost leader lease #5123

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 73 additions & 21 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"path"
"sync"
"time"

"github.com/k0sproject/k0s/internal/pkg/dir"
Expand All @@ -39,11 +40,13 @@ type Manager struct {
KubeClientFactory kubeutil.ClientFactoryInterface

// client kubernetes.Interface
applier Applier
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
stacks map[string]stack
applier Applier
bundlePath string
stacks map[string]stack
log *logrus.Entry
startChan chan struct{}
mux sync.Mutex
watcherCancelFn context.CancelFunc

LeaderElector leaderelector.Interface
}
Expand All @@ -67,35 +70,80 @@ func (m *Manager) Init(ctx context.Context) error {

m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)

m.LeaderElector.AddAcquiredLeaseCallback(func() {
watcherCtx, cancel := context.WithCancel(ctx)
m.cancelWatcher = cancel
go func() {
_ = m.runWatchers(watcherCtx)
}()
})
m.LeaderElector.AddLostLeaseCallback(func() {
if m.cancelWatcher != nil {
m.cancelWatcher()
}
})

return err
return nil
}

// Run runs the Manager
func (m *Manager) Start(_ context.Context) error {
m.log.Debug("Starting")
Copy link
Contributor Author

@emosbaugh emosbaugh Oct 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was largely lifted from the ExtensionsController

m.startChan = make(chan struct{}, 1)

m.LeaderElector.AddLostLeaseCallback(m.leaseLost)

m.LeaderElector.AddAcquiredLeaseCallback(m.leaseAcquired)

// It's possible that by the time we added the callback, we are already the leader,
// If this is true the callback will not be called, so we need to check if we are
// the leader and notify the channel manually
if m.LeaderElector.IsLeader() {
m.leaseAcquired()
}

go m.watchStartChan()
return nil
}

func (m *Manager) watchStartChan() {
m.log.Debug("Watching start channel")
for range m.startChan {
m.log.Info("Acquired leader lease")
m.mux.Lock()
ctx, cancel := context.WithCancel(context.Background())
// If there is a previous cancel func, call it
if m.watcherCancelFn != nil {
m.watcherCancelFn()
}
m.watcherCancelFn = cancel
m.mux.Unlock()
_ = m.runWatchers(ctx)
}
m.log.Info("Start channel closed, stopping applier-manager")
}

// Stop stops the Manager
func (m *Manager) Stop() error {
if m.cancelWatcher != nil {
m.cancelWatcher()
m.log.Info("Stopping applier-manager")
// We have no guarantees on concurrency here, so use mutex
m.mux.Lock()
watcherCancelFn := m.watcherCancelFn
m.mux.Unlock()
if watcherCancelFn != nil {
watcherCancelFn()
}
close(m.startChan)
m.log.Debug("Stopped applier-manager")
return nil
}

func (m *Manager) leaseLost() {
m.mux.Lock()
defer m.mux.Unlock()
m.log.Warn("Lost leader lease, stopping applier-manager")

watcherCancelFn := m.watcherCancelFn
if watcherCancelFn != nil {
watcherCancelFn()
}
}

func (m *Manager) leaseAcquired() {
m.log.Info("Acquired leader lease")
select {
case m.startChan <- struct{}{}:
default:
}
}

func (m *Manager) runWatchers(ctx context.Context) error {
log := logrus.WithField("component", constant.ApplierManagerComponentName)

Expand Down Expand Up @@ -145,6 +193,10 @@ func (m *Manager) runWatchers(ctx context.Context) error {
m.removeStack(ctx, event.Name)
}
case <-ctx.Done():
// When the parent context is canceled, the stacks goroutine will stop.
// We need to clear the stacks map so that they can be added back if the lease is
// re-acquired.
m.stacks = make(map[string]stack)
log.Info("manifest watcher done")
return nil
}
Expand Down
254 changes: 254 additions & 0 deletions pkg/applier/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*
Copyright 2024 k0s authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package applier

import (
"context"
"embed"
"os"
"path"
"path/filepath"
"sync"
"testing"
"time"

kubeutil "github.com/k0sproject/k0s/internal/testutil"
"github.com/k0sproject/k0s/pkg/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
yaml "sigs.k8s.io/yaml/goyaml.v2"
)

//go:embed testdata/manager_test/*
var managerTestData embed.FS

func TestManager(t *testing.T) {
ctx := context.Background()

dir := t.TempDir()

cfg := &config.CfgVars{
ManifestsDir: dir,
}

fakes := kubeutil.NewFakeClientFactory()

le := new(mockLeaderElector)

manager := &Manager{
K0sVars: cfg,
KubeClientFactory: fakes,
LeaderElector: le,
}

writeStack(t, dir, "testdata/manager_test/stack1")

err := manager.Init(ctx)
require.NoError(t, err)

err = manager.Start(ctx)
require.NoError(t, err)

le.activate()

// validate stack that already exists is applied

cmgv, _ := schema.ParseResourceArg("configmaps.v1.")
podgv, _ := schema.ParseResourceArg("pods.v1.")

waitForResource(t, fakes, *cmgv, "kube-system", "applier-test")
waitForResource(t, fakes, *podgv, "kube-system", "applier-test")

r, err := getResource(fakes, *cmgv, "kube-system", "applier-test")
if assert.NoError(t, err) {
assert.Equal(t, "applier", r.GetLabels()["component"])
}
r, err = getResource(fakes, *podgv, "kube-system", "applier-test")
if assert.NoError(t, err) {
assert.Equal(t, "Pod", r.GetKind())
assert.Equal(t, "applier", r.GetLabels()["component"])
}

// update the stack and verify the changes are applied

writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom1", "test")

t.Log("waiting for pod to be updated")
waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) {
r, err := getResource(fakes, *podgv, "kube-system", "applier-test")
if err != nil {
return false, nil
}
return r.GetLabels()["custom1"] == "test", nil
})

// lose and re-acquire leadership
le.deactivate()
le.activate()

// validate a new stack that is added is applied

writeStack(t, dir, "testdata/manager_test/stack2")

deployGV, _ := schema.ParseResourceArg("deployments.v1.apps")

waitForResource(t, fakes, *deployGV, "kube-system", "nginx")

r, err = getResource(fakes, *deployGV, "kube-system", "nginx")
if assert.NoError(t, err) {
assert.Equal(t, "Deployment", r.GetKind())
assert.Equal(t, "applier", r.GetLabels()["component"])
}

// update the stack after the lease aquire and verify the changes are applied

writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom2", "test")

t.Log("waiting for pod to be updated")
waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) {
r, err := getResource(fakes, *podgv, "kube-system", "applier-test")
if err != nil {
return false, nil
}
return r.GetLabels()["custom2"] == "test", nil
})

// delete the stack and verify the resources are deleted

err = os.RemoveAll(filepath.Join(dir, "stack1"))
require.NoError(t, err)

t.Log("waiting for pod to be deleted")
waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) {
_, err := getResource(fakes, *podgv, "kube-system", "applier-test")
if errors.IsNotFound(err) {
return true, nil
}
return false, nil
})
}

func writeLabel(t *testing.T, file string, key string, value string) {
t.Helper()
contents, err := os.ReadFile(file)
require.NoError(t, err)
unst := map[interface{}]interface{}{}
err = yaml.Unmarshal(contents, &unst)
require.NoError(t, err)
unst["metadata"].(map[interface{}]interface{})["labels"].(map[interface{}]interface{})[key] = value
data, err := yaml.Marshal(unst)
require.NoError(t, err)
err = os.WriteFile(file, data, 0400)
require.NoError(t, err)
}

func waitForResource(t *testing.T, fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) {
t.Logf("waiting for resource %s/%s", gv.Resource, name)
waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) {
_, err := getResource(fakes, gv, namespace, name)
if errors.IsNotFound(err) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
})
}

func getResource(fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) (*unstructured.Unstructured, error) {
return fakes.DynamicClient.Resource(gv).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{})
}

func waitFor(t *testing.T, interval, timeout time.Duration, fn wait.ConditionWithContextFunc) {
t.Helper()
err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, fn)
require.NoError(t, err)
}

func writeStack(t *testing.T, dst string, src string) {
dstStackDir := filepath.Join(dst, path.Base(src))
err := os.MkdirAll(dstStackDir, 0755)
require.NoError(t, err)
entries, err := managerTestData.ReadDir(src)
require.NoError(t, err)
for _, entry := range entries {
data, err := managerTestData.ReadFile(path.Join(src, entry.Name()))
require.NoError(t, err)
dst := filepath.Join(dstStackDir, entry.Name())
t.Logf("writing file %s", dst)
err = os.WriteFile(dst, data, 0644)
require.NoError(t, err)
}
}

type mockLeaderElector struct {
mu sync.Mutex
leader bool
acquired []func()
lost []func()
}

func (e *mockLeaderElector) activate() {
e.mu.Lock()
defer e.mu.Unlock()
if !e.leader {
e.leader = true
for _, fn := range e.acquired {
fn()
}
}
}

func (e *mockLeaderElector) deactivate() {
e.mu.Lock()
defer e.mu.Unlock()
if e.leader {
e.leader = false
for _, fn := range e.lost {
fn()
}
}
}

func (e *mockLeaderElector) IsLeader() bool {
e.mu.Lock()
defer e.mu.Unlock()
return e.leader
}

func (e *mockLeaderElector) AddAcquiredLeaseCallback(fn func()) {
e.mu.Lock()
defer e.mu.Unlock()
e.acquired = append(e.acquired, fn)
if e.leader {
fn()
}
}

func (e *mockLeaderElector) AddLostLeaseCallback(fn func()) {
e.mu.Lock()
defer e.mu.Unlock()
e.lost = append(e.lost, fn)
if e.leader {
fn()
}
}
9 changes: 9 additions & 0 deletions pkg/applier/testdata/manager_test/stack1/configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
kind: ConfigMap
apiVersion: v1
metadata:
name: applier-test
namespace: kube-system
labels:
component: applier
data:
foo: bar
Loading
Loading