Skip to content

Commit

Permalink
Fix code smells and PR comments
Browse files Browse the repository at this point in the history
The bigger change is that the status is now updated before sending the
events to the replicated policy reconciler - this should help prevent
some requeues. Otherwise, these are largely just organizational changes.

Signed-off-by: Justin Kulikauskas <[email protected]>
(cherry picked from commit b2aa21d)
  • Loading branch information
JustinKuli committed Oct 10, 2023
1 parent 3651c4f commit 56b9c23
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 84 deletions.
81 changes: 42 additions & 39 deletions controllers/propagator/propagation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type Propagator struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder
DynamicWatcher k8sdepwatches.DynamicWatcher
RootPolicyLocks *sync.Map
ReplicatedPolicyUpdates chan event.GenericEvent
}
Expand Down Expand Up @@ -182,12 +181,12 @@ func (r *RootPolicyReconciler) getAllClusterDecisions(
decisions = make(map[appsv1.PlacementDecision]policiesv1.BindingOverrides)

// Process all placement bindings without subFilter
for _, pb := range pbList.Items {
for i, pb := range pbList.Items {
if pb.SubFilter == policiesv1.Restricted {
continue
}

plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb)
plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i])
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -226,14 +225,14 @@ func (r *RootPolicyReconciler) getAllClusterDecisions(
}

// Process all placement bindings with subFilter:restricted
for _, pb := range pbList.Items {
for i, pb := range pbList.Items {
if pb.SubFilter != policiesv1.Restricted {
continue
}

foundInDecisions := false

plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb)
plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i])
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -267,9 +266,8 @@ func (r *RootPolicyReconciler) getAllClusterDecisions(
return decisions, placements, nil
}

// handleDecisions identifies all managed clusters which should have a replicated policy, and sends
// events to the replicated policy reconciler for them to be created or updated.
func (r *RootPolicyReconciler) handleDecisions(
// getDecisions identifies all managed clusters which should have a replicated policy
func (r *RootPolicyReconciler) getDecisions(
instance *policiesv1.Policy,
) (
[]*policiesv1.Placement, decisionSet, error,
Expand Down Expand Up @@ -299,25 +297,6 @@ func (r *RootPolicyReconciler) handleDecisions(
decisions[dec] = true
}

log.Info("Sending reconcile events to replicated policies", "decisionsCount", len(allClusterDecisions))

for decision := range allClusterDecisions {
simpleObj := &GuttedObject{
TypeMeta: metav1.TypeMeta{
Kind: policiesv1.Kind,
APIVersion: policiesv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: common.FullNameForPolicy(instance),
Namespace: decision.ClusterNamespace,
},
}

log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName())

r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj}
}

return placements, decisions, nil
}

Expand All @@ -326,11 +305,11 @@ func (r *RootPolicyReconciler) handleDecisions(
// decisions, then it's considered stale and an event is sent to the replicated policy reconciler
// so the policy will be removed.
func (r *RootPolicyReconciler) cleanUpOrphanedRplPolicies(
instance *policiesv1.Policy, allDecisions decisionSet,
instance *policiesv1.Policy, originalCPCS []*policiesv1.CompliancePerClusterStatus, allDecisions decisionSet,
) error {
log := log.WithValues("policyName", instance.GetName(), "policyNamespace", instance.GetNamespace())

for _, cluster := range instance.Status.Status {
for _, cluster := range originalCPCS {
key := appsv1.PlacementDecision{
ClusterName: cluster.ClusterNamespace,
ClusterNamespace: cluster.ClusterNamespace,
Expand Down Expand Up @@ -388,32 +367,30 @@ func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) err
}
}

placements, decisions, err := r.handleDecisions(instance)
placements, decisions, err := r.getDecisions(instance)
if err != nil {
log.Info("Failed to get any placement decisions. Giving up on the request.")

return errors.New("could not get the placement decisions")
}

err = r.cleanUpOrphanedRplPolicies(instance, decisions)
if err != nil {
log.Error(err, "Failed to delete orphaned replicated policies")

return err
}

log.V(1).Info("Updating the root policy status")

cpcs, cpcsErr := r.calculatePerClusterStatus(instance, decisions)
if cpcsErr != nil {
log.Error(cpcsErr, "Failed to get at least one replicated policy")
// If there is a new replicated policy, then its lookup is expected to fail - it hasn't been created yet.
log.Error(cpcsErr, "Failed to get at least one replicated policy, but that may be expected. Ignoring.")
}

err = r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, instance)
if err != nil {
log.Error(err, "Failed to refresh the cached policy. Will use existing policy.")
}

// make a copy of the original status
originalCPCS := make([]*policiesv1.CompliancePerClusterStatus, len(instance.Status.Status))
copy(originalCPCS, instance.Status.Status)

instance.Status.Status = cpcs
instance.Status.ComplianceState = CalculateRootCompliance(cpcs)
instance.Status.Placement = placements
Expand All @@ -423,7 +400,33 @@ func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) err
return err
}

return cpcsErr
log.Info("Sending reconcile events to replicated policies", "decisionsCount", len(decisions))

for decision := range decisions {
simpleObj := &GuttedObject{
TypeMeta: metav1.TypeMeta{
Kind: policiesv1.Kind,
APIVersion: policiesv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: common.FullNameForPolicy(instance),
Namespace: decision.ClusterNamespace,
},
}

log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName())

r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj}
}

err = r.cleanUpOrphanedRplPolicies(instance, originalCPCS, decisions)
if err != nil {
log.Error(err, "Failed to delete orphaned replicated policies")

return err
}

return nil
}

// a helper to quickly check if there are any templates in any of the policy templates
Expand Down
80 changes: 43 additions & 37 deletions controllers/propagator/replicatedpolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var _ reconcile.Reconciler = &ReplicatedPolicyReconciler{}
type ReplicatedPolicyReconciler struct {
Propagator
ResourceVersions *sync.Map
DynamicWatcher k8sdepwatches.DynamicWatcher
}

func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -52,13 +53,18 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl
}

rootName, rootNS, err := common.ParseRootPolicyLabel(request.Name)
if err != nil && replicatedExists {
if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil {
if !k8serrors.IsNotFound(err) {
log.Error(err, "Failed to delete the invalid replicated policy, requeueing")
if err != nil {
if !replicatedExists {
log.Error(err, "Invalid replicated policy sent for reconcile, rejecting")

return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

cleanUpErr := r.cleanUpReplicated(ctx, replicatedPolicy)
if cleanUpErr != nil && !k8serrors.IsNotFound(cleanUpErr) {
log.Error(err, "Failed to delete the invalid replicated policy, requeueing")

return reconcile.Result{}, err
}

log.Info("Invalid replicated policy deleted")
Expand All @@ -73,48 +79,48 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl
rootNN := types.NamespacedName{Namespace: rootNS, Name: rootName}

if err := r.Get(ctx, rootNN, rootPolicy); err != nil {
if k8serrors.IsNotFound(err) {
if replicatedExists {
// do not handle a replicated policy which does not belong to the current cluster
inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace)
if err != nil {
return reconcile.Result{}, err
}

if !inClusterNS {
log.Info("Found a replicated policy in non-cluster namespace, skipping it")

return reconcile.Result{}, nil
}

// otherwise, we need to clean it up
if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil {
if !k8serrors.IsNotFound(err) {
log.Error(err, "Failed to delete the orphaned replicated policy, requeueing")

return reconcile.Result{}, err
}
}

log.Info("Orphaned replicated policy deleted")
if !k8serrors.IsNotFound(err) {
log.Error(err, "Failed to get the root policy, requeueing")

return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if !replicatedExists {
version := safeWriteLoad(r.ResourceVersions, rsrcVersKey)
defer version.Unlock()

// Store this to ensure the cache matches a known possible state for this situation
version.resourceVersion = "deleted"

log.Info("Root policy and replicated policy already missing")
log.V(1).Info("Root policy and replicated policy already missing")

return reconcile.Result{}, nil
}

log.Error(err, "Failed to get the root policy, requeueing")
// do not handle a replicated policy which does not belong to the current cluster
inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace)
if err != nil {
return reconcile.Result{}, err
}

if !inClusterNS {
log.V(1).Info("Found a replicated policy in non-cluster namespace, skipping it")

return reconcile.Result{}, err
return reconcile.Result{}, nil
}

// otherwise, we need to clean it up
if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil {
if !k8serrors.IsNotFound(err) {
log.Error(err, "Failed to delete the orphaned replicated policy, requeueing")

return reconcile.Result{}, err
}
}

log.Info("Orphaned replicated policy deleted")

return reconcile.Result{}, nil
}

if rootPolicy.Spec.Disabled {
Expand All @@ -138,7 +144,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl
// Store this to ensure the cache matches a known possible state for this situation
version.resourceVersion = "deleted"

log.Info("Root policy is disabled, and replicated policy correctly not found.")
log.V(1).Info("Root policy is disabled, and replicated policy correctly not found.")

return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -173,7 +179,7 @@ func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl
// Store this to ensure the cache matches a known possible state for this situation
version.resourceVersion = "deleted"

log.Info("Replicated policy should not exist on this managed cluster, and does not.")
log.V(1).Info("Replicated policy should not exist on this managed cluster, and does not.")

return reconcile.Result{}, nil
}
Expand Down
8 changes: 4 additions & 4 deletions controllers/propagator/rootpolicy_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ func mapPlacementRuleToPolicies(c client.Client) handler.MapFunc {

var result []reconcile.Request
// loop through pbs and collect policies from each matching one.
for _, pb := range pbList.Items {
for i, pb := range pbList.Items {
if pb.PlacementRef.APIGroup != appsv1.SchemeGroupVersion.Group ||
pb.PlacementRef.Kind != "PlacementRule" || pb.PlacementRef.Name != object.GetName() {
continue
}

result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...)
result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...)
}

return result
Expand Down Expand Up @@ -238,13 +238,13 @@ func mapPlacementDecisionToPolicies(c client.Client) handler.MapFunc {

var result []reconcile.Request
// loop through pbs and collect policies from each matching one.
for _, pb := range pbList.Items {
for i, pb := range pbList.Items {
if pb.PlacementRef.APIGroup != clusterv1beta1.SchemeGroupVersion.Group ||
pb.PlacementRef.Kind != "Placement" || pb.PlacementRef.Name != placementName {
continue
}

result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...)
result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...)
}

return result
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ func main() {
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor(propagatorctrl.ControllerName),
DynamicWatcher: dynamicWatcher,
RootPolicyLocks: policiesLock,
ReplicatedPolicyUpdates: replicatedPolicyUpdates,
}
Expand All @@ -296,6 +295,7 @@ func main() {
if err = (&propagatorctrl.ReplicatedPolicyReconciler{
Propagator: propagator,
ResourceVersions: replicatedResourceVersions,
DynamicWatcher: dynamicWatcher,
}).SetupWithManager(mgr, replPolicyMaxConcurrency, dynamicWatcherSource, replicatedUpdatesSource); err != nil {
log.Error(err, "Unable to create the controller", "controller", "replicated-policy")
os.Exit(1)
Expand Down
17 changes: 14 additions & 3 deletions test/e2e/case1_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,24 @@ var _ = Describe("Test policy propagation", func() {
policyMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(policy)
Expect(err).ToNot(HaveOccurred())

policyRV, err := policyClient().Create(
_, err = policyClient().Create(
context.TODO(), &unstructured.Unstructured{Object: policyMap}, metav1.CreateOptions{},
)
Expect(err).ToNot(HaveOccurred())

_, found, _ := unstructured.NestedBool(policyRV.Object, "spec", "copyPolicyMetadata")
Expect(found).To(BeFalse())
Eventually(func(g Gomega) {
replicatedPlc := utils.GetWithTimeout(
clientHubDynamic,
gvrPolicy,
testNamespace+"."+policyName,
"managed1",
true,
defaultTimeoutSeconds,
)

_, found, _ := unstructured.NestedBool(replicatedPlc.Object, "spec", "copyPolicyMetadata")
g.Expect(found).To(BeFalse())
}, defaultTimeoutSeconds, 1).Should(Succeed())
})

It("verifies that the labels and annotations are copied with spec.copyPolicyMetadata=true", func() {
Expand Down

0 comments on commit 56b9c23

Please sign in to comment.