diff --git a/apis/apps/v2beta1/const.go b/apis/apps/v2beta1/const.go index 55e353782..48cfaa424 100644 --- a/apis/apps/v2beta1/const.go +++ b/apis/apps/v2beta1/const.go @@ -14,6 +14,11 @@ const ( LabelsPodTemplateHashKey string = "apps.emqx.io/pod-template-hash" ) +const ( + // annotations + AnnotationsLastEMQXConfigKey string = "apps.emqx.io/last-emqx-configuration" +) + const ( // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate PodOnServing corev1.PodConditionType = "apps.emqx.io/on-serving" diff --git a/controllers/apps/v2beta1/sync_emqx_config.go b/controllers/apps/v2beta1/sync_emqx_config.go index 70ed7f3ac..7c27cd0e2 100644 --- a/controllers/apps/v2beta1/sync_emqx_config.go +++ b/controllers/apps/v2beta1/sync_emqx_config.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "reflect" "strings" emperror "emperror.dev/errors" @@ -11,7 +12,6 @@ import ( innerReq "github.com/emqx/emqx-operator/internal/requester" "github.com/rory-z/go-hocon" corev1 "k8s.io/api/core/v1" - k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -21,32 +21,19 @@ type syncConfig struct { } func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult { - defaultListenerConfig := "" - defaultListenerConfig += fmt.Sprintln("listeners.tcp.default.bind = 1883") - defaultListenerConfig += fmt.Sprintln("listeners.ssl.default.bind = 8883") - defaultListenerConfig += fmt.Sprintln("listeners.ws.default.bind = 8083") - defaultListenerConfig += fmt.Sprintln("listeners.wss.default.bind = 8084") - - hoconConfig, _ := hocon.ParseString(defaultListenerConfig + instance.Spec.Config.Data) - configMap := generateConfigMap(instance, hoconConfig.String()) + hoconConfig := mergeDefaultConfig(instance.Spec.Config.Data) + confStr := hoconConfig.String() - storageConfigMap := &corev1.ConfigMap{} - if err := s.Client.Get(ctx, client.ObjectKeyFromObject(configMap), storageConfigMap); err != nil { - if k8sErrors.IsNotFound(err) { - if err := s.Handler.Create(configMap); err != nil { - return subResult{err: emperror.Wrap(err, "failed to create configMap")} - } - return subResult{} + lastConfigStr, ok := instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] + if !ok { + if err := s.update(ctx, instance, confStr); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update emqx config")} } - return subResult{err: emperror.Wrap(err, "failed to get configMap")} + return subResult{} } - patchResult, _ := s.Patcher.Calculate( - storageConfigMap.DeepCopy(), - configMap.DeepCopy(), - ) - - if !patchResult.IsEmpty() && r != nil { + lastHoconConfig, _ := hocon.ParseString(lastConfigStr) + if !reflect.DeepEqual(hoconConfig, lastHoconConfig) { _, coreReady := instance.Status.GetCondition(appsv2beta1.CoreNodesReady) if coreReady == nil || !instance.Status.IsConditionTrue(appsv2beta1.CoreNodesReady) { return subResult{} @@ -62,14 +49,44 @@ func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, return subResult{err: emperror.Wrap(err, "failed to put emqx config")} } - if err := s.Client.Update(ctx, configMap); err != nil { - return subResult{err: emperror.Wrap(err, "failed to update configMap")} + if err := s.update(ctx, instance, confStr); err != nil { + return subResult{err: emperror.Wrap(err, "failed to update emqx config")} } + + return subResult{} } return subResult{} } +func (s *syncConfig) update(ctx context.Context, instance *appsv2beta1.EMQX, confStr string) error { + configMap := generateConfigMap(instance, confStr) + if err := s.Handler.CreateOrUpdateList(instance, s.Scheme, []client.Object{configMap}); err != nil { + return emperror.Wrap(err, "failed to create or update configMap") + } + + if instance.Annotations == nil { + instance.Annotations = map[string]string{} + } + instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] = confStr + if err := s.Client.Update(ctx, instance); err != nil { + return emperror.Wrap(err, "failed to update emqx instance annotation") + } + + return nil +} + +func mergeDefaultConfig(config string) *hocon.Config { + defaultListenerConfig := "" + defaultListenerConfig += fmt.Sprintln("listeners.tcp.default.bind = 1883") + defaultListenerConfig += fmt.Sprintln("listeners.ssl.default.bind = 8883") + defaultListenerConfig += fmt.Sprintln("listeners.ws.default.bind = 8083") + defaultListenerConfig += fmt.Sprintln("listeners.wss.default.bind = 8084") + + hoconConfig, _ := hocon.ParseString(defaultListenerConfig + config) + return hoconConfig +} + func generateConfigMap(instance *appsv2beta1.EMQX, data string) *corev1.ConfigMap { return &corev1.ConfigMap{ TypeMeta: metav1.TypeMeta{