From b2aa21dcef1741537924af78066e24cb3774ef17 Mon Sep 17 00:00:00 2001 From: Justin Kulikauskas Date: Thu, 5 Oct 2023 11:53:21 -0400 Subject: [PATCH] Fix code smells and PR comments The bigger change is that the status is now updated before sending the events to the replicated policy reconciler - this should help prevent some requeues. Otherwise, these are largely just organizational changes. Signed-off-by: Justin Kulikauskas --- controllers/propagator/propagation.go | 81 ++++++++++--------- .../propagator/replicatedpolicy_controller.go | 80 +++++++++--------- controllers/propagator/rootpolicy_setup.go | 8 +- main.go | 2 +- test/e2e/case1_propagation_test.go | 17 +++- 5 files changed, 104 insertions(+), 84 deletions(-) diff --git a/controllers/propagator/propagation.go b/controllers/propagator/propagation.go index 8df4cf06..aef8dae7 100644 --- a/controllers/propagator/propagation.go +++ b/controllers/propagator/propagation.go @@ -50,7 +50,6 @@ type Propagator struct { client.Client Scheme *runtime.Scheme Recorder record.EventRecorder - DynamicWatcher k8sdepwatches.DynamicWatcher RootPolicyLocks *sync.Map ReplicatedPolicyUpdates chan event.GenericEvent } @@ -182,12 +181,12 @@ func (r *RootPolicyReconciler) getAllClusterDecisions( decisions = make(map[appsv1.PlacementDecision]policiesv1.BindingOverrides) // Process all placement bindings without subFilter - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.SubFilter == policiesv1.Restricted { continue } - plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb) + plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i]) if err != nil { return nil, nil, err } @@ -226,14 +225,14 @@ func (r *RootPolicyReconciler) getAllClusterDecisions( } // Process all placement bindings with subFilter:restricted - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.SubFilter != policiesv1.Restricted { continue } foundInDecisions := false - plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb) + plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i]) if err != nil { return nil, nil, err } @@ -267,9 +266,8 @@ func (r *RootPolicyReconciler) getAllClusterDecisions( return decisions, placements, nil } -// handleDecisions identifies all managed clusters which should have a replicated policy, and sends -// events to the replicated policy reconciler for them to be created or updated. -func (r *RootPolicyReconciler) handleDecisions( +// getDecisions identifies all managed clusters which should have a replicated policy +func (r *RootPolicyReconciler) getDecisions( instance *policiesv1.Policy, ) ( []*policiesv1.Placement, decisionSet, error, @@ -299,25 +297,6 @@ func (r *RootPolicyReconciler) handleDecisions( decisions[dec] = true } - log.Info("Sending reconcile events to replicated policies", "decisionsCount", len(allClusterDecisions)) - - for decision := range allClusterDecisions { - simpleObj := &GuttedObject{ - TypeMeta: metav1.TypeMeta{ - Kind: policiesv1.Kind, - APIVersion: policiesv1.GroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: common.FullNameForPolicy(instance), - Namespace: decision.ClusterNamespace, - }, - } - - log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName()) - - r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj} - } - return placements, decisions, nil } @@ -326,11 +305,11 @@ func (r *RootPolicyReconciler) handleDecisions( // decisions, then it's considered stale and an event is sent to the replicated policy reconciler // so the policy will be removed. func (r *RootPolicyReconciler) cleanUpOrphanedRplPolicies( - instance *policiesv1.Policy, allDecisions decisionSet, + instance *policiesv1.Policy, originalCPCS []*policiesv1.CompliancePerClusterStatus, allDecisions decisionSet, ) error { log := log.WithValues("policyName", instance.GetName(), "policyNamespace", instance.GetNamespace()) - for _, cluster := range instance.Status.Status { + for _, cluster := range originalCPCS { key := appsv1.PlacementDecision{ ClusterName: cluster.ClusterNamespace, ClusterNamespace: cluster.ClusterNamespace, @@ -388,25 +367,19 @@ func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) err } } - placements, decisions, err := r.handleDecisions(instance) + placements, decisions, err := r.getDecisions(instance) if err != nil { log.Info("Failed to get any placement decisions. Giving up on the request.") return errors.New("could not get the placement decisions") } - err = r.cleanUpOrphanedRplPolicies(instance, decisions) - if err != nil { - log.Error(err, "Failed to delete orphaned replicated policies") - - return err - } - log.V(1).Info("Updating the root policy status") cpcs, cpcsErr := r.calculatePerClusterStatus(instance, decisions) if cpcsErr != nil { - log.Error(cpcsErr, "Failed to get at least one replicated policy") + // If there is a new replicated policy, then its lookup is expected to fail - it hasn't been created yet. + log.Error(cpcsErr, "Failed to get at least one replicated policy, but that may be expected. Ignoring.") } err = r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, instance) @@ -414,6 +387,10 @@ func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) err log.Error(err, "Failed to refresh the cached policy. Will use existing policy.") } + // make a copy of the original status + originalCPCS := make([]*policiesv1.CompliancePerClusterStatus, len(instance.Status.Status)) + copy(originalCPCS, instance.Status.Status) + instance.Status.Status = cpcs instance.Status.ComplianceState = CalculateRootCompliance(cpcs) instance.Status.Placement = placements @@ -423,7 +400,33 @@ func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) err return err } - return cpcsErr + log.Info("Sending reconcile events to replicated policies", "decisionsCount", len(decisions)) + + for decision := range decisions { + simpleObj := &GuttedObject{ + TypeMeta: metav1.TypeMeta{ + Kind: policiesv1.Kind, + APIVersion: policiesv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: common.FullNameForPolicy(instance), + Namespace: decision.ClusterNamespace, + }, + } + + log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName()) + + r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj} + } + + err = r.cleanUpOrphanedRplPolicies(instance, originalCPCS, decisions) + if err != nil { + log.Error(err, "Failed to delete orphaned replicated policies") + + return err + } + + return nil } // a helper to quickly check if there are any templates in any of the policy templates diff --git a/controllers/propagator/replicatedpolicy_controller.go b/controllers/propagator/replicatedpolicy_controller.go index ddb484a1..69a4c1b1 100644 --- a/controllers/propagator/replicatedpolicy_controller.go +++ b/controllers/propagator/replicatedpolicy_controller.go @@ -24,6 +24,7 @@ var _ reconcile.Reconciler = &ReplicatedPolicyReconciler{} type ReplicatedPolicyReconciler struct { Propagator ResourceVersions *sync.Map + DynamicWatcher k8sdepwatches.DynamicWatcher } func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { @@ -52,13 +53,18 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl } rootName, rootNS, err := common.ParseRootPolicyLabel(request.Name) - if err != nil && replicatedExists { - if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { - if !k8serrors.IsNotFound(err) { - log.Error(err, "Failed to delete the invalid replicated policy, requeueing") + if err != nil { + if !replicatedExists { + log.Error(err, "Invalid replicated policy sent for reconcile, rejecting") - return reconcile.Result{}, err - } + return reconcile.Result{}, nil + } + + cleanUpErr := r.cleanUpReplicated(ctx, replicatedPolicy) + if cleanUpErr != nil && !k8serrors.IsNotFound(cleanUpErr) { + log.Error(err, "Failed to delete the invalid replicated policy, requeueing") + + return reconcile.Result{}, err } log.Info("Invalid replicated policy deleted") @@ -73,48 +79,48 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl rootNN := types.NamespacedName{Namespace: rootNS, Name: rootName} if err := r.Get(ctx, rootNN, rootPolicy); err != nil { - if k8serrors.IsNotFound(err) { - if replicatedExists { - // do not handle a replicated policy which does not belong to the current cluster - inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace) - if err != nil { - return reconcile.Result{}, err - } - - if !inClusterNS { - log.Info("Found a replicated policy in non-cluster namespace, skipping it") - - return reconcile.Result{}, nil - } - - // otherwise, we need to clean it up - if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { - if !k8serrors.IsNotFound(err) { - log.Error(err, "Failed to delete the orphaned replicated policy, requeueing") - - return reconcile.Result{}, err - } - } - - log.Info("Orphaned replicated policy deleted") + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to get the root policy, requeueing") - return reconcile.Result{}, nil - } + return reconcile.Result{}, err + } + if !replicatedExists { version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) defer version.Unlock() // Store this to ensure the cache matches a known possible state for this situation version.resourceVersion = "deleted" - log.Info("Root policy and replicated policy already missing") + log.V(1).Info("Root policy and replicated policy already missing") return reconcile.Result{}, nil } - log.Error(err, "Failed to get the root policy, requeueing") + // do not handle a replicated policy which does not belong to the current cluster + inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace) + if err != nil { + return reconcile.Result{}, err + } + + if !inClusterNS { + log.V(1).Info("Found a replicated policy in non-cluster namespace, skipping it") - return reconcile.Result{}, err + return reconcile.Result{}, nil + } + + // otherwise, we need to clean it up + if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to delete the orphaned replicated policy, requeueing") + + return reconcile.Result{}, err + } + } + + log.Info("Orphaned replicated policy deleted") + + return reconcile.Result{}, nil } if rootPolicy.Spec.Disabled { @@ -138,7 +144,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl // Store this to ensure the cache matches a known possible state for this situation version.resourceVersion = "deleted" - log.Info("Root policy is disabled, and replicated policy correctly not found.") + log.V(1).Info("Root policy is disabled, and replicated policy correctly not found.") return reconcile.Result{}, nil } @@ -173,7 +179,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl // Store this to ensure the cache matches a known possible state for this situation version.resourceVersion = "deleted" - log.Info("Replicated policy should not exist on this managed cluster, and does not.") + log.V(1).Info("Replicated policy should not exist on this managed cluster, and does not.") return reconcile.Result{}, nil } diff --git a/controllers/propagator/rootpolicy_setup.go b/controllers/propagator/rootpolicy_setup.go index 26321da6..2ec7d396 100644 --- a/controllers/propagator/rootpolicy_setup.go +++ b/controllers/propagator/rootpolicy_setup.go @@ -197,13 +197,13 @@ func mapPlacementRuleToPolicies(c client.Client) handler.MapFunc { var result []reconcile.Request // loop through pbs and collect policies from each matching one. - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.PlacementRef.APIGroup != appsv1.SchemeGroupVersion.Group || pb.PlacementRef.Kind != "PlacementRule" || pb.PlacementRef.Name != object.GetName() { continue } - result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...) + result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...) } return result @@ -238,13 +238,13 @@ func mapPlacementDecisionToPolicies(c client.Client) handler.MapFunc { var result []reconcile.Request // loop through pbs and collect policies from each matching one. - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.PlacementRef.APIGroup != clusterv1beta1.SchemeGroupVersion.Group || pb.PlacementRef.Kind != "Placement" || pb.PlacementRef.Name != placementName { continue } - result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...) + result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...) } return result diff --git a/main.go b/main.go index 7dc85b19..20a3a457 100644 --- a/main.go +++ b/main.go @@ -281,7 +281,6 @@ func main() { Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Recorder: mgr.GetEventRecorderFor(propagatorctrl.ControllerName), - DynamicWatcher: dynamicWatcher, RootPolicyLocks: policiesLock, ReplicatedPolicyUpdates: replicatedPolicyUpdates, } @@ -296,6 +295,7 @@ func main() { if err = (&propagatorctrl.ReplicatedPolicyReconciler{ Propagator: propagator, ResourceVersions: replicatedResourceVersions, + DynamicWatcher: dynamicWatcher, }).SetupWithManager(mgr, replPolicyMaxConcurrency, dynamicWatcherSource, replicatedUpdatesSource); err != nil { log.Error(err, "Unable to create the controller", "controller", "replicated-policy") os.Exit(1) diff --git a/test/e2e/case1_propagation_test.go b/test/e2e/case1_propagation_test.go index 95a638d4..1be45aa3 100644 --- a/test/e2e/case1_propagation_test.go +++ b/test/e2e/case1_propagation_test.go @@ -747,13 +747,24 @@ var _ = Describe("Test policy propagation", func() { policyMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(policy) Expect(err).ToNot(HaveOccurred()) - policyRV, err := policyClient().Create( + _, err = policyClient().Create( context.TODO(), &unstructured.Unstructured{Object: policyMap}, metav1.CreateOptions{}, ) Expect(err).ToNot(HaveOccurred()) - _, found, _ := unstructured.NestedBool(policyRV.Object, "spec", "copyPolicyMetadata") - Expect(found).To(BeFalse()) + Eventually(func(g Gomega) { + replicatedPlc := utils.GetWithTimeout( + clientHubDynamic, + gvrPolicy, + testNamespace+"."+policyName, + "managed1", + true, + defaultTimeoutSeconds, + ) + + _, found, _ := unstructured.NestedBool(replicatedPlc.Object, "spec", "copyPolicyMetadata") + g.Expect(found).To(BeFalse()) + }, defaultTimeoutSeconds, 1).Should(Succeed()) }) It("verifies that the labels and annotations are copied with spec.copyPolicyMetadata=true", func() {