From 0d07bec2a92949fd41b2049595fedb269a910190 Mon Sep 17 00:00:00 2001 From: Ethan Mosbaugh Date: Tue, 15 Oct 2024 13:33:54 -0700 Subject: [PATCH] fix: clear stack watchers map when lost leader lease Signed-off-by: Ethan Mosbaugh --- pkg/applier/manager.go | 4 + pkg/applier/manager_test.go | 238 ++++++++++++++++++ .../manager_test/stack1/configmap.yaml | 9 + .../testdata/manager_test/stack1/pod.yaml | 11 + .../testdata/manager_test/stack2/deploy.yaml | 25 ++ 5 files changed, 287 insertions(+) create mode 100644 pkg/applier/manager_test.go create mode 100644 pkg/applier/testdata/manager_test/stack1/configmap.yaml create mode 100644 pkg/applier/testdata/manager_test/stack1/pod.yaml create mode 100644 pkg/applier/testdata/manager_test/stack2/deploy.yaml diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index 4774b75fdf44..78a3f14e1c9c 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -145,6 +145,10 @@ func (m *Manager) runWatchers(ctx context.Context) error { m.removeStack(ctx, event.Name) } case <-ctx.Done(): + // When the parent context is canceled, the stacks goroutine will stop. + // We need to clear the stacks map so that they can be added back if the lease is + // re-acquired. + m.stacks = make(map[string]stack) log.Info("manifest watcher done") return nil } diff --git a/pkg/applier/manager_test.go b/pkg/applier/manager_test.go new file mode 100644 index 000000000000..9222163f9894 --- /dev/null +++ b/pkg/applier/manager_test.go @@ -0,0 +1,238 @@ +package applier + +import ( + "context" + "embed" + "os" + "path/filepath" + "sync" + "testing" + "time" + + kubeutil "github.com/k0sproject/k0s/internal/testutil" + "github.com/k0sproject/k0s/pkg/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + yaml "sigs.k8s.io/yaml/goyaml.v2" +) + +//go:embed testdata/manager_test/* +var managerTestData embed.FS + +func TestManager(t *testing.T) { + ctx := context.Background() + + dir := t.TempDir() + + cfg := &config.CfgVars{ + ManifestsDir: dir, + } + + fakes := kubeutil.NewFakeClientFactory() + + le := new(mockLeaderElector) + + manager := &Manager{ + K0sVars: cfg, + KubeClientFactory: fakes, + LeaderElector: le, + } + + writeStack(t, dir, "testdata/manager_test/stack1") + + err := manager.Init(ctx) + require.NoError(t, err) + + err = manager.Start(ctx) + require.NoError(t, err) + + le.activate() + + // validate stack that already exists is applied + + cmgv, _ := schema.ParseResourceArg("configmaps.v1.") + podgv, _ := schema.ParseResourceArg("pods.v1.") + + waitForResource(t, fakes, *cmgv, "kube-system", "applier-test") + waitForResource(t, fakes, *podgv, "kube-system", "applier-test") + + r, err := getResource(fakes, *cmgv, "kube-system", "applier-test") + if assert.NoError(t, err) { + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + r, err = getResource(fakes, *podgv, "kube-system", "applier-test") + if assert.NoError(t, err) { + assert.Equal(t, "Pod", r.GetKind()) + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + + // update the stack and verify the changes are applied + + writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom1", "test") + + t.Log("waiting for pod to be updated") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + r, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if err != nil { + return false, nil + } + return r.GetLabels()["custom1"] == "test", nil + }) + + // lose and re-acquire leadership + le.deactivate() + le.activate() + + // validate a new stack that is added is applied + + writeStack(t, dir, "testdata/manager_test/stack2") + + deployGV, _ := schema.ParseResourceArg("deployments.v1.apps") + + waitForResource(t, fakes, *deployGV, "kube-system", "nginx") + + r, err = getResource(fakes, *deployGV, "kube-system", "nginx") + if assert.NoError(t, err) { + assert.Equal(t, "Deployment", r.GetKind()) + assert.Equal(t, "applier", r.GetLabels()["component"]) + } + + // update the stack after the lease aquire and verify the changes are applied + + writeLabel(t, filepath.Join(dir, "stack1/pod.yaml"), "custom2", "test") + + t.Log("waiting for pod to be updated") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + r, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if err != nil { + return false, nil + } + return r.GetLabels()["custom2"] == "test", nil + }) + + // delete the stack and verify the resources are deleted + + err = os.RemoveAll(filepath.Join(dir, "stack1")) + require.NoError(t, err) + + t.Log("waiting for pod to be deleted") + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + _, err := getResource(fakes, *podgv, "kube-system", "applier-test") + if errors.IsNotFound(err) { + return true, nil + } + return false, nil + }) +} + +func writeLabel(t *testing.T, file string, key string, value string) { + t.Helper() + contents, err := os.ReadFile(file) + require.NoError(t, err) + unst := map[interface{}]interface{}{} + err = yaml.Unmarshal(contents, &unst) + require.NoError(t, err) + unst["metadata"].(map[interface{}]interface{})["labels"].(map[interface{}]interface{})[key] = value + data, err := yaml.Marshal(unst) + require.NoError(t, err) + err = os.WriteFile(file, data, 0400) + require.NoError(t, err) +} + +func waitForResource(t *testing.T, fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) { + t.Logf("waiting for resource %s/%s", gv.Resource, name) + waitFor(t, 100*time.Millisecond, 5*time.Second, func(ctx context.Context) (bool, error) { + _, err := getResource(fakes, gv, namespace, name) + if errors.IsNotFound(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil + }) +} + +func getResource(fakes *kubeutil.FakeClientFactory, gv schema.GroupVersionResource, namespace string, name string) (*unstructured.Unstructured, error) { + return fakes.DynamicClient.Resource(gv).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) +} + +func waitFor(t *testing.T, interval, timeout time.Duration, fn wait.ConditionWithContextFunc) { + t.Helper() + err := wait.PollUntilContextTimeout(context.Background(), interval, timeout, true, fn) + require.NoError(t, err) +} + +func writeStack(t *testing.T, dst string, src string) { + dstStackDir := filepath.Join(dst, filepath.Base(src)) + err := os.MkdirAll(dstStackDir, 0755) + require.NoError(t, err) + entries, err := managerTestData.ReadDir(src) + require.NoError(t, err) + for _, entry := range entries { + src := filepath.Join(src, entry.Name()) + data, err := managerTestData.ReadFile(src) + require.NoError(t, err) + dst := filepath.Join(dstStackDir, entry.Name()) + t.Logf("writing file %s", dst) + err = os.WriteFile(dst, data, 0644) + require.NoError(t, err) + } +} + +type mockLeaderElector struct { + mu sync.Mutex + leader bool + acquired []func() + lost []func() +} + +func (e *mockLeaderElector) activate() { + e.mu.Lock() + defer e.mu.Unlock() + if !e.leader { + e.leader = true + for _, fn := range e.acquired { + fn() + } + } +} + +func (e *mockLeaderElector) deactivate() { + e.mu.Lock() + defer e.mu.Unlock() + if e.leader { + e.leader = false + for _, fn := range e.lost { + fn() + } + } +} + +func (e *mockLeaderElector) IsLeader() bool { + e.mu.Lock() + defer e.mu.Unlock() + return e.leader +} + +func (e *mockLeaderElector) AddAcquiredLeaseCallback(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + e.acquired = append(e.acquired, fn) + if e.leader { + fn() + } +} + +func (e *mockLeaderElector) AddLostLeaseCallback(fn func()) { + e.mu.Lock() + defer e.mu.Unlock() + e.lost = append(e.lost, fn) + if e.leader { + fn() + } +} diff --git a/pkg/applier/testdata/manager_test/stack1/configmap.yaml b/pkg/applier/testdata/manager_test/stack1/configmap.yaml new file mode 100644 index 000000000000..e8662b63e3c7 --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack1/configmap.yaml @@ -0,0 +1,9 @@ +kind: ConfigMap +apiVersion: v1 +metadata: + name: applier-test + namespace: kube-system + labels: + component: applier +data: + foo: bar diff --git a/pkg/applier/testdata/manager_test/stack1/pod.yaml b/pkg/applier/testdata/manager_test/stack1/pod.yaml new file mode 100644 index 000000000000..08957e15baf2 --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack1/pod.yaml @@ -0,0 +1,11 @@ +kind: Pod +apiVersion: v1 +metadata: + name: applier-test + namespace: kube-system + labels: + component: applier +spec: + containers: + - name: nginx + image: nginx:1.15 diff --git a/pkg/applier/testdata/manager_test/stack2/deploy.yaml b/pkg/applier/testdata/manager_test/stack2/deploy.yaml new file mode 100644 index 000000000000..48eabe53140f --- /dev/null +++ b/pkg/applier/testdata/manager_test/stack2/deploy.yaml @@ -0,0 +1,25 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: nginx + namespace: kube-system + labels: + component: applier +spec: + selector: + matchLabels: + app: nginx + template: + metadata: + labels: + app: nginx + spec: + containers: + - name: nginx + image: docker.io/nginx:1-alpine + resources: + limits: + memory: "64Mi" + cpu: "100m" + ports: + - containerPort: 80