Skip to content

Commit

Permalink
fix(v2beta1): fix replicant node blue green update error
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jul 29, 2023
1 parent 0f85a47 commit 722a3c0
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 28 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ uninstall: manifests kustomize ## Uninstall CRDs from the K8s cluster specified
deploy: manifests kustomize ## Deploy controller to the K8s cluster specified in ~/.kube/config.
cd config/manager && $(KUSTOMIZE) edit set image controller=${IMG}
$(KUSTOMIZE) build config/default | kubectl apply -f -
kubectl wait --for=condition=Ready pods -l "control-plane=controller-manager" -n emqx-operator-system

undeploy: ## Undeploy controller from the K8s cluster specified in ~/.kube/config.
$(KUSTOMIZE) build config/default | kubectl delete -f -
Expand Down
2 changes: 1 addition & 1 deletion controllers/apps/v2beta1/emqx_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
&addSvc{r},
&addCore{r},
&addRepl{r},
&updateStatus{r},
&updatePodConditions{r},
&syncPods{r},
&updateStatus{r},
} {
subResult := subReconciler.reconcile(ctx, instance, requester)
if !subResult.result.IsZero() {
Expand Down
102 changes: 75 additions & 27 deletions controllers/apps/v2beta1/sync_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
innerReq "github.com/emqx/emqx-operator/internal/requester"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type syncPods struct {
Expand All @@ -30,18 +32,18 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r

targetedEMQXNodesName := []string{}
if appsv2beta1.IsExistReplicant(instance) {
if currentRs != nil {
if updateRs != nil {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
if node.ControllerUID == updateRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
}

} else {
if currentSts != nil {
if updateSts != nil {
for _, node := range instance.Status.CoreNodes {
if node.ControllerUID == currentSts.UID {
if node.ControllerUID == updateSts.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
Expand All @@ -63,10 +65,19 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r
return subResult{err: emperror.Wrap(err, "failed update pod deletion cost")}
}

currentRs.Spec.Replicas = pointer.Int32(instance.Status.ReplicantNodesStatus.CurrentReplicas - 1)
if err := s.Client.Update(ctx, currentRs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to scale down old replicaSet")}
pod := &corev1.Pod{}
if err := s.Client.Get(ctx, client.ObjectKeyFromObject(shouldDeletePod), pod); err != nil {
if !k8sErrors.IsNotFound(err) {
return subResult{err: emperror.Wrap(err, "failed to get should delete pod")}
}
}
if _, ok := pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]; ok && pod.DeletionTimestamp == nil {
currentRs.Spec.Replicas = pointer.Int32(instance.Status.ReplicantNodesStatus.CurrentReplicas - 1)
if err := s.Client.Update(ctx, currentRs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to scale down old replicaSet")}
}
}

}
return subResult{}
}
Expand Down Expand Up @@ -94,33 +105,62 @@ func (s *syncPods) canBeScaleDownRs(
oldRs *appsv1.ReplicaSet,
targetedEMQXNodesName []string,
) (*corev1.Pod, error) {
var shouldDeletePod *corev1.Pod
var shouldDeletePodInfo *appsv2beta1.EMQXNode
var err error

if !checkInitialDelaySecondsReady(instance) {
return nil, nil
}

oldRsPods := getRsPodMap(ctx, s.Client, instance)[oldRs.UID]
sort.Sort(PodsByNameOlder(oldRsPods))
if len(oldRsPods) == 0 {
return nil, nil
}

shouldDeletePod := oldRsPods[0].DeepCopy()
shouldDeletePodInfo, err := getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s", shouldDeletePod.Status.PodIP))
if err != nil {
return nil, emperror.Wrap(err, "failed to get node info by API")
}
if len(instance.Status.NodeEvacuationsStatus) > 0 {
if instance.Status.NodeEvacuationsStatus[0].State != "prohibiting" {
return nil, nil
}
emqxNode := instance.Status.NodeEvacuationsStatus[0].Node
FindPod:
for _, node := range instance.Status.ReplicantNodes {
if node.Node == emqxNode {
shouldDeletePodUID := node.PodUID
for _, pod := range oldRsPods {
if pod.UID == shouldDeletePodUID {
shouldDeletePod = pod.DeepCopy()
shouldDeletePodInfo = &node
break FindPod
}
}
}
}
} else {
sort.Sort(PodsByNameOlder(oldRsPods))
shouldDeletePod = oldRsPods[0].DeepCopy()
for _, pod := range oldRsPods {
if _, ok := pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]; ok {
shouldDeletePod = pod.DeepCopy()
break
}
}

if shouldDeletePodInfo.NodeStatus == "stopped" {
return shouldDeletePod, nil
shouldDeletePodInfo, err = getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s", shouldDeletePod.Status.PodIP))
if err != nil {
return nil, emperror.Wrap(err, "failed to get node info by API")
}

if shouldDeletePodInfo.NodeStatus == "stopped" {
return shouldDeletePod, nil
}
}

if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return nil, nil
}

Expand All @@ -138,6 +178,10 @@ func (s *syncPods) canBeScaleDownSts(
oldSts *appsv1.StatefulSet,
targetedEMQXNodesName []string,
) (bool, error) {
var shouldDeletePod *corev1.Pod
var shouldDeletePodInfo *appsv2beta1.EMQXNode
var err error

if appsv2beta1.IsExistReplicant(instance) {
if instance.Status.ReplicantNodesStatus.CurrentRevision != instance.Status.ReplicantNodesStatus.UpdateRevision {
return false, nil
Expand All @@ -148,13 +192,19 @@ func (s *syncPods) canBeScaleDownSts(
return false, nil
}

shouldDeletePod := &corev1.Pod{}
if len(instance.Status.NodeEvacuationsStatus) > 0 {
if instance.Status.NodeEvacuationsStatus[0].State != "prohibiting" {
return false, nil
}
}

shouldDeletePod = &corev1.Pod{}
_ = s.Client.Get(ctx, types.NamespacedName{
Namespace: instance.Namespace,
Name: fmt.Sprintf("%s-%d", oldSts.Name, *oldSts.Spec.Replicas-1),
}, shouldDeletePod)

shouldDeletePodInfo, err := getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", shouldDeletePod.Name, oldSts.Spec.ServiceName, oldSts.Namespace))
shouldDeletePodInfo, err = getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s.%s.%s.svc.cluster.local", shouldDeletePod.Name, oldSts.Spec.ServiceName, oldSts.Namespace))
if err != nil {
return false, emperror.Wrap(err, "failed to get node info by API")
}
Expand All @@ -164,12 +214,10 @@ func (s *syncPods) canBeScaleDownSts(
}

if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return false, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return false, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return false, nil
}
// Open Source or Enterprise with no session
Expand Down
11 changes: 11 additions & 0 deletions controllers/apps/v2beta1/sync_pods_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,17 @@ var _ = Describe("check can be scale down", func() {
Expect(canBeScaledDown).Should(BeNil())
})

It("emqx is in node evacuations", func() {
instance.Status.NodeEvacuationsStatus = []appsv2beta1.NodeEvacuationStatus{
{
State: "fake",
},
}
canBeScaledDown, err := s.canBeScaleDownRs(ctx, instance, fakeR, oldRs, []string{})
Expect(err).ShouldNot(HaveOccurred())
Expect(canBeScaledDown).Should(BeNil())
})

It("emqx is enterprise, and node session more than 0", func() {
fakeR.ReqFunc = func(method string, url url.URL, body []byte, header http.Header) (resp *http.Response, respBody []byte, err error) {
resp = &http.Response{
Expand Down

0 comments on commit 722a3c0

Please sign in to comment.