diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 3436a872018e..e2dae8143fde 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -664,7 +664,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop RESTMapper: mgr.GetRESTMapper(), EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"), RateLimiterOptions: opts.RateLimiterOpts, - GenericEvent: make(chan event.GenericEvent), } if err := dependenciesDistributor.SetupWithManager(mgr); err != nil { klog.Fatalf("Failed to setup dependencies distributor: %v", err) diff --git a/pkg/dependenciesdistributor/dependencies_distributor.go b/pkg/dependenciesdistributor/dependencies_distributor.go index 715810035a1f..92d54f9f5f78 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor.go +++ b/pkg/dependenciesdistributor/dependencies_distributor.go @@ -13,7 +13,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" @@ -33,7 +32,6 @@ import ( configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" - "github.com/karmada-io/karmada/pkg/detector" "github.com/karmada-io/karmada/pkg/events" "github.com/karmada-io/karmada/pkg/resourceinterpreter" "github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag" @@ -64,15 +62,15 @@ type DependenciesDistributor struct { // DynamicClient used to fetch arbitrary resources. DynamicClient dynamic.Interface InformerManager genericmanager.SingleClusterInformerManager - EventHandler cache.ResourceEventHandler EventRecorder record.EventRecorder - Processor util.AsyncWorker RESTMapper meta.RESTMapper ResourceInterpreter resourceinterpreter.ResourceInterpreter RateLimiterOptions ratelimiterflag.Options - GenericEvent chan event.GenericEvent - stopCh <-chan struct{} + eventHandler cache.ResourceEventHandler + resourceProcessor util.AsyncWorker + genericEvent chan event.GenericEvent + stopCh <-chan struct{} } // Check if our DependenciesDistributor implements necessary interfaces @@ -91,7 +89,7 @@ func (d *DependenciesDistributor) OnAdd(obj interface{}) { if !ok { return } - d.Processor.Enqueue(runtimeObj) + d.resourceProcessor.Enqueue(runtimeObj) } // OnUpdate handles object update event and push the object to queue. @@ -121,45 +119,45 @@ func (d *DependenciesDistributor) reconcile(key util.QueueKey) error { return err } - var errs []error for i := range bindingList.Items { binding := &bindingList.Items[i] if !binding.DeletionTimestamp.IsZero() { continue } - matched, err := dependentObjectReferenceMatches(clusterWideKey, binding) - if err != nil { - klog.Errorf("Failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err) - errs = append(errs, err) - continue - } else if !matched { + matched := dependentObjectReferenceMatches(clusterWideKey, binding) + if !matched { klog.V(4).Infof("No need to sync binding(%s/%s)", binding.Namespace, binding.Name) continue } klog.V(4).Infof("Resource binding(%s/%s) is matched for resource(%s/%s)", binding.Namespace, binding.Name, clusterWideKey.Namespace, clusterWideKey.Name) - d.GenericEvent <- event.GenericEvent{Object: binding} + d.genericEvent <- event.GenericEvent{Object: binding} } - return utilerrors.NewAggregate(errs) + return nil } // dependentObjectReferenceMatches tells if the given object is referred by current resource binding. -func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) { +func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) bool { dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey] if !exist { - return false, nil + return false } var dependenciesSlice []configv1alpha1.DependentObjectReference err := json.Unmarshal([]byte(dependencies), &dependenciesSlice) if err != nil { - return false, err + // If unmarshal fails, retrying with an error return will not solve the problem. + // It will only increase the consumption by repeatedly listing the binding. + // Therefore, it is better to print this error and ignore it. + klog.Errorf("Failed to unmarshal binding(%s/%s) dependencies(%s): %v", + referenceBinding.Namespace, referenceBinding.Name, dependencies, err) + return false } if len(dependenciesSlice) == 0 { - return false, nil + return false } for _, dependence := range dependenciesSlice { @@ -167,11 +165,10 @@ func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBin objectKey.Kind == dependence.Kind && objectKey.Namespace == dependence.Namespace && objectKey.Name == dependence.Name { - return true, nil + return true } } - - return false, nil + return false } // Reconcile performs a full reconciliation for the object referred to by the Request. @@ -180,7 +177,7 @@ func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBin func (d *DependenciesDistributor) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { klog.V(4).Infof("Start to reconcile ResourceBinding(%s)", request.NamespacedName) bindingObject := &workv1alpha2.ResourceBinding{} - err := d.Client.Get(ctx, types.NamespacedName{Namespace: request.Namespace, Name: request.Name}, bindingObject) + err := d.Client.Get(ctx, request.NamespacedName, bindingObject) if err != nil { if apierrors.IsNotFound(err) { klog.V(4).Infof("ResourceBinding(%s) has been removed.", request.NamespacedName) @@ -233,12 +230,12 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding * } }() - if err := d.recordDependenciesForIndependentBinding(binding, dependencies); err != nil { + if err := d.recordDependencies(binding, dependencies); err != nil { return err } // remove orphan attached bindings - orphanBindings, err := d.findOrphanAttachedResourceBindings(binding, dependencies) + orphanBindings, err := d.findOrphanAttachedBindings(binding, dependencies) if err != nil { klog.Errorf("Failed to find orphan attached bindings for resourceBinding(%s/%s). Error: %v.", binding.GetNamespace(), binding.GetName(), err) @@ -267,8 +264,8 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding * errs = append(errs, err) continue } - if !d.InformerManager.IsHandlerExist(gvr, d.EventHandler) { - d.InformerManager.ForResource(gvr, d.EventHandler) + if !d.InformerManager.IsHandlerExist(gvr, d.eventHandler) { + d.InformerManager.ForResource(gvr, d.eventHandler) startInformerManager = true } rawObject, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, resource) @@ -293,12 +290,13 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding * return utilerrors.NewAggregate(errs) } -func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error { +func (d *DependenciesDistributor) recordDependencies(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error { dependenciesBytes, err := json.Marshal(dependencies) if err != nil { klog.Errorf("Failed to marshal dependencies of binding(%s/%s): %v", binding.Namespace, binding.Name, err) return err } + depenciesStr := string(dependenciesBytes) objectAnnotation := binding.GetAnnotations() if objectAnnotation == nil { @@ -306,11 +304,11 @@ func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(bindin } // dependencies are not updated, no need to update annotation. - if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == string(dependenciesBytes) { + if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == depenciesStr { return nil } - objectAnnotation[bindingDependenciesAnnotationKey] = string(dependenciesBytes) + objectAnnotation[bindingDependenciesAnnotationKey] = depenciesStr return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) { binding.SetAnnotations(objectAnnotation) @@ -329,7 +327,7 @@ func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(bindin }) } -func (d *DependenciesDistributor) findOrphanAttachedResourceBindings(independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) ([]*workv1alpha2.ResourceBinding, error) { +func (d *DependenciesDistributor) findOrphanAttachedBindings(independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) ([]*workv1alpha2.ResourceBinding, error) { attachedBindings, err := d.listAttachedBindings(independentBinding.Namespace, independentBinding.Name) if err != nil { return nil, err @@ -352,8 +350,8 @@ func (d *DependenciesDistributor) findOrphanAttachedResourceBindings(independent } func (d *DependenciesDistributor) listAttachedBindings(bindingNamespace, bindingName string) (res []*workv1alpha2.ResourceBinding, err error) { - label := generateBindingDependedByLabel(bindingNamespace, bindingName) - selector := labels.SelectorFromSet(label) + labelSet := generateBindingDependedLabels(bindingNamespace, bindingName) + selector := labels.SelectorFromSet(labelSet) bindingList := &workv1alpha2.ResourceBindingList{} err = d.Client.List(context.TODO(), bindingList, &client.ListOptions{ Namespace: bindingNamespace, @@ -372,7 +370,7 @@ func (d *DependenciesDistributor) removeScheduleResultFromAttachedBindings(bindi return nil } - bindingLabelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName) + bindingLabelKey := generateBindingDependedLabelKey(bindingNamespace, bindingName) var errs []error for index, binding := range attachedBindings { @@ -402,8 +400,7 @@ func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding return err } - updatedBindingSnapshot := mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy) - existBinding.Spec.RequiredBy = updatedBindingSnapshot + existBinding.Spec.RequiredBy = mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy) existBinding.Labels = util.DedupeAndMergeLabels(existBinding.Labels, attachedBinding.Labels) existBinding.Spec.Resource = attachedBinding.Spec.Resource @@ -420,13 +417,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { klog.Infof("Starting dependencies distributor.") d.stopCh = ctx.Done() resourceWorkerOptions := util.Options{ - Name: "resource detector", - KeyFunc: detector.ClusterWideKeyFunc, + Name: "dependencies resource detector", + KeyFunc: func(obj interface{}) (util.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) }, ReconcileFunc: d.reconcile, } - d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) - d.Processor = util.NewAsyncWorker(resourceWorkerOptions) - d.Processor.Run(2, d.stopCh) + d.eventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete) + d.resourceProcessor = util.NewAsyncWorker(resourceWorkerOptions) + d.resourceProcessor.Run(2, d.stopCh) <-d.stopCh klog.Infof("Stopped as stopCh closed.") @@ -435,6 +432,7 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error { // SetupWithManager creates a controller and register to controller manager. func (d *DependenciesDistributor) SetupWithManager(mgr controllerruntime.Manager) error { + d.genericEvent = make(chan event.GenericEvent) return utilerrors.NewAggregate([]error{ mgr.Add(d), controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha2.ResourceBinding{}). @@ -474,18 +472,18 @@ func (d *DependenciesDistributor) SetupWithManager(mgr controllerruntime.Manager RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(d.RateLimiterOptions), MaxConcurrentReconciles: 2, }). - WatchesRawSource(&source.Channel{Source: d.GenericEvent}, &handler.EnqueueRequestForObject{}). + WatchesRawSource(&source.Channel{Source: d.genericEvent}, &handler.EnqueueRequestForObject{}). Complete(d), }) } -func generateBindingDependedByLabel(bindingNamespace, bindingName string) map[string]string { - labelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName) +func generateBindingDependedLabels(bindingNamespace, bindingName string) map[string]string { + labelKey := generateBindingDependedLabelKey(bindingNamespace, bindingName) labelValue := fmt.Sprintf(bindingNamespace + "_" + bindingName) return map[string]string{labelKey: labelValue} } -func generateBindingDependedByLabelKey(bindingNamespace, bindingName string) string { +func generateBindingDependedLabelKey(bindingNamespace, bindingName string) string { bindHashKey := names.GenerateBindingReferenceKey(bindingNamespace, bindingName) return fmt.Sprintf(bindingDependedByLabelKeyPrefix + bindHashKey) } @@ -499,7 +497,7 @@ func generateDependencyKey(kind, apiVersion, name, namespace string) string { } func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstructured.Unstructured) *workv1alpha2.ResourceBinding { - dependedByLabels := generateBindingDependedByLabel(binding.Namespace, binding.Name) + dependedLabels := generateBindingDependedLabels(binding.Namespace, binding.Name) var result []workv1alpha2.BindingSnapshot result = append(result, workv1alpha2.BindingSnapshot{ @@ -515,7 +513,7 @@ func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstruc OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(object, object.GroupVersionKind()), }, - Labels: dependedByLabels, + Labels: dependedLabels, Finalizers: []string{util.BindingControllerFinalizer}, }, Spec: workv1alpha2.ResourceBindingSpec{ diff --git a/pkg/dependenciesdistributor/dependencies_distributor_test.go b/pkg/dependenciesdistributor/dependencies_distributor_test.go index b232042cef0c..5d4147ecd9f2 100644 --- a/pkg/dependenciesdistributor/dependencies_distributor_test.go +++ b/pkg/dependenciesdistributor/dependencies_distributor_test.go @@ -15,10 +15,9 @@ func Test_dependentObjectReferenceMatches(t *testing.T) { referenceBinding *workv1alpha2.ResourceBinding } tests := []struct { - name string - args args - want bool - wantErr bool + name string + args args + want bool }{ { name: "test custom resource", @@ -36,8 +35,7 @@ func Test_dependentObjectReferenceMatches(t *testing.T) { }}, }, }, - want: true, - wantErr: false, + want: true, }, { name: "test configmap", @@ -55,17 +53,12 @@ func Test_dependentObjectReferenceMatches(t *testing.T) { }}, }, }, - want: true, - wantErr: false, + want: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := dependentObjectReferenceMatches(tt.args.objectKey, tt.args.referenceBinding) - if (err != nil) != tt.wantErr { - t.Errorf("dependentObjectReferenceMatches() error = %v, wantErr %v", err, tt.wantErr) - return - } + got := dependentObjectReferenceMatches(tt.args.objectKey, tt.args.referenceBinding) if got != tt.want { t.Errorf("dependentObjectReferenceMatches() got = %v, want %v", got, tt.want) }