Skip to content

Commit

Permalink
feat: restart pods when update configmap and secret
Browse files Browse the repository at this point in the history
Signed-off-by: zhanghongtong <[email protected]>
  • Loading branch information
Rory-Z committed Jan 14, 2022
1 parent 4e3c969 commit f43c8dc
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 34 deletions.
14 changes: 10 additions & 4 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,18 @@ jobs:
runs-on: ubuntu-latest
steps:
- run: minikube start
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: '1.17.3'
- uses: actions/checkout@v2
- uses: actions/cache@v2
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('go.sum') }}
restore-keys: |
${{ runner.os }}-go-
- name: install kubebuilder
run: |
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
Expand Down Expand Up @@ -61,12 +69,10 @@ jobs:
--create-namespace \
--version v1.6.1 \
--set installCRDs=true
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/setup-go@v2
with:
go-version: '1.17.3'
- uses: actions/checkout@v2
- name: Build image
env:
IMG: "emqx/emqx-operator-controller:${{ github.sha }}"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.16 as builder
FROM golang:1.17.3 as builder

WORKDIR /workspace
# Copy the Go Modules manifests
Expand Down
2 changes: 1 addition & 1 deletion config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ kind: Kustomization
images:
- name: controller
newName: emqx/emqx-operator-controller
newTag: 7549fa9
newTag: 86b1b70
2 changes: 1 addition & 1 deletion config/samples/operator/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7902,7 +7902,7 @@ spec:
- --leader-elect
command:
- /manager
image: emqx/emqx-operator-controller:7549fa9
image: emqx/emqx-operator-controller:86b1b70
livenessProbe:
httpGet:
path: /healthz
Expand Down
32 changes: 31 additions & 1 deletion controllers_suite/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -52,6 +53,20 @@ var _ = Describe("", func() {
"acl.conf": emqx.GetACL()["conf"],
}))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: emqx.GetName(),
Namespace: emqx.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("ACL/ResourceVersion", cm.ResourceVersion),
)
}
})

Expand All @@ -64,8 +79,8 @@ var _ = Describe("", func() {
client.RawPatch(types.MergePatchType, patch),
)).Should(Succeed())

cm := &corev1.ConfigMap{}
Eventually(func() map[string]string {
cm := &corev1.ConfigMap{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Expand All @@ -80,6 +95,21 @@ var _ = Describe("", func() {
"acl.conf": "{deny, all, pubsub, [\"#\"]}.\n",
},
))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: emqx.GetName(),
Namespace: emqx.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("ACL/ResourceVersion", cm.ResourceVersion),
)
}
// TODO: check acl status by emqx api
// TODO: test acl by mqtt pubsub
Expand Down
65 changes: 63 additions & 2 deletions controllers_suite/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -50,6 +51,21 @@ var _ = Describe("", func() {
Expect(cm.Data).Should(Equal(map[string]string{
"loaded_modules": broker.GetLoadedModules()["conf"],
}))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: broker.GetName(),
Namespace: broker.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedModules/ResourceVersion", cm.ResourceVersion),
)
})

It("Update emqx broker loaded modules", func() {
Expand All @@ -62,8 +78,8 @@ var _ = Describe("", func() {
client.RawPatch(types.MergePatchType, patch),
)).Should(Succeed())

cm := &corev1.ConfigMap{}
Eventually(func() map[string]string {
cm := &corev1.ConfigMap{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Expand All @@ -76,6 +92,21 @@ var _ = Describe("", func() {
}, timeout, interval).Should(Equal(
map[string]string{"loaded_modules": "{emqx_mod_presence, false}.\n"},
))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: broker.GetName(),
Namespace: broker.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedModules/ResourceVersion", cm.ResourceVersion),
)
// TODO: check modules status by emqx api
})

Expand All @@ -97,6 +128,21 @@ var _ = Describe("", func() {
Expect(cm.Data).Should(Equal(map[string]string{
"loaded_modules": enterprise.GetLoadedModules()["conf"],
}))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: enterprise.GetName(),
Namespace: enterprise.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedModules/ResourceVersion", cm.ResourceVersion),
)
})

It("Update emqx enterprise loaded modules", func() {
Expand All @@ -109,8 +155,8 @@ var _ = Describe("", func() {
client.RawPatch(types.MergePatchType, patch),
)).Should(Succeed())

cm := &corev1.ConfigMap{}
Eventually(func() map[string]string {
cm := &corev1.ConfigMap{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Expand All @@ -125,6 +171,21 @@ var _ = Describe("", func() {
"loaded_modules": "[{\"name\":\"internal_acl\",\"configs\":{\"acl_rule_file\":\"etc/acl.conf\"}}]",
},
))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: enterprise.GetName(),
Namespace: enterprise.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedModules/ResourceVersion", cm.ResourceVersion),
)
// TODO: check modules status by emqx api
})
})
Expand Down
34 changes: 33 additions & 1 deletion controllers_suite/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -50,6 +51,21 @@ var _ = Describe("", func() {
Expect(cm.Data).Should(Equal(map[string]string{
"loaded_plugins": emqx.GetLoadedPlugins()["conf"],
}))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: emqx.GetName(),
Namespace: emqx.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedPlugins/ResourceVersion", cm.ResourceVersion),
)
}
})

Expand All @@ -62,8 +78,8 @@ var _ = Describe("", func() {
client.RawPatch(types.MergePatchType, patch),
)).Should(Succeed())

cm := &corev1.ConfigMap{}
Eventually(func() map[string]string {
cm := &corev1.ConfigMap{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Expand All @@ -78,6 +94,22 @@ var _ = Describe("", func() {
"loaded_plugins": "{emqx_management, true}.\n{emqx_rule_engine, true}.\n",
},
))

Eventually(func() map[string]string {
sts := &appsv1.StatefulSet{}
_ = k8sClient.Get(
context.Background(),
types.NamespacedName{
Name: emqx.GetName(),
Namespace: emqx.GetNamespace(),
},
sts,
)
return sts.Spec.Template.Annotations
}, timeout, interval).Should(
HaveKeyWithValue("LoadedPlugins/ResourceVersion", cm.ResourceVersion),
)

}
// TODO: check plugins status by emqx api
})
Expand Down
1 change: 0 additions & 1 deletion controllers_suite/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ var _ = Describe("", func() {
}, timeout, interval).Should(Equal(*emqx.GetReplicas()))

Expect(sts.Spec.Template.Labels).Should(Equal(emqx.GetLabels()))
Expect(sts.Spec.Template.Annotations).Should(Equal(emqx.GetAnnotations()))
Expect(sts.Spec.Template.Spec.Affinity).Should(Equal(emqx.GetAffinity()))
Expect(sts.Spec.Template.Spec.Containers[0].ImagePullPolicy).Should(Equal(corev1.PullIfNotPresent))
Expect(sts.Spec.Template.Spec.Containers[0].Resources).Should(Equal(emqx.GetResource()))
Expand Down
File renamed without changes.
50 changes: 28 additions & 22 deletions pkg/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,25 @@ func (client *Client) EnsureEmqxConfigMapForLoadedPlugins(emqx v1beta1.Emqx, lab
}

func (client *Client) EnsureEmqxStatefulSet(emqx v1beta1.Emqx, labels map[string]string, ownerRefs []metav1.OwnerReference) error {
annotation := emqx.GetAnnotations()
if annotation == nil {
annotation = make(map[string]string)
}
if license, err := client.Secret.Get(emqx.GetNamespace(), emqx.GetSecretName()); err == nil {
annotation["License/ResourceVersion"] = license.ResourceVersion
}
if acl, err := client.ConfigMap.Get(emqx.GetNamespace(), emqx.GetACL()["name"]); err == nil {
annotation["ACL/ResourceVersion"] = acl.ResourceVersion
}
if plugins, err := client.ConfigMap.Get(emqx.GetNamespace(), emqx.GetLoadedPlugins()["name"]); err == nil {
annotation["LoadedPlugins/ResourceVersion"] = plugins.ResourceVersion
}
if modules, err := client.ConfigMap.Get(emqx.GetNamespace(), emqx.GetLoadedModules()["name"]); err == nil {
annotation["LoadedModules/ResourceVersion"] = modules.ResourceVersion
}

emqx.SetAnnotations(annotation)

new := NewEmqxStatefulSet(emqx, labels, ownerRefs)
old, err := client.StatefulSet.Get(emqx.GetNamespace(), emqx.GetName())
if err != nil {
Expand All @@ -202,28 +221,15 @@ func (client *Client) EnsureEmqxStatefulSet(emqx v1beta1.Emqx, labels map[string
return err
}

if broker, ok := emqx.(*v1beta1.EmqxBroker); ok {
if oldBroker, err := client.EmqxBroker.Get(
emqx.GetNamespace(),
emqx.GetName(),
); err == nil {
if *old.Spec.Replicas != *broker.Spec.Replicas || !reflect.DeepEqual(oldBroker.Spec, broker.Spec) {
new.ResourceVersion = old.ResourceVersion
return client.StatefulSet.Update(new)
}
}
}

if enterprise, ok := emqx.(*v1beta1.EmqxEnterprise); ok {
if oldEnterprise, err := client.EmqxEnterprise.Get(
emqx.GetNamespace(),
emqx.GetName(),
); err == nil {
if *old.Spec.Replicas != *enterprise.Spec.Replicas || !reflect.DeepEqual(oldEnterprise.Spec, enterprise.Spec) {
new.ResourceVersion = old.ResourceVersion
return client.StatefulSet.Update(new)
}
}
// Updates to statefulset spec for fields other than 'replicas', 'template', 'updateStrategy' and 'minReadySeconds' are forbidden
if !reflect.DeepEqual(old.Spec.Replicas, new.Spec.Replicas) ||
!reflect.DeepEqual(old.Spec.Template, new.Spec.Template) ||
!reflect.DeepEqual(old.Spec.UpdateStrategy, new.Spec.UpdateStrategy) {
new.ResourceVersion = old.ResourceVersion
old.Spec.Replicas = new.Spec.Replicas
old.Spec.Template = new.Spec.Template
old.Spec.UpdateStrategy = new.Spec.UpdateStrategy
return client.StatefulSet.Update(old)
}

return nil
Expand Down

0 comments on commit f43c8dc

Please sign in to comment.