Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠ Support shutdown watches dynamically (v2) #2159

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(
mgr.GetCache(),
&corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()),
)); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
109 changes: 65 additions & 44 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -54,14 +53,15 @@ const (

// Builder builds a Controller.
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
forInput ForInput
ownsInput []OwnsInput
watchesObjectInput []WatchesObjectInput
watchesSourceInput []WatchesSourceInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
Expand All @@ -80,7 +80,7 @@ type ForInput struct {
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
// update events by *reconciling the object*.
// This is the equivalent of calling
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
// Watches(source.Kind(mgr.GetCache(), apiType, &handler.EnqueueRequestForObject{})).
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
if blder.forInput.object != nil {
blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
Expand Down Expand Up @@ -121,10 +121,10 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
return blder
}

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
// WatchesObjectInput represents the information set by Watches method.
type WatchesObjectInput struct {
object client.Object
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -133,10 +133,15 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
src := source.Kind(blder.mgr.GetCache(), object)
return blder.WatchesRawSource(src, eventHandler, opts...)
// WatchesRawSource(source.Kind(scheme, object, eventhandler, opts...)).
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
input := WatchesObjectInput{object: object, eventhandler: eventhandler}
for _, opt := range opts {
opt.ApplyToWatchesObject(&input)
}

blder.watchesObjectInput = append(blder.watchesObjectInput, input)
return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -166,29 +171,30 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa
// In the first case, controller-runtime will create another cache for the
// concrete type on top of the metadata cache; this increases memory
// consumption and leads to race conditions as caches are not in sync.
func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
func (blder *Builder) WatchesMetadata(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
opts = append(opts, OnlyMetadata)
return blder.Watches(object, eventHandler, opts...)
return blder.Watches(object, eventhandler, opts...)
}

// WatchesSourceInput represents the information set by Watches method.
type WatchesSourceInput struct {
src source.Source
}

// WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
// Specified predicates are registered only for given source.
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}

blder.watchesInput = append(blder.watchesInput, input)
// This method is only exposed for more advanced use cases, most users should use higher level functions.
// This method does generally disregard all the global configuration set by the builder.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.watchesSourceInput = append(blder.watchesSourceInput, WatchesSourceInput{src: src})
return blder
}

// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
// trigger reconciliations. For example, filtering on whether the resource version has changed.
// Given predicate is added for all watched objects.
// The predicates are not applied to sources watched with WatchesRawSource(...).
// Defaults to the empty list.
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
blder.globalPredicates = append(blder.globalPredicates, p)
Expand Down Expand Up @@ -272,11 +278,14 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(
blder.mgr.GetCache(),
obj,
handler.WithPredicates(&handler.EnqueueRequestForObject{}, allPredicates...),
)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -290,7 +299,6 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand All @@ -302,30 +310,43 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(
blder.mgr.GetCache(),
obj,
handler.WithPredicates(hdler, allPredicates...),
)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
if len(blder.watchesObjectInput) == 0 && len(blder.watchesSourceInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
for _, w := range blder.watchesObjectInput {
obj, err := blder.project(w.object, w.objectProjection)
if err != nil {
return err
}

allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
src := source.Kind(
blder.mgr.GetCache(), obj,
handler.WithPredicates(w.eventhandler, allPredicates...),
)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

for _, w := range blder.watchesSourceInput {
if err := blder.ctrl.Watch(w.src); err != nil {
return err
}
}

return nil
}

Expand Down
24 changes: 12 additions & 12 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type OwnsOption interface {
ApplyToOwns(*OwnsInput)
}

// WatchesOption is some configuration that modifies options for a watches request.
type WatchesOption interface {
// ApplyToWatches applies this configuration to the given watches options.
ApplyToWatches(*WatchesInput)
// WatchesObjectOption is some configuration that modifies options for a watches request.
type WatchesObjectOption interface {
// ApplyToWatchesObject applies this configuration to the given watches options.
ApplyToWatchesObject(*WatchesObjectInput)
}

// }}}
Expand Down Expand Up @@ -66,14 +66,14 @@ func (w Predicates) ApplyToOwns(opts *OwnsInput) {
opts.predicates = w.predicates
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
// ApplyToWatchesObject applies this configuration to the given WatchesInput options.
func (w Predicates) ApplyToWatchesObject(opts *WatchesObjectInput) {
opts.predicates = w.predicates
}

var _ ForOption = &Predicates{}
var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}
var _ WatchesObjectOption = &Predicates{}

// }}}

Expand All @@ -94,8 +94,8 @@ func (p projectAs) ApplyToOwns(opts *OwnsInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
// ApplyToWatchesObject applies this configuration to the given WatchesObjectInput options.
func (p projectAs) ApplyToWatchesObject(opts *WatchesObjectInput) {
opts.objectProjection = objectProjection(p)
}

Expand Down Expand Up @@ -132,9 +132,9 @@ var (
// consumption and leads to race conditions as caches are not in sync.
OnlyMetadata = projectAs(projectAsMetadata)

_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesOption = OnlyMetadata
_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesObjectOption = OnlyMetadata
)

// }}}
Expand Down
8 changes: 3 additions & 5 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -89,10 +90,7 @@ func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) e

// WaitForCacheSync implements Informers.
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
return true
}
return *c.Synced
return ptr.Deref(c.Synced, true)
}

// FakeInformerFor implements Informers.
Expand All @@ -116,7 +114,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: ptr.Deref(c.Synced, true)}
return c.InformersByGVK[gvk], nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -72,13 +70,15 @@ type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
// Watch takes events provided by a Source and enqueues reconcile.Requests
// in response to the events.
Watch(src source.Source) error

// StopWatch stops watching a source that was previously registered by Watch().
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// StopWatch may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
StopWatch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down
Loading