Skip to content

Commit

Permalink
implement preserveResourcesOnDeletion to support migration rollback
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Alavi <[email protected]>
  • Loading branch information
a7i committed Sep 23, 2024
1 parent 721107d commit 90270f8
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 41 deletions.
14 changes: 11 additions & 3 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ensureWork(
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspension *policyv1alpha1.Suspension
var preserveResourcesOnDeletion *bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -57,6 +58,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
Expand All @@ -65,6 +67,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -133,9 +136,14 @@ func ensureWork(
Annotations: annotations,
}

suspendDispatching := shouldSuspendDispatching(suspension, targetCluster)

if err = helper.CreateOrUpdateWork(ctx, c, workMeta, clonedWorkload, &suspendDispatching); err != nil {
if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
}
}
Expand Down
33 changes: 24 additions & 9 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
Expand Down Expand Up @@ -102,15 +104,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
}

if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
if err := c.handleWorkDelete(ctx, work, cluster); err != nil {
return controllerruntime.Result{}, err
}

return c.removeFinalizer(ctx, work)
Expand Down Expand Up @@ -161,6 +156,26 @@ func (c *Controller) syncWork(ctx context.Context, clusterName string, work *wor
return controllerruntime.Result{}, nil
}

func (c *Controller) handleWorkDelete(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
if ptr.Deref(work.Spec.PreserveResourcesOnDeletion, false) {
klog.V(4).InfoS("Preserving resource on deletion from work(%s/%s) on cluster(%s)", work.Namespace, work.Name, cluster.Name)
return nil
}

// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, cluster.Name, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return nil
}

// tryDeleteWorkload tries to delete resources in the given member cluster.
func (c *Controller) tryDeleteWorkload(ctx context.Context, clusterName string, work *workv1alpha1.Work) error {
for _, manifest := range work.Spec.Workload.Manifests {
Expand Down
50 changes: 42 additions & 8 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,19 @@ import (
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/gclient"
utilhelper "github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)

const (
podNamespace = "default"
podName = "test"
clusterName = "cluster"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
Expand All @@ -54,6 +62,7 @@ func TestExecutionController_Reconcile(t *testing.T) {
expectCondition *metav1.Condition
expectEventMessage string
existErr bool
resourceExists *bool
}{
{
name: "work dispatching is suspended, no error, no apply",
Expand Down Expand Up @@ -112,6 +121,19 @@ func TestExecutionController_Reconcile(t *testing.T) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=true, deletion timestamp set, does not delete resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(true),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(true)
}),
},
}

for _, tt := range tests {
Expand Down Expand Up @@ -143,14 +165,26 @@ func TestExecutionController_Reconcile(t *testing.T) {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}

if tt.resourceExists != nil {
obj, err := utilhelper.GetObjectFromCache(c.RESTMapper, c.InformerManager, keys.FederatedKey{Cluster: clusterName, ClusterWideKey: keys.ClusterWideKey{
Version: "v1", Kind: "Pod", Namespace: podNamespace, Name: podName,
}})
if *tt.resourceExists {
assert.NoErrorf(t, err, "unable to query pod (%s/%s)", podNamespace, podName)
} else {
assert.Nil(t, t, obj)
}
}
})
}
}

func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod("default", "test")
client := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Controller {
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod(podNamespace, podName)
pod.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue})
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
restMapper.Add(corev1.SchemeGroupVersion.WithKind(pod.Kind), meta.RESTScopeNamespace)
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
Expand All @@ -159,16 +193,16 @@ func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder)
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
return Controller{
Client: client,
Client: fakeClient,
InformerManager: informerManager,
EventRecorder: eventRecorder,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(client, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
}
}

func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
pod := testhelper.NewPod("default", "test")
pod := testhelper.NewPod(podNamespace, podName)
bytes, _ := json.Marshal(pod)
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
if applyFunc != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(ctx context.Context, quota *policyv1alpha1.F
},
}

err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj, nil)
err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/namespace/namespace_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Controller) buildWorks(ctx context.Context, namespace *corev1.Namespace
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced); err != nil {
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/unifiedauth/unified_auth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *Controller) buildWorks(ctx context.Context, cluster *clusterv1alpha1.Cl
},
}

if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj); err != nil {
return err
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
excludeClusterPolicy(bindingCopy.Labels)
return nil
})
Expand Down Expand Up @@ -596,6 +597,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -642,6 +644,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -763,12 +766,13 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
Finalizers: []string{util.BindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down Expand Up @@ -808,12 +812,13 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down
5 changes: 3 additions & 2 deletions pkg/util/helper/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
)

// CreateOrUpdateWork creates a Work object if not exist, or updates if it already exists.
func CreateOrUpdateWork(ctx context.Context, client client.Client, workMeta metav1.ObjectMeta, resource *unstructured.Unstructured, suspendDispatching *bool) error {
func CreateOrUpdateWork(ctx context.Context, client client.Client, workMeta metav1.ObjectMeta, resource *unstructured.Unstructured, options ...WorkOption) error {
if workMeta.Labels[util.PropagationInstruction] != util.PropagationInstructionSuppressed {
resource = resource.DeepCopy()
// set labels
Expand All @@ -62,7 +62,6 @@ func CreateOrUpdateWork(ctx context.Context, client client.Client, workMeta meta
work := &workv1alpha1.Work{
ObjectMeta: workMeta,
Spec: workv1alpha1.WorkSpec{
SuspendDispatching: suspendDispatching,
Workload: workv1alpha1.WorkloadTemplate{
Manifests: []workv1alpha1.Manifest{
{
Expand All @@ -75,6 +74,8 @@ func CreateOrUpdateWork(ctx context.Context, client client.Client, workMeta meta
},
}

applyWorkOptions(work, options)

runtimeObject := work.DeepCopy()
var operationResult controllerutil.OperationResult
err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
Expand Down
43 changes: 43 additions & 0 deletions pkg/util/helper/workoption.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Copyright 2024 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package helper

import workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"

// WorkOption is a function that applies changes to a Work object.
// It is used to configure Work fields for clients of CreateOrUpdateWork.
type WorkOption func(work *workv1alpha1.Work)

// WithSuspendDispatching sets the SuspendDispatching field of the Work Spec.
func WithSuspendDispatching(suspendDispatching bool) WorkOption {
return func(work *workv1alpha1.Work) {
work.Spec.SuspendDispatching = &suspendDispatching
}
}

// WithPreserveResourcesOnDeletion sets the PreserveResourcesOnDeletion field of the Work Spec.
func WithPreserveResourcesOnDeletion(preserveResourcesOnDeletion bool) WorkOption {
return func(work *workv1alpha1.Work) {
work.Spec.PreserveResourcesOnDeletion = &preserveResourcesOnDeletion
}
}

func applyWorkOptions(work *workv1alpha1.Work, options []WorkOption) {
for _, option := range options {
option(work)
}
}
Loading

0 comments on commit 90270f8

Please sign in to comment.