Skip to content

Commit

Permalink
feat(v2beta1): add new field of revisionHistoryLimit
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Sep 1, 2023
1 parent 6e00d27 commit fb44305
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 45 deletions.
7 changes: 7 additions & 0 deletions apis/apps/v2beta1/emqx_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,20 @@ type EMQXSpec struct {
//+kubebuilder:default:="cluster.local"
ClusterDomain string `json:"clusterDomain,omitempty"`

// The number of old ReplicaSets, old StatefulSet and old PersistentVolumeClaim to retain to allow rollback.
// This is a pointer to distinguish between explicit zero and not specified.
// Defaults to 3.
// +kubebuilder:default:=3
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`

// UpdateStrategy is the object that describes the EMQX blue-green update strategy
//+kubebuilder:default={type:Recreate,initialDelaySeconds:10,evacuationStrategy:{waitTakeover:10,connEvictRate:1000,sessEvictRate:1000}}
UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"`

// CoreTemplate is the object that describes the EMQX core node that will be created
//+kubebuilder:default={spec:{replicas:2}}
CoreTemplate EMQXCoreTemplate `json:"coreTemplate,omitempty"`

// ReplicantTemplate is the object that describes the EMQX replicant node that will be created
ReplicantTemplate *EMQXReplicantTemplate `json:"replicantTemplate,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/apps.emqx.io_emqxes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12746,6 +12746,10 @@ spec:
type: array
type: object
type: object
revisionHistoryLimit:
default: 3
format: int32
type: integer
serviceAccountName:
type: string
updateStrategy:
Expand Down
1 change: 1 addition & 0 deletions config/samples/emqx/v2beta1/emqx-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ metadata:
spec:
image: "emqx:5.1"
imagePullPolicy: Always
revisionHistoryLimit: 3
config:
data: |
dashboard.listeners.http.bind = 18083
Expand Down
5 changes: 3 additions & 2 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "patch", string(patchResult.Patch))
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preSts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.E
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "patch", string(patchResult.Patch))
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))
return preSts, nil
}

Expand Down
6 changes: 4 additions & 2 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "patch", string(patchResult.Patch))
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preRs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
Expand Down Expand Up @@ -150,8 +151,9 @@ func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EM
preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector
return preRs, nil
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "patch", string(patchResult.Patch))
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

return preRs, nil
}
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/v2beta1/emqx_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
&updatePodConditions{r},
&updateStatus{r},
&syncPods{r},
&syncSets{r},
} {
subResult := subReconciler.reconcile(ctx, instance, requester)
if !subResult.result.IsZero() {
Expand Down
76 changes: 76 additions & 0 deletions controllers/apps/v2beta1/sync_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package v2beta1

import (
"context"

appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1"
innerReq "github.com/emqx/emqx-operator/internal/requester"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type syncSets struct {
*EMQXReconciler
}

func (s *syncSets) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
if !instance.Status.IsConditionTrue(appsv2beta1.Ready) {
return subResult{}
}
logger := log.FromContext(ctx)

_, _, oldRsList := getReplicaSetList(ctx, s.Client, instance)
rsDiff := int32(len(oldRsList)) - *instance.Spec.RevisionHistoryLimit
if rsDiff > 0 {
for i := 0; i < int(rsDiff); i++ {
rs := oldRsList[i].DeepCopy()
// Avoid delete replica set with non-zero replica counts
if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
continue
}
logger.Info("trying to cleanup replica set for EMQX", "replicaSet", klog.KObj(rs), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, rs); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}
}
}

_, _, oldStsList := getStateFulSetList(ctx, s.Client, instance)
stsDiff := int32(len(oldStsList)) - *instance.Spec.RevisionHistoryLimit
if stsDiff > 0 {
for i := 0; i < int(rsDiff); i++ {
sts := oldStsList[i].DeepCopy()
// Avoid delete stateful set with non-zero replica counts
if sts.Status.Replicas != 0 || *(sts.Spec.Replicas) != 0 || sts.Generation > sts.Status.ObservedGeneration || sts.DeletionTimestamp != nil {
continue
}
logger.Info("trying to cleanup stateful set for EMQX", "statefulSet", klog.KObj(sts), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, sts); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}

// Delete PVCs
pvcList := &corev1.PersistentVolumeClaimList{}
_ = s.Client.List(ctx, pvcList,
client.InNamespace(instance.Namespace),
client.MatchingLabels(sts.Spec.Selector.MatchLabels),
)

for _, p := range pvcList.Items {
pvc := p.DeepCopy()
if pvc.DeletionTimestamp != nil {
continue
}
logger.Info("trying to cleanup pvc for EMQX", "pvc", klog.KObj(pvc), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, pvc); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}
}
}
}

return subResult{}
}
203 changes: 203 additions & 0 deletions controllers/apps/v2beta1/sync_sets_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package v2beta1

import (
"context"
"fmt"
"time"

appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Check sync rs", func() {
var s *syncSets

var instance *appsv2beta1.EMQX = new(appsv2beta1.EMQX)
var ns *corev1.Namespace = &corev1.Namespace{}

BeforeEach(func() {
s = &syncSets{emqxReconciler}
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-v2beta1-sync-sets-test-" + rand.String(5),
Labels: map[string]string{
"test": "e2e",
},
},
}
instance = emqx.DeepCopy()
instance.Namespace = ns.Name
instance.Spec.RevisionHistoryLimit = pointer.Int32(3)
instance.Status = appsv2beta1.EMQXStatus{
Conditions: []metav1.Condition{
{
Type: appsv2beta1.Ready,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
},
}

Expect(k8sClient.Create(context.Background(), ns)).To(Succeed())
for i := 0; i < 5; i++ {
name := fmt.Sprintf("%s-%d", instance.Name, i)

rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: instance.Namespace,
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: appsv1.ReplicaSetSpec{
Replicas: pointer.Int32Ptr(0),
Selector: &metav1.LabelSelector{
MatchLabels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "emqx", Image: "emqx"},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), rs.DeepCopy())).Should(Succeed())
rs.Status.Replicas = 0
rs.Status.ObservedGeneration = 1
Expect(k8sClient.Status().Patch(context.Background(), rs.DeepCopy(), client.Merge)).Should(Succeed())

sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: instance.Namespace,
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: appsv1.StatefulSetSpec{
Replicas: pointer.Int32Ptr(0),
Selector: &metav1.LabelSelector{
MatchLabels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "emqx", Image: "emqx"},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), sts.DeepCopy())).Should(Succeed())
sts.Status.Replicas = 0
sts.Status.ObservedGeneration = 1
Expect(k8sClient.Status().Patch(context.Background(), sts.DeepCopy(), client.Merge)).Should(Succeed())

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: sts.Name,
Namespace: sts.Namespace,
Labels: sts.Labels,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
}
Expect(k8sClient.Create(context.Background(), pvc.DeepCopy())).Should(Succeed())
}
})

It("should delete rs sts and pvc", func() {
Expect(s.reconcile(context.Background(), instance, nil)).Should(Equal(subResult{}))

Eventually(func() int {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultReplicantLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))

Eventually(func() int {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultCoreLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))

Eventually(func() int {
list := &corev1.PersistentVolumeClaimList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultCoreLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))
})
})
Loading

0 comments on commit fb44305

Please sign in to comment.