Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add e2e for migration rollback feature #5609

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ensureWork(
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspension *policyv1alpha1.Suspension
var preserveResourcesOnDeletion *bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -57,6 +58,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
Expand All @@ -65,6 +67,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -133,9 +136,14 @@ func ensureWork(
Annotations: annotations,
}

suspendDispatching := shouldSuspendDispatching(suspension, targetCluster)

if err = helper.CreateOrUpdateWork(ctx, c, workMeta, clonedWorkload, &suspendDispatching); err != nil {
if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
}
}
Expand Down
37 changes: 26 additions & 11 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
Expand Down Expand Up @@ -102,15 +104,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
}

if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
if err := c.handleWorkDelete(ctx, work, cluster); err != nil {
return controllerruntime.Result{}, err
}

return c.removeFinalizer(ctx, work)
Expand Down Expand Up @@ -151,16 +146,36 @@ func (c *Controller) syncWork(ctx context.Context, clusterName string, work *wor
metrics.ObserveSyncWorkloadLatency(err, start)
if err != nil {
msg := fmt.Sprintf("Failed to sync work(%s/%s) to cluster(%s), err: %v", work.Namespace, work.Name, clusterName, err)
klog.Errorf(msg)
klog.Error(msg)
c.EventRecorder.Event(work, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, msg)
return controllerruntime.Result{}, err
}
msg := fmt.Sprintf("Sync work(%s/%s) to cluster(%s) successful.", work.Namespace, work.Name, clusterName)
klog.V(4).Infof(msg)
klog.V(4).Info(msg)
c.EventRecorder.Event(work, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, msg)
return controllerruntime.Result{}, nil
}

func (c *Controller) handleWorkDelete(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
if ptr.Deref(work.Spec.PreserveResourcesOnDeletion, false) {
klog.V(4).Infof("Preserving resource on deletion from work(%s/%s) on cluster(%s)", work.Namespace, work.Name, cluster.Name)
return nil
}

// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, cluster.Name, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return nil
}

// tryDeleteWorkload tries to delete resources in the given member cluster.
func (c *Controller) tryDeleteWorkload(ctx context.Context, clusterName string, work *workv1alpha1.Work) error {
for _, manifest := range work.Spec.Workload.Manifests {
Expand Down
85 changes: 77 additions & 8 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
Expand All @@ -45,6 +47,12 @@ import (
testhelper "github.com/karmada-io/karmada/test/helper"
)

const (
podNamespace = "default"
podName = "test"
clusterName = "cluster"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
Expand All @@ -54,6 +62,7 @@ func TestExecutionController_Reconcile(t *testing.T) {
expectCondition *metav1.Condition
expectEventMessage string
existErr bool
resourceExists *bool
}{
{
name: "work dispatching is suspended, no error, no apply",
Expand Down Expand Up @@ -112,10 +121,52 @@ func TestExecutionController_Reconcile(t *testing.T) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=true, deletion timestamp set, does not delete resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(true),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=false, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(false)
}),
},
{
name: "PreserveResourcesOnDeletion unset, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
}),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Cleanup(func() {
genericmanager.GetInstance().Stop(clusterName)
})

req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
Expand Down Expand Up @@ -143,32 +194,50 @@ func TestExecutionController_Reconcile(t *testing.T) {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}

if tt.resourceExists != nil {
resourceInterface := c.InformerManager.GetSingleClusterManager(clusterName).GetClient().
Resource(corev1.SchemeGroupVersion.WithResource("pods")).Namespace(podNamespace)
_, err = resourceInterface.Get(context.TODO(), podName, metav1.GetOptions{})
if *tt.resourceExists {
assert.NoErrorf(t, err, "unable to query pod (%s/%s)", podNamespace, podName)
} else {
assert.True(t, apierrors.IsNotFound(err), "pod (%s/%s) was not deleted", podNamespace, podName)
}
}
})
}
}

func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod("default", "test")
client := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Controller {
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod(podNamespace, podName)
pod.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue})
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
restMapper.Add(corev1.SchemeGroupVersion.WithKind(pod.Kind), meta.RESTScopeNamespace)
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).WithRESTMapper(restMapper).Build()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
informerManager := genericmanager.GetInstance()
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
return Controller{
Client: client,
Client: fakeClient,
InformerManager: informerManager,
EventRecorder: eventRecorder,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(client, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, nil),
}
}

func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
pod := testhelper.NewPod("default", "test")
pod := testhelper.NewPod(podNamespace, podName)
bytes, _ := json.Marshal(pod)
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
if applyFunc != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(ctx context.Context, quota *policyv1alpha1.F
},
}

err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj, nil)
err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/namespace/namespace_sync_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *Controller) buildWorks(ctx context.Context, namespace *corev1.Namespace
Annotations: annotations,
}

if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, clonedNamespaced); err != nil {
ch <- fmt.Errorf("sync namespace(%s) to cluster(%s) failed due to: %v", clonedNamespaced.GetName(), cluster.GetName(), err)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/unifiedauth/unified_auth_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (c *Controller) buildWorks(ctx context.Context, cluster *clusterv1alpha1.Cl
},
}

if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, obj); err != nil {
return err
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
excludeClusterPolicy(bindingCopy.Labels)
return nil
})
Expand Down Expand Up @@ -596,6 +597,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -642,6 +644,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
bindingCopy.Spec.Failover = binding.Spec.Failover
bindingCopy.Spec.ConflictResolution = binding.Spec.ConflictResolution
bindingCopy.Spec.Suspension = binding.Spec.Suspension
bindingCopy.Spec.PreserveResourcesOnDeletion = binding.Spec.PreserveResourcesOnDeletion
return nil
})
return err
Expand Down Expand Up @@ -763,12 +766,13 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
Finalizers: []string{util.BindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down Expand Up @@ -808,12 +812,13 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PropagateDeps: policySpec.PropagateDeps,
SchedulerName: policySpec.SchedulerName,
Placement: &policySpec.Placement,
Failover: policySpec.Failover,
ConflictResolution: policySpec.ConflictResolution,
Suspension: policySpec.Suspension,
PreserveResourcesOnDeletion: policySpec.PreserveResourcesOnDeletion,
Resource: workv1alpha2.ObjectReference{
APIVersion: object.GetAPIVersion(),
Kind: object.GetKind(),
Expand Down
Loading
Loading