Skip to content

Commit

Permalink
chore: use pod conditions to manager endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Rory-Z committed Jul 27, 2023
1 parent 56bde72 commit 90d1fd7
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 108 deletions.
13 changes: 13 additions & 0 deletions apis/apps/v2alpha2/emqx_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (r *EMQX) Default() {
r.defaultLabels()
r.defaultAnnotations()
r.defaultConfiguration()
r.defaultListenersServiceTemplate()
r.defaultDashboardServiceTemplate()
r.defaultContainerPort()
}
Expand Down Expand Up @@ -201,6 +202,18 @@ func (r *EMQX) defaultConfiguration() {
}
}

func (r *EMQX) defaultListenersServiceTemplate() {
r.Spec.ListenersServiceTemplate.Spec.Selector = r.Spec.CoreTemplate.Labels
if IsExistReplicant(r) {
r.Spec.ListenersServiceTemplate.Spec.Selector = r.Spec.ReplicantTemplate.Labels
}
listenersPort, _ := GetListenersServicePorts(r.Spec.Config.Data)
r.Spec.ListenersServiceTemplate.Spec.Ports = MergeServicePorts(
r.Spec.ListenersServiceTemplate.Spec.Ports,
listenersPort,
)
}

func (r *EMQX) defaultDashboardServiceTemplate() {
r.Spec.DashboardServiceTemplate.Spec.Selector = r.Spec.CoreTemplate.Labels
dashboardPort, err := GetDashboardServicePort(r.Spec.Config.Data)
Expand Down
72 changes: 72 additions & 0 deletions apis/apps/v2alpha2/emqx_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -242,6 +243,77 @@ func TestDefaultConfiguration(t *testing.T) {
})
}

func TestDefaultListeneresServiceTemplate(t *testing.T) {
t.Run("check selector", func(t *testing.T) {
instance := &EMQX{
Spec: EMQXSpec{
CoreTemplate: EMQXCoreTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
DBRoleLabelKey: "core",
},
},
},
ReplicantTemplate: nil,
},
}
instance.defaultListenersServiceTemplate()
assert.Equal(t, "core", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey])

instance.Spec.ReplicantTemplate = &EMQXReplicantTemplate{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
DBRoleLabelKey: "replicant",
},
},
Spec: EMQXReplicantTemplateSpec{
Replicas: pointer.Int32(0),
},
}
instance.defaultListenersServiceTemplate()
assert.Equal(t, "core", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey])

instance.Spec.ReplicantTemplate.Spec.Replicas = pointer.Int32(1)
instance.defaultListenersServiceTemplate()
assert.Equal(t, "replicant", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey])
})

t.Run("check port", func(t *testing.T) {
instance := &EMQX{
Spec: EMQXSpec{
Config: Config{
Data: `listeners.tcp.default.bind = "0.0.0.0:1883"`,
},
ListenersServiceTemplate: corev1.Service{
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Name: "foo",
Protocol: corev1.ProtocolTCP,
Port: 11883,
TargetPort: intstr.FromInt(11883),
},
},
},
},
},
}
instance.defaultListenersServiceTemplate()
assert.Contains(t, instance.Spec.ListenersServiceTemplate.Spec.Ports, corev1.ServicePort{
Name: "foo",
Protocol: corev1.ProtocolTCP,
Port: 11883,
TargetPort: intstr.FromInt(11883),
})
assert.Contains(t, instance.Spec.ListenersServiceTemplate.Spec.Ports, corev1.ServicePort{
Name: "tcp-default",
Protocol: corev1.ProtocolTCP,
Port: 1883,
TargetPort: intstr.FromInt(1883),
})
})
}

func TestDefaultDashboardServiceTemplate(t *testing.T) {
t.Run("failed to get dashboard listeners", func(t *testing.T) {
instance := &EMQX{}
Expand Down
4 changes: 4 additions & 0 deletions apis/apps/v2alpha2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

func IsExistReplicant(instance *EMQX) bool {
return instance.Spec.ReplicantTemplate != nil && instance.Spec.ReplicantTemplate.Spec.Replicas != nil && *instance.Spec.ReplicantTemplate.Spec.Replicas > 0
}

// Clones the given selector and returns a new selector with the given key and value added.
// Returns the given selector, if labelKey is empty.
func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector {
Expand Down
86 changes: 1 addition & 85 deletions controllers/apps/v2alpha2/add_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -40,12 +39,6 @@ func (a *addSvc) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ i
listeners := generateListenerService(instance, ports)
if listeners != nil {
resources = append(resources, listeners)
if instance.Status.IsConditionTrue(appsv2alpha2.CoreNodesReady) {
pods := a.getPodList(ctx, instance)
if len(pods) > 0 {
resources = append(resources, generateEndpoints(listeners, pods))
}
}
}

if err := a.CreateOrUpdateList(instance, a.Scheme, resources); err != nil {
Expand All @@ -55,41 +48,6 @@ func (a *addSvc) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ i
return subResult{}
}

func (a *addSvc) getPodList(ctx context.Context, instance *appsv2alpha2.EMQX) []corev1.Pod {
labels := appsv2alpha2.CloneAndAddLabel(
instance.Spec.CoreTemplate.Labels,
appsv2alpha2.PodTemplateHashLabelKey,
instance.Status.CoreNodesStatus.CurrentRevision,
)
if isExistReplicant(instance) {
labels = appsv2alpha2.CloneAndAddLabel(
instance.Spec.ReplicantTemplate.Labels,
appsv2alpha2.PodTemplateHashLabelKey,
instance.Status.ReplicantNodesStatus.CurrentRevision,
)
}

podList := &corev1.PodList{}
_ = a.Client.List(ctx, podList,
client.InNamespace(instance.Namespace),
client.MatchingLabels(labels),
)

list := []corev1.Pod{}
for _, pod := range podList.Items {
for _, condition := range pod.Status.Conditions {
// We also add readiness gate to the pod, so if pod is ready, the EMQX will definitely be in the cluster.
// More info: https://git.k8s.io/enhancements/keps/sig-network/580-pod-readiness-gates
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
if pod.Status.PodIP != "" {
list = append(list, pod)
}
}
}
}
return list
}

func generateHeadlessService(instance *appsv2alpha2.EMQX) *corev1.Service {
headlessSvc := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Expand All @@ -99,6 +57,7 @@ func generateHeadlessService(instance *appsv2alpha2.EMQX) *corev1.Service {
ObjectMeta: metav1.ObjectMeta{
Name: instance.HeadlessServiceNamespacedName().Name,
Namespace: instance.Namespace,
Labels: instance.Labels,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
Expand Down Expand Up @@ -143,10 +102,6 @@ func generateDashboardService(instance *appsv2alpha2.EMQX) *corev1.Service {

func generateListenerService(instance *appsv2alpha2.EMQX, ports []corev1.ServicePort) *corev1.Service {
listener := instance.Spec.ListenersServiceTemplate.DeepCopy()
// We don't need to set the selector for the service
// because the Operator will manager the endpoints
// please check https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors
listener.Spec.Selector = nil
listener.Spec.Ports = appsv2alpha2.MergeServicePorts(
listener.Spec.Ports,
ports,
Expand All @@ -168,42 +123,3 @@ func generateListenerService(instance *appsv2alpha2.EMQX, ports []corev1.Service
Spec: listener.Spec,
}
}

func generateEndpoints(svc *corev1.Service, pods []corev1.Pod) *corev1.Endpoints {
subSet := corev1.EndpointSubset{}
for _, port := range svc.Spec.Ports {
subSet.Ports = append(subSet.Ports, corev1.EndpointPort{
Name: port.Name,
Port: port.Port,
Protocol: port.Protocol,
})
}
for _, p := range pods {
pod := p.DeepCopy()
subSet.Addresses = append(subSet.Addresses, corev1.EndpointAddress{
IP: pod.Status.PodIP,
NodeName: pointer.String(pod.Spec.NodeName),
TargetRef: &corev1.ObjectReference{
Kind: "Pod",
Namespace: pod.Namespace,
Name: pod.Name,
UID: pod.UID,
},
})

}

return &corev1.Endpoints{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "Endpoints",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: svc.Namespace,
Name: svc.Name,
Annotations: svc.Annotations,
Labels: svc.Labels,
},
Subsets: []corev1.EndpointSubset{subSet},
}
}
8 changes: 4 additions & 4 deletions controllers/apps/v2alpha2/status_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type codeNodesReadyStatus struct {
func (s *codeNodesReadyStatus) nextStatus() {
emqx := s.emqxStatusMachine.GetEMQX()

if isExistReplicant(emqx) {
if appsv2alpha2.IsExistReplicant(emqx) {
emqx.Status.SetCondition(metav1.Condition{
Type: appsv2alpha2.ReplicantNodesProgressing,
Status: metav1.ConditionTrue,
Expand All @@ -179,7 +179,7 @@ type replicantNodesProgressingStatus struct {
func (s *replicantNodesProgressingStatus) nextStatus() {
emqx := s.emqxStatusMachine.GetEMQX()

if !isExistReplicant(emqx) {
if !appsv2alpha2.IsExistReplicant(emqx) {
s.emqxStatusMachine.initialized.nextStatus()
return
}
Expand All @@ -201,7 +201,7 @@ type replicantNodesReadyStatus struct {
}

func (s *replicantNodesReadyStatus) nextStatus() {
if !isExistReplicant(s.emqxStatusMachine.emqx) {
if !appsv2alpha2.IsExistReplicant(s.emqxStatusMachine.emqx) {
s.emqxStatusMachine.initialized.nextStatus()
return
}
Expand All @@ -227,7 +227,7 @@ func (s *availableStatus) nextStatus() {
return
}

if isExistReplicant(emqx) {
if appsv2alpha2.IsExistReplicant(emqx) {
if emqx.Status.ReplicantNodesStatus.ReadyReplicas != emqx.Status.ReplicantNodesStatus.Replicas ||
emqx.Status.ReplicantNodesStatus.UpdateRevision != emqx.Status.ReplicantNodesStatus.CurrentRevision {
return
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/v2alpha2/sync_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, r
updateRs, currentRs, _ := getReplicaSetList(ctx, s.Client, instance)

targetedEMQXNodesName := []string{}
if isExistReplicant(instance) {
if appsv2alpha2.IsExistReplicant(instance) {
for _, node := range instance.Status.ReplicantNodes {
if node.ControllerUID == currentRs.UID {
targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (s *syncPods) canBeScaleDownSts(
oldSts *appsv1.StatefulSet,
targetedEMQXNodesName []string,
) (bool, error) {
if isExistReplicant(instance) {
if appsv2alpha2.IsExistReplicant(instance) {
if instance.Status.ReplicantNodesStatus.CurrentRevision != instance.Status.ReplicantNodesStatus.UpdateRevision {
return false, nil
}
Expand Down
32 changes: 19 additions & 13 deletions controllers/apps/v2alpha2/update_pod_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,52 @@ type updatePodConditions struct {
}

func (u *updatePodConditions) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, r innerReq.RequesterInterface) subResult {
updateRs, _, _ := getReplicaSetList(ctx, u.Client, instance)
updateSts, _, _ := getStateFulSetList(ctx, u.Client, instance)

pods := &corev1.PodList{}
_ = u.Client.List(ctx, pods,
client.InNamespace(instance.Namespace),
client.MatchingLabels(instance.Labels),
)

for _, pod := range pods.Items {
hash := make(map[corev1.PodConditionType]int)

for i, condition := range pod.Status.Conditions {
hash[condition.Type] = i
}

if index, ok := hash[corev1.ContainersReady]; !ok || pod.Status.Conditions[index].Status != corev1.ConditionTrue {
for _, p := range pods.Items {
pod := p.DeepCopy()
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
continue
}

onServingCondition := corev1.PodCondition{
Type: appsv2alpha2.PodOnServing,
Status: u.checkInCluster(instance, r, pod.DeepCopy()),
Status: corev1.ConditionFalse,
LastProbeTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
}
if index, ok := hash[appsv2alpha2.PodOnServing]; ok {
onServingCondition.LastTransitionTime = pod.Status.Conditions[index].LastTransitionTime

if updateRs != nil && controllerRef.UID == updateRs.UID || updateSts != nil && controllerRef.UID == updateSts.UID {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue {
onServingCondition.Status = u.checkInCluster(instance, r, pod)
break
}
}

}

patchBytes, _ := json.Marshal(corev1.Pod{
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{onServingCondition},
},
})
_ = u.Client.Status().Patch(ctx, &pod, client.RawPatch(types.StrategicMergePatchType, patchBytes))
_ = u.Client.Status().Patch(ctx, pod.DeepCopy(), client.RawPatch(types.StrategicMergePatchType, patchBytes))
}
return subResult{}
}

func (u *updatePodConditions) checkInCluster(instance *appsv2alpha2.EMQX, r innerReq.RequesterInterface, pod *corev1.Pod) corev1.ConditionStatus {
nodes := instance.Status.CoreNodes
if isExistReplicant(instance) {
if appsv2alpha2.IsExistReplicant(instance) {
nodes = append(nodes, instance.Status.ReplicantNodes...)
}
for _, node := range nodes {
Expand Down
4 changes: 0 additions & 4 deletions controllers/apps/v2alpha2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

func isExistReplicant(instance *appsv2alpha2.EMQX) bool {
return instance.Spec.ReplicantTemplate != nil && instance.Spec.ReplicantTemplate.Spec.Replicas != nil && *instance.Spec.ReplicantTemplate.Spec.Replicas > 0
}

func getRsPodMap(ctx context.Context, k8sClient client.Client, instance *appsv2alpha2.EMQX) map[types.UID][]*corev1.Pod {
podList := &corev1.PodList{}
_ = k8sClient.List(ctx, podList,
Expand Down

0 comments on commit 90d1fd7

Please sign in to comment.