Skip to content

Commit

Permalink
fix(v2alpha2): fix sync pod error
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jul 28, 2023
1 parent 8a54d5f commit 1c76b8a
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions controllers/apps/v2alpha2/sync_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, r

targetedEMQXNodesName := []string{}
if appsv2alpha2.IsExistReplicant(instance) {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
if currentRs != nil {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
}

} else {
for _, node := range instance.Status.CoreNodes {
if node.ControllerUID == currentSts.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
if currentSts != nil {
for _, node := range instance.Status.CoreNodes {
if node.ControllerUID == currentSts.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
}
}
Expand Down Expand Up @@ -109,16 +114,14 @@ func (s *syncPods) canBeScaleDownRs(
return shouldDeletePod, nil
}

if shouldDeletePodInfo.Edition == "Enterprise" {
if shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")
}
if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return nil, nil
}
return nil, nil
}

// Open Source or Enterprise with no session
Expand Down Expand Up @@ -160,14 +163,14 @@ func (s *syncPods) canBeScaleDownSts(
return true, nil
}

if shouldDeletePodInfo.Edition == "Enterprise" {
if shouldDeletePodInfo.Session > 0 && len(instance.Status.NodeEvacuationsStatus) == 0 {
if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return false, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return false, nil
}
return false, nil
}
// Open Source or Enterprise with no session
if !checkWaitTakeoverReady(instance, getEventList(ctx, s.Clientset, oldSts)) {
Expand Down

0 comments on commit 1c76b8a

Please sign in to comment.