Skip to content

Commit

Permalink
fix statefulSetUpdateNeeded, minor enhancements
Browse files Browse the repository at this point in the history
Signed-off-by: Nick Revin <[email protected]>
  • Loading branch information
nrvnrvn committed Aug 22, 2019
1 parent 2cc1ef9 commit ff8f1b2
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 65 deletions.
1 change: 1 addition & 0 deletions cmd/manager/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//vendor/sigs.k8s.io/controller-runtime/pkg/manager:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/runtime/log:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/runtime/signals:go_default_library",
"//version:go_default_library",
],
)

Expand Down
2 changes: 2 additions & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (

"github.com/amaizfinance/redis-operator/pkg/apis"
"github.com/amaizfinance/redis-operator/pkg/controller"
"github.com/amaizfinance/redis-operator/version"
)

// Change below variables to serve metrics on different host or port.
Expand All @@ -52,6 +53,7 @@ var (
var log = logf.Log.WithName("cmd")

func printVersion() {
log.Info(fmt.Sprintf("Redis-Operator Version: %s", version.Version))
log.Info(fmt.Sprintf("Go Version: %s", runtime.Version()))
log.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
log.Info(fmt.Sprintf("Version of operator-sdk: %v", sdkVersion.Version))
Expand Down
6 changes: 4 additions & 2 deletions deploy/ZZ_Deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ spec:
serviceAccountName: redis-operator
containers:
- name: redis-operator
image: amaiz/redis-operator:0.1.0
imagePullPolicy: Always
image: amaiz/redis-operator:v0.1.0
args:
- --zap-time-encoding
- iso8601
env:
- name: WATCH_NAMESPACE # left empty to watch all namespaces
- name: OPERATOR_NAME
Expand Down
1 change: 1 addition & 0 deletions hack/print-status.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ set -o nounset
set -o pipefail

cat <<EOF
Version ${DRONE_TAG:-latest}
ImageTag ${DRONE_TAG:-latest}
EOF
65 changes: 35 additions & 30 deletions pkg/controller/redis/object_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ const (
dataMountPath = "/data"
workingDir = dataMountPath

// environment variables
rediscliAuthEnvName = "REDISCLI_AUTH"

// argon2id parameters.
// Recommended parameters are time = 1, Memory = 65536.
// Below parameters are equivalent(time-wise) to time = 4, Memory = 65536.
Expand Down Expand Up @@ -153,7 +156,7 @@ func generateSecret(r *k8sv1alpha1.Redis, password string) *corev1.Secret {
_, _ = fmt.Fprintf(&b, authConfTemplate, password)

return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.GetNamespace(), Labels: r.GetLabels()},
Data: map[string][]byte{secretFileName: []byte(b.String())},
}
}
Expand All @@ -162,7 +165,7 @@ func generateConfigMap(r *k8sv1alpha1.Redis, master redis.Address) *corev1.Confi
var b strings.Builder
defer b.Reset()
// explicitly set the working directory
_, _ = fmt.Fprintf(&b, "# Generated by redis-operator for redis.k8s.amaiz.com/%s\ndir %s\n", r.Name, workingDir)
_, _ = fmt.Fprintf(&b, "# Generated by redis-operator for redis.k8s.amaiz.com/%s\ndir %s\n", r.GetName(), workingDir)

if r.Spec.Password.SecretKeyRef != nil {
_, _ = fmt.Fprintf(&b, "include %s\n", secretMountPath)
Expand All @@ -179,25 +182,25 @@ func generateConfigMap(r *k8sv1alpha1.Redis, master redis.Address) *corev1.Confi
}

return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.GetNamespace(), Labels: r.GetLabels()},
Data: map[string]string{configFileName: b.String()}}
}

func generateService(r *k8sv1alpha1.Redis, serviceType int) *corev1.Service {
var name, clusterIP string
var selector map[string]string
labels := make(map[string]string)
for k, v := range r.Labels {
for k, v := range r.GetLabels() {
labels[k] = v
}

switch serviceType {
case serviceTypeAll:
name = generateName(r)
selector = r.Labels
selector = r.GetLabels()
case serviceTypeHeadless:
name = fmt.Sprintf("%s-%s", generateName(r), headlessServiceTypeLabel)
selector = r.Labels
selector = r.GetLabels()
labels[headlessServiceTypeLabelKey] = headlessServiceTypeLabel
clusterIP = corev1.ClusterIPNone
case serviceTypeMaster:
Expand All @@ -223,7 +226,7 @@ func generateService(r *k8sv1alpha1.Redis, serviceType int) *corev1.Service {
}

return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: r.Namespace, Labels: labels},
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: r.GetNamespace(), Labels: labels},
Spec: corev1.ServiceSpec{
Ports: ports,
Selector: selector,
Expand All @@ -235,10 +238,10 @@ func generateService(r *k8sv1alpha1.Redis, serviceType int) *corev1.Service {

func generatePodDisruptionBudget(r *k8sv1alpha1.Redis) *policyv1beta1.PodDisruptionBudget {
return &policyv1beta1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels},
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.GetNamespace(), Labels: r.GetLabels()},
Spec: policyv1beta1.PodDisruptionBudgetSpec{
MinAvailable: &[]intstr.IntOrString{intstr.FromInt(redis.MinimumFailoverSize)}[0],
Selector: &metav1.LabelSelector{MatchLabels: r.Labels},
Selector: &metav1.LabelSelector{MatchLabels: r.GetLabels()},
},
}
}
Expand Down Expand Up @@ -300,7 +303,7 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) *appsv1.Stateful
})

containers[0].Env = []corev1.EnvVar{{
Name: "REDISCLI_AUTH",
Name: rediscliAuthEnvName,
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: r.Spec.Password.SecretKeyRef,
},
Expand Down Expand Up @@ -362,13 +365,13 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) *appsv1.Stateful
}

s := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.Namespace, Labels: r.Labels, Annotations: make(map[string]string)},
ObjectMeta: metav1.ObjectMeta{Name: generateName(r), Namespace: r.GetNamespace(), Labels: r.GetLabels(), Annotations: make(map[string]string)},
Spec: appsv1.StatefulSetSpec{
Replicas: r.Spec.Replicas,
Selector: &metav1.LabelSelector{MatchLabels: r.Labels},
Selector: &metav1.LabelSelector{MatchLabels: r.GetLabels()},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: r.Labels,
Labels: r.GetLabels(),
Annotations: r.Spec.Annotations,
},
Spec: corev1.PodSpec{
Expand Down Expand Up @@ -399,8 +402,8 @@ func generateStatefulSet(r *k8sv1alpha1.Redis, password string) *appsv1.Stateful

// state checkers
func secretUpdateNeeded(got, want *corev1.Secret) (needed bool) {
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
if !mapsEqual(got.GetLabels(), want.GetLabels()) {
got.SetLabels(want.GetLabels())
needed = true
}
if !reflect.DeepEqual(got.Data, want.Data) {
Expand All @@ -411,8 +414,8 @@ func secretUpdateNeeded(got, want *corev1.Secret) (needed bool) {
}

func configMapUpdateNeeded(got, want *corev1.ConfigMap) (needed bool) {
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
if !mapsEqual(got.GetLabels(), want.GetLabels()) {
got.SetLabels(want.GetLabels())
needed = true
}
if !strings.Contains(got.Data[configFileName], want.Data[configFileName]) {
Expand All @@ -423,8 +426,8 @@ func configMapUpdateNeeded(got, want *corev1.ConfigMap) (needed bool) {
}

func serviceUpdateNeeded(got, want *corev1.Service) (needed bool) {
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
if !mapsEqual(got.GetLabels(), want.GetLabels()) {
got.SetLabels(want.GetLabels())
needed = true
}
if !mapsEqual(got.Spec.Selector, want.Spec.Selector) {
Expand All @@ -442,39 +445,41 @@ func podDisruptionBudgetUpdateNeeded(got, want *policyv1beta1.PodDisruptionBudge
// updating PDB spec is forbidden
// TODO: keep an eye on https://github.com/kubernetes/kubernetes/issues/45398
// bring back PDB spec comparison once the minimum supported k8s version is 1.15
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
if !mapsEqual(got.GetLabels(), want.GetLabels()) {
got.SetLabels(want.GetLabels())
return true
}
return
}

func statefulSetUpdateNeeded(got, want *appsv1.StatefulSet) (needed bool) {
if !mapsEqual(got.Labels, want.Labels) {
got.Labels = want.Labels
if *got.Spec.Replicas != *want.Spec.Replicas {
got.Spec.Replicas = want.Spec.Replicas
needed = true
}
if !mapsEqual(got.Annotations, want.Annotations) {
got.Annotations = want.Annotations

if !deepContains(got.Spec.Template, want.Spec.Template) || (got.Annotations[hashAnnotationKey] != want.Annotations[hashAnnotationKey]) {
got.Spec.Template = want.Spec.Template
needed = true
}

if *got.Spec.Replicas != *want.Spec.Replicas {
got.Spec.Replicas = want.Spec.Replicas
if !mapsEqual(got.GetLabels(), want.GetLabels()) {
got.SetLabels(want.GetLabels())
needed = true
}

if !deepContains(got.Spec.Template, want.Spec.Template) || (got.Annotations[hashAnnotationKey] != want.Annotations[hashAnnotationKey]) {
got.Spec.Template = want.Spec.Template
if !mapsEqual(got.Annotations, want.Annotations) {
got.SetAnnotations(want.Annotations)
needed = true
}

return
}

// generateName returns generic name for all owned resources.
// It should be used as a prefix for all resources requiring more specific naming scheme.
func generateName(r *k8sv1alpha1.Redis) string {
return fmt.Sprintf(namePrefixTemplate, r.Name)
return fmt.Sprintf(namePrefixTemplate, r.GetName())
}

// mapsEqual compares two plain map[string]string values
Expand Down
59 changes: 35 additions & 24 deletions pkg/controller/redis/redis_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ func (reconciler *ReconcileRedis) Reconcile(request reconcile.Request) (reconcil
loggerDebug := logger.V(1).Info
loggerDebug("Reconciling Redis")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Fetch the Redis instance
fetchedRedis := new(k8sv1alpha1.Redis)
if err := reconciler.client.Get(context.TODO(), request.NamespacedName, fetchedRedis); err != nil {
if err := reconciler.client.Get(ctx, request.NamespacedName, fetchedRedis); err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
Expand All @@ -140,16 +143,15 @@ func (reconciler *ReconcileRedis) Reconcile(request reconcile.Request) (reconcil
if redisObject.Labels == nil {
redisObject.Labels = make(map[string]string)
}
redisObject.Labels[redisName] = redisObject.Name
redisObject.Labels[redisName] = redisObject.GetName()

// read password from Secret
if redisObject.Spec.Password.SecretKeyRef != nil {
passwordSecret := new(corev1.Secret)
if err := reconciler.client.Get(
context.TODO(),
types.NamespacedName{Namespace: request.Namespace, Name: redisObject.Spec.Password.SecretKeyRef.Name},
passwordSecret,
); err != nil {
if err := reconciler.client.Get(ctx, types.NamespacedName{
Namespace: request.Namespace,
Name: redisObject.Spec.Password.SecretKeyRef.Name,
}, passwordSecret); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to fetch password: %s", err)
}

Expand Down Expand Up @@ -186,43 +188,44 @@ func (reconciler *ReconcileRedis) Reconcile(request reconcile.Request) (reconcil
continue
}

if result, err := reconciler.createOrUpdate(object, redisObject, options); err != nil {
if result, err := reconciler.createOrUpdate(ctx, object, redisObject, options); err != nil {
return reconcile.Result{}, err
} else if result.Requeue {
logger.Info(fmt.Sprintf("Applied %T", object))
logger.Info(fmt.Sprintf("Applied %#v", object))
return result, nil
}
}

// all the kubernetes resources are OK.
// Redis failover state should be checked and reconfigured if needed.
podList := new(corev1.PodList)
if err := reconciler.client.List(
context.TODO(),
&client.ListOptions{Namespace: request.Namespace, LabelSelector: labels.SelectorFromSet(redisObject.Labels)},
podList,
); err != nil {
if err := reconciler.client.List(ctx, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(redisObject.Labels),
Namespace: request.Namespace,
}, podList); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list Pods: %s", err)
}

var addrs []redis.Address
var addresses []redis.Address

podIter:
// filter out pods without assigned IP addresses and not having all containers ready
for i := range podList.Items {
if podList.Items[i].Status.Phase != corev1.PodRunning || podList.Items[i].Status.PodIP == "" {
continue
}

for _, status := range podList.Items[i].Status.ContainerStatuses {
if !status.Ready {
continue podIter
}
}
addrs = append(addrs, redis.Address{Host: podList.Items[i].Status.PodIP, Port: strconv.Itoa(redis.Port)})

addresses = append(addresses, redis.Address{Host: podList.Items[i].Status.PodIP, Port: strconv.Itoa(redis.Port)})
}

// Run Redis Replication Reconfiguration
replication, err := redis.New(options.password, addrs...)
replication, err := redis.New(options.password, addresses...)
if err != nil {
// This is considered part of normal operation - return and requeue
logger.Info("Error creating Redis replication, requeue", "error", err)
Expand Down Expand Up @@ -281,7 +284,7 @@ podIter:
}
pod.Labels[roleLabelKey] = replicaLabel
}
if err := reconciler.client.Update(context.TODO(), &pod); err != nil {
if err := reconciler.client.Update(ctx, &pod); err != nil {
errChan <- err
}
}(podList.Items[i], master.Host, &wg)
Expand All @@ -306,7 +309,7 @@ podIter:

// update configmap with the current master's IP address
options.master = master
if result, err := reconciler.createOrUpdate(new(corev1.ConfigMap), redisObject, options); err != nil {
if result, err := reconciler.createOrUpdate(ctx, new(corev1.ConfigMap), redisObject, options); err != nil {
return result, err
} else if result.Requeue {
logger.Info("Updated ConfigMap")
Expand All @@ -321,7 +324,7 @@ podIter:

fetchedRedis.Status.Replicas = replication.Size()
fetchedRedis.Status.Master = masterPodName
if err := reconciler.client.Status().Update(context.TODO(), fetchedRedis); err != nil {
if err := reconciler.client.Status().Update(ctx, fetchedRedis); err != nil {
if errors.IsConflict(err) {
loggerDebug("Conflict updating Redis status, requeue")
return reconcile.Result{Requeue: true}, nil
Expand All @@ -336,17 +339,25 @@ podIter:
// passing an empty instance implementing runtime.Object will generate the appropriate ``expected'' object,
// create an object if it does not exist, compare the existing object with the generated one and update if needed.
// the Result.Requeue will be true if the object was successfully created or updated or in case there was a conflict updating the object.
func (reconciler *ReconcileRedis) createOrUpdate(object runtime.Object, redis *k8sv1alpha1.Redis, options objectGeneratorOptions) (result reconcile.Result, err error) {
func (reconciler *ReconcileRedis) createOrUpdate(
ctx context.Context,
object runtime.Object,
redis *k8sv1alpha1.Redis,
options objectGeneratorOptions,
) (result reconcile.Result, err error) {
generatedObject := generateObject(redis, object, options)
objectMeta := generatedObject.(metav1.Object)

if err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: objectMeta.GetName(), Namespace: redis.Namespace}, object); err != nil {
if err = reconciler.client.Get(ctx, types.NamespacedName{
Namespace: redis.GetNamespace(),
Name: objectMeta.GetName(),
}, object); err != nil {
if errors.IsNotFound(err) {
// Set Redis instance as the owner and controller
if err = controllerutil.SetControllerReference(redis, objectMeta, reconciler.scheme); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set owner for Object: %s", err)
}
if err = reconciler.client.Create(context.TODO(), generatedObject); err != nil && !errors.IsAlreadyExists(err) {
if err = reconciler.client.Create(ctx, generatedObject); err != nil && !errors.IsAlreadyExists(err) {
return reconcile.Result{}, fmt.Errorf("failed to create Object: %s", err)
}
return reconcile.Result{Requeue: true}, nil
Expand All @@ -358,7 +369,7 @@ func (reconciler *ReconcileRedis) createOrUpdate(object runtime.Object, redis *k
return
}

if err = reconciler.client.Update(context.TODO(), object); err != nil {
if err = reconciler.client.Update(ctx, object); err != nil {
if errors.IsConflict(err) {
// conflicts can be common, consider it part of normal operation
return reconcile.Result{Requeue: true}, nil
Expand Down
Loading

0 comments on commit ff8f1b2

Please sign in to comment.