Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(v2alpha2): fix sync pod error #883

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions controllers/apps/v2alpha2/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
})

It("should create statefulSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
Expand Down Expand Up @@ -86,7 +86,7 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
})

It("should update statefulSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
Expand Down Expand Up @@ -117,7 +117,7 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
})

It("should create new statefulSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(ctx, list,
Expand Down
12 changes: 6 additions & 6 deletions controllers/apps/v2alpha2/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
})

It("should do nothing", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand All @@ -87,7 +87,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
})

It("should do nothing", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand All @@ -101,7 +101,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {

Context("replicant template is not nil, and core code is ready", func() {
It("should create replicaSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand Down Expand Up @@ -134,7 +134,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
})

It("should update replicaSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand Down Expand Up @@ -164,7 +164,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
})

It("should update replicaSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand Down Expand Up @@ -197,7 +197,7 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
})

It("should create new replicaSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.ReplicaSet {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(ctx, list,
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2alpha2/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func TestAPIs(t *testing.T) {
}

var _ = BeforeSuite(func() {
timeout = time.Second * 5
interval = time.Millisecond * 500
timeout = time.Second
interval = time.Millisecond * 250
ctx = context.TODO()
emqx.Default()

Expand Down
35 changes: 19 additions & 16 deletions controllers/apps/v2alpha2/sync_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,20 @@

targetedEMQXNodesName := []string{}
if appsv2alpha2.IsExistReplicant(instance) {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
if currentRs != nil {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
}

} else {
for _, node := range instance.Status.CoreNodes {
if node.ControllerUID == currentSts.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
if currentSts != nil {
for _, node := range instance.Status.CoreNodes {
if node.ControllerUID == currentSts.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
}
}
}
}
Expand Down Expand Up @@ -109,16 +114,14 @@
return shouldDeletePod, nil
}

if shouldDeletePodInfo.Edition == "Enterprise" {
if shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")
}
if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return nil, emperror.Wrap(err, "failed to start node evacuation")

Check warning on line 120 in controllers/apps/v2alpha2/sync_pods.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2alpha2/sync_pods.go#L120

Added line #L120 was not covered by tests
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return nil, nil
}
return nil, nil
}

// Open Source or Enterprise with no session
Expand Down Expand Up @@ -160,14 +163,14 @@
return true, nil
}

if shouldDeletePodInfo.Edition == "Enterprise" {
if shouldDeletePodInfo.Session > 0 && len(instance.Status.NodeEvacuationsStatus) == 0 {
if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 {
if len(instance.Status.NodeEvacuationsStatus) == 0 {
if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil {
return false, emperror.Wrap(err, "failed to start node evacuation")
}
s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node))
return false, nil
}
return false, nil
}
// Open Source or Enterprise with no session
if !checkWaitTakeoverReady(instance, getEventList(ctx, s.Clientset, oldSts)) {
Expand Down
6 changes: 3 additions & 3 deletions controllers/apps/v2alpha2/sync_pods_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ var _ = Describe("Check sync pods controller", Ordered, Label("node"), func() {
})

It("running update emqx node controller", func() {
Expect(s.reconcile(ctx, instance, fakeR)).Should(Equal(subResult{}))
Eventually(s.reconcile(ctx, instance, fakeR)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))

By("should add pod deletion cost annotation")
Eventually(func() map[string]string {
Expand All @@ -240,7 +240,7 @@ var _ = Describe("Check sync pods controller", Ordered, Label("node"), func() {
}).Should(Equal(int32(1)))

instance.Status.ReplicantNodesStatus.CurrentRevision = instance.Status.ReplicantNodesStatus.UpdateRevision
Expect(s.reconcile(ctx, instance, fakeR)).Should(Equal(subResult{}))
Eventually(s.reconcile(ctx, instance, fakeR)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
By("should scale down sts")
Eventually(func() int32 {
_ = k8sClient.Get(context.Background(), client.ObjectKeyFromObject(currentSts), currentSts)
Expand Down Expand Up @@ -331,7 +331,7 @@ var _ = Describe("check can be scale down", func() {
canBeScaledDown, err := s.canBeScaleDownSts(ctx, instance, nil, oldSts, []string{})
Expect(err).ShouldNot(HaveOccurred())
Expect(canBeScaledDown).Should(BeFalse())
Eventually(s.reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
Eventually(s.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
})

It("emqx is enterprise, and node session more than 0", func() {
Expand Down
4 changes: 2 additions & 2 deletions e2e/v2alpha2/e2e_rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("EMQX 5 Rebalance Test", func() {
var r *appsv2alpha2.Rebalance
BeforeEach(func() {
instance = genEMQX().DeepCopy()
instance.Spec.Image = "emqx/emqx-enterprise:5.1.1-alpha.4"
instance.Spec.Image = "emqx/emqx-enterprise:5.1"
instance.Default()
})

Expand Down Expand Up @@ -257,7 +257,7 @@ var _ = Describe("EMQX 4 Rebalance Test", Label("rebalance"), func() {
EmqxContainer: appsv1beta4.EmqxContainer{
Image: appsv1beta4.EmqxImage{
Repository: "emqx/emqx-ee",
Version: "4.4.18",
Version: "4.4.19",
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions e2e/v2alpha2/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
It("change EMQX image", func() {
Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance)).Should(Succeed())
storage := instance.DeepCopy()
instance.Spec.Image = "emqx/emqx:5.1"
instance.Spec.Image = "emqx:5.1"
Expect(k8sClient.Update(context.TODO(), instance)).Should(Succeed())

Eventually(func() *appsv2alpha2.EMQX {
Expand Down Expand Up @@ -393,7 +393,7 @@ var _ = Describe("E2E Test", Label("base"), Ordered, func() {
It("change EMQX image", func() {
Expect(k8sClient.Get(context.TODO(), client.ObjectKeyFromObject(instance), instance)).Should(Succeed())
storage := instance.DeepCopy()
instance.Spec.Image = "emqx:5.1"
instance.Spec.Image = "emqx/emqx:5.1"
Expect(k8sClient.Update(context.TODO(), instance)).Should(Succeed())

Eventually(func() *appsv2alpha2.EMQX {
Expand Down
3 changes: 1 addition & 2 deletions e2e/v2alpha2/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ func genEMQX() *appsv2alpha2.EMQX {
Namespace: "e2e-test-v2alpha2" + "-" + rand.String(5),
},
Spec: appsv2alpha2.EMQXSpec{
// Image: "emqx/emqx-enterprise:5.1.1-alpha.4",
Image: "emqx/emqx:latest",
Image: "emqx:latest",
ImagePullPolicy: corev1.PullAlways,
ClusterDomain: "cluster.local",
Config: appsv2alpha2.Config{
Expand Down