Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-17157: operators/olm: record and expose informers #3005

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions pkg/controller/operators/olm/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ type Operator struct {
serviceAccountQuerier *scoped.UserDefinedServiceAccountQuerier
clientFactory clients.Factory
plugins []plugins.OperatorPlugin
informersByNamespace map[string]*plugins.Informers
}

func (a *Operator) Informers() map[string]*plugins.Informers {
return a.informersByNamespace
}

func NewOperator(ctx context.Context, options ...OperatorOption) (*Operator, error) {
Expand Down Expand Up @@ -159,9 +164,11 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
protectedCopiedCSVNamespaces: config.protectedCopiedCSVNamespaces,
}

informersByNamespace := map[string]*plugins.Informers{}
// Set up syncing for namespace-scoped resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
for _, namespace := range config.watchedNamespaces {
informersByNamespace[namespace] = &plugins.Informers{}
// Wire CSVs
csvInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
Expand All @@ -171,6 +178,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
options.LabelSelector = fmt.Sprintf("!%s", v1alpha1.CopiedLabelKey)
}),
).Operators().V1alpha1().ClusterServiceVersions()
informersByNamespace[namespace].CSVInformer = csvInformer
op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister())
csvQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv", namespace))
op.csvQueueSet.Set(namespace, csvQueue)
Expand Down Expand Up @@ -225,6 +233,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
},
).Informer()
op.copiedCSVLister = metadatalister.New(copiedCSVInformer.GetIndexer(), gvr)
informersByNamespace[namespace].CopiedCSVInformer = copiedCSVInformer
informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister

// Register separate queue for gcing copied csvs
copiedCSVGCQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/csv-gc", namespace))
Expand All @@ -247,6 +257,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
// Wire OperatorGroup reconciliation
extInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(op.client, config.resyncPeriod(), externalversions.WithNamespace(namespace))
operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups()
informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer
op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister())
ogQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s/og", namespace))
op.ogQueueSet.Set(namespace, ogQueue)
Expand All @@ -266,6 +277,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register OperatorCondition QueueInformer
opConditionInformer := extInformerFactory.Operators().V2().OperatorConditions()
informersByNamespace[namespace].OperatorConditionInformer = opConditionInformer
op.lister.OperatorsV2().RegisterOperatorConditionLister(namespace, opConditionInformer.Lister())
opConditionQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -281,6 +293,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

subInformer := extInformerFactory.Operators().V1alpha1().Subscriptions()
informersByNamespace[namespace].SubscriptionInformer = subInformer
op.lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, subInformer.Lister())
subQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -298,6 +311,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
// Wire Deployments
k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace))
depInformer := k8sInformerFactory.Apps().V1().Deployments()
informersByNamespace[namespace].DeploymentInformer = depInformer
op.lister.AppsV1().RegisterDeploymentLister(namespace, depInformer.Lister())
depQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -314,6 +328,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Set up RBAC informers
roleInformer := k8sInformerFactory.Rbac().V1().Roles()
informersByNamespace[namespace].RoleInformer = roleInformer
op.lister.RbacV1().RegisterRoleLister(namespace, roleInformer.Lister())
roleQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -329,6 +344,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

roleBindingInformer := k8sInformerFactory.Rbac().V1().RoleBindings()
informersByNamespace[namespace].RoleBindingInformer = roleBindingInformer
op.lister.RbacV1().RegisterRoleBindingLister(namespace, roleBindingInformer.Lister())
roleBindingQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -347,6 +363,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
secretInformer := informers.NewSharedInformerFactoryWithOptions(op.opClient.KubernetesInterface(), config.resyncPeriod(), informers.WithNamespace(namespace), informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromValidatedSet(map[string]string{install.OLMManagedLabelKey: install.OLMManagedLabelValue}).String()
})).Core().V1().Secrets()
informersByNamespace[namespace].SecretInformer = secretInformer
op.lister.CoreV1().RegisterSecretLister(namespace, secretInformer.Lister())
secretQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -363,6 +380,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register Service QueueInformer
serviceInformer := k8sInformerFactory.Core().V1().Services()
informersByNamespace[namespace].ServiceInformer = serviceInformer
op.lister.CoreV1().RegisterServiceLister(namespace, serviceInformer.Lister())
serviceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -379,6 +397,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register ServiceAccount QueueInformer
serviceAccountInformer := k8sInformerFactory.Core().V1().ServiceAccounts()
informersByNamespace[namespace].ServiceAccountInformer = serviceAccountInformer
op.lister.CoreV1().RegisterServiceAccountLister(metav1.NamespaceAll, serviceAccountInformer.Lister())
serviceAccountQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -429,13 +448,14 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
olmConfigInformer := externalversions.NewSharedInformerFactoryWithOptions(
op.client,
config.resyncPeriod(),
).Operators().V1().OLMConfigs().Informer()
).Operators().V1().OLMConfigs()
informersByNamespace[metav1.NamespaceAll].OLMConfigInformer = olmConfigInformer
olmConfigQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithInformer(olmConfigInformer),
queueinformer.WithInformer(olmConfigInformer.Informer()),
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(op.olmConfigQueue),
queueinformer.WithIndexer(olmConfigInformer.GetIndexer()),
queueinformer.WithIndexer(olmConfigInformer.Informer().GetIndexer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOLMConfig).ToSyncer()),
)
if err != nil {
Expand All @@ -447,6 +467,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

k8sInformerFactory := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod())
clusterRoleInformer := k8sInformerFactory.Rbac().V1().ClusterRoles()
informersByNamespace[metav1.NamespaceAll].ClusterRoleInformer = clusterRoleInformer
op.lister.RbacV1().RegisterClusterRoleLister(clusterRoleInformer.Lister())
clusterRoleQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -462,6 +483,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
}

clusterRoleBindingInformer := k8sInformerFactory.Rbac().V1().ClusterRoleBindings()
informersByNamespace[metav1.NamespaceAll].ClusterRoleBindingInformer = clusterRoleBindingInformer
op.lister.RbacV1().RegisterClusterRoleBindingLister(clusterRoleBindingInformer.Lister())
clusterRoleBindingQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -478,6 +500,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// register namespace queueinformer
namespaceInformer := k8sInformerFactory.Core().V1().Namespaces()
informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsQueueSet = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resolver")
namespaceInformer.Informer().AddEventHandler(
Expand All @@ -502,6 +525,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register APIService QueueInformer
apiServiceInformer := kagg.NewSharedInformerFactory(op.opClient.ApiregistrationV1Interface(), config.resyncPeriod()).Apiregistration().V1().APIServices()
informersByNamespace[metav1.NamespaceAll].APIServiceInformer = apiServiceInformer
op.lister.APIRegistrationV1().RegisterAPIServiceLister(apiServiceInformer.Lister())
apiServiceQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand All @@ -519,6 +543,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat

// Register CustomResourceDefinition QueueInformer
crdInformer := extinf.NewSharedInformerFactory(op.opClient.ApiextensionsInterface(), config.resyncPeriod()).Apiextensions().V1().CustomResourceDefinitions()
informersByNamespace[metav1.NamespaceAll].CRDInformer = crdInformer
op.lister.APIExtensionsV1().RegisterCustomResourceDefinitionLister(crdInformer.Lister())
crdQueueInformer, err := queueinformer.NewQueueInformer(
ctx,
Expand Down Expand Up @@ -572,6 +597,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat
op.resolver = &install.StrategyResolver{
OverridesBuilderFunc: overridesBuilderFunc.GetDeploymentInitializer,
}
op.informersByNamespace = informersByNamespace

// initialize plugins
for _, makePlugIn := range operatorPlugInFactoryFuncs {
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/operators/olm/plugins/operator_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,48 @@ import (
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned"
operatorsv1informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v1"
operatorsv1alpha1informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v1alpha1"
operatorsv2informers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions/operators/v2"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/operatorclient"
"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/queueinformer"
"github.com/sirupsen/logrus"
extensionsv1informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
appsv1informers "k8s.io/client-go/informers/apps/v1"
corev1informers "k8s.io/client-go/informers/core/v1"
rbacv1informers "k8s.io/client-go/informers/rbac/v1"
"k8s.io/client-go/metadata/metadatalister"
"k8s.io/client-go/tools/cache"
apiregistrationv1informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
)

// HostOperator is an extensible and observable operator that hosts the plug-in, i.e. which the plug-in is extending
type HostOperator interface {
queueinformer.ObservableOperator
queueinformer.ExtensibleOperator
Informers() map[string]*Informers
}

// Informers exposes informer caches that the host operator has already started, for re-use by plugins.
type Informers struct {
CSVInformer operatorsv1alpha1informers.ClusterServiceVersionInformer
CopiedCSVInformer cache.SharedIndexInformer
CopiedCSVLister metadatalister.Lister
OperatorGroupInformer operatorsv1informers.OperatorGroupInformer
OperatorConditionInformer operatorsv2informers.OperatorConditionInformer
SubscriptionInformer operatorsv1alpha1informers.SubscriptionInformer
DeploymentInformer appsv1informers.DeploymentInformer
RoleInformer rbacv1informers.RoleInformer
RoleBindingInformer rbacv1informers.RoleBindingInformer
SecretInformer corev1informers.SecretInformer
ServiceInformer corev1informers.ServiceInformer
ServiceAccountInformer corev1informers.ServiceAccountInformer
OLMConfigInformer operatorsv1informers.OLMConfigInformer
ClusterRoleInformer rbacv1informers.ClusterRoleInformer
ClusterRoleBindingInformer rbacv1informers.ClusterRoleBindingInformer
NamespaceInformer corev1informers.NamespaceInformer
APIServiceInformer apiregistrationv1informers.APIServiceInformer
CRDInformer extensionsv1informers.CustomResourceDefinitionInformer
}

// OperatorConfig gives access to required configuration from the host operator
Expand Down