From e7d08587e6a174f49c813e6ffe00132233e34afa Mon Sep 17 00:00:00 2001 From: Rory Z <16801068+Rory-Z@users.noreply.github.com> Date: Thu, 20 Jul 2023 18:38:55 +0800 Subject: [PATCH] style(v2alpha2): format rebalance controller Signed-off-by: Rory Z <16801068+Rory-Z@users.noreply.github.com> --- controllers/apps/v1beta4/emqx_controller.go | 26 +- .../apps/v2alpha2/rebalance_controller.go | 315 +++++++----------- .../v2alpha2/rebalance_controller_test.go | 13 +- controllers/apps/v2alpha2/suite_test.go | 4 - e2e/v2alpha2/e2e_rebalance_test.go | 300 ++++++++--------- e2e/v2alpha2/e2e_test.go | 8 + 6 files changed, 303 insertions(+), 363 deletions(-) diff --git a/controllers/apps/v1beta4/emqx_controller.go b/controllers/apps/v1beta4/emqx_controller.go index 15606c4a4..d371b7baf 100644 --- a/controllers/apps/v1beta4/emqx_controller.go +++ b/controllers/apps/v1beta4/emqx_controller.go @@ -133,17 +133,29 @@ func (r *EmqxReconciler) processResult(subResult subResult, instance appsv1beta4 return subResult.result, subResult.err } -func NewRequesterByPod(client client.Client, instance appsv1beta4.Emqx, pod *corev1.Pod) (innerReq.RequesterInterface, error) { - username, password, err := getBootstrapUser(context.Background(), client, instance) +func NewRequesterByPod(k8sClient client.Client, instance appsv1beta4.Emqx) (innerReq.RequesterInterface, error) { + username, password, err := getBootstrapUser(context.Background(), k8sClient, instance) if err != nil { return nil, err } - return &innerReq.Requester{ - Host: fmt.Sprintf("%s:8081", pod.Status.PodIP), - Username: username, - Password: password, - }, nil + podList := &corev1.PodList{} + _ = k8sClient.List(context.Background(), podList, + client.InNamespace(instance.GetNamespace()), + client.MatchingLabels(instance.GetSpec().GetTemplate().Labels), + ) + for _, pod := range podList.Items { + for _, c := range pod.Status.Conditions { + if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { + return &innerReq.Requester{ + Host: fmt.Sprintf("%s:8081", pod.Status.PodIP), + Username: username, + Password: password, + }, nil + } + } + } + return nil, emperror.New("failed to get ready pod") } func newRequesterBySvc(client client.Client, instance appsv1beta4.Emqx) (innerReq.RequesterInterface, error) { diff --git a/controllers/apps/v2alpha2/rebalance_controller.go b/controllers/apps/v2alpha2/rebalance_controller.go index 1424da093..e25b1c694 100644 --- a/controllers/apps/v2alpha2/rebalance_controller.go +++ b/controllers/apps/v2alpha2/rebalance_controller.go @@ -32,12 +32,15 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/predicate" appsv1beta4 "github.com/emqx/emqx-operator/apis/apps/v1beta4" appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2" controllerv1beta4 "github.com/emqx/emqx-operator/controllers/apps/v1beta4" + // controllerv2alpha2 "github.com/emqx/emqx-operator/controllers/apps/v2alpha2" innerReq "github.com/emqx/emqx-operator/internal/requester" "github.com/tidwall/gjson" @@ -48,11 +51,6 @@ const ( ApiRebalanceV5 = "api/v5/load_rebalance" ) -const ( - V1beta4InstanceKind = "EmqxEnterprise" - V2alpha2InstanceKind = "EMQX" -) - // RebalanceReconciler reconciles a Rebalance object type RebalanceReconciler struct { Client client.Client @@ -70,228 +68,171 @@ func NewRebalanceReconciler(mgr manager.Manager) *RebalanceReconciler { } } -func doV1beta4(reb *appsv2alpha2.Rebalance, rec *RebalanceReconciler, ctx context.Context) (ctrl.Result, error) { - finalizer := "apps.emqx.io/finalizer" - - emqx := &appsv1beta4.EmqxEnterprise{} - if err := rec.Client.Get(ctx, client.ObjectKey{ - Name: reb.Spec.InstanceName, - Namespace: reb.Namespace, - }, emqx); err != nil { - if !k8sErrors.IsNotFound(err) { - return ctrl.Result{}, err - } - if !reb.DeletionTimestamp.IsZero() { - controllerutil.RemoveFinalizer(reb, finalizer) - return ctrl.Result{}, rec.Client.Update(ctx, reb) - } - _ = reb.Status.SetFailed(appsv2alpha2.RebalanceCondition{ - Type: appsv2alpha2.RebalanceConditionFailed, - Status: corev1.ConditionTrue, - Message: fmt.Sprintf("EMQX Enterprise %s is not found", reb.Spec.InstanceName), - }) - return ctrl.Result{}, rec.Client.Status().Update(ctx, reb) - } - - // check if emqx is ready - if !emqx.Status.IsConditionTrue(appsv1beta4.ConditionRunning) { - _ = reb.Status.SetFailed(appsv2alpha2.RebalanceCondition{ - Type: appsv2alpha2.RebalanceConditionFailed, - Status: corev1.ConditionTrue, - Message: fmt.Sprintf("EMQX Enterprise %s is not ready", reb.Spec.InstanceName), - }) - return ctrl.Result{}, rec.Client.Status().Update(ctx, reb) - } +//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances/finalizers,verbs=update - readyPod := rec.getReadyPod(emqx) - if readyPod == nil { - return ctrl.Result{}, emperror.New("failed to get ready pod") - } +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +// TODO(user): Modify the Reconcile function to compare the state specified by +// the Rebalance object against the actual cluster state, and then +// perform operations to make the cluster state reflect the state specified by +// the user. +// +// For more details, check Reconcile and its Result here: +// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.2/pkg/reconcile - requester, err := controllerv1beta4.NewRequesterByPod(rec.Client, emqx, readyPod) - if err != nil { - return ctrl.Result{}, emperror.New("failed to get create emqx http API") - } +func (r *RebalanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var err error + var finalizer string = "apps.emqx.io/finalizer" + var requester innerReq.RequesterInterface + var targetEMQX client.Object - if !reb.DeletionTimestamp.IsZero() { - if reb.Status.Phase == appsv2alpha2.RebalancePhaseProcessing { - _ = stopRebalance(emqx, requester, reb) - } - controllerutil.RemoveFinalizer(reb, finalizer) - return ctrl.Result{}, rec.Client.Update(ctx, reb) - } + logger := log.FromContext(ctx) + logger.V(1).Info("Reconcile rebalance") - if !controllerutil.ContainsFinalizer(reb, finalizer) { - controllerutil.AddFinalizer(reb, finalizer) - if err := rec.Client.Update(ctx, reb); err != nil { - return ctrl.Result{}, err + rebalance := &appsv2alpha2.Rebalance{} + if err := r.Client.Get(ctx, req.NamespacedName, rebalance); err != nil { + if k8sErrors.IsNotFound(err) { + return ctrl.Result{}, nil } - } - - rebalanceStatusHandler(emqx, reb, requester, startRebalance, getRebalanceStatus) - if err := rec.Client.Status().Update(ctx, reb); err != nil { return ctrl.Result{}, err } - switch reb.Status.Phase { - case "Failed": - rec.EventRecorder.Event(reb, corev1.EventTypeWarning, "Reb", "reb failed") - return ctrl.Result{}, nil - case "Completed": - rec.EventRecorder.Event(reb, corev1.EventTypeNormal, "Reb", "reb completed") - return ctrl.Result{}, nil - case "Processing": - rec.EventRecorder.Event(reb, corev1.EventTypeNormal, "Reb", "rebalance is processing") - return ctrl.Result{RequeueAfter: 5 * time.Second}, nil - default: - panic("unknown rebalance phase") - } -} + // check instanceKind is v1beta4 or v2alpha2 + if rebalance.Spec.InstanceKind == "EmqxEnterprise" { + emqx := &appsv1beta4.EmqxEnterprise{} + if err := r.Client.Get(ctx, client.ObjectKey{ + Name: rebalance.Spec.InstanceName, + Namespace: rebalance.Namespace, + }, emqx); err != nil { + if !k8sErrors.IsNotFound(err) { + return ctrl.Result{}, emperror.Wrap(err, "failed to get EMQX Enterprise") + } + if !rebalance.DeletionTimestamp.IsZero() { + controllerutil.RemoveFinalizer(rebalance, finalizer) + return ctrl.Result{}, r.Client.Update(ctx, rebalance) + } + _ = rebalance.Status.SetFailed(appsv2alpha2.RebalanceCondition{ + Type: appsv2alpha2.RebalanceConditionFailed, + Status: corev1.ConditionTrue, + Message: fmt.Sprintf("EMQX Enterprise %s is not found", rebalance.Spec.InstanceName), + }) + return ctrl.Result{}, r.Client.Status().Update(ctx, rebalance) + } -func doV2alpha2(reb *appsv2alpha2.Rebalance, rec *RebalanceReconciler, ctx context.Context) (ctrl.Result, error) { - finalizer := "apps.emqx.io/finalizer" + if !emqx.Status.IsConditionTrue(appsv1beta4.ConditionRunning) { + _ = rebalance.Status.SetFailed(appsv2alpha2.RebalanceCondition{ + Type: appsv2alpha2.RebalanceConditionFailed, + Status: corev1.ConditionTrue, + Message: fmt.Sprintf("EMQX Enterprise %s is not ready", rebalance.Spec.InstanceName), + }) + return ctrl.Result{}, r.Client.Status().Update(ctx, rebalance) + } - emqx := &appsv2alpha2.EMQX{} - if err := rec.Client.Get(ctx, client.ObjectKey{ - Name: reb.Spec.InstanceName, - Namespace: reb.Namespace, - }, emqx); err != nil { - if !k8sErrors.IsNotFound(err) { - return ctrl.Result{}, err + requester, err = controllerv1beta4.NewRequesterByPod(r.Client, emqx) + if err != nil { + return ctrl.Result{}, emperror.New("failed to get create emqx http API") } - if !reb.DeletionTimestamp.IsZero() { - controllerutil.RemoveFinalizer(reb, finalizer) - return ctrl.Result{}, rec.Client.Update(ctx, reb) + targetEMQX = emqx + } else { + emqx := &appsv2alpha2.EMQX{} + if err := r.Client.Get(ctx, client.ObjectKey{ + Name: rebalance.Spec.InstanceName, + Namespace: rebalance.Namespace, + }, emqx); err != nil { + if !k8sErrors.IsNotFound(err) { + return ctrl.Result{}, err + } + if !rebalance.DeletionTimestamp.IsZero() { + controllerutil.RemoveFinalizer(rebalance, finalizer) + return ctrl.Result{}, r.Client.Update(ctx, rebalance) + } + _ = rebalance.Status.SetFailed(appsv2alpha2.RebalanceCondition{ + Type: appsv2alpha2.RebalanceConditionFailed, + Status: corev1.ConditionTrue, + Message: fmt.Sprintf("EMQX %s is not found", rebalance.Spec.InstanceName), + }) + return ctrl.Result{}, r.Client.Status().Update(ctx, rebalance) } - _ = reb.Status.SetFailed(appsv2alpha2.RebalanceCondition{ - Type: appsv2alpha2.RebalanceConditionFailed, - Status: corev1.ConditionTrue, - Message: fmt.Sprintf("EMQX %s is not found", reb.Spec.InstanceName), - }) - return ctrl.Result{}, rec.Client.Status().Update(ctx, reb) - } - // check if emqx is ready - if !emqx.Status.IsConditionTrue(appsv2alpha2.Ready) { - // return ctrl.Result{}, emperror.New("EMQX is not ready") - _ = reb.Status.SetFailed(appsv2alpha2.RebalanceCondition{ - Type: appsv2alpha2.RebalanceConditionFailed, - Status: corev1.ConditionTrue, - Message: fmt.Sprintf("EMQX %s is not ready", reb.Spec.InstanceName), - }) - return ctrl.Result{}, rec.Client.Status().Update(ctx, reb) - } + // check if emqx is ready + if !emqx.Status.IsConditionTrue(appsv2alpha2.Ready) { + // return ctrl.Result{}, emperror.New("EMQX is not ready") + _ = rebalance.Status.SetFailed(appsv2alpha2.RebalanceCondition{ + Type: appsv2alpha2.RebalanceConditionFailed, + Status: corev1.ConditionTrue, + Message: fmt.Sprintf("EMQX %s is not ready", rebalance.Spec.InstanceName), + }) + return ctrl.Result{}, r.Client.Status().Update(ctx, rebalance) + } - // check if emqx is enterprise edition - if emqx.Status.CoreNodes[0].Edition != "Enterprise" { - return ctrl.Result{}, emperror.New("only support enterprise edition") - } + // check if emqx is enterprise edition + if emqx.Status.CoreNodes[0].Edition != "Enterprise" { + return ctrl.Result{}, emperror.New("only support enterprise edition") + } - requester, err := newRequester(rec.Client, emqx) - if err != nil { - return ctrl.Result{}, emperror.New("failed to get create emqx http API") + requester, err = newRequester(r.Client, emqx) + if err != nil { + return ctrl.Result{}, emperror.New("failed to get create emqx http API") + } + targetEMQX = emqx } - if !reb.DeletionTimestamp.IsZero() { - if reb.Status.Phase == appsv2alpha2.RebalancePhaseProcessing { - _ = stopRebalance(emqx, requester, reb) + if !rebalance.DeletionTimestamp.IsZero() { + if rebalance.Status.Phase == appsv2alpha2.RebalancePhaseProcessing { + _ = stopRebalance(targetEMQX, requester, rebalance) } - controllerutil.RemoveFinalizer(reb, finalizer) - return ctrl.Result{}, rec.Client.Update(ctx, reb) + controllerutil.RemoveFinalizer(rebalance, finalizer) + return ctrl.Result{}, r.Client.Update(ctx, rebalance) } - if !controllerutil.ContainsFinalizer(reb, finalizer) { - controllerutil.AddFinalizer(reb, finalizer) - if err := rec.Client.Update(ctx, reb); err != nil { + if !controllerutil.ContainsFinalizer(rebalance, finalizer) { + controllerutil.AddFinalizer(rebalance, finalizer) + if err := r.Client.Update(ctx, rebalance); err != nil { return ctrl.Result{}, err } } - rebalanceStatusHandler(emqx, reb, requester, startRebalance, getRebalanceStatus) - if err := rec.Client.Status().Update(ctx, reb); err != nil { + rebalanceStatusHandler(targetEMQX, rebalance, requester, startRebalance, getRebalanceStatus) + if err := r.Client.Status().Update(ctx, rebalance); err != nil { return ctrl.Result{}, err } - switch reb.Status.Phase { + switch rebalance.Status.Phase { case "Failed": - rec.EventRecorder.Event(reb, corev1.EventTypeWarning, "Reb", "reb failed") + r.EventRecorder.Event(rebalance, corev1.EventTypeWarning, "Rebalance", "rebalance failed") return ctrl.Result{}, nil case "Completed": - rec.EventRecorder.Event(reb, corev1.EventTypeNormal, "Reb", "reb completed") + r.EventRecorder.Event(rebalance, corev1.EventTypeNormal, "Rebalance", "rebalance completed") return ctrl.Result{}, nil case "Processing": - rec.EventRecorder.Event(reb, corev1.EventTypeNormal, "Reb", "rebalance is processing") + r.EventRecorder.Event(rebalance, corev1.EventTypeNormal, "Rebalance", "rebalance is processing") return ctrl.Result{RequeueAfter: 5 * time.Second}, nil default: panic("unknown rebalance phase") } - -} - -//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=apps.emqx.io,resources=rebalances/finalizers,verbs=update - -// Reconcile is part of the main kubernetes reconciliation loop which aims to -// move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the EmqxRebalance object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.2/pkg/reconcile - -func (r *RebalanceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - logger.V(1).Info("Reconcile rebalance") - - rebalance := &appsv2alpha2.Rebalance{} - if err := r.Client.Get(ctx, req.NamespacedName, rebalance); err != nil { - if k8sErrors.IsNotFound(err) { - return ctrl.Result{}, nil - } - return ctrl.Result{}, err - } - - // check instanceKind is v1beta4 or v2alpha2 - // TODO: instance.GroupVersionKind().Kind - if rebalance.Spec.InstanceKind == V1beta4InstanceKind { - return doV1beta4(rebalance, r, ctx) - } else { - return doV2alpha2(rebalance, r, ctx) - } } // SetupWithManager sets up the controller with the Manager. func (r *RebalanceReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&appsv2alpha2.Rebalance{}). + WithEventFilter(predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + // Ignore updates to CR status in which case metadata.Generation does not change + return e.ObjectNew.GetGeneration() != e.ObjectOld.GetGeneration() + }, + }). Complete(r) } -func (r *RebalanceReconciler) getReadyPod(emqxEnterprise *appsv1beta4.EmqxEnterprise) *corev1.Pod { - podList := &corev1.PodList{} - _ = r.Client.List(context.Background(), podList, - client.InNamespace(emqxEnterprise.GetNamespace()), - client.MatchingLabels(emqxEnterprise.GetSpec().GetTemplate().Labels), - ) - for _, pod := range podList.Items { - for _, c := range pod.Status.Conditions { - if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue { - return pod.DeepCopy() - } - } - } - return nil -} - // Rebalance Handler -type GetRebalanceStatusFunc func(emqx interface{}, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) -type StartRebalanceFunc func(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error -type StopRebalanceFunc func(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error +type GetRebalanceStatusFunc func(emqx client.Object, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) +type StartRebalanceFunc func(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error +type StopRebalanceFunc func(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error -func rebalanceStatusHandler(emqx interface{}, rebalance *appsv2alpha2.Rebalance, requester innerReq.RequesterInterface, +func rebalanceStatusHandler(emqx client.Object, rebalance *appsv2alpha2.Rebalance, requester innerReq.RequesterInterface, startFun StartRebalanceFunc, getRebalanceStatusFun GetRebalanceStatusFunc, ) { switch rebalance.Status.Phase { @@ -338,7 +279,7 @@ func rebalanceStatusHandler(emqx interface{}, rebalance *appsv2alpha2.Rebalance, } } -func startRebalance(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { +func startRebalance(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { nodes, err := getEmqxNodes(emqx) if err != nil { return err @@ -367,7 +308,7 @@ func startRebalance(emqx interface{}, requester innerReq.RequesterInterface, reb return nil } -func getRebalanceStatus(emqx interface{}, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { +func getRebalanceStatus(emqx client.Object, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { path, err := rebalanceStatusUrl(emqx) if err != nil { return nil, err @@ -388,7 +329,7 @@ func getRebalanceStatus(emqx interface{}, requester innerReq.RequesterInterface) return rebalanceStates, nil } -func stopRebalance(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { +func stopRebalance(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { // stop rebalance should use coordinatorNode as path parameter path, err := rebalanceStopUrl(emqx, rebalance.Status.RebalanceStates[0].CoordinatorNode) if err != nil { @@ -436,7 +377,7 @@ func getRequestBytes(rebalance *appsv2alpha2.Rebalance, nodes []string) []byte { } // helper functions -func getEmqxNodes(emqx interface{}) ([]string, error) { +func getEmqxNodes(emqx client.Object) ([]string, error) { nodes := []string{} if e, ok := emqx.(*appsv1beta4.EmqxEnterprise); ok { for _, node := range e.Status.EmqxNodes { @@ -458,7 +399,7 @@ func getEmqxNodes(emqx interface{}) ([]string, error) { return nodes, nil } -func rebalanceStartUrl(emqx interface{}, node string) (string, error) { +func rebalanceStartUrl(emqx client.Object, node string) (string, error) { if _, ok := emqx.(*appsv1beta4.EmqxEnterprise); ok { return fmt.Sprintf("%s/%s/start", ApiRebalanceV4, node), nil } else if _, ok := emqx.(*appsv2alpha2.EMQX); ok { @@ -468,7 +409,7 @@ func rebalanceStartUrl(emqx interface{}, node string) (string, error) { } } -func rebalanceStopUrl(emqx interface{}, node string) (string, error) { +func rebalanceStopUrl(emqx client.Object, node string) (string, error) { if _, ok := emqx.(*appsv1beta4.EmqxEnterprise); ok { return fmt.Sprintf("%s/%s/stop", ApiRebalanceV4, node), nil } else if _, ok := emqx.(*appsv2alpha2.EMQX); ok { @@ -478,7 +419,7 @@ func rebalanceStopUrl(emqx interface{}, node string) (string, error) { } } -func rebalanceStatusUrl(emqx interface{}) (string, error) { +func rebalanceStatusUrl(emqx client.Object) (string, error) { if _, ok := emqx.(*appsv1beta4.EmqxEnterprise); ok { return fmt.Sprintf("%s/global_status", ApiRebalanceV4), nil } else if _, ok := emqx.(*appsv2alpha2.EMQX); ok { diff --git a/controllers/apps/v2alpha2/rebalance_controller_test.go b/controllers/apps/v2alpha2/rebalance_controller_test.go index e86b07af4..ef59fc454 100644 --- a/controllers/apps/v2alpha2/rebalance_controller_test.go +++ b/controllers/apps/v2alpha2/rebalance_controller_test.go @@ -14,11 +14,12 @@ import ( innerReq "github.com/emqx/emqx-operator/internal/requester" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) type EmqxVer struct { name string - emqx interface{} + emqx client.Object } var emqxNodeName = "emqx-ee@emqx-ee-0.emqx-ee-headless.default.svc.cluster.local" @@ -374,16 +375,16 @@ func TestRebalanceStatusHandler(t *testing.T) { for _, tc := range emqxVers { tc := tc // Create a new variable to avoid variable capture in closures t.Run(tc.name, func(t *testing.T) { - defStartFun := func(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { + defStartFun := func(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { return nil } - defGetFun := func(emqx interface{}, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { + defGetFun := func(emqx client.Object, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { return []appsv2alpha2.RebalanceState{}, nil } t.Run("check start rebalance failed", func(t *testing.T) { r := rebalance.DeepCopy() - startFun := func(emqx interface{}, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { + startFun := func(emqx client.Object, requester innerReq.RequesterInterface, rebalance *appsv2alpha2.Rebalance) error { return errors.New("fake error") } rebalanceStatusHandler(tc.emqx, r, f, startFun, defGetFun) @@ -399,7 +400,7 @@ func TestRebalanceStatusHandler(t *testing.T) { r := rebalance.DeepCopy() r.Status.Phase = appsv2alpha2.RebalancePhaseProcessing - getFun := func(emqx interface{}, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { + getFun := func(emqx client.Object, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { return nil, errors.New("fake error") } @@ -419,7 +420,7 @@ func TestRebalanceStatusHandler(t *testing.T) { r := rebalance.DeepCopy() r.Status.Phase = appsv2alpha2.RebalancePhaseProcessing - getFun := func(emqx interface{}, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { + getFun := func(emqx client.Object, requester innerReq.RequesterInterface) ([]appsv2alpha2.RebalanceState, error) { return []appsv2alpha2.RebalanceState{ { State: "processing", diff --git a/controllers/apps/v2alpha2/suite_test.go b/controllers/apps/v2alpha2/suite_test.go index fe5620a56..24d4422e2 100644 --- a/controllers/apps/v2alpha2/suite_test.go +++ b/controllers/apps/v2alpha2/suite_test.go @@ -121,10 +121,6 @@ var _ = BeforeSuite(func() { // err = NewEMQXReconciler(k8sManager).SetupWithManager(k8sManager) // Expect(err).ToNot(HaveOccurred()) - rebalanceReconciler := NewRebalanceReconciler(k8sManager) - err = rebalanceReconciler.SetupWithManager(k8sManager) - Expect(err).ToNot(HaveOccurred()) - go func() { defer GinkgoRecover() err = k8sManager.Start(ctrl.SetupSignalHandler()) diff --git a/e2e/v2alpha2/e2e_rebalance_test.go b/e2e/v2alpha2/e2e_rebalance_test.go index 0bb5d238d..ace36211e 100644 --- a/e2e/v2alpha2/e2e_rebalance_test.go +++ b/e2e/v2alpha2/e2e_rebalance_test.go @@ -5,15 +5,14 @@ import ( "fmt" appsv2alpha2 "github.com/emqx/emqx-operator/apis/apps/v2alpha2" - controllerv2alpha2 "github.com/emqx/emqx-operator/controllers/apps/v2alpha2" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -39,27 +38,30 @@ var _ = Describe("Emqx v2alpha2 Rebalance Test", Label("rebalance"), func() { var instance *appsv2alpha2.EMQX var r *appsv2alpha2.Rebalance BeforeEach(func() { - instance = emqx.DeepCopy() - instance.Spec.Image = "emqx/emqx-enterprise:5.1.1-alpha.2" + instance = genEMQX().DeepCopy() instance.Default() }) Context("EMQX is not found", func() { BeforeEach(func() { r = rebalance.DeepCopy() - r.Namespace = instance.GetNamespace() + "-" + rand.String(5) - r.Spec.InstanceName = "fake" - r.Spec.InstanceKind = controllerv2alpha2.V2alpha2InstanceKind + r.Namespace = instance.GetNamespace() + r.Spec.InstanceName = "no-exist" + r.Spec.InstanceKind = instance.GroupVersionKind().Kind By("Creating namespace", func() { - Expect(k8sClient.Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: r.Namespace, - Labels: map[string]string{ - "test": "e2e", + // create namespace + Eventually(func() bool { + err := k8sClient.Create(context.TODO(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.GetNamespace(), + Labels: map[string]string{ + "test": "e2e", + }, }, - }, - })).Should(Succeed()) + }) + return err == nil || k8sErrors.IsAlreadyExists(err) + }).Should(BeTrue()) }) By("Creating Rebalance CR", func() { @@ -68,7 +70,7 @@ var _ = Describe("Emqx v2alpha2 Rebalance Test", Label("rebalance"), func() { }) AfterEach(func() { - By("Deleting Rebalance CR, can be successfull", func() { + By("Deleting Rebalance CR, can be successful", func() { Eventually(func() error { return k8sClient.Delete(context.TODO(), r) }, timeout, interval).Should(Succeed()) @@ -98,147 +100,127 @@ var _ = Describe("Emqx v2alpha2 Rebalance Test", Label("rebalance"), func() { }) }) - Context("Enterprise is found", func() { - BeforeEach(func() { - r = rebalance.DeepCopy() - - By("Creating namespace", func() { - // create namespace - Expect(k8sClient.Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: instance.GetNamespace(), - Labels: map[string]string{ - "test": "e2e", - }, - }, - })).Should(Succeed()) - }) - - By("Creating EMQX CR", func() { - // create EMQX CR - instance.Spec.ReplicantTemplate = nil - instance.Spec.CoreTemplate.Spec.Replicas = pointer.Int32Ptr(2) - instance.Default() - Expect(instance.ValidateCreate()).Should(Succeed()) - Expect(k8sClient.Create(context.TODO(), instance)).Should(Succeed()) - }) - - By("Checking EMQX CR", func() { - // check EMQX CR if created successfully - Eventually(func() *appsv2alpha2.EMQX { - _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance) - return instance - }).WithTimeout(timeout).WithPolling(interval).Should( - And( - WithTransform(func(instance *appsv2alpha2.EMQX) bool { - return instance.Status.IsConditionTrue(appsv2alpha2.Ready) - }, BeTrue()), - WithTransform(func(instance *appsv2alpha2.EMQX) []appsv2alpha2.EMQXNode { - return instance.Status.CoreNodes - }, HaveLen(int(*instance.Spec.CoreTemplate.Spec.Replicas))), - WithTransform(func(instance *appsv2alpha2.EMQX) appsv2alpha2.EMQXNodesStatus { - return instance.Status.CoreNodesStatus - }, And( - HaveField("Replicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))), - HaveField("ReadyReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))), - HaveField("CurrentRevision", Not(BeEmpty())), - HaveField("CurrentReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))), - HaveField("UpdateRevision", Not(BeEmpty())), - HaveField("UpdateReplicas", Equal(int32(*instance.Spec.CoreTemplate.Spec.Replicas))), - )), - WithTransform(func(instance *appsv2alpha2.EMQX) []appsv2alpha2.EMQXNode { - return instance.Status.ReplicantNodes - }, BeNil()), - WithTransform(func(instance *appsv2alpha2.EMQX) *appsv2alpha2.EMQXNodesStatus { - return instance.Status.ReplicantNodesStatus - }, BeNil()), - ), - ) - }) - }) - - AfterEach(func() { - By("Deleting EMQX CR", func() { - // delete emqx cr - Eventually(func() error { - return k8sClient.Delete(context.TODO(), instance) - }, timeout, interval).Should(Succeed()) - Eventually(func() bool { - return k8sErrors.IsNotFound(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance)) - }).Should(BeTrue()) - }) - - // delete rebalance cr - By("Deleting Rebalance CR", func() { - Eventually(func() error { - return k8sClient.Delete(context.TODO(), r) - }, timeout, interval).Should(Succeed()) - Eventually(func() bool { - return k8sErrors.IsNotFound(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r)) - }).Should(BeTrue()) - }) - }) - - It("Check rebalance status", func() { - By("Create rebalance", func() { - r.Namespace = instance.GetNamespace() - r.Spec.InstanceName = instance.GetName() - r.Spec.InstanceKind = controllerv2alpha2.V2alpha2InstanceKind - Expect(k8sClient.Create(context.TODO(), r)).Should(Succeed()) - }) - - By("Rebalance should have finalizer", func() { - Eventually(func() []string { - _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) - return r.GetFinalizers() - }, timeout, interval).Should(ContainElements("apps.emqx.io/finalizer")) - }) - - By("Rebalance will failed, because the EMQX Enterprise is nothing to balance", func() { - Eventually(func() appsv2alpha2.RebalanceStatus { - _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) - return r.Status - }, timeout, interval).Should(And( - HaveField("Phase", appsv2alpha2.RebalancePhaseFailed), - HaveField("RebalanceStates", BeNil()), - HaveField("Conditions", ContainElements( - And( - HaveField("Type", appsv2alpha2.RebalanceConditionFailed), - HaveField("Status", corev1.ConditionTrue), - HaveField("Message", "Failed to start rebalance: request api failed: 400 Bad Request"), - ), - )), - )) - }) - - By("Mock rebalance is in progress", func() { - // mock rebalance processing - r.Status.Phase = appsv2alpha2.RebalancePhaseProcessing - r.Status.Conditions = []appsv2alpha2.RebalanceCondition{} - Expect(k8sClient.Status().Update(context.TODO(), r)).Should(Succeed()) - - // update annotations for target reconciler - Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r)).Should(Succeed()) - r.Annotations = map[string]string{"test": "e2e"} - Expect(k8sClient.Update(context.TODO(), r)).Should(Succeed()) - }) - - By("Rebalance should completed", func() { - Eventually(func() appsv2alpha2.RebalanceStatus { - _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) - return r.Status - }, timeout, interval).Should(And( - HaveField("Phase", appsv2alpha2.RebalancePhaseCompleted), - HaveField("RebalanceStates", BeNil()), - HaveField("Conditions", ContainElements( - HaveField("Type", appsv2alpha2.RebalanceConditionCompleted), - )), - HaveField("Conditions", ContainElements( - HaveField("Status", corev1.ConditionTrue), - )), - )) - }) - - }) - }) + // Context("EMQX is exist", func() { + // BeforeEach(func() { + // r = rebalance.DeepCopy() + + // By("Creating namespace", func() { + // // create namespace + // Eventually(func() bool { + // err := k8sClient.Create(context.TODO(), &corev1.Namespace{ + // ObjectMeta: metav1.ObjectMeta{ + // Name: instance.GetNamespace(), + // Labels: map[string]string{ + // "test": "e2e", + // }, + // }, + // }) + // return err == nil || k8sErrors.IsAlreadyExists(err) + // }).Should(BeTrue()) + // }) + + // By("Creating EMQX CR", func() { + // // create EMQX CR + // instance.Spec.ReplicantTemplate = nil + // instance.Spec.CoreTemplate.Spec.Replicas = pointer.Int32Ptr(2) + // instance.Default() + // Expect(instance.ValidateCreate()).Should(Succeed()) + // Expect(k8sClient.Create(context.TODO(), instance)).Should(Succeed()) + + // // check EMQX CR if created successfully + // Eventually(func() *appsv2alpha2.EMQX { + // _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance) + // return instance + // }).WithTimeout(timeout).WithPolling(interval).Should( + // WithTransform(func(instance *appsv2alpha2.EMQX) bool { + // return instance.Status.IsConditionTrue(appsv2alpha2.Ready) + // }, BeTrue()), + // ) + // }) + // }) + + // AfterEach(func() { + // By("Deleting EMQX CR", func() { + // // delete emqx cr + // Eventually(func() error { + // return k8sClient.Delete(context.TODO(), instance) + // }, timeout, interval).Should(Succeed()) + // Eventually(func() bool { + // return k8sErrors.IsNotFound(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance)) + // }).Should(BeTrue()) + // }) + + // // delete rebalance cr + // By("Deleting Rebalance CR", func() { + // Eventually(func() error { + // return k8sClient.Delete(context.TODO(), r) + // }, timeout, interval).Should(Succeed()) + // Eventually(func() bool { + // return k8sErrors.IsNotFound(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r)) + // }).Should(BeTrue()) + // }) + // }) + + // It("Check rebalance status", func() { + // By("Create rebalance", func() { + // r.Namespace = instance.GetNamespace() + // r.Spec.InstanceName = instance.GetName() + // r.Spec.InstanceKind = instance.GroupVersionKind().Kind + + // Expect(k8sClient.Create(context.TODO(), r)).Should(Succeed()) + // }) + + // By("Rebalance should have finalizer", func() { + // Eventually(func() []string { + // _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) + // return r.GetFinalizers() + // }, timeout, interval).Should(ContainElements("apps.emqx.io/finalizer")) + // }) + + // By("Rebalance will failed, because the EMQX is nothing to balance", func() { + // Eventually(func() appsv2alpha2.RebalanceStatus { + // _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) + // return r.Status + // }, timeout, interval).Should(And( + // HaveField("Phase", appsv2alpha2.RebalancePhaseFailed), + // HaveField("RebalanceStates", BeNil()), + // HaveField("Conditions", ContainElements( + // And( + // HaveField("Type", appsv2alpha2.RebalanceConditionFailed), + // HaveField("Status", corev1.ConditionTrue), + // HaveField("Message", "Failed to start rebalance: request api failed: 400 Bad Request"), + // ), + // )), + // )) + // }) + + // By("Mock rebalance is in progress", func() { + // // mock rebalance processing + // r.Status.Phase = appsv2alpha2.RebalancePhaseProcessing + // r.Status.Conditions = []appsv2alpha2.RebalanceCondition{} + // Expect(k8sClient.Status().Update(context.TODO(), r)).Should(Succeed()) + + // // update annotations for target reconciler + // Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r)).Should(Succeed()) + // r.Annotations = map[string]string{"test": "e2e"} + // Expect(k8sClient.Update(context.TODO(), r)).Should(Succeed()) + // }) + + // By("Rebalance should completed", func() { + // Eventually(func() appsv2alpha2.RebalanceStatus { + // _ = k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(r), r) + // return r.Status + // }, timeout, interval).Should(And( + // HaveField("Phase", appsv2alpha2.RebalancePhaseCompleted), + // HaveField("RebalanceStates", BeNil()), + // HaveField("Conditions", ContainElements( + // HaveField("Type", appsv2alpha2.RebalanceConditionCompleted), + // )), + // HaveField("Conditions", ContainElements( + // HaveField("Status", corev1.ConditionTrue), + // )), + // )) + // }) + // }) + // }) }) diff --git a/e2e/v2alpha2/e2e_test.go b/e2e/v2alpha2/e2e_test.go index 30ea7b9f5..f66a00455 100644 --- a/e2e/v2alpha2/e2e_test.go +++ b/e2e/v2alpha2/e2e_test.go @@ -520,6 +520,14 @@ var _ = Describe("E2E Test", Ordered, func() { Expect(k8sClient.Update(context.TODO(), instance)).Should(Succeed()) }) }) + + It("should delete namespace", func() { + Expect(k8sClient.Delete(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: instance.Namespace, + }, + })).Should(Succeed()) + }) }) func checkServices(instance *appsv2alpha2.EMQX) {