diff --git a/Makefile b/Makefile index 6b52cf1e8..250fa007c 100644 --- a/Makefile +++ b/Makefile @@ -80,6 +80,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config. cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG} $(KUSTOMIZE) build config/default | kubectl apply -f - + kubectl wait --for=condition=Ready pods -l "control-plane=controller-manager" -n emqx-operator-system undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config. $(KUSTOMIZE) build config/default | kubectl delete -f - diff --git a/controllers/apps/v2beta1/emqx_controller.go b/controllers/apps/v2beta1/emqx_controller.go index 25b0e0c28..5f9cc4f33 100644 --- a/controllers/apps/v2beta1/emqx_controller.go +++ b/controllers/apps/v2beta1/emqx_controller.go @@ -118,9 +118,9 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl. &addSvc{r}, &addCore{r}, &addRepl{r}, - &updateStatus{r}, &updatePodConditions{r}, &syncPods{r}, + &updateStatus{r}, } { subResult := subReconciler.reconcile(ctx, instance, requester) if !subResult.result.IsZero() { diff --git a/controllers/apps/v2beta1/sync_pods.go b/controllers/apps/v2beta1/sync_pods.go index ed7e4dee2..eb778f71a 100644 --- a/controllers/apps/v2beta1/sync_pods.go +++ b/controllers/apps/v2beta1/sync_pods.go @@ -12,8 +12,10 @@ import ( innerReq "github.com/emqx/emqx-operator/internal/requester" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client" ) type syncPods struct { @@ -30,18 +32,18 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r targetedEMQXNodesName := []string{} if appsv2beta1.IsExistReplicant(instance) { - if currentRs != nil { + if updateRs != nil { for _, node := range instance.Status.ReplicantNodes { - if node.ControllerUID == currentRs.UID { + if node.ControllerUID == updateRs.UID { targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node) } } } } else { - if currentSts != nil { + if updateSts != nil { for _, node := range instance.Status.CoreNodes { - if node.ControllerUID == currentSts.UID { + if node.ControllerUID == updateSts.UID { targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node) } } @@ -63,10 +65,19 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r return subResult{err: emperror.Wrap(err, "failed update pod deletion cost")} } - currentRs.Spec.Replicas = pointer.Int32(instance.Status.ReplicantNodesStatus.CurrentReplicas - 1) - if err := s.Client.Update(ctx, currentRs); err != nil { - return subResult{err: emperror.Wrap(err, "failed to scale down old replicaSet")} + pod := &corev1.Pod{} + if err := s.Client.Get(ctx, client.ObjectKeyFromObject(shouldDeletePod), pod); err != nil { + if !k8sErrors.IsNotFound(err) { + return subResult{err: emperror.Wrap(err, "failed to get should delete pod")} + } + } + if _, ok := pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]; ok && pod.DeletionTimestamp == nil { + currentRs.Spec.Replicas = pointer.Int32(instance.Status.ReplicantNodesStatus.CurrentReplicas - 1) + if err := s.Client.Update(ctx, currentRs); err != nil { + return subResult{err: emperror.Wrap(err, "failed to scale down old replicaSet")} + } } + } return subResult{} } @@ -94,33 +105,62 @@ func (s *syncPods) canBeScaleDownRs( oldRs *appsv1.ReplicaSet, targetedEMQXNodesName []string, ) (*corev1.Pod, error) { + var shouldDeletePod *corev1.Pod + var shouldDeletePodInfo *appsv2beta1.EMQXNode + var err error + if !checkInitialDelaySecondsReady(instance) { return nil, nil } oldRsPods := getRsPodMap(ctx, s.Client, instance)[oldRs.UID] - sort.Sort(PodsByNameOlder(oldRsPods)) if len(oldRsPods) == 0 { return nil, nil } - shouldDeletePod := oldRsPods[0].DeepCopy() - shouldDeletePodInfo, err := getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s", shouldDeletePod.Status.PodIP)) - if err != nil { - return nil, emperror.Wrap(err, "failed to get node info by API") - } + if len(instance.Status.NodeEvacuationsStatus) > 0 { + if instance.Status.NodeEvacuationsStatus[0].State != "prohibiting" { + return nil, nil + } + emqxNode := instance.Status.NodeEvacuationsStatus[0].Node + FindPod: + for _, node := range instance.Status.ReplicantNodes { + if node.Node == emqxNode { + shouldDeletePodUID := node.PodUID + for _, pod := range oldRsPods { + if pod.UID == shouldDeletePodUID { + shouldDeletePod = pod.DeepCopy() + shouldDeletePodInfo = &node + break FindPod + } + } + } + } + } else { + sort.Sort(PodsByNameOlder(oldRsPods)) + shouldDeletePod = oldRsPods[0].DeepCopy() + for _, pod := range oldRsPods { + if _, ok := pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]; ok { + shouldDeletePod = pod.DeepCopy() + break + } + } - if shouldDeletePodInfo.NodeStatus == "stopped" { - return shouldDeletePod, nil + shouldDeletePodInfo, err = getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s", shouldDeletePod.Status.PodIP)) + if err != nil { + return nil, emperror.Wrap(err, "failed to get node info by API") + } + + if shouldDeletePodInfo.NodeStatus == "stopped" { + return shouldDeletePod, nil + } } if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 { - if len(instance.Status.NodeEvacuationsStatus) == 0 { - if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { - return nil, emperror.Wrap(err, "failed to start node evacuation") - } - s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) + if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { + return nil, emperror.Wrap(err, "failed to start node evacuation") } + s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) return nil, nil } @@ -138,6 +178,10 @@ func (s *syncPods) canBeScaleDownSts( oldSts *appsv1.StatefulSet, targetedEMQXNodesName []string, ) (bool, error) { + var shouldDeletePod *corev1.Pod + var shouldDeletePodInfo *appsv2beta1.EMQXNode + var err error + if appsv2beta1.IsExistReplicant(instance) { if instance.Status.ReplicantNodesStatus.CurrentRevision != instance.Status.ReplicantNodesStatus.UpdateRevision { return false, nil @@ -148,13 +192,19 @@ func (s *syncPods) canBeScaleDownSts( return false, nil } - shouldDeletePod := &corev1.Pod{} + if len(instance.Status.NodeEvacuationsStatus) > 0 { + if instance.Status.NodeEvacuationsStatus[0].State != "prohibiting" { + return false, nil + } + } + + shouldDeletePod = &corev1.Pod{} _ = s.Client.Get(ctx, types.NamespacedName{ Namespace: instance.Namespace, Name: fmt.Sprintf("%s-%d", oldSts.Name, *oldSts.Spec.Replicas-1), }, shouldDeletePod) - shouldDeletePodInfo, err := getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", shouldDeletePod.Name, oldSts.Spec.ServiceName, oldSts.Namespace)) + shouldDeletePodInfo, err = getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", shouldDeletePod.Name, oldSts.Spec.ServiceName, oldSts.Namespace)) if err != nil { return false, emperror.Wrap(err, "failed to get node info by API") } @@ -164,12 +214,10 @@ func (s *syncPods) canBeScaleDownSts( } if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 { - if len(instance.Status.NodeEvacuationsStatus) == 0 { - if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { - return false, emperror.Wrap(err, "failed to start node evacuation") - } - s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) + if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { + return false, emperror.Wrap(err, "failed to start node evacuation") } + s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) return false, nil } // Open Source or Enterprise with no session diff --git a/controllers/apps/v2beta1/sync_pods_suite_test.go b/controllers/apps/v2beta1/sync_pods_suite_test.go index 28b68cbb8..d742ca5dc 100644 --- a/controllers/apps/v2beta1/sync_pods_suite_test.go +++ b/controllers/apps/v2beta1/sync_pods_suite_test.go @@ -455,6 +455,17 @@ var _ = Describe("check can be scale down", func() { Expect(canBeScaledDown).Should(BeNil()) }) + It("emqx is in node evacuations", func() { + instance.Status.NodeEvacuationsStatus = []appsv2beta1.NodeEvacuationStatus{ + { + State: "fake", + }, + } + canBeScaledDown, err := s.canBeScaleDownRs(ctx, instance, fakeR, oldRs, []string{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(canBeScaledDown).Should(BeNil()) + }) + It("emqx is enterprise, and node session more than 0", func() { fakeR.ReqFunc = func(method string, url url.URL, body []byte, header http.Header) (resp *http.Response, respBody []byte, err error) { resp = &http.Response{