Skip to content

Commit

Permalink
feat(v2beta1): add new field of revisionHistoryLimit
Browse files Browse the repository at this point in the history
Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Aug 31, 2023
1 parent 6e00d27 commit af3199b
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 45 deletions.
7 changes: 7 additions & 0 deletions apis/apps/v2beta1/emqx_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,20 @@ type EMQXSpec struct {
//+kubebuilder:default:="cluster.local"
ClusterDomain string `json:"clusterDomain,omitempty"`

// The number of old ReplicaSets, old StatefulSet and old PersistentVolumeClaim to retain to allow rollback.
// This is a pointer to distinguish between explicit zero and not specified.
// Defaults to 3.
// +kubebuilder:default:=3
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`

// UpdateStrategy is the object that describes the EMQX blue-green update strategy
//+kubebuilder:default={type:Recreate,initialDelaySeconds:10,evacuationStrategy:{waitTakeover:10,connEvictRate:1000,sessEvictRate:1000}}
UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"`

// CoreTemplate is the object that describes the EMQX core node that will be created
//+kubebuilder:default={spec:{replicas:2}}
CoreTemplate EMQXCoreTemplate `json:"coreTemplate,omitempty"`

// ReplicantTemplate is the object that describes the EMQX replicant node that will be created
ReplicantTemplate *EMQXReplicantTemplate `json:"replicantTemplate,omitempty"`

Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions config/crd/bases/apps.emqx.io_emqxes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12746,6 +12746,10 @@ spec:
type: array
type: object
type: object
revisionHistoryLimit:
default: 3
format: int32
type: integer
serviceAccountName:
type: string
updateStrategy:
Expand Down
1 change: 1 addition & 0 deletions config/samples/emqx/v2beta1/emqx-full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ metadata:
spec:
image: "emqx:5.1"
imagePullPolicy: Always
revisionHistoryLimit: 3
config:
data: |
dashboard.listeners.http.bind = 18083
Expand Down
5 changes: 3 additions & 2 deletions controllers/apps/v2beta1/add_emqx_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (a *addCore) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "patch", string(patchResult.Patch))
logger.Info("got different statefulSet for EMQX core nodes, will update statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preSts); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update statefulSet")}
Expand Down Expand Up @@ -146,7 +147,7 @@ func (a *addCore) getNewStatefulSet(ctx context.Context, instance *appsv2beta1.E
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "patch", string(patchResult.Patch))
logger.Info("got different pod template for EMQX core nodes, will create new statefulSet", "statefulSet", klog.KObj(preSts), "patch", string(patchResult.Patch))
return preSts, nil
}

Expand Down
6 changes: 4 additions & 2 deletions controllers/apps/v2beta1/add_emqx_repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -69,7 +70,7 @@ func (a *addRepl) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, _ i
)
if !patchResult.IsEmpty() {
logger := log.FromContext(ctx)
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "patch", string(patchResult.Patch))
logger.Info("got different replicaSet for EMQX replicant nodes, will update replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

if err := a.Handler.Update(preRs); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update replicaSet")}
Expand Down Expand Up @@ -150,8 +151,9 @@ func (a *addRepl) getNewReplicaSet(ctx context.Context, instance *appsv2beta1.EM
preRs.Spec.Selector = updateRs.DeepCopy().Spec.Selector
return preRs, nil
}

logger := log.FromContext(ctx)
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "patch", string(patchResult.Patch))
logger.Info("got different pod template for EMQX replicant nodes, will create new replicaSet", "replicaSet", klog.KObj(preRs), "patch", string(patchResult.Patch))

return preRs, nil
}
Expand Down
1 change: 1 addition & 0 deletions controllers/apps/v2beta1/emqx_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (r *EMQXReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
&updatePodConditions{r},
&updateStatus{r},
&syncPods{r},
&syncSets{r},
} {
subResult := subReconciler.reconcile(ctx, instance, requester)
if !subResult.result.IsZero() {
Expand Down
76 changes: 76 additions & 0 deletions controllers/apps/v2beta1/sync_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package v2beta1

import (
"context"

appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1"
innerReq "github.com/emqx/emqx-operator/internal/requester"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

type syncSets struct {
*EMQXReconciler
}

func (s *syncSets) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
if !instance.Status.IsConditionTrue(appsv2beta1.Ready) {
return subResult{}
}
logger := log.FromContext(ctx)

_, _, oldRsList := getReplicaSetList(ctx, s.Client, instance)
rsDiff := int32(len(oldRsList)) - *instance.Spec.RevisionHistoryLimit
if rsDiff > 0 {
for i := 0; i < int(rsDiff); i++ {
rs := oldRsList[i].DeepCopy()
// Avoid delete replica set with non-zero replica counts
if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
continue

Check warning on line 32 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L32

Added line #L32 was not covered by tests
}
logger.Info("trying to cleanup replica set for EMQX", "replicaSet", klog.KObj(rs), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, rs); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}

Check warning on line 37 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L36-L37

Added lines #L36 - L37 were not covered by tests
}
}

_, _, oldStsList := getStateFulSetList(ctx, s.Client, instance)
stsDiff := int32(len(oldStsList)) - *instance.Spec.RevisionHistoryLimit
if stsDiff > 0 {
for i := 0; i < int(rsDiff); i++ {
sts := oldStsList[i].DeepCopy()
// Avoid delete stateful set with non-zero replica counts
if sts.Status.Replicas != 0 || *(sts.Spec.Replicas) != 0 || sts.Generation > sts.Status.ObservedGeneration || sts.DeletionTimestamp != nil {
continue

Check warning on line 48 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L48

Added line #L48 was not covered by tests
}
logger.Info("trying to cleanup stateful set for EMQX", "statefulSet", klog.KObj(sts), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, sts); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}

Check warning on line 53 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L52-L53

Added lines #L52 - L53 were not covered by tests

// Delete PVCs
pvcList := &corev1.PersistentVolumeClaimList{}
_ = s.Client.List(ctx, pvcList,
client.InNamespace(instance.Namespace),
client.MatchingLabels(sts.Spec.Selector.MatchLabels),
)

for _, p := range pvcList.Items {
pvc := p.DeepCopy()
if pvc.DeletionTimestamp != nil {
continue

Check warning on line 65 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L65

Added line #L65 was not covered by tests
}
logger.Info("trying to cleanup pvc for EMQX", "pvc", klog.KObj(pvc), "EMQX", klog.KObj(instance))
if err := s.Client.Delete(ctx, pvc); err != nil && !k8sErrors.IsNotFound(err) {
return subResult{err: err}
}

Check warning on line 70 in controllers/apps/v2beta1/sync_sets.go

View check run for this annotation

Codecov / codecov/patch

controllers/apps/v2beta1/sync_sets.go#L69-L70

Added lines #L69 - L70 were not covered by tests
}
}
}

return subResult{}
}
203 changes: 203 additions & 0 deletions controllers/apps/v2beta1/sync_sets_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package v2beta1

import (
"context"
"fmt"
"time"

appsv2beta1 "github.com/emqx/emqx-operator/apis/apps/v2beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Check sync rs", func() {
var s *syncSets

var instance *appsv2beta1.EMQX = new(appsv2beta1.EMQX)
var ns *corev1.Namespace = &corev1.Namespace{}

BeforeEach(func() {
s = &syncSets{emqxReconciler}
ns = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "controller-v2beta1-sync-sets-test-" + rand.String(5),
Labels: map[string]string{
"test": "e2e",
},
},
}
instance = emqx.DeepCopy()
instance.Namespace = ns.Name
instance.Spec.RevisionHistoryLimit = pointer.Int32(3)
instance.Status = appsv2beta1.EMQXStatus{
Conditions: []metav1.Condition{
{
Type: appsv2beta1.Ready,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Time{Time: time.Now().AddDate(0, 0, -1)},
},
},
}

Expect(k8sClient.Create(context.Background(), ns)).To(Succeed())
for i := 0; i < 5; i++ {
name := fmt.Sprintf("%s-%d", instance.Name, i)

rs := &appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: instance.Namespace,
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: appsv1.ReplicaSetSpec{
Replicas: pointer.Int32Ptr(0),
Selector: &metav1.LabelSelector{
MatchLabels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultReplicantLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "emqx", Image: "emqx"},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), rs.DeepCopy())).Should(Succeed())
rs.Status.Replicas = 0
rs.Status.ObservedGeneration = 1
Expect(k8sClient.Status().Patch(context.Background(), rs.DeepCopy(), client.Merge)).Should(Succeed())

sts := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: instance.Namespace,
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: appsv1.StatefulSetSpec{
Replicas: pointer.Int32Ptr(0),
Selector: &metav1.LabelSelector{
MatchLabels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: appsv2beta1.CloneAndAddLabel(
appsv2beta1.DefaultCoreLabels(instance),
appsv2beta1.LabelsPodTemplateHashKey,
fmt.Sprintf("fake-%d", i),
),
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{Name: "emqx", Image: "emqx"},
},
},
},
},
}
Expect(k8sClient.Create(context.Background(), sts.DeepCopy())).Should(Succeed())
sts.Status.Replicas = 0
sts.Status.ObservedGeneration = 1
Expect(k8sClient.Status().Patch(context.Background(), sts.DeepCopy(), client.Merge)).Should(Succeed())

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: sts.Name,
Namespace: sts.Namespace,
Labels: sts.Labels,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{corev1.ReadWriteOnce},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse("1Gi"),
},
},
},
}
Expect(k8sClient.Create(context.Background(), pvc.DeepCopy())).Should(Succeed())
}
})

It("should delete rs sts and pvc", func() {
Expect(s.reconcile(context.Background(), instance, nil)).Should(Equal(subResult{}))

Eventually(func() int {
list := &appsv1.ReplicaSetList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultReplicantLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))

Eventually(func() int {
list := &appsv1.StatefulSetList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultCoreLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))

Eventually(func() int {
list := &corev1.PersistentVolumeClaimList{}
_ = k8sClient.List(context.Background(), list,
client.InNamespace(instance.Namespace),
client.MatchingLabels(appsv2beta1.DefaultCoreLabels(instance)),
)
count := 0
for _, i := range list.Items {
item := i.DeepCopy()
if item.DeletionTimestamp == nil {
count++
}
}
return count
}).WithTimeout(timeout).WithPolling(interval).Should(BeEquivalentTo(*instance.Spec.RevisionHistoryLimit))
})
})
Loading

0 comments on commit af3199b

Please sign in to comment.