Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix) registry pods do not come up again after node failure #3366

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions pkg/controller/registry/reconciler/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package reconciler

import (
"context"
"errors"
"fmt"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash"
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"

"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
Expand Down Expand Up @@ -327,27 +329,27 @@ func (c *ConfigMapRegistryReconciler) EnsureRegistryServer(logger *logrus.Entry,

//TODO: if any of these error out, we should write a status back (possibly set RegistryServiceStatus to nil so they get recreated)
if err := c.ensureServiceAccount(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
return pkgerrors.Wrapf(err, "error ensuring service account: %s", source.serviceAccountName())
}
if err := c.ensureRole(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring role: %s", source.roleName())
return pkgerrors.Wrapf(err, "error ensuring role: %s", source.roleName())
}
if err := c.ensureRoleBinding(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
return pkgerrors.Wrapf(err, "error ensuring rolebinding: %s", source.RoleBinding().GetName())
}
pod, err := source.Pod(image, defaultPodSecurityConfig)
if err != nil {
return err
}
if err := c.ensurePod(source, defaultPodSecurityConfig, overwritePod); err != nil {
return errors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
return pkgerrors.Wrapf(err, "error ensuring pod: %s", pod.GetName())
}
service, err := source.Service()
if err != nil {
return err
}
if err := c.ensureService(source, overwrite); err != nil {
return errors.Wrapf(err, "error ensuring service: %s", service.GetName())
return pkgerrors.Wrapf(err, "error ensuring service: %s", service.GetName())
}

if overwritePod {
Expand Down Expand Up @@ -420,15 +422,15 @@ func (c *ConfigMapRegistryReconciler) ensurePod(source configMapCatalogSourceDec
}
for _, p := range currentPods {
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Delete(context.TODO(), p.GetName(), *metav1.NewDeleteOptions(1)); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "error deleting old pod: %s", p.GetName())
return pkgerrors.Wrapf(err, "error deleting old pod: %s", p.GetName())
}
}
}
_, err = c.OpClient.KubernetesInterface().CoreV1().Pods(pod.GetNamespace()).Create(context.TODO(), pod, metav1.CreateOptions{})
if err == nil {
return nil
}
return errors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
return pkgerrors.Wrapf(err, "error creating new pod: %s", pod.GetGenerateName())
}

func (c *ConfigMapRegistryReconciler) ensureService(source configMapCatalogSourceDecorator, overwrite bool) error {
Expand Down Expand Up @@ -512,6 +514,34 @@ func (c *ConfigMapRegistryReconciler) CheckRegistryServer(logger *logrus.Entry,
return
}

healthy = true
return
podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, pods, source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// detectAndDeleteDeadPods determines if there are registry client pods that are in the deleted state
// but have not been removed by GC (eg the node goes down before GC can remove them), and attempts to
// force delete the pods. If there are live registry pods remaining, it returns true, otherwise returns false.
func detectAndDeleteDeadPods(logger *logrus.Entry, client operatorclient.ClientInterface, pods []*corev1.Pod, sourceNamespace string) (bool, error) {
var forceDeletionErrs []error
livePodFound := false
for _, pod := range pods {
if !isPodDead(pod) {
livePodFound = true
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Debug("pod is alive")
continue
}
logger.WithFields(logrus.Fields{"pod.namespace": sourceNamespace, "pod.name": pod.GetName()}).Info("force deleting dead pod")
if err := client.KubernetesInterface().CoreV1().Pods(sourceNamespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
}); err != nil && !apierrors.IsNotFound(err) {
forceDeletionErrs = append(forceDeletionErrs, err)
}
}
if len(forceDeletionErrs) > 0 {
return false, errors.Join(forceDeletionErrs...)
}
return livePodFound, nil
}
52 changes: 52 additions & 0 deletions pkg/controller/registry/reconciler/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,55 @@ func TestConfigMapRegistryReconciler(t *testing.T) {
})
}
}

func TestConfigMapRegistryChecker(t *testing.T) {
validConfigMap := validConfigMap()
validCatalogSource := validConfigMapCatalogSource(validConfigMap)
type cluster struct {
k8sObjs []runtime.Object
}
type in struct {
cluster cluster
catsrc *v1alpha1.CatalogSource
}
type out struct {
healthy bool
err error
}
tests := []struct {
testName string
in in
out out
}{
{
testName: "ConfigMap/ExistingRegistry/DeadPod",
in: in{
cluster: cluster{
k8sObjs: append(withPodDeletedButNotRemoved(objectsForCatalogSource(t, validCatalogSource)), validConfigMap),
},
catsrc: validCatalogSource,
},
out: out{
healthy: false,
},
},
}
for _, tt := range tests {
t.Run(tt.testName, func(t *testing.T) {
stopc := make(chan struct{})
defer close(stopc)

factory, _ := fakeReconcilerFactory(t, stopc, withK8sObjs(tt.in.cluster.k8sObjs...))
rec := factory.ReconcilerForSource(tt.in.catsrc)

healthy, err := rec.CheckRegistryServer(logrus.NewEntry(logrus.New()), tt.in.catsrc)

require.Equal(t, tt.out.err, err)
if tt.out.err != nil {
return
}

require.Equal(t, tt.out.healthy, healthy)
})
}
}
33 changes: 7 additions & 26 deletions pkg/controller/registry/reconciler/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package reconciler

import (
"context"
"errors"
"fmt"
"slices"
"strings"
"time"

Expand All @@ -24,7 +22,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -348,25 +345,6 @@ func isRegistryServiceStatusValid(source *grpcCatalogSourceDecorator) (bool, err
func (c *GrpcRegistryReconciler) ensurePod(logger *logrus.Entry, source grpcCatalogSourceDecorator, serviceAccount *corev1.ServiceAccount, defaultPodSecurityConfig v1alpha1.SecurityConfig, overwrite bool) error {
// currentPods refers to the current pod instances of the catalog source
currentPods := c.currentPods(logger, source)

var forceDeleteErrs []error
currentPods = slices.DeleteFunc(currentPods, func(pod *corev1.Pod) bool {
if !isPodDead(pod) {
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Debug("pod is alive")
return false
}
logger.WithFields(logrus.Fields{"pod.namespace": source.GetNamespace(), "pod.name": pod.GetName()}).Info("force deleting dead pod")
if err := c.OpClient.KubernetesInterface().CoreV1().Pods(source.GetNamespace()).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: ptr.To[int64](0),
}); err != nil && !apierrors.IsNotFound(err) {
forceDeleteErrs = append(forceDeleteErrs, pkgerrors.Wrapf(err, "error deleting old pod: %s", pod.GetName()))
}
return true
})
if len(forceDeleteErrs) > 0 {
return errors.Join(forceDeleteErrs...)
}

if len(currentPods) > 0 {
if !overwrite {
return nil
Expand Down Expand Up @@ -628,16 +606,19 @@ func (c *GrpcRegistryReconciler) CheckRegistryServer(logger *logrus.Entry, catal
if err != nil {
return false, err
}
current, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
currentPods, err := c.currentPodsWithCorrectImageAndSpec(logger, source, serviceAccount, registryPodSecurityConfig)
if err != nil {
return false, err
}
if len(current) < 1 ||
if len(currentPods) < 1 ||
service == nil || c.currentServiceAccount(source) == nil {
return false, nil
}

return true, nil
podsAreLive, e := detectAndDeleteDeadPods(logger, c.OpClient, currentPods, source.GetNamespace())
if e != nil {
return false, fmt.Errorf("error deleting dead pods: %v", e)
}
return podsAreLive, nil
}

// promoteCatalog swaps the labels on the update pod so that the update pod is now reachable by the catalog service.
Expand Down
29 changes: 29 additions & 0 deletions pkg/controller/registry/reconciler/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ func grpcCatalogSourceWithName(name string) *v1alpha1.CatalogSource {
return catsrc
}

func withPodDeletedButNotRemoved(objs []runtime.Object) []runtime.Object {
var out []runtime.Object
for _, obj := range objs {
o := obj.DeepCopyObject()
if pod, ok := obj.(*corev1.Pod); ok {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: corev1.DisruptionTarget,
Reason: "DeletionByTaintManager",
Status: corev1.ConditionTrue,
})
o = pod
}
out = append(out, o)
}
return out
}
func TestGrpcRegistryReconciler(t *testing.T) {
now := func() metav1.Time { return metav1.Date(2018, time.January, 26, 20, 40, 0, 0, time.UTC) }
blockOwnerDeletion := true
Expand Down Expand Up @@ -558,6 +575,18 @@ func TestGrpcRegistryChecker(t *testing.T) {
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/DeadPod",
in: in{
cluster: cluster{
k8sObjs: withPodDeletedButNotRemoved(objectsForCatalogSource(t, validGrpcCatalogSource("test-img", ""))),
},
catsrc: validGrpcCatalogSource("test-img", ""),
},
out: out{
healthy: false,
},
},
{
testName: "Grpc/ExistingRegistry/Image/OldPod/NotHealthy",
in: in{
Expand Down