diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 45bc78f0c0..b809ccad73 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -19,7 +19,7 @@ package disruption import ( "bytes" "context" - "errors" + stderrors "errors" "fmt" "strings" "sync" @@ -27,7 +27,7 @@ import ( "github.com/awslabs/operatorpkg/singleton" "github.com/samber/lo" - "go.uber.org/multierr" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" @@ -36,6 +36,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/utils/pretty" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" @@ -126,13 +128,16 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { return reconcile.Result{RequeueAfter: time.Second}, nil } - // Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process - // while it progresses in memory. If Karpenter restarts during a disruption action, some nodes can be left tainted. + // Karpenter taints nodes with a karpenter.sh/disruption taint as part of the disruption process while it progresses in memory. + // If Karpenter restarts or fails with an error during a disruption action, some nodes can be left tainted. // Idempotently remove this taint from candidates that are not in the orchestration queue before continuing. if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, false, lo.Filter(c.cluster.Nodes(), func(s *state.StateNode, _ int) bool { return !c.queue.HasAny(s.ProviderID()) })...); err != nil { - return reconcile.Result{}, fmt.Errorf("removing taint from nodes, %w", err) + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, fmt.Errorf("removing taint %s from nodes, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err) } // Attempt different disruption methods. We'll only let one method perform an action @@ -140,6 +145,9 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) { c.recordRun(fmt.Sprintf("%T", m)) success, err := c.disrupt(ctx, m) if err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } return reconcile.Result{}, fmt.Errorf("disrupting via reason=%q, %w", strings.ToLower(string(m.Reason())), err) } if success { @@ -201,7 +209,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, }) // Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil { - return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err) } var nodeClaimNames []string @@ -210,7 +218,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil { // If we failed to launch the replacement, don't disrupt. If this is some permanent failure, // we don't want to disrupt workloads with no way to provision new nodes for them. - return multierr.Append(fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + return fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err) } } @@ -229,10 +237,10 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, // We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion c.cluster.MarkForDeletion(providerIDs...) - if err := c.queue.Add(orchestration.NewCommand(nodeClaimNames, + if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames, lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil { c.cluster.UnmarkForDeletion(providerIDs...) - return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, multierr.Append(err, state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))) + return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err) } // An action is only performed and pods/nodes are only disrupted after a successful add to the queue @@ -292,6 +300,6 @@ func (c *Controller) logInvalidBudgets(ctx context.Context) { } } if buf.Len() > 0 { - log.FromContext(ctx).Error(errors.New(buf.String()), "detected disruption budget errors") + log.FromContext(ctx).Error(stderrors.New(buf.String()), "detected disruption budget errors") } } diff --git a/pkg/controllers/disruption/orchestration/queue.go b/pkg/controllers/disruption/orchestration/queue.go index 7da499e0d8..e9a24ff4b7 100644 --- a/pkg/controllers/disruption/orchestration/queue.go +++ b/pkg/controllers/disruption/orchestration/queue.go @@ -181,7 +181,7 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) { if err := q.waitOrTerminate(ctx, cmd); err != nil { // If recoverable, re-queue and try again. if !IsUnrecoverableError(err) { - // store the error that is causing us to fail so we can bubble it up later if this times out. + // store the error that is causing us to fail, so we can bubble it up later if this times out. cmd.lastError = err // mark this item as done processing. This is necessary so that the RLI is able to add the item back in. q.RateLimitingInterface.Done(cmd) diff --git a/pkg/controllers/node/termination/controller.go b/pkg/controllers/node/termination/controller.go index d1df6118df..b8e49a3931 100644 --- a/pkg/controllers/node/termination/controller.go +++ b/pkg/controllers/node/termination/controller.go @@ -39,6 +39,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/karpenter/pkg/utils/pretty" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator" @@ -91,7 +93,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err) } - if err := c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil { + if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil { return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err) } @@ -100,10 +102,13 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile return reconcile.Result{}, err } - if err := c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { - return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", v1.DisruptedTaintKey, err) + if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } + return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)) } - if err := c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil { + if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil { if !terminator.IsNodeDrainError(err) { return reconcile.Result{}, fmt.Errorf("draining node, %w", err) } @@ -114,7 +119,7 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile // if the Node Ready condition is true // Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144 if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue { - if _, err := c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { + if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil { if cloudprovider.IsNodeClaimNotFoundError(err) { return reconcile.Result{}, c.removeFinalizer(ctx, node) } @@ -231,8 +236,12 @@ func (c *Controller) removeFinalizer(ctx context.Context, n *corev1.Node) error stored := n.DeepCopy() controllerutil.RemoveFinalizer(n, v1.TerminationFinalizer) if !equality.Semantic.DeepEqual(stored, n) { + // We use client.StrategicMergeFrom here since the node object supports it and + // a strategic merge patch represents the finalizer list as a keyed "set" so removing + // an item from the list doesn't replace the full list + // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 if err := c.kubeClient.Patch(ctx, n, client.StrategicMergeFrom(stored)); err != nil { - return client.IgnoreNotFound(fmt.Errorf("patching node, %w", err)) + return client.IgnoreNotFound(fmt.Errorf("removing finalizer, %w", err)) } metrics.NodesTerminatedTotal.With(prometheus.Labels{ diff --git a/pkg/controllers/node/termination/terminator/terminator.go b/pkg/controllers/node/termination/terminator/terminator.go index 8ed53ced43..3310f50542 100644 --- a/pkg/controllers/node/termination/terminator/terminator.go +++ b/pkg/controllers/node/termination/terminator/terminator.go @@ -73,7 +73,10 @@ func (t *Terminator) Taint(ctx context.Context, node *corev1.Node, taint corev1. corev1.LabelNodeExcludeBalancers: "karpenter", }) if !equality.Semantic.DeepEqual(node, stored) { - if err := t.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the taint list + if err := t.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { return err } taintValues := []any{ diff --git a/pkg/controllers/nodeclaim/consistency/controller.go b/pkg/controllers/nodeclaim/consistency/controller.go index d1a53dd447..832d10809d 100644 --- a/pkg/controllers/nodeclaim/consistency/controller.go +++ b/pkg/controllers/nodeclaim/consistency/controller.go @@ -100,10 +100,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re return reconcile.Result{}, err } if !equality.Semantic.DeepEqual(stored, nodeClaim) { - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change // Here, we are updating the status condition list - if err = c.kubeClient.Status().Update(ctx, nodeClaim); client.IgnoreNotFound(err) != nil { + if err = c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } diff --git a/pkg/controllers/nodeclaim/disruption/controller.go b/pkg/controllers/nodeclaim/disruption/controller.go index 1d0f986813..e5cef47117 100644 --- a/pkg/controllers/nodeclaim/disruption/controller.go +++ b/pkg/controllers/nodeclaim/disruption/controller.go @@ -92,10 +92,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re results = append(results, res) } if !equality.Semantic.DeepEqual(stored, nodeClaim) { - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change // Here, we are updating the status condition list - if err := c.kubeClient.Status().Update(ctx, nodeClaim); err != nil { + if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } diff --git a/pkg/controllers/nodeclaim/lifecycle/controller.go b/pkg/controllers/nodeclaim/lifecycle/controller.go index 63d44f84ad..8a2e739b4b 100644 --- a/pkg/controllers/nodeclaim/lifecycle/controller.go +++ b/pkg/controllers/nodeclaim/lifecycle/controller.go @@ -25,6 +25,7 @@ import ( "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/util/workqueue" "k8s.io/utils/clock" controllerruntime "sigs.k8s.io/controller-runtime" @@ -85,7 +86,13 @@ func (c *Controller) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) (re stored := nodeClaim.DeepCopy() controllerutil.AddFinalizer(nodeClaim, v1.TerminationFinalizer) if !equality.Semantic.DeepEqual(nodeClaim, stored) { - if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the finalizer list + if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { + if errors.IsConflict(err) { + return reconcile.Result{Requeue: true}, nil + } return reconcile.Result{}, client.IgnoreNotFound(err) } } diff --git a/pkg/controllers/nodeclaim/lifecycle/initialization.go b/pkg/controllers/nodeclaim/lifecycle/initialization.go index 0c3979be86..8fbc2b8564 100644 --- a/pkg/controllers/nodeclaim/lifecycle/initialization.go +++ b/pkg/controllers/nodeclaim/lifecycle/initialization.go @@ -74,7 +74,7 @@ func (i *Initialization) Reconcile(ctx context.Context, nodeClaim *v1.NodeClaim) stored := node.DeepCopy() node.Labels = lo.Assign(node.Labels, map[string]string{v1.NodeInitializedLabelKey: "true"}) if !equality.Semantic.DeepEqual(stored, node) { - if err = i.kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil { + if err = i.kubeClient.Patch(ctx, node, client.MergeFrom(stored)); err != nil { return reconcile.Result{}, err } } diff --git a/pkg/controllers/nodeclaim/lifecycle/registration.go b/pkg/controllers/nodeclaim/lifecycle/registration.go index 0fb7ba6b28..77e0e22160 100644 --- a/pkg/controllers/nodeclaim/lifecycle/registration.go +++ b/pkg/controllers/nodeclaim/lifecycle/registration.go @@ -102,7 +102,10 @@ func (r *Registration) syncNode(ctx context.Context, nodeClaim *v1.NodeClaim, no v1.NodeRegisteredLabelKey: "true", }) if !equality.Semantic.DeepEqual(stored, node) { - if err := r.kubeClient.Update(ctx, node); err != nil { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the taint list + if err := r.kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { return fmt.Errorf("syncing node, %w", err) } } diff --git a/pkg/controllers/nodeclaim/podevents/controller.go b/pkg/controllers/nodeclaim/podevents/controller.go index 133e513bd0..8313e08c40 100644 --- a/pkg/controllers/nodeclaim/podevents/controller.go +++ b/pkg/controllers/nodeclaim/podevents/controller.go @@ -83,7 +83,7 @@ func (c *Controller) Reconcile(ctx context.Context, pod *corev1.Pod) (reconcile. // otherwise, set the pod event time to now nc.Status.LastPodEventTime.Time = c.clock.Now() if !equality.Semantic.DeepEqual(stored, nc) { - if err := c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil { + if err = c.kubeClient.Status().Patch(ctx, nc, client.MergeFrom(stored)); err != nil { return reconcile.Result{}, client.IgnoreNotFound(err) } } diff --git a/pkg/controllers/nodeclaim/termination/controller.go b/pkg/controllers/nodeclaim/termination/controller.go index 829f8e3a9e..0b032c2249 100644 --- a/pkg/controllers/nodeclaim/termination/controller.go +++ b/pkg/controllers/nodeclaim/termination/controller.go @@ -79,7 +79,6 @@ func (c *Controller) Reconcile(ctx context.Context, n *v1.NodeClaim) (reconcile. //nolint:gocyclo func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (reconcile.Result, error) { ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef("", nodeClaim.Status.NodeName), "provider-id", nodeClaim.Status.ProviderID)) - stored := nodeClaim.DeepCopy() if !controllerutil.ContainsFinalizer(nodeClaim, v1.TerminationFinalizer) { return reconcile.Result{}, nil } @@ -125,12 +124,14 @@ func (c *Controller) finalize(ctx context.Context, nodeClaim *v1.NodeClaim) (rec metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey], }).Observe(time.Since(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).LastTransitionTime.Time).Seconds()) } + stored := nodeClaim.DeepCopy() // The NodeClaim may have been modified in the EnsureTerminated function controllerutil.RemoveFinalizer(nodeClaim, v1.TerminationFinalizer) if !equality.Semantic.DeepEqual(stored, nodeClaim) { - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the finalizer list // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 - if err = c.kubeClient.Update(ctx, nodeClaim); err != nil { + if err = c.kubeClient.Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } @@ -170,7 +171,7 @@ func (c *Controller) annotateTerminationGracePeriodTerminationTime(ctx context.C nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime}) if err := c.kubeClient.Patch(ctx, nodeClaim, client.MergeFrom(stored)); err != nil { - return client.IgnoreNotFound(fmt.Errorf("patching nodeclaim, %w", err)) + return client.IgnoreNotFound(err) } log.FromContext(ctx).WithValues(v1.NodeClaimTerminationTimestampAnnotationKey, terminationTime).Info("annotated nodeclaim") c.recorder.Publish(terminatorevents.NodeClaimTerminationGracePeriodExpiring(nodeClaim, terminationTime)) diff --git a/pkg/controllers/nodeclaim/termination/suite_test.go b/pkg/controllers/nodeclaim/termination/suite_test.go index 07c859a46e..2e8e2bb13b 100644 --- a/pkg/controllers/nodeclaim/termination/suite_test.go +++ b/pkg/controllers/nodeclaim/termination/suite_test.go @@ -22,12 +22,15 @@ import ( "testing" "time" + "github.com/awslabs/operatorpkg/object" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/samber/lo" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" clock "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/cache" @@ -146,6 +149,31 @@ var _ = Describe("Termination", func() { _, err = cloudProvider.Get(ctx, nodeClaim.Status.ProviderID) Expect(cloudprovider.IsNodeClaimNotFoundError(err)).To(BeTrue()) }) + It("should delete the NodeClaim when the spec resource.Quantity values will change during deserialization", func() { + nodeClaim.SetGroupVersionKind(object.GVK(nodeClaim)) // This is needed so that the GVK is set on the unstructured object + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(nodeClaim) + Expect(err).ToNot(HaveOccurred()) + // Set a value in resources that will get to converted to a value with a suffix e.g. 50k + Expect(unstructured.SetNestedStringMap(u, map[string]string{"memory": "50000"}, "spec", "resources", "requests")).To(Succeed()) + + obj := &unstructured.Unstructured{} + Expect(runtime.DefaultUnstructuredConverter.FromUnstructured(u, obj)).To(Succeed()) + + ExpectApplied(ctx, env.Client, nodePool, obj) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim) + + // Expect the node and the nodeClaim to both be gone + Expect(env.Client.Delete(ctx, nodeClaim)).To(Succeed()) + result := ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // triggers the nodeclaim deletion + + Expect(result.RequeueAfter).To(BeEquivalentTo(5 * time.Second)) + nodeClaim = ExpectExists(ctx, env.Client, nodeClaim) + Expect(nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue()).To(BeTrue()) + + ExpectObjectReconciled(ctx, env.Client, nodeClaimTerminationController, nodeClaim) // this will call cloudProvider Get to check if the instance is still around + ExpectNotFound(ctx, env.Client, nodeClaim) + }) It("should requeue reconciliation if cloudProvider Get returns an error other than NodeClaimNotFoundError", func() { ExpectApplied(ctx, env.Client, nodePool, nodeClaim) ExpectObjectReconciled(ctx, env.Client, nodeClaimLifecycleController, nodeClaim) diff --git a/pkg/controllers/nodepool/readiness/controller.go b/pkg/controllers/nodepool/readiness/controller.go index 985a0192df..1488599a6e 100644 --- a/pkg/controllers/nodepool/readiness/controller.go +++ b/pkg/controllers/nodepool/readiness/controller.go @@ -71,7 +71,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco c.setReadyCondition(nodePool, nodeClass) } if !equality.Semantic.DeepEqual(stored, nodePool) { - if err = c.kubeClient.Status().Update(ctx, nodePool); client.IgnoreNotFound(err) != nil { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the status condition list + if err = c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(err) != nil { if errors.IsConflict(err) { return reconcile.Result{Requeue: true}, nil } diff --git a/pkg/controllers/nodepool/validation/controller.go b/pkg/controllers/nodepool/validation/controller.go index 96fa53cb90..986952ca50 100644 --- a/pkg/controllers/nodepool/validation/controller.go +++ b/pkg/controllers/nodepool/validation/controller.go @@ -53,10 +53,10 @@ func (c *Controller) Reconcile(ctx context.Context, nodePool *v1.NodePool) (reco nodePool.StatusConditions().SetTrue(v1.ConditionTypeValidationSucceeded) } if !equality.Semantic.DeepEqual(stored, nodePool) { - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change - // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 - if e := c.kubeClient.Status().Update(ctx, nodePool); client.IgnoreNotFound(e) != nil { + // Here, we are updating the status condition list + if e := c.kubeClient.Status().Patch(ctx, nodePool, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); client.IgnoreNotFound(e) != nil { if errors.IsConflict(e) { return reconcile.Result{Requeue: true}, nil } diff --git a/pkg/controllers/state/statenode.go b/pkg/controllers/state/statenode.go index b70e127b30..d241c51af9 100644 --- a/pkg/controllers/state/statenode.go +++ b/pkg/controllers/state/statenode.go @@ -507,7 +507,10 @@ func RequireNoScheduleTaint(ctx context.Context, kubeClient client.Client, addTa node.Spec.Taints = append(node.Spec.Taints, v1.DisruptedNoScheduleTaint) } if !equality.Semantic.DeepEqual(stored, node) { - if err := kubeClient.Patch(ctx, node, client.StrategicMergeFrom(stored)); err != nil { + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch + // can cause races due to the fact that it fully replaces the list on a change + // Here, we are updating the taint list + if err := kubeClient.Patch(ctx, node, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { multiErr = multierr.Append(multiErr, fmt.Errorf("patching node %s, %w", node.Name, err)) } } diff --git a/pkg/scheduling/taints.go b/pkg/scheduling/taints.go index 102d9ce627..b6bd0d6ab9 100644 --- a/pkg/scheduling/taints.go +++ b/pkg/scheduling/taints.go @@ -24,6 +24,8 @@ import ( corev1 "k8s.io/api/core/v1" cloudproviderapi "k8s.io/cloud-provider/api" + "sigs.k8s.io/karpenter/pkg/utils/pretty" + v1 "sigs.k8s.io/karpenter/pkg/apis/v1" ) @@ -49,7 +51,7 @@ func (ts Taints) Tolerates(pod *corev1.Pod) (errs error) { tolerates = tolerates || t.ToleratesTaint(&taint) } if !tolerates { - errs = multierr.Append(errs, fmt.Errorf("did not tolerate %s=%s:%s", taint.Key, taint.Value, taint.Effect)) + errs = multierr.Append(errs, fmt.Errorf("did not tolerate %s", pretty.Taint(taint))) } } return errs diff --git a/pkg/utils/pretty/pretty.go b/pkg/utils/pretty/pretty.go index d419a71fe9..fbc7b3bf94 100644 --- a/pkg/utils/pretty/pretty.go +++ b/pkg/utils/pretty/pretty.go @@ -24,6 +24,7 @@ import ( "golang.org/x/exp/constraints" "golang.org/x/exp/slices" + v1 "k8s.io/api/core/v1" ) func Concise(o interface{}) string { @@ -77,3 +78,10 @@ func Map[K constraints.Ordered, V any](values map[K]V, maxItems int) string { } return buf.String() } + +func Taint(t v1.Taint) string { + if t.Value == "" { + return fmt.Sprintf("%s:%s", t.Key, t.Effect) + } + return fmt.Sprintf("%s=%s:%s", t.Key, t.Value, t.Effect) +} diff --git a/pkg/utils/termination/termination.go b/pkg/utils/termination/termination.go index 72de929d21..084339b47b 100644 --- a/pkg/utils/termination/termination.go +++ b/pkg/utils/termination/termination.go @@ -34,13 +34,14 @@ func EnsureTerminated(ctx context.Context, c client.Client, nodeClaim *v1.NodeCl // Check if the status condition on nodeClaim is Terminating if !nodeClaim.StatusConditions().Get(v1.ConditionTypeInstanceTerminating).IsTrue() { // If not then call Delete on cloudProvider to trigger termination and always requeue reconciliation - if err := cloudProvider.Delete(ctx, nodeClaim); err != nil { + if err = cloudProvider.Delete(ctx, nodeClaim); err != nil { if cloudprovider.IsNodeClaimNotFoundError(err) { + stored := nodeClaim.DeepCopy() nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInstanceTerminating) - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change - // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 - if err = c.Status().Update(ctx, nodeClaim); err != nil { + // Here, we are updating the status condition list + if err = c.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { return false, err } // Instance is terminated @@ -49,11 +50,12 @@ func EnsureTerminated(ctx context.Context, c client.Client, nodeClaim *v1.NodeCl return false, fmt.Errorf("terminating cloudprovider instance, %w", err) } + stored := nodeClaim.DeepCopy() nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeInstanceTerminating) - // We call Update() here rather than Patch() because patching a list with a JSON merge patch + // We use client.MergeFromWithOptimisticLock because patching a list with a JSON merge patch // can cause races due to the fact that it fully replaces the list on a change - // https://github.com/kubernetes/kubernetes/issues/111643#issuecomment-2016489732 - if err := c.Status().Update(ctx, nodeClaim); err != nil { + // Here, we are updating the status condition list + if err = c.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil { return false, err } return false, nil