Skip to content

Commit

Permalink
fix: fix emqx cluster can not ready (#1038)
Browse files Browse the repository at this point in the history
fix #1036
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z authored Apr 10, 2024
1 parent ca67358 commit c8fdfc7
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 41 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ jobs:
repository: ghcr.io/${{ github.repository }}
tag: ${{ github.ref_name }}
- name: Deploy emqx
timeout-minutes: 10
timeout-minutes: 5
uses: ./.github/actions/deploy-emqx
with:
kind: ${{ matrix.emqx[0] }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
timeout-minutes: 5
run: kubectl wait --for=condition=Ready pods -l "control-plane=controller-manager" -n emqx-operator-system
- name: Deployment emqx
timeout-minutes: 10
timeout-minutes: 5
uses: ./.github/actions/deploy-emqx
with:
kind: ${{ matrix.emqx[0] }}
Expand Down
61 changes: 61 additions & 0 deletions controllers/apps/v2beta1/add_headless_svc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package v2beta1

import (
"context"

emperror "emperror.dev/errors"
appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1"
innerReq "github.com/emqx/emqx-operator/internal/requester"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type addHeadlessSvc struct {
*EMQXReconciler
}

func (a *addHeadlessSvc) reconcile(ctx context.Context, logger logr.Logger, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult {
if err := a.CreateOrUpdateList(ctx, a.Scheme, logger, instance, []client.Object{generateHeadlessService(instance)}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update services")}
}
return subResult{}
}

func generateHeadlessService(instance *appsv2beta1.EMQX) *corev1.Service {
headlessSvc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: instance.Namespace,
Name: instance.HeadlessServiceNamespacedName().Name,
Labels: appsv2beta1.CloneAndMergeMap(appsv2beta1.DefaultLabels(instance), instance.Labels),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone,
SessionAffinity: corev1.ServiceAffinityNone,
PublishNotReadyAddresses: true,
Selector: appsv2beta1.DefaultCoreLabels(instance),
Ports: []corev1.ServicePort{
{
Name: "erlang-dist",
Port: 4370,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(4370),
},
{
Name: "gen-rpc",
Port: 5369,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(5369),
},
},
},
}
return headlessSvc
}
38 changes: 1 addition & 37 deletions controllers/apps/v2beta1/add_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (a *addSvc) reconcile(ctx context.Context, logger logr.Logger, instance *ap
return subResult{err: emperror.Wrap(err, "failed to get emqx configs by api")}
}

resources := []client.Object{generateHeadlessService(instance)}
resources := []client.Object{}
if dashboard := generateDashboardService(instance, configStr); dashboard != nil {
resources = append(resources, dashboard)
}
Expand Down Expand Up @@ -61,42 +61,6 @@ func (a *addSvc) getEMQXConfigsByAPI(r innerReq.RequesterInterface) (string, err
return string(body), nil
}

func generateHeadlessService(instance *appsv2beta1.EMQX) *corev1.Service {
headlessSvc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: instance.Namespace,
Name: instance.HeadlessServiceNamespacedName().Name,
Labels: appsv2beta1.CloneAndMergeMap(appsv2beta1.DefaultLabels(instance), instance.Labels),
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
ClusterIP: corev1.ClusterIPNone,
SessionAffinity: corev1.ServiceAffinityNone,
PublishNotReadyAddresses: true,
Selector: appsv2beta1.DefaultCoreLabels(instance),
Ports: []corev1.ServicePort{
{
Name: "erlang-dist",
Port: 4370,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(4370),
},
{
Name: "gen-rpc",
Port: 5369,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(5369),
},
},
},
}
return headlessSvc
}

func generateDashboardService(instance *appsv2beta1.EMQX, configStr string) *corev1.Service {
svc := &corev1.Service{}
if instance.Spec.DashboardServiceTemplate != nil {
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/v2beta1/emqx_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
&addBootstrap{r},
&updatePodConditions{r},
&updateStatus{r},
&addHeadlessSvc{r},
&addCore{r},
&addRepl{r},
&addPdb{r},
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/status_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (s *coreNodesProgressingStatus) nextStatus(ctx context.Context) {
emqx := s.emqxStatusMachine.GetEMQX()

updateSts, _, _ := getStateFulSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == emqx.Status.CoreNodesStatus.Replicas {
if updateSts != nil && updateSts.Status.ReadyReplicas != 0 && updateSts.Status.ReadyReplicas == *emqx.Spec.CoreTemplate.Spec.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.CoreNodesReady,
Status: metav1.ConditionTrue,
Expand Down Expand Up @@ -191,7 +191,7 @@ func (s *replicantNodesProgressingStatus) nextStatus(ctx context.Context) {
}

updateRs, _, _ := getReplicaSetList(ctx, s.emqxStatusMachine.client, emqx)
if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == emqx.Status.ReplicantNodesStatus.Replicas {
if updateRs != nil && updateRs.Status.ReadyReplicas != 0 && updateRs.Status.ReadyReplicas == *emqx.Spec.ReplicantTemplate.Spec.Replicas {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2beta1.ReplicantNodesReady,
Status: metav1.ConditionTrue,
Expand Down

0 comments on commit c8fdfc7

Please sign in to comment.