Skip to content

Commit

Permalink
chore: pod get container port from configMap
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Aug 16, 2023
1 parent a9c0bcd commit 5b8e0a1
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 74 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ jobs:
file: ${{ matrix.emqx[2] }}
- if: failure()
run: kubectl logs -l "control-plane=controller-manager" -n emqx-operator-system -c manager --tail=1000
- if: failure()
run: kubectl describe ${{ matrix.emqx[0] }} ${{ matrix.emqx[1] }}
- if: failure()
run: kubectl get ${{ matrix.emqx[0] }} ${{ matrix.emqx[1] }} -o json
- if: failure()
run: kubectl get events --sort-by='.lastTimestamp'
- if: failure()
run: kubectl logs -l "apps.emqx.io/managed-by=emqx-operator" -c emqx
- if: failure()
Expand Down
78 changes: 47 additions & 31 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -25,7 +26,10 @@ type addCore struct {
}

func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ innerReq.RequesterInterface) subResult {
preSts := a.getNewStatefulSet(ctx, instance)
preSts, err := a.getNewStatefulSet(ctx, instance)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get new statefulSet")}
}

Check warning on line 32 in controllers/apps/v2beta1/add_emqx_core.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_core.go#L31-L32

Added lines #L31 - L32 were not covered by tests
if preSts.UID == "" {
_ = ctrl.SetControllerReference(instance, preSts, a.Scheme)
if err := a.Handler.Create(preSts); err != nil {
Expand Down Expand Up @@ -81,53 +85,72 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
return subResult{}
}

func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.StatefulSet, error) {
configMap := &corev1.ConfigMap{}
if err := a.Client.Get(ctx, types.NamespacedName{
Name: instance.ConfigsNamespacedName().Name,
Namespace: instance.Namespace,
}, configMap); err != nil {
return nil, emperror.Wrap(err, "failed to get configMap")
}

Check warning on line 95 in controllers/apps/v2beta1/add_emqx_core.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_core.go#L94-L95

Added lines #L94 - L95 were not covered by tests

var containerPort corev1.ContainerPort
if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: 18083,
}
} else {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: svcPort.Port,
}
}

Check warning on line 110 in controllers/apps/v2beta1/add_emqx_core.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_core.go#L105-L110

Added lines #L105 - L110 were not covered by tests

preSts := generateStatefulSet(instance)
podTemplateSpecHash := computeHash(preSts.Spec.Template.DeepCopy(), instance.Status.CoreNodesStatus.CollisionCount)
preSts.Name = preSts.Name + "-" + podTemplateSpecHash
preSts.Labels = appsv2beta1.CloneAndAddLabel(preSts.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preSts.Spec.Template.Labels = appsv2beta1.CloneAndAddLabel(preSts.Spec.Template.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preSts.Spec.Selector = appsv2beta1.CloneSelectorAndAddLabel(preSts.Spec.Selector, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preSts.Spec.Template.Labels = appsv2beta1.CloneAndAddLabel(preSts.Spec.Template.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preSts.Spec.Template.Spec.Containers[0].Ports = appsv2beta1.MergeContainerPorts(
preSts.Spec.Template.Spec.Containers[0].Ports,
[]corev1.ContainerPort{
containerPort,
},
)
preSts.Spec.Template.Spec.Containers[0].Env = append([]corev1.EnvVar{
{Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))},
}, preSts.Spec.Template.Spec.Containers[0].Env...)

updateSts, _, _ := getStateFulSetList(ctx, a.Client, instance)
if updateSts == nil {
return preSts
return preSts, nil
}

patchResult, _ := a.Patcher.Calculate(
patchResult, err := a.Patcher.Calculate(
updateSts.DeepCopy(),
preSts.DeepCopy(),
justCheckPodTemplate(),
)
if err != nil {
return nil, emperror.Wrap(err, "failed to calculate patch")
}

Check warning on line 140 in controllers/apps/v2beta1/add_emqx_core.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_core.go#L139-L140

Added lines #L139 - L140 were not covered by tests
if patchResult.IsEmpty() {
preSts.ObjectMeta = updateSts.DeepCopy().ObjectMeta
preSts.Spec.Template.ObjectMeta = updateSts.DeepCopy().Spec.Template.ObjectMeta
preSts.Spec.Selector = updateSts.DeepCopy().Spec.Selector
return preSts
return preSts, nil
}

logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX core nodes, will create new statefulSet", "patch", string(patchResult.Patch))
return preSts
return preSts, nil
}

func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
var containerPort corev1.ContainerPort
svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)
if err != nil {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: 18083,
}
} else {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: svcPort.Port,
}
}

sts := &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
APIVersion: "apps/v1",
Expand Down Expand Up @@ -171,10 +194,7 @@ func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
ImagePullPolicy: corev1.PullPolicy(instance.Spec.ImagePullPolicy),
Command: instance.Spec.CoreTemplate.Spec.Command,
Args: instance.Spec.CoreTemplate.Spec.Args,
Ports: appsv2beta1.MergeContainerPorts(
instance.Spec.CoreTemplate.Spec.Ports,
[]corev1.ContainerPort{containerPort},
),
Ports: instance.Spec.CoreTemplate.Spec.Ports,
Env: append([]corev1.EnvVar{
{
Name: "POD_NAME",
Expand All @@ -184,10 +204,6 @@ func generateStatefulSet(instance *appsv2beta1.EMQX) *appsv1.StatefulSet {
},
},
},
{
Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND",
Value: strconv.Itoa(int(containerPort.ContainerPort)),
},
{
Name: "EMQX_CLUSTER__DISCOVERY_STRATEGY",
Value: "dns",
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/v2beta1/add_emqx_core_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var _ = Describe("Check add core controller", Ordered, Label("core"), func() {
Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed())
})

It("create configMap", func() {
Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
})

It("should create statefulSet", func() {
Eventually(a.reconcile(ctx, instance, nil)).WithTimeout(timeout).WithPolling(interval).Should(Equal(subResult{}))
Eventually(func() []appsv1.StatefulSet {
Expand Down
9 changes: 1 addition & 8 deletions controllers/apps/v2beta1/add_emqx_core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ func TestGenerateStatefulSet(t *testing.T) {
assert.Equal(t, emqx.Spec.ImagePullPolicy, got.Spec.Template.Spec.Containers[0].ImagePullPolicy)
assert.Equal(t, emqx.Spec.CoreTemplate.Spec.Command, got.Spec.Template.Spec.Containers[0].Command)
assert.Equal(t, emqx.Spec.CoreTemplate.Spec.Args, got.Spec.Template.Spec.Containers[0].Args)
assert.ElementsMatch(t, []corev1.ContainerPort{
{Name: "fake", HostPort: 0, ContainerPort: 0, Protocol: "", HostIP: ""},
{Name: "dashboard", HostPort: 0, ContainerPort: 18083, Protocol: "TCP", HostIP: ""},
}, got.Spec.Template.Spec.Containers[0].Ports)
assert.Equal(t, emqx.Spec.CoreTemplate.Spec.Ports, got.Spec.Template.Spec.Containers[0].Ports)
assert.ElementsMatch(t, []corev1.EnvVar{
{
Name: "POD_NAME",
Expand All @@ -160,10 +157,6 @@ func TestGenerateStatefulSet(t *testing.T) {
},
},
},
{
Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND",
Value: "18083",
},
{
Name: "EMQX_CLUSTER__DISCOVERY_STRATEGY",
Value: "dns",
Expand Down
78 changes: 47 additions & 31 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -31,7 +32,10 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
return subResult{}
}

preRs := a.getNewReplicaSet(ctx, instance)
preRs, err := a.getNewReplicaSet(ctx, instance)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get new replicaSet")}
}

Check warning on line 38 in controllers/apps/v2beta1/add_emqx_repl.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_repl.go#L37-L38

Added lines #L37 - L38 were not covered by tests
if preRs.UID == "" {
_ = ctrl.SetControllerReference(instance, preRs, a.Scheme)
if err := a.Handler.Create(preRs); err != nil {
Expand Down Expand Up @@ -87,53 +91,72 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
return subResult{}
}

func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EMQX) (*appsv1.ReplicaSet, error) {
configMap := &corev1.ConfigMap{}
if err := a.Client.Get(ctx, types.NamespacedName{
Name: instance.ConfigsNamespacedName().Name,
Namespace: instance.Namespace,
}, configMap); err != nil {
return nil, emperror.Wrap(err, "failed to get configMap")
}

Check warning on line 101 in controllers/apps/v2beta1/add_emqx_repl.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_repl.go#L100-L101

Added lines #L100 - L101 were not covered by tests

var containerPort corev1.ContainerPort
if svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data); err != nil {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: 18083,
}
} else {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: svcPort.Port,
}
}

Check warning on line 116 in controllers/apps/v2beta1/add_emqx_repl.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_repl.go#L111-L116

Added lines #L111 - L116 were not covered by tests

preRs := generateReplicaSet(instance)
podTemplateSpecHash := computeHash(preRs.Spec.Template.DeepCopy(), instance.Status.ReplicantNodesStatus.CollisionCount)
preRs.Name = preRs.Name + "-" + podTemplateSpecHash
preRs.Labels = appsv2beta1.CloneAndAddLabel(preRs.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preRs.Spec.Template.Labels = appsv2beta1.CloneAndAddLabel(preRs.Spec.Template.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preRs.Spec.Selector = appsv2beta1.CloneSelectorAndAddLabel(preRs.Spec.Selector, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preRs.Spec.Template.Labels = appsv2beta1.CloneAndAddLabel(preRs.Spec.Template.Labels, appsv2beta1.LabelsPodTemplateHashKey, podTemplateSpecHash)
preRs.Spec.Template.Spec.Containers[0].Ports = appsv2beta1.MergeContainerPorts(
preRs.Spec.Template.Spec.Containers[0].Ports,
[]corev1.ContainerPort{
containerPort,
},
)
preRs.Spec.Template.Spec.Containers[0].Env = append([]corev1.EnvVar{
{Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND", Value: strconv.Itoa(int(containerPort.ContainerPort))},
}, preRs.Spec.Template.Spec.Containers[0].Env...)

updateRs, _, _ := getReplicaSetList(ctx, a.Client, instance)
if updateRs == nil {
return preRs
return preRs, nil
}

patchResult, _ := a.Patcher.Calculate(
patchResult, err := a.Patcher.Calculate(
updateRs.DeepCopy(),
preRs.DeepCopy(),
justCheckPodTemplate(),
)
if err != nil {
return nil, emperror.Wrap(err, "failed to calculate patch result")
}

Check warning on line 146 in controllers/apps/v2beta1/add_emqx_repl.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/add_emqx_repl.go#L145-L146

Added lines #L145 - L146 were not covered by tests
if patchResult.IsEmpty() {
preRs.ObjectMeta = updateRs.DeepCopy().ObjectMeta
preRs.Spec.Template.ObjectMeta = updateRs.DeepCopy().Spec.Template.ObjectMeta
preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector
return preRs
return preRs, nil
}
logger := log.FromContext(ctx)
logger.V(1).Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "patch", string(patchResult.Patch))

return preRs
return preRs, nil
}

func generateReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
var containerPort corev1.ContainerPort
svcPort, err := appsv2beta1.GetDashboardServicePort(instance.Spec.Config.Data)
if err != nil {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: 18083,
}
} else {
containerPort = corev1.ContainerPort{
Name: "dashboard",
Protocol: corev1.ProtocolTCP,
ContainerPort: svcPort.Port,
}
}

return &appsv1.ReplicaSet{
TypeMeta: metav1.TypeMeta{
Kind: "ReplicaSet",
Expand Down Expand Up @@ -175,15 +198,8 @@ func generateReplicaSet(instance *appsv2beta1.EMQX) *appsv1.ReplicaSet {
ImagePullPolicy: instance.Spec.ImagePullPolicy,
Command: instance.Spec.ReplicantTemplate.Spec.Command,
Args: instance.Spec.ReplicantTemplate.Spec.Args,
Ports: appsv2beta1.MergeContainerPorts(
instance.Spec.ReplicantTemplate.Spec.Ports,
[]corev1.ContainerPort{containerPort},
),
Ports: instance.Spec.ReplicantTemplate.Spec.Ports,
Env: append([]corev1.EnvVar{
{
Name: "EMQX_DASHBOARD__LISTENERS__HTTP__BIND",
Value: strconv.Itoa(int(containerPort.ContainerPort)),
},
{
Name: "EMQX_CLUSTER__DISCOVERY_STRATEGY",
Value: "dns",
Expand Down
4 changes: 4 additions & 0 deletions controllers/apps/v2beta1/add_emqx_repl_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ var _ = Describe("Check add repl controller", Ordered, Label("repl"), func() {
Expect(k8sClient.Create(context.TODO(), ns)).Should(Succeed())
})

It("create configMap", func() {
Expect((&syncConfig{emqxReconciler}).reconcile(ctx, instance, nil)).Should(Equal(subResult{}))
})

Context("replicant template is nil", func() {
JustBeforeEach(func() {
instance.Spec.ReplicantTemplate = nil
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2beta1/add_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func generateDashboardService(instance *appsv2beta1.EMQX, configStr string) *cor
}

func generateListenerService(instance *appsv2beta1.EMQX, configStr string) *corev1.Service {
ports, err := appsv2beta1.GetListenersServicePorts(configStr)
if err != nil {
ports, _ := appsv2beta1.GetListenersServicePorts(configStr)
if len(ports) == 0 {
ports = append(ports, []corev1.ServicePort{
{
Name: "tcp-default",
Expand Down

0 comments on commit 5b8e0a1

Please sign in to comment.