Skip to content

Commit

Permalink
fix: Ensure all patch calls can conflict when resource version doesn'…
Browse files Browse the repository at this point in the history
…t match (#1658)
  • Loading branch information
jonathan-innis committed Sep 17, 2024
1 parent 275fa2f commit 71f7aef
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 43 deletions.
28 changes: 18 additions & 10 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package disruption
import (
"bytes"
"context"
"errors"
stderrors "errors"
"fmt"
"strings"
"sync"
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/samber/lo"
"go.uber.org/multierr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand All @@ -36,6 +36,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand Down Expand Up @@ -126,20 +128,26 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
return reconcile.Result{RequeueAfter: time.Second}, nil
}

// Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process
// while it progresses in memory. If Karpenter restarts during a disruption action, some nodes can be left tainted.
// Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process while it progresses in memory.
// If Karpenter restarts or fails with an error during a disruption action, some nodes can be left tainted.
// Idempotently remove this taint from candidates that are not in the orchestration queue before continuing.
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool {
return !c.queue.HasAny(s.ProviderID())
})...); err != nil {
return reconcile.Result{}, fmt.Errorf("removing taint from nodes, %w", err)
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("removing taint %s from nodes, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}

// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
c.recordRun(fmt.Sprintf("%T", m))
success, err := c.disrupt(ctx, m)
if err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("disrupting via reason=%q, %w", strings.ToLower(string(m.Reason())), err)
}
if success {
Expand Down Expand Up @@ -201,7 +209,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err)
}

var nodeClaimNames []string
Expand All @@ -210,7 +218,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil {
// If we failed to launch the replacement, don't disrupt. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return multierr.Append(fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
return fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err)
}
}

Expand All @@ -229,10 +237,10 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)

if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames,
if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)))
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err)
}

// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
Expand Down Expand Up @@ -292,6 +300,6 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) {
}
}
if buf.Len() > 0 {
log.FromContext(ctx).Error(errors.New(buf.String()), "detected disruption budget errors")
log.FromContext(ctx).Error(stderrors.New(buf.String()), "detected disruption budget errors")
}
}
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
if err := q.waitOrTerminate(ctx, cmd); err != nil {
// If recoverable, re-queue and try again.
if !IsUnrecoverableError(err) {
// store the error that is causing us to fail so we can bubble it up later if this times out.
// store the error that is causing us to fail, so we can bubble it up later if this times out.
cmd.lastError = err
// mark this item as done processing. This is necessary so that the RLI is able to add the item back in.
q.RateLimitingInterface.Done(cmd)
Expand Down
21 changes: 15 additions & 6 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/karpenter/pkg/utils/pretty"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
Expand Down Expand Up @@ -91,7 +93,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}

if err := c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
}

Expand All @@ -100,10 +102,13 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, err
}

if err := c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1.DisruptedTaintKey, err)
if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err))
}
if err := c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
Expand All @@ -114,7 +119,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err := c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
Expand Down Expand Up @@ -231,8 +236,12 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, n) {
// We use client.StrategicMergeFrom here since the node object supports it and
// a strategic merge patch represents the finalizer list as a keyed "set" so removing
// an item from the list doesn't replace the full list
// https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732
if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil {
return client.IgnoreNotFound(fmt.Errorf("patching node, %w", err))
return client.IgnoreNotFound(fmt.Errorf("removing finalizer, %w", err))
}

metrics.NodesTerminatedTotal.With(prometheus.Labels{
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/node/termination/terminator/terminator.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ func (t *Terminator) Taint(ctx context.Context, node *corev1.Node, taint corev1.
corev1.LabelNodeExcludeBalancers: "karpenter",
})
if !equality.Semantic.DeepEqual(node, stored) {
if err := t.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the taint list
if err := t.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return err
}
taintValues := []any{
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclaim/consistency/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
return reconcile.Result{}, err
}
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err = c.kubeClient.Status().Update(ctx, nodeClaim); client.IgnoreNotFound(err) != nil {
if err = c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/nodeclaim/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
results = append(results, res)
}
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err := c.kubeClient.Status().Update(ctx, nodeClaim); err != nil {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -85,7 +86,13 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re
stored := nodeClaim.DeepCopy()
controllerutil.AddFinalizer(nodeClaim, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(nodeClaim, stored) {
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the finalizer list
if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/lifecycle/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim)
stored := node.DeepCopy()
node.Labels = lo.Assign(node.Labels, map[string]string{v1.NodeInitializedLabelKey: "true"})
if !equality.Semantic.DeepEqual(stored, node) {
if err = i.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil {
if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, err
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodeclaim/lifecycle/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, no
v1.NodeRegisteredLabelKey: "true",
})
if !equality.Semantic.DeepEqual(stored, node) {
if err := r.kubeClient.Update(ctx, node); err != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the taint list
if err := r.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
return fmt.Errorf("syncing node, %w", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/nodeclaim/podevents/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *Controller) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile.
// otherwise, set the pod event time to now
nc.Status.LastPodEventTime.Time = c.clock.Now()
if !equality.Semantic.DeepEqual(stored, nc) {
if err := c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
if err = c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/nodeclaim/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func (c *Controller) Reconcile(ctx context.Context, n *v1.NodeClaim) (reconcile.
//nolint:gocyclo
func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) {
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef("", nodeClaim.Status.NodeName), "provider-id", nodeClaim.Status.ProviderID))
stored := nodeClaim.DeepCopy()
if !controllerutil.ContainsFinalizer(nodeClaim, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -125,12 +124,14 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
}).Observe(time.Since(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).LastTransitionTime.Time).Seconds())
}
stored := nodeClaim.DeepCopy() // The NodeClaim may have been modified in the EnsureTerminated function
controllerutil.RemoveFinalizer(nodeClaim, v1.TerminationFinalizer)
if !equality.Semantic.DeepEqual(stored, nodeClaim) {
// We call Update() here rather than Patch() because patching a list with a JSON merge patch
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the finalizer list
// https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732
if err = c.kubeClient.Update(ctx, nodeClaim); err != nil {
if err = c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down Expand Up @@ -170,7 +171,7 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})

if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil {
return client.IgnoreNotFound(fmt.Errorf("patching nodeclaim, %w", err))
return client.IgnoreNotFound(err)
}
log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim")
c.recorder.Publish(terminatorevents.NodeClaimTerminationGracePeriodExpiring(nodeClaim, terminationTime))
Expand Down
28 changes: 28 additions & 0 deletions pkg/controllers/nodeclaim/termination/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ import (
"testing"
"time"

"github.com/awslabs/operatorpkg/object"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand Down Expand Up @@ -146,6 +149,31 @@ var _ = Describe("Termination", func() {
_, err = cloudProvider.Get(ctx, nodeClaim.Status.ProviderID)
Expect(cloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue())
})
It("should delete the NodeClaim when the spec resource.Quantity values will change during deserialization", func() {
nodeClaim.SetGroupVersionKind(object.GVK(nodeClaim)) // This is needed so that the GVK is set on the unstructured object
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(nodeClaim)
Expect(err).ToNot(HaveOccurred())
// Set a value in resources that will get to converted to a value with a suffix e.g. 50k
Expect(unstructured.SetNestedStringMap(u, map[string]string{"memory": "50000"}, "spec", "resources", "requests")).To(Succeed())

obj := &unstructured.Unstructured{}
Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(u, obj)).To(Succeed())

ExpectApplied(ctx, env.Client, nodePool, obj)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim)

// Expect the node and the nodeClaim to both be gone
Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed())
result := ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // triggers the nodeclaim deletion

Expect(result.RequeueAfter).To(BeEquivalentTo(5 * time.Second))
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue()).To(BeTrue())

ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // this will call cloudProvider Get to check if the instance is still around
ExpectNotFound(ctx, env.Client, nodeClaim)
})
It("should requeue reconciliation if cloudProvider Get returns an error other than NodeClaimNotFoundError", func() {
ExpectApplied(ctx, env.Client, nodePool, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim)
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/nodepool/readiness/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco
c.setReadyCondition(nodePool, nodeClass)
}
if !equality.Semantic.DeepEqual(stored, nodePool) {
if err = c.kubeClient.Status().Update(ctx, nodePool); client.IgnoreNotFound(err) != nil {
// We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch
// can cause races due to the fact that it fully replaces the list on a change
// Here, we are updating the status condition list
if err = c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
Expand Down
Loading

0 comments on commit 71f7aef

Please sign in to comment.