Skip to content

Commit

Permalink
Adding event filter to reduce provisioner logs. (#1630)
Browse files Browse the repository at this point in the history
* Event filter is added for node resource

* Adding uncached client to retreive pods list

* updating constants.go
  • Loading branch information
jintusebastian authored Jun 7, 2022
1 parent f938202 commit a2615db
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ import (
// Reconciler reconciles a Node objects and computes the capacity of cluster
type Reconciler struct {
client.Client
Log logr.Logger
Log logr.Logger
uncachedClient client.Client
}

// Reconcile iterates through all nodes and computes requested resources
Expand Down Expand Up @@ -68,7 +69,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
currentCapacity := make(corev1.ResourceList)
// TODO: Verify pagination works
for more := true; more; more = (nodes.Continue != "") {
err = r.List(ctx, nodes, client.Limit(constants.ListPaginationLimit), client.Continue(nodes.Continue))
err = r.uncachedClient.List(ctx, nodes, client.Limit(constants.ListPaginationLimit), client.Continue(nodes.Continue))
if err != nil {
log.Error(err, "error while fetching nodes")
return ctrl.Result{}, err
Expand All @@ -84,7 +85,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu

// TODO: Verify pagination works
for more := true; more; more = (pods.Continue != "") {
err = r.List(ctx, pods, client.Limit(constants.ListPaginationLimit), client.Continue(pods.Continue))
err = r.uncachedClient.List(ctx, pods, client.Limit(constants.ListPaginationLimit), client.Continue(pods.Continue))
if err != nil {
log.Error(err, "error while fetching pods")
return ctrl.Result{}, err
Expand Down Expand Up @@ -133,6 +134,17 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return nil
}

if r.uncachedClient == nil {
uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
})
if err != nil {
return err
}
r.uncachedClient = uncachedClient
}

watchMapper := handler.EnqueueRequestsFromMapFunc(func(a client.Object) []reconcile.Request {
return []reconcile.Request{
{NamespacedName: types.NamespacedName{
Expand All @@ -145,7 +157,7 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
Named("scheduler_helper_sfclusterusage").
For(&resourcev1alpha1.SFCluster{}).
Watches(&source.Kind{Type: &corev1.Node{}}, watchMapper).
WithEventFilter(watches.NamespaceFilter())
WithEventFilter(watches.NodeFilter())

return builder.Complete(r)
}
Expand Down
7 changes: 4 additions & 3 deletions interoperator/internal/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (r resourceManager) ReconcileResources(client kubernetes.Client, expectedRe
if !force {
updatedResource, toBeUpdated, err = dynamic.DeepUpdate(foundResource.Object, expectedResource.Object)
if err != nil {
log.Error(err, "reconcile- failed to update resource ", "kind ", kind, "namespacedName ", namespacedName)
return nil, err
log.Error(err, "reconcile- failed to update resource ", "kind ", kind, "namespacedName ", namespacedName)
return nil, err
}
}
if toBeUpdated || force {
Expand All @@ -152,7 +152,8 @@ func (r resourceManager) ReconcileResources(client kubernetes.Client, expectedRe
err = client.Update(context.TODO(), expectedResource)
} else {
foundResource.Object = updatedResource.(map[string]interface{})
log.Info("reconcile - updating resource", "resource", foundResource.Object)
// Printing the object leaks credentialsstores in subresources. Disabling it for now.
// log.Info("reconcile - updating resource", "resource", foundResource.Object)
err = client.Update(context.TODO(), foundResource)
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion interoperator/pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const (
PlanWatchDrainTimeout = time.Second * 2
DefaultClusterReconcileInterval = "20m"

ListPaginationLimit = 50
ListPaginationLimit = 100
)

// Configs initialized at startup
Expand Down
34 changes: 34 additions & 0 deletions interoperator/pkg/watches/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package watches

import (
"context"
reflect "reflect"

osbv1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/osb/v1alpha1"
resourcev1alpha1 "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/api/resource/v1alpha1"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/config"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/properties"
rendererFactory "github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/internal/renderer/factory"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/constants"
"github.com/cloudfoundry-incubator/service-fabrik-broker/interoperator/pkg/errors"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -372,3 +375,34 @@ func NamespaceFilter() predicate.Predicate {
}
return p
}

// NodeFilter creates a predicates for filtering objects in interoperator namespace
func NodeFilter() predicate.Predicate {
p := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return true
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
UpdateFunc: func(e event.UpdateEvent) bool {
switch new := e.ObjectNew.(type) {
case *corev1.Node:
old := e.ObjectOld.(*corev1.Node)
if !reflect.DeepEqual(old.Status.Allocatable, new.Status.Allocatable) {
return true
}
case *resourcev1alpha1.SFCluster:
old := e.ObjectOld.(*resourcev1alpha1.SFCluster)
if !reflect.DeepEqual(old.Status, new.Status) {
return true
}
}
return false
},
GenericFunc: func(e event.GenericEvent) bool {
return true
},
}
return p
}

0 comments on commit a2615db

Please sign in to comment.