diff --git a/README.md b/README.md index 8a594c27..71557644 100644 --- a/README.md +++ b/README.md @@ -76,18 +76,6 @@ make kind-delete-cluster ### Updating Deployment resources Some of the deployment resources are generated by kubebuilder - the crds are generated into `./deploy/crds` and the rbac details from kubebuilder comments are compiled into `./deploy/rbac/role.yaml`. Other details are managed independently - in particular, the details in `./deploy/manager/manager.yaml`. When any of those details need to be changed, the main deployment yaml `./deploy/operator.yaml` must be regenerated through the `make generate-operator-yaml` target. The `./deploy/operator.yaml` SHOULD NOT be manually updated. -## Configuration - -The following environment variables can be set to configure the controller: - -* `CONTROLLER_CONFIG_CONCURRENCY_PER_POLICY` - The maximum number of placement decisions that can be - processed concurrently per policy. This defaults to `5`. -* `CONTROLLER_CONFIG_REQUEUE_ERROR_DELAY` - The number of minutes to delay before retrying to - process a reconcile event after one or more placement decisions failed to be processed. This is - not a blocking delay. This defaults to `5`. -* `CONTROLLER_CONFIG_RETRY_ATTEMPTS` - The number of times to retry a failed Kubernetes API call - when processing a placement decision. This defaults to `3`. - ## Running the Compliance Events API Create the KinD cluster and install Postgres with the following commands: diff --git a/controllers/common/common.go b/controllers/common/common.go index 046316f0..cc40cb81 100644 --- a/controllers/common/common.go +++ b/controllers/common/common.go @@ -221,18 +221,6 @@ func GetDecisions(c client.Client, pb *policiesv1.PlacementBinding) ([]appsv1.Pl return nil, fmt.Errorf("placement binding %s/%s reference is not valid", pb.Name, pb.Namespace) } -// GetNumWorkers is a helper function to return the number of workers to handle concurrent tasks -func GetNumWorkers(listLength int, concurrencyPerPolicy int) int { - var numWorkers int - if listLength > concurrencyPerPolicy { - numWorkers = concurrencyPerPolicy - } else { - numWorkers = listLength - } - - return numWorkers -} - func ParseRootPolicyLabel(rootPlc string) (name, namespace string, err error) { // namespaces can't have a `.` (but names can) so this always correctly pulls the namespace out namespace, name, found := strings.Cut(rootPlc, ".") diff --git a/controllers/common/policy_mapper.go b/controllers/common/policy_mapper.go index 2ab46c9d..7e1e1984 100644 --- a/controllers/common/policy_mapper.go +++ b/controllers/common/policy_mapper.go @@ -15,7 +15,7 @@ import ( // PolicyMapper looks at object and returns a slice of reconcile.Request to reconcile // owners of object from label: policy.open-cluster-management.io/root-policy -func PolicyMapper(c client.Client) handler.MapFunc { +func MapToRootPolicy(c client.Client) handler.MapFunc { return func(ctx context.Context, object client.Object) []reconcile.Request { log := ctrl.Log.WithValues("name", object.GetName(), "namespace", object.GetNamespace()) diff --git a/controllers/encryptionkeys/encryptionkeys_controller.go b/controllers/encryptionkeys/encryptionkeys_controller.go index bafe7a82..735d1d36 100644 --- a/controllers/encryptionkeys/encryptionkeys_controller.go +++ b/controllers/encryptionkeys/encryptionkeys_controller.go @@ -35,17 +35,14 @@ const ( var ( log = ctrl.Log.WithName(ControllerName) errLastRotationParseError = fmt.Errorf(`failed to parse the "%s" annotation`, propagator.LastRotatedAnnotation) - // The number of retries when performing an operation that can fail temporarily and can't be - // requeued for a retry later. This is not a const so it can be overwritten during the tests. - retries uint = 4 ) // SetupWithManager sets up the controller with the Manager. -func (r *EncryptionKeysReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *EncryptionKeysReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles uint) error { return ctrl.NewControllerManagedBy(mgr). // The work queue prevents the same item being reconciled concurrently: // https://github.com/kubernetes-sigs/controller-runtime/issues/1416#issuecomment-899833144 - WithOptions(controller.Options{MaxConcurrentReconciles: int(r.MaxConcurrentReconciles)}). + WithOptions(controller.Options{MaxConcurrentReconciles: int(maxConcurrentReconciles)}). Named(ControllerName). For(&corev1.Secret{}). Complete(r) @@ -58,9 +55,8 @@ var _ reconcile.Reconciler = &EncryptionKeysReconciler{} // for all managed clusters. type EncryptionKeysReconciler struct { //nolint:golint,revive client.Client - KeyRotationDays uint - MaxConcurrentReconciles uint - Scheme *runtime.Scheme + KeyRotationDays uint + Scheme *runtime.Scheme } //+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=get;list;patch diff --git a/controllers/encryptionkeys/encryptionkeys_controller_test.go b/controllers/encryptionkeys/encryptionkeys_controller_test.go index 0cd3d032..311f94f5 100644 --- a/controllers/encryptionkeys/encryptionkeys_controller_test.go +++ b/controllers/encryptionkeys/encryptionkeys_controller_test.go @@ -205,10 +205,9 @@ func getReconciler(encryptionSecret *corev1.Secret) *EncryptionKeysReconciler { client := builder.Build() return &EncryptionKeysReconciler{ - Client: client, - KeyRotationDays: 30, - MaxConcurrentReconciles: 1, - Scheme: scheme, + Client: client, + KeyRotationDays: 30, + Scheme: scheme, } } @@ -464,11 +463,6 @@ func TestReconcileAPIFails(t *testing.T) { t.Parallel() RegisterFailHandler(Fail) - originalRetries := retries - retries = 0 - - t.Cleanup(func() { retries = originalRetries }) - tests := []struct { ExpectedRotation bool GetError bool diff --git a/controllers/policymetrics/policymetrics_controller.go b/controllers/policymetrics/policymetrics_controller.go index d46c8f14..dd91b00d 100644 --- a/controllers/policymetrics/policymetrics_controller.go +++ b/controllers/policymetrics/policymetrics_controller.go @@ -24,11 +24,11 @@ const ControllerName string = "policy-metrics" var log = ctrl.Log.WithName(ControllerName) // SetupWithManager sets up the controller with the Manager. -func (r *MetricReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *MetricReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles uint) error { return ctrl.NewControllerManagedBy(mgr). // The work queue prevents the same item being reconciled concurrently: // https://github.com/kubernetes-sigs/controller-runtime/issues/1416#issuecomment-899833144 - WithOptions(controller.Options{MaxConcurrentReconciles: int(r.MaxConcurrentReconciles)}). + WithOptions(controller.Options{MaxConcurrentReconciles: int(maxConcurrentReconciles)}). Named(ControllerName). For(&policiesv1.Policy{}). Complete(r) @@ -40,8 +40,7 @@ var _ reconcile.Reconciler = &MetricReconciler{} // MetricReconciler reconciles the metrics for the Policy type MetricReconciler struct { client.Client - MaxConcurrentReconciles uint - Scheme *runtime.Scheme + Scheme *runtime.Scheme } //+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=get;list;watch;create;update;patch;delete diff --git a/controllers/propagator/aggregation.go b/controllers/propagator/aggregation.go index 4cb84973..fa207321 100644 --- a/controllers/propagator/aggregation.go +++ b/controllers/propagator/aggregation.go @@ -4,61 +4,50 @@ import ( "context" "sort" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" ) // calculatePerClusterStatus lists up all policies replicated from the input policy, and stores -// their compliance states in the result list. Additionally, clusters in the failedClusters input -// will be marked as NonCompliant in the result. The result is sorted by cluster name. An error -// will be returned if lookup of the replicated policies fails, and the retries also fail. -func (r *PolicyReconciler) calculatePerClusterStatus( - instance *policiesv1.Policy, allDecisions, failedClusters decisionSet, +// their compliance states in the result list. The result is sorted by cluster name. An error +// will be returned if lookup of a replicated policy fails, but all lookups will still be attempted. +func (r *Propagator) calculatePerClusterStatus( + instance *policiesv1.Policy, decisions decisionSet, ) ([]*policiesv1.CompliancePerClusterStatus, error) { if instance.Spec.Disabled { return nil, nil } - status := make([]*policiesv1.CompliancePerClusterStatus, 0, len(allDecisions)) + status := make([]*policiesv1.CompliancePerClusterStatus, 0, len(decisions)) + var lookupErr error // save until end, to attempt all lookups // Update the status based on the processed decisions - for decision := range allDecisions { - if failedClusters[decision] { - // Skip the replicated policies that failed to be properly replicated - // for now. This will be handled later. - continue - } - - rPlc := &policiesv1.Policy{} + for dec := range decisions { + replicatedPolicy := &policiesv1.Policy{} key := types.NamespacedName{ - Namespace: decision.ClusterNamespace, Name: instance.Namespace + "." + instance.Name, + Namespace: dec.ClusterNamespace, Name: instance.Namespace + "." + instance.Name, } - err := r.Get(context.TODO(), key, rPlc) + err := r.Get(context.TODO(), key, replicatedPolicy) if err != nil { - return nil, err - } + if errors.IsNotFound(err) { + status = append(status, &policiesv1.CompliancePerClusterStatus{ + ClusterName: dec.ClusterName, + ClusterNamespace: dec.ClusterNamespace, + }) - status = append(status, &policiesv1.CompliancePerClusterStatus{ - ComplianceState: rPlc.Status.ComplianceState, - ClusterName: decision.ClusterName, - ClusterNamespace: decision.ClusterNamespace, - }) - } + continue + } - // Add cluster statuses for the clusters that did not get their policies properly - // replicated. This is not done in the previous loop since some replicated polices may not - // have been created at all. - for clusterDecision := range failedClusters { - log.Info( - "Setting the policy to noncompliant since the replication failed", "cluster", clusterDecision, - ) + lookupErr = err + } status = append(status, &policiesv1.CompliancePerClusterStatus{ - ComplianceState: policiesv1.NonCompliant, - ClusterName: clusterDecision.ClusterName, - ClusterNamespace: clusterDecision.ClusterNamespace, + ComplianceState: replicatedPolicy.Status.ComplianceState, + ClusterName: dec.ClusterName, + ClusterNamespace: dec.ClusterNamespace, }) } @@ -66,7 +55,7 @@ func (r *PolicyReconciler) calculatePerClusterStatus( return status[i].ClusterName < status[j].ClusterName }) - return status, nil + return status, lookupErr } // CalculateRootCompliance uses the input per-cluster statuses to determine what a root policy's diff --git a/controllers/propagator/encryption.go b/controllers/propagator/encryption.go index a695c3e4..d3e1b808 100644 --- a/controllers/propagator/encryption.go +++ b/controllers/propagator/encryption.go @@ -26,7 +26,7 @@ const ( // getEncryptionKey will get the encryption key for a managed cluster used for policy template encryption. If it doesn't // already exist as a secret on the Hub cluster, it will be generated. -func (r *PolicyReconciler) getEncryptionKey(clusterName string) ([]byte, error) { +func (r *Propagator) getEncryptionKey(clusterName string) ([]byte, error) { ctx := context.TODO() objectKey := types.NamespacedName{ Name: EncryptionKeySecret, @@ -90,7 +90,7 @@ func GenerateEncryptionKey() ([]byte, error) { // getInitializationVector retrieves the initialization vector from the annotation // "policy.open-cluster-management.io/encryption-iv" if the annotation exists or generates a new // initialization vector and adds it to the annotations object if it's missing. -func (r *PolicyReconciler) getInitializationVector( +func (r *Propagator) getInitializationVector( policyName string, clusterName string, annotations map[string]string, ) ([]byte, error) { log := log.WithValues("policy", policyName, "cluster", clusterName) diff --git a/controllers/propagator/encryption_test.go b/controllers/propagator/encryption_test.go index db9216ad..31ceb86c 100644 --- a/controllers/propagator/encryption_test.go +++ b/controllers/propagator/encryption_test.go @@ -28,7 +28,7 @@ func TestGetEncryptionKeyNoSecret(_ *testing.T) { RegisterFailHandler(Fail) client := fake.NewClientBuilder().Build() - r := PolicyReconciler{Client: client} + r := Propagator{Client: client} key, err := r.getEncryptionKey(clusterName) Expect(err).ToNot(HaveOccurred()) @@ -68,7 +68,7 @@ func TestGetEncryptionKeySecretExists(_ *testing.T) { client := fake.NewClientBuilder().WithObjects(encryptionSecret).Build() - r := PolicyReconciler{Client: client} + r := Propagator{Client: client} key, err = r.getEncryptionKey(clusterName) Expect(err).ToNot(HaveOccurred()) @@ -103,7 +103,7 @@ func TestGetInitializationVector(t *testing.T) { }, } - r := PolicyReconciler{} + r := Propagator{} for _, test := range tests { subTest := test diff --git a/controllers/propagator/placementBindingMapper.go b/controllers/propagator/placementBindingMapper.go deleted file mode 100644 index 9d68c301..00000000 --- a/controllers/propagator/placementBindingMapper.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "context" - - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - "open-cluster-management.io/governance-policy-propagator/controllers/common" -) - -func placementBindingMapper(c client.Client) handler.MapFunc { - return func(ctx context.Context, obj client.Object) []reconcile.Request { - //nolint:forcetypeassert - pb := obj.(*policiesv1.PlacementBinding) - - log := log.WithValues("placementBindingName", pb.GetName(), "namespace", pb.GetNamespace()) - log.V(2).Info("Reconcile request for a PlacementBinding") - - return common.GetPoliciesInPlacementBinding(ctx, c, pb) - } -} diff --git a/controllers/propagator/placementBindingPredicate.go b/controllers/propagator/placementBindingPredicate.go deleted file mode 100644 index 04682da9..00000000 --- a/controllers/propagator/placementBindingPredicate.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - "open-cluster-management.io/governance-policy-propagator/controllers/common" -) - -// we only want to watch for pb contains policy and policyset as subjects -var pbPredicateFuncs = predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - //nolint:forcetypeassert - pbObjNew := e.ObjectNew.(*policiesv1.PlacementBinding) - //nolint:forcetypeassert - pbObjOld := e.ObjectOld.(*policiesv1.PlacementBinding) - - return common.IsForPolicyOrPolicySet(pbObjNew) || common.IsForPolicyOrPolicySet(pbObjOld) - }, - CreateFunc: func(e event.CreateEvent) bool { - //nolint:forcetypeassert - pbObj := e.Object.(*policiesv1.PlacementBinding) - - return common.IsForPolicyOrPolicySet(pbObj) - }, - DeleteFunc: func(e event.DeleteEvent) bool { - //nolint:forcetypeassert - pbObj := e.Object.(*policiesv1.PlacementBinding) - - return common.IsForPolicyOrPolicySet(pbObj) - }, -} diff --git a/controllers/propagator/placementDecisionMapper.go b/controllers/propagator/placementDecisionMapper.go deleted file mode 100644 index 21f4e35c..00000000 --- a/controllers/propagator/placementDecisionMapper.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "context" - - clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - "open-cluster-management.io/governance-policy-propagator/controllers/common" -) - -func placementDecisionMapper(c client.Client) handler.MapFunc { - return func(ctx context.Context, object client.Object) []reconcile.Request { - log := log.WithValues("placementDecisionName", object.GetName(), "namespace", object.GetNamespace()) - - log.V(2).Info("Reconcile request for a placement decision") - - // get the placement name from the placementdecision - placementName := object.GetLabels()["cluster.open-cluster-management.io/placement"] - if placementName == "" { - return nil - } - - pbList := &policiesv1.PlacementBindingList{} - // find pb in the same namespace of placementrule - lopts := &client.ListOptions{Namespace: object.GetNamespace()} - opts := client.MatchingFields{"placementRef.name": placementName} - opts.ApplyToList(lopts) - - err := c.List(ctx, pbList, lopts) - if err != nil { - return nil - } - - var result []reconcile.Request - // loop through pbs and collect policies from each matching one. - for _, pb := range pbList.Items { - if pb.PlacementRef.APIGroup != clusterv1beta1.SchemeGroupVersion.Group || - pb.PlacementRef.Kind != "Placement" || pb.PlacementRef.Name != placementName { - continue - } - - result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...) - } - - return result - } -} diff --git a/controllers/propagator/placementRuleMapper.go b/controllers/propagator/placementRuleMapper.go deleted file mode 100644 index bf5f1d75..00000000 --- a/controllers/propagator/placementRuleMapper.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "context" - - appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - "open-cluster-management.io/governance-policy-propagator/controllers/common" -) - -func placementRuleMapper(c client.Client) handler.MapFunc { - return func(ctx context.Context, object client.Object) []reconcile.Request { - log := log.WithValues("placementRuleName", object.GetName(), "namespace", object.GetNamespace()) - - log.V(2).Info("Reconcile Request for PlacementRule") - - // list pb - pbList := &policiesv1.PlacementBindingList{} - - // find pb in the same namespace of placementrule - err := c.List(ctx, pbList, &client.ListOptions{Namespace: object.GetNamespace()}) - if err != nil { - return nil - } - - var result []reconcile.Request - // loop through pbs and collect policies from each matching one. - for _, pb := range pbList.Items { - if pb.PlacementRef.APIGroup != appsv1.SchemeGroupVersion.Group || - pb.PlacementRef.Kind != "PlacementRule" || pb.PlacementRef.Name != object.GetName() { - continue - } - - result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pb)...) - } - - return result - } -} diff --git a/controllers/propagator/policyPredicate.go b/controllers/propagator/policyPredicate.go deleted file mode 100644 index 5887accb..00000000 --- a/controllers/propagator/policyPredicate.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "k8s.io/apimachinery/pkg/api/equality" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" -) - -// policyPredicates filters out updates to policies that are pure status updates. -func policyPredicates() predicate.Funcs { - return predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - //nolint:forcetypeassert - oldPolicy := e.ObjectOld.(*policiesv1.Policy) - //nolint:forcetypeassert - updatedPolicy := e.ObjectNew.(*policiesv1.Policy) - - // Ignore pure status updates since those are handled by a separate controller - return oldPolicy.Generation != updatedPolicy.Generation || - !equality.Semantic.DeepEqual(oldPolicy.ObjectMeta.Labels, updatedPolicy.ObjectMeta.Labels) || - !equality.Semantic.DeepEqual(oldPolicy.ObjectMeta.Annotations, updatedPolicy.ObjectMeta.Annotations) - }, - } -} diff --git a/controllers/propagator/policySetMapper.go b/controllers/propagator/policySetMapper.go deleted file mode 100644 index b97a6817..00000000 --- a/controllers/propagator/policySetMapper.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "context" - - "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - - policiesv1beta1 "open-cluster-management.io/governance-policy-propagator/api/v1beta1" -) - -func policySetMapper(_ client.Client) handler.MapFunc { - return func(ctx context.Context, object client.Object) []reconcile.Request { - log := log.WithValues("policySetName", object.GetName(), "namespace", object.GetNamespace()) - log.V(2).Info("Reconcile Request for PolicySet") - - var result []reconcile.Request - - for _, plc := range object.(*policiesv1beta1.PolicySet).Spec.Policies { - log.V(2).Info("Found reconciliation request from a policyset", "policyName", string(plc)) - - request := reconcile.Request{NamespacedName: types.NamespacedName{ - Name: string(plc), - Namespace: object.GetNamespace(), - }} - result = append(result, request) - } - - return result - } -} diff --git a/controllers/propagator/policySetPredicate.go b/controllers/propagator/policySetPredicate.go deleted file mode 100644 index aa201a13..00000000 --- a/controllers/propagator/policySetPredicate.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright (c) 2020 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "k8s.io/apimachinery/pkg/api/equality" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - policiesv1beta1 "open-cluster-management.io/governance-policy-propagator/api/v1beta1" -) - -// we only want to watch for policyset objects with Spec.Policies field change -var policySetPredicateFuncs = predicate.Funcs{ - UpdateFunc: func(e event.UpdateEvent) bool { - //nolint:forcetypeassert - policySetObjNew := e.ObjectNew.(*policiesv1beta1.PolicySet) - //nolint:forcetypeassert - policySetObjOld := e.ObjectOld.(*policiesv1beta1.PolicySet) - - return !equality.Semantic.DeepEqual(policySetObjNew.Spec.Policies, policySetObjOld.Spec.Policies) - }, - CreateFunc: func(e event.CreateEvent) bool { - return true - }, - DeleteFunc: func(e event.DeleteEvent) bool { - return true - }, -} diff --git a/controllers/propagator/policy_controller.go b/controllers/propagator/policy_controller.go deleted file mode 100644 index 0277b827..00000000 --- a/controllers/propagator/policy_controller.go +++ /dev/null @@ -1,182 +0,0 @@ -// Copyright (c) 2021 Red Hat, Inc. -// Copyright Contributors to the Open Cluster Management project - -package propagator - -import ( - "context" - "sync" - - k8sdepwatches "github.com/stolostron/kubernetes-dependency-watches/client" - k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" - appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" - - policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" - policiesv1beta1 "open-cluster-management.io/governance-policy-propagator/api/v1beta1" - "open-cluster-management.io/governance-policy-propagator/controllers/common" -) - -const ControllerName string = "policy-propagator" - -var log = ctrl.Log.WithName(ControllerName) - -//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies/finalizers,verbs=update -//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=placementbindings,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policysets,verbs=get;list;watch -//+kubebuilder:rbac:groups=cluster.open-cluster-management.io,resources=managedclusters;placementdecisions;placements,verbs=get;list;watch -//+kubebuilder:rbac:groups=apps.open-cluster-management.io,resources=placementrules,verbs=get;list;watch -//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch - -// SetupWithManager sets up the controller with the Manager. -func (r *PolicyReconciler) SetupWithManager(mgr ctrl.Manager, additionalSources ...source.Source) error { - builder := ctrl.NewControllerManagedBy(mgr). - Named(ControllerName). - For( - &policiesv1.Policy{}, - builder.WithPredicates(common.NeverEnqueue)). - // This is a workaround - the controller-runtime requires a "For", but does not allow it to - // modify the eventhandler. Currently we need to enqueue requests for Policies in a very - // particular way, so we will define that in a separate "Watches" - Watches( - &policiesv1.Policy{}, - handler.EnqueueRequestsFromMapFunc(common.PolicyMapper(mgr.GetClient())), - builder.WithPredicates(policyPredicates())). - Watches( - &policiesv1beta1.PolicySet{}, - handler.EnqueueRequestsFromMapFunc(policySetMapper(mgr.GetClient())), - builder.WithPredicates(policySetPredicateFuncs)). - Watches( - &policiesv1.PlacementBinding{}, - handler.EnqueueRequestsFromMapFunc(placementBindingMapper(mgr.GetClient())), - builder.WithPredicates(pbPredicateFuncs)). - Watches( - &appsv1.PlacementRule{}, - handler.EnqueueRequestsFromMapFunc(placementRuleMapper(mgr.GetClient()))). - Watches( - &clusterv1beta1.PlacementDecision{}, - handler.EnqueueRequestsFromMapFunc(placementDecisionMapper(mgr.GetClient())), - ) - - for _, source := range additionalSources { - builder.WatchesRawSource(source, &handler.EnqueueRequestForObject{}) - } - - return builder.Complete(r) -} - -// blank assignment to verify that ReconcilePolicy implements reconcile.Reconciler -var _ reconcile.Reconciler = &PolicyReconciler{} - -// PolicyReconciler reconciles a Policy object -type PolicyReconciler struct { - client.Client - Scheme *runtime.Scheme - Recorder record.EventRecorder - DynamicWatcher k8sdepwatches.DynamicWatcher - RootPolicyLocks *sync.Map -} - -// Reconcile reads that state of the cluster for a Policy object and makes changes based on the state read -// and what is in the Policy.Spec -// Note: -// The Controller will requeue the Request to be processed again if the returned error is non-nil or -// Result.Requeue is true, otherwise upon completion it will remove the work from the queue. -func (r *PolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - log := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) - - log.V(3).Info("Acquiring the lock for the root policy") - - lock, _ := r.RootPolicyLocks.LoadOrStore(request.NamespacedName, &sync.Mutex{}) - - lock.(*sync.Mutex).Lock() - defer lock.(*sync.Mutex).Unlock() - - // Set the hub template watch metric after reconcile - defer func() { - hubTempWatches := r.DynamicWatcher.GetWatchCount() - log.V(3).Info("Setting hub template watch metric", "value", hubTempWatches) - - hubTemplateActiveWatchesMetric.Set(float64(hubTempWatches)) - }() - - log.Info("Reconciling the policy") - - // Fetch the Policy instance - instance := &policiesv1.Policy{} - - err := r.Get(ctx, request.NamespacedName, instance) - if err != nil { - if k8serrors.IsNotFound(err) { - // Request object not found, could have been deleted after reconcile request. - // Owned objects are automatically garbage collected. - log.Info("Policy not found, so it may have been deleted. Deleting the replicated policies.") - - _, err := r.cleanUpPolicy(&policiesv1.Policy{ - TypeMeta: metav1.TypeMeta{ - Kind: policiesv1.Kind, - APIVersion: policiesv1.GroupVersion.Group + "/" + policiesv1.GroupVersion.Version, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: request.Name, - Namespace: request.Namespace, - }, - }) - if err != nil { - log.Error(err, "Failure during replicated policy cleanup") - - return reconcile.Result{}, err - } - - return reconcile.Result{}, nil - } - - log.Error(err, "Failed to get the policy") - - // Error reading the object - requeue the request. - return reconcile.Result{}, err - } - - inClusterNs, err := common.IsInClusterNamespace(r.Client, instance.Namespace) - if err != nil { - log.Error(err, "Failed to determine if the policy is in a managed cluster namespace. Requeueing the request.") - - return reconcile.Result{}, err - } - - if !inClusterNs { - err := r.handleRootPolicy(instance) - if err != nil { - log.Error(err, "Failure during root policy handling") - - propagationFailureMetric.WithLabelValues(instance.GetName(), instance.GetNamespace()).Inc() - } - - return reconcile.Result{}, err - } - - log = log.WithValues("name", instance.GetName(), "namespace", instance.GetNamespace()) - - log.Info("The policy was found in the cluster namespace but doesn't belong to any root policy, deleting it") - - err = r.Delete(ctx, instance) - if err != nil && !k8serrors.IsNotFound(err) { - log.Error(err, "Failed to delete the policy") - - return reconcile.Result{}, err - } - - return reconcile.Result{}, nil -} diff --git a/controllers/propagator/propagation.go b/controllers/propagator/propagation.go index 99f22525..aef8dae7 100644 --- a/controllers/propagator/propagation.go +++ b/controllers/propagator/propagation.go @@ -7,10 +7,10 @@ import ( "context" "errors" "fmt" - "os" "sort" "strconv" "strings" + "sync" "time" templates "github.com/stolostron/go-template-utils/v3/pkg/templates" @@ -18,27 +18,23 @@ import ( k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" clusterv1 "open-cluster-management.io/api/cluster/v1" clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" policiesv1beta1 "open-cluster-management.io/governance-policy-propagator/api/v1beta1" "open-cluster-management.io/governance-policy-propagator/controllers/common" ) -// The configuration of the maximum number of Go routines to spawn when handling placement decisions -// per policy. -const ( - concurrencyPerPolicyEnvName = "CONTROLLER_CONFIG_CONCURRENCY_PER_POLICY" - concurrencyPerPolicyDefault = 5 -) - const ( startDelim = "{{hub" stopDelim = "hub}}" @@ -46,15 +42,21 @@ const ( ) var ( - concurrencyPerPolicy int - kubeConfig *rest.Config - kubeClient *kubernetes.Interface + kubeConfig *rest.Config + kubeClient *kubernetes.Interface ) +type Propagator struct { + client.Client + Scheme *runtime.Scheme + Recorder record.EventRecorder + RootPolicyLocks *sync.Map + ReplicatedPolicyUpdates chan event.GenericEvent +} + func Initialize(kubeconfig *rest.Config, kubeclient *kubernetes.Interface) { kubeConfig = kubeconfig kubeClient = kubeclient - concurrencyPerPolicy = getEnvVarPosInt(concurrencyPerPolicyEnvName, concurrencyPerPolicyDefault) } // getTemplateCfg returns the default policy template configuration. @@ -70,153 +72,6 @@ func getTemplateCfg() templates.Config { } } -func getEnvVarPosInt(name string, defaultValue int) int { - envValue := os.Getenv(name) - if envValue == "" { - return defaultValue - } - - envInt, err := strconv.Atoi(envValue) - if err == nil && envInt > 0 { - return envInt - } - - log.Info("The environment variable is invalid. Using default.", "name", name) - - return defaultValue -} - -func (r *PolicyReconciler) deletePolicy(plc *policiesv1.Policy) error { - // #nosec G601 -- no memory addresses are stored in collections - err := r.Delete(context.TODO(), plc) - if err != nil && !k8serrors.IsNotFound(err) { - log.Error( - err, - "Failed to delete the replicated policy", - "name", plc.GetName(), - "namespace", plc.GetNamespace(), - ) - - return err - } - - return nil -} - -type policyDeleter interface { - deletePolicy(instance *policiesv1.Policy) error -} - -type deletionResult struct { - Identifier string - Err error -} - -func plcDeletionWrapper( - deletionHandler policyDeleter, - policies <-chan policiesv1.Policy, - results chan<- deletionResult, -) { - for policy := range policies { - identifier := fmt.Sprintf("%s/%s", policy.GetNamespace(), policy.GetName()) - err := deletionHandler.deletePolicy(&policy) - results <- deletionResult{identifier, err} - } -} - -// cleanUpPolicy will delete all replicated policies associated with provided policy and return a boolean -// indicating all of the replicated policies were deleted, if any -func (r *PolicyReconciler) cleanUpPolicy(instance *policiesv1.Policy) (bool, error) { - log := log.WithValues("policyName", instance.GetName(), "policyNamespace", instance.GetNamespace()) - replicatedPlcList := &policiesv1.PolicyList{} - - instanceGVK := instance.GroupVersionKind() - - err := r.DynamicWatcher.RemoveWatcher(k8sdepwatches.ObjectIdentifier{ - Group: instanceGVK.Group, - Version: instanceGVK.Version, - Kind: instanceGVK.Kind, - Namespace: instance.Namespace, - Name: instance.Name, - }) - if err != nil { - log.Error( - err, - fmt.Sprintf( - "Failed to remove watches for the policy %s/%s from the dynamic watcher", - instance.Namespace, - instance.Name, - ), - ) - - return false, err - } - - err = r.List( - context.TODO(), replicatedPlcList, client.MatchingLabels(common.LabelsForRootPolicy(instance)), - ) - if err != nil { - log.Error(err, "Failed to list the replicated policies") - - return false, err - } - - if len(replicatedPlcList.Items) == 0 { - log.V(2).Info("No replicated policies to delete.") - - return false, nil - } - - log.V(2).Info( - "Deleting replicated policies because root policy was deleted", "count", len(replicatedPlcList.Items)) - - policiesChan := make(chan policiesv1.Policy, len(replicatedPlcList.Items)) - deletionResultsChan := make(chan deletionResult, len(replicatedPlcList.Items)) - - numWorkers := common.GetNumWorkers(len(replicatedPlcList.Items), concurrencyPerPolicy) - - for i := 0; i < numWorkers; i++ { - go plcDeletionWrapper(r, policiesChan, deletionResultsChan) - } - - log.V(2).Info("Scheduling work to handle deleting replicated policies") - - for _, plc := range replicatedPlcList.Items { - policiesChan <- plc - } - - // Wait for all the deletions to be processed. - log.V(1).Info("Waiting for the result of deleting the replicated policies", "count", len(policiesChan)) - - processedResults := 0 - failures := 0 - - for result := range deletionResultsChan { - if result.Err != nil { - log.V(2).Info("Failed to delete replicated policy " + result.Identifier) - failures++ - } - - processedResults++ - - // Once all the deletions have been processed, it's safe to close - // the channels and stop blocking in this goroutine. - if processedResults == len(replicatedPlcList.Items) { - close(policiesChan) - close(deletionResultsChan) - log.V(2).Info("All replicated policy deletions have been handled", "count", len(replicatedPlcList.Items)) - } - } - - if failures > 0 { - return false, errors.New("failed to delete one or more replicated policies") - } - - propagationFailureMetric.DeleteLabelValues(instance.GetName(), instance.GetNamespace()) - - return true, nil -} - // clusterDecision contains a single decision where the replicated policy // should be processed and any overrides to the root policy type clusterDecision struct { @@ -224,71 +79,12 @@ type clusterDecision struct { PolicyOverrides policiesv1.BindingOverrides } -type decisionHandler interface { - handleDecision(instance *policiesv1.Policy, decision clusterDecision) ( - templateRefObjs map[k8sdepwatches.ObjectIdentifier]bool, err error, - ) -} - -// decisionResult contains the result of handling a placement decision of a policy. It is intended -// to be sent in a channel by handleDecisionWrapper for the calling Go routine to determine if the -// processing was successful. Identifier is the PlacementDecision, with the ClusterNamespace and -// the ClusterName. TemplateRefObjs is a set of identifiers of objects accessed by hub policy -// templates. Err is the error associated with handling the decision. This can be nil to denote success. -type decisionResult struct { - Identifier appsv1.PlacementDecision - TemplateRefObjs map[k8sdepwatches.ObjectIdentifier]bool - Err error -} - -// handleDecisionWrapper wraps the handleDecision method for concurrency. decisionHandler is an -// object with the handleDecision method. This is used instead of making this a method on the -// PolicyReconciler struct in order for easier unit testing. instance is the policy the placement -// decision is about. decisions is the channel with the placement decisions for the input policy to -// process. When this channel closes, it means that all decisions have been processed. results is a -// channel this method will send the outcome of handling each placement decision. The calling Go -// routine can use this to determine success. -func handleDecisionWrapper( - decisionHandler decisionHandler, - instance *policiesv1.Policy, - decisions <-chan clusterDecision, - results chan<- decisionResult, -) { - for decision := range decisions { - log := log.WithValues( - "policyName", instance.GetName(), - "policyNamespace", instance.GetNamespace(), - "decision", decision.Cluster, - "policyOverrides", decision.PolicyOverrides, - ) - log.V(1).Info("Handling the decision") - - templateRefObjs, err := decisionHandler.handleDecision(instance, decision) - if err == nil { - log.V(1).Info("Replicated the policy") - } - - results <- decisionResult{decision.Cluster, templateRefObjs, err} - } -} - type decisionSet map[appsv1.PlacementDecision]bool -func (set decisionSet) namespaces() []string { - namespaces := make([]string, 0) - - for decision, isTrue := range set { - if isTrue { - namespaces = append(namespaces, decision.ClusterNamespace) - } - } - - return namespaces -} - -// getPolicyPlacementDecisions retrieves the placement decisions for a input -// placement binding when the policy is bound within it. -func (r *PolicyReconciler) getPolicyPlacementDecisions( +// getPolicyPlacementDecisions retrieves the placement decisions for a input PlacementBinding when +// the policy is bound within it. It can return an error if the PlacementBinding is invalid, or if +// a required lookup fails. +func (r *RootPolicyReconciler) getPolicyPlacementDecisions( instance *policiesv1.Policy, pb *policiesv1.PlacementBinding, ) (decisions []appsv1.PlacementDecision, placements []*policiesv1.Placement, err error) { if !common.HasValidPlacementRef(pb) { @@ -373,36 +169,24 @@ func (r *PolicyReconciler) getPolicyPlacementDecisions( return decisions, placements, err } -// getAllClusterDecisions retrieves all cluster decisions for the input policy, taking into -// account subFilter and bindingOverrides specified in the placement binding from the input -// placement binding list. -// It first processes all placement bindings with disabled subFilter to obtain a list of bound -// clusters along with their policy overrides if any, then processes all placement bindings -// with subFilter:restricted to override the policy for the subset of bound clusters as needed. -// It returns: -// - allClusterDecisions: a slice of all the cluster decisions should be handled -// - placements: a slice of all the placement decisions discovered -// - err: error -// -// The rules for policy overrides are as follows: -// -// - remediationAction: If any placement binding that the cluster is bound to has -// bindingOverrides.remediationAction set to "enforce", the remediationAction -// for the replicated policy will be set to "enforce". -func (r *PolicyReconciler) getAllClusterDecisions( +// getAllClusterDecisions calculates which managed clusters should have a replicated policy, and +// whether there are any BindingOverrides for that cluster. The placements array it returns is +// sorted by PlacementBinding name. It can return an error if the PlacementBinding is invalid, or if +// a required lookup fails. +func (r *RootPolicyReconciler) getAllClusterDecisions( instance *policiesv1.Policy, pbList *policiesv1.PlacementBindingList, ) ( - allClusterDecisions []clusterDecision, placements []*policiesv1.Placement, err error, + decisions map[appsv1.PlacementDecision]policiesv1.BindingOverrides, placements []*policiesv1.Placement, err error, ) { - allClusterDecisionsMap := map[appsv1.PlacementDecision]policiesv1.BindingOverrides{} + decisions = make(map[appsv1.PlacementDecision]policiesv1.BindingOverrides) // Process all placement bindings without subFilter - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.SubFilter == policiesv1.Restricted { continue } - plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb) + plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i]) if err != nil { return nil, nil, err } @@ -413,15 +197,15 @@ func (r *PolicyReconciler) getAllClusterDecisions( } for _, decision := range plcDecisions { - if overrides, ok := allClusterDecisionsMap[decision]; ok { + if overrides, ok := decisions[decision]; ok { // Found cluster in the decision map if strings.EqualFold(pb.BindingOverrides.RemediationAction, string(policiesv1.Enforce)) { overrides.RemediationAction = strings.ToLower(string(policiesv1.Enforce)) - allClusterDecisionsMap[decision] = overrides + decisions[decision] = overrides } } else { // No found cluster in the decision map, add it to the map - allClusterDecisionsMap[decision] = policiesv1.BindingOverrides{ + decisions[decision] = policiesv1.BindingOverrides{ // empty string if pb.BindingOverrides.RemediationAction is not defined RemediationAction: strings.ToLower(pb.BindingOverrides.RemediationAction), } @@ -431,15 +215,24 @@ func (r *PolicyReconciler) getAllClusterDecisions( placements = append(placements, plcPlacements...) } + if len(decisions) == 0 { + sort.Slice(placements, func(i, j int) bool { + return placements[i].PlacementBinding < placements[j].PlacementBinding + }) + + // No decisions, and subfilters can't add decisions, so we can stop early. + return nil, placements, nil + } + // Process all placement bindings with subFilter:restricted - for _, pb := range pbList.Items { + for i, pb := range pbList.Items { if pb.SubFilter != policiesv1.Restricted { continue } foundInDecisions := false - plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pb) + plcDecisions, plcPlacements, err := r.getPolicyPlacementDecisions(instance, &pbList.Items[i]) if err != nil { return nil, nil, err } @@ -450,13 +243,13 @@ func (r *PolicyReconciler) getAllClusterDecisions( } for _, decision := range plcDecisions { - if overrides, ok := allClusterDecisionsMap[decision]; ok { + if overrides, ok := decisions[decision]; ok { // Found cluster in the decision map foundInDecisions = true if strings.EqualFold(pb.BindingOverrides.RemediationAction, string(policiesv1.Enforce)) { overrides.RemediationAction = strings.ToLower(string(policiesv1.Enforce)) - allClusterDecisionsMap[decision] = overrides + decisions[decision] = overrides } } } @@ -466,153 +259,57 @@ func (r *PolicyReconciler) getAllClusterDecisions( } } - // Covert the decision map to a slice of clusterDecision - for cluster, overrides := range allClusterDecisionsMap { - decision := clusterDecision{ - Cluster: cluster, - PolicyOverrides: overrides, - } - allClusterDecisions = append(allClusterDecisions, decision) - } + sort.Slice(placements, func(i, j int) bool { + return placements[i].PlacementBinding < placements[j].PlacementBinding + }) - return allClusterDecisions, placements, nil + return decisions, placements, nil } -// handleDecisions will get all the placement decisions based on the input policy and placement -// binding list and propagate the policy. Note that this method performs concurrent operations. -// It returns the following: -// - placements - a slice of all the placement decisions discovered -// - allDecisions - a set of all the placement decisions encountered -// - failedClusters - a set of all the clusters that encountered an error during propagation -// - allFailed - a bool that determines if all clusters encountered an error during propagation -func (r *PolicyReconciler) handleDecisions( - instance *policiesv1.Policy, pbList *policiesv1.PlacementBindingList, +// getDecisions identifies all managed clusters which should have a replicated policy +func (r *RootPolicyReconciler) getDecisions( + instance *policiesv1.Policy, ) ( - placements []*policiesv1.Placement, allDecisions decisionSet, failedClusters decisionSet, allFailed bool, + []*policiesv1.Placement, decisionSet, error, ) { log := log.WithValues("policyName", instance.GetName(), "policyNamespace", instance.GetNamespace()) - allDecisions = map[appsv1.PlacementDecision]bool{} - failedClusters = map[appsv1.PlacementDecision]bool{} + decisions := make(map[appsv1.PlacementDecision]bool) - allTemplateRefObjs := getPolicySetDependencies(instance) + pbList := &policiesv1.PlacementBindingList{} - allClusterDecisions, placements, err := r.getAllClusterDecisions(instance, pbList) + err := r.List(context.TODO(), pbList, &client.ListOptions{Namespace: instance.GetNamespace()}) if err != nil { - allFailed = true - - return - } - - if len(allClusterDecisions) != 0 { - // Setup the workers which will call r.handleDecision. The number of workers depends - // on the number of decisions and the limit defined in concurrencyPerPolicy. - // decisionsChan acts as the work queue of decisions to process. resultsChan contains - // the results from the decisions being processed. - decisionsChan := make(chan clusterDecision, len(allClusterDecisions)) - resultsChan := make(chan decisionResult, len(allClusterDecisions)) - numWorkers := common.GetNumWorkers(len(allClusterDecisions), concurrencyPerPolicy) - - for i := 0; i < numWorkers; i++ { - go handleDecisionWrapper(r, instance, decisionsChan, resultsChan) - } - - log.Info("Handling the placement decisions", "count", len(allClusterDecisions)) - - for _, decision := range allClusterDecisions { - log.V(2).Info( - "Scheduling work to handle the decision for the cluster", - "name", decision.Cluster.ClusterName, - ) - decisionsChan <- decision - } - - // Wait for all the decisions to be processed. - log.V(2).Info("Waiting for the result of handling the decision(s)", "count", len(allClusterDecisions)) - - processedResults := 0 - - for result := range resultsChan { - allDecisions[result.Identifier] = true - - if result.Err != nil { - failedClusters[result.Identifier] = true - } - - processedResults++ - - for refObject := range result.TemplateRefObjs { - allTemplateRefObjs[refObject] = true - } + log.Error(err, "Could not list the placement bindings") - // Once all the decisions have been processed, it's safe to close - // the channels and stop blocking in this goroutine. - if processedResults == len(allClusterDecisions) { - close(decisionsChan) - close(resultsChan) - log.Info("All the placement decisions have been handled", "count", len(allClusterDecisions)) - } - } + return nil, decisions, err } - instanceGVK := instance.GroupVersionKind() - instanceObjID := k8sdepwatches.ObjectIdentifier{ - Group: instanceGVK.Group, - Version: instanceGVK.Version, - Kind: instanceGVK.Kind, - Namespace: instance.Namespace, - Name: instance.Name, + allClusterDecisions, placements, err := r.getAllClusterDecisions(instance, pbList) + if err != nil { + return placements, decisions, err } - refObjs := make([]k8sdepwatches.ObjectIdentifier, 0, len(allTemplateRefObjs)) - for refObj := range allTemplateRefObjs { - refObjs = append(refObjs, refObj) + if allClusterDecisions == nil { + allClusterDecisions = make(map[appsv1.PlacementDecision]policiesv1.BindingOverrides) } - if len(refObjs) != 0 { - err := r.DynamicWatcher.AddOrUpdateWatcher(instanceObjID, refObjs...) - if err != nil { - log.Error( - err, - fmt.Sprintf( - "Failed to update the dynamic watches for the policy %s/%s on objects referenced by hub policy "+ - "templates", - instance.Namespace, - instance.Name, - ), - ) - - allFailed = true - } - } else { - err := r.DynamicWatcher.RemoveWatcher(instanceObjID) - if err != nil { - log.Error( - err, - fmt.Sprintf( - "Failed to remove the dynamic watches for the policy %s/%s on objects referenced by hub policy "+ - "templates", - instance.Namespace, - instance.Name, - ), - ) - - allFailed = true - } + for dec := range allClusterDecisions { + decisions[dec] = true } - return + return placements, decisions, nil } // cleanUpOrphanedRplPolicies compares the status of the input policy against the input placement // decisions. If the cluster exists in the status but doesn't exist in the input placement -// decisions, then it's considered stale and will be removed. -func (r *PolicyReconciler) cleanUpOrphanedRplPolicies( - instance *policiesv1.Policy, allDecisions decisionSet, +// decisions, then it's considered stale and an event is sent to the replicated policy reconciler +// so the policy will be removed. +func (r *RootPolicyReconciler) cleanUpOrphanedRplPolicies( + instance *policiesv1.Policy, originalCPCS []*policiesv1.CompliancePerClusterStatus, allDecisions decisionSet, ) error { log := log.WithValues("policyName", instance.GetName(), "policyNamespace", instance.GetNamespace()) - successful := true - for _, cluster := range instance.Status.Status { + for _, cluster := range originalCPCS { key := appsv1.PlacementDecision{ ClusterName: cluster.ClusterNamespace, ClusterNamespace: cluster.ClusterNamespace, @@ -620,37 +317,29 @@ func (r *PolicyReconciler) cleanUpOrphanedRplPolicies( if allDecisions[key] { continue } - // not found in allDecisions, orphan, delete it - name := common.FullNameForPolicy(instance) - log := log.WithValues("name", name, "namespace", cluster.ClusterNamespace) - log.Info("Deleting the orphaned replicated policy") - err := r.Delete(context.TODO(), &policiesv1.Policy{ + // not found in allDecisions, orphan, send an event for it to delete itself + simpleObj := &GuttedObject{ TypeMeta: metav1.TypeMeta{ Kind: policiesv1.Kind, - APIVersion: policiesv1.SchemeGroupVersion.Group, + APIVersion: policiesv1.GroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: common.FullNameForPolicy(instance), Namespace: cluster.ClusterNamespace, }, - }) - if err != nil && !k8serrors.IsNotFound(err) { - successful = false - - log.Error(err, "Failed to delete the orphaned replicated policy") } - } - if !successful { - return errors.New("one or more orphaned replicated policies failed to be deleted") + log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName()) + + r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj} } return nil } // handleRootPolicy will properly replicate or clean up when a root policy is updated. -func (r *PolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) error { +func (r *RootPolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) error { // Generate a metric for elapsed handling time for each policy entryTS := time.Now() defer func() { @@ -665,62 +354,43 @@ func (r *PolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) error { if instance.Spec.Disabled { log.Info("The policy is disabled, doing clean up") - allReplicasDeleted, err := r.cleanUpPolicy(instance) + updateCount, err := r.updateExistingReplicas(context.TODO(), instance.Namespace+"."+instance.Name) if err != nil { - log.Info("One or more replicated policies could not be deleted") - return err } // Checks if replicated policies exist in the event that // a double reconcile to prevent emitting the same event twice - if allReplicasDeleted { + if updateCount > 0 { r.Recorder.Event(instance, "Normal", "PolicyPropagation", fmt.Sprintf("Policy %s/%s was disabled", instance.GetNamespace(), instance.GetName())) } } - // Get the placement binding in order to later get the placement decisions - pbList := &policiesv1.PlacementBindingList{} - - log.V(1).Info("Getting the placement bindings", "namespace", instance.GetNamespace()) - - err := r.List(context.TODO(), pbList, &client.ListOptions{Namespace: instance.GetNamespace()}) + placements, decisions, err := r.getDecisions(instance) if err != nil { - log.Error(err, "Could not list the placement bindings") - - return err - } - - placements, allDecisions, failedClusters, allFailed := r.handleDecisions(instance, pbList) - if allFailed { log.Info("Failed to get any placement decisions. Giving up on the request.") return errors.New("could not get the placement decisions") } - // Clean up before the status update in case the status update fails - err = r.cleanUpOrphanedRplPolicies(instance, allDecisions) - if err != nil { - log.Error(err, "Failed to delete orphaned replicated policies") - - return err - } - log.V(1).Info("Updating the root policy status") - cpcs, _ := r.calculatePerClusterStatus(instance, allDecisions, failedClusters) - - // loop through all pb, update status.placement - sort.Slice(placements, func(i, j int) bool { - return placements[i].PlacementBinding < placements[j].PlacementBinding - }) + cpcs, cpcsErr := r.calculatePerClusterStatus(instance, decisions) + if cpcsErr != nil { + // If there is a new replicated policy, then its lookup is expected to fail - it hasn't been created yet. + log.Error(cpcsErr, "Failed to get at least one replicated policy, but that may be expected. Ignoring.") + } err = r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: instance.Name}, instance) if err != nil { log.Error(err, "Failed to refresh the cached policy. Will use existing policy.") } + // make a copy of the original status + originalCPCS := make([]*policiesv1.CompliancePerClusterStatus, len(instance.Status.Status)) + copy(originalCPCS, instance.Status.Status) + instance.Status.Status = cpcs instance.Status.ComplianceState = CalculateRootCompliance(cpcs) instance.Status.Placement = placements @@ -730,123 +400,33 @@ func (r *PolicyReconciler) handleRootPolicy(instance *policiesv1.Policy) error { return err } - if len(failedClusters) != 0 { - return errors.New("failed to handle cluster namespaces:" + strings.Join(failedClusters.namespaces(), ",")) - } - - log.Info("Reconciliation complete") - - return nil -} - -// handleDecision puts the policy on the cluster, creating it or updating it as required, -// including resolving hub templates. It will return an error if an API call fails; no -// internal states will result in errors (eg invalid templates don't cause errors here) -func (r *PolicyReconciler) handleDecision( - rootPlc *policiesv1.Policy, clusterDec clusterDecision, -) ( - map[k8sdepwatches.ObjectIdentifier]bool, error, -) { - decision := clusterDec.Cluster + log.Info("Sending reconcile events to replicated policies", "decisionsCount", len(decisions)) - log := log.WithValues( - "policyName", rootPlc.GetName(), - "policyNamespace", rootPlc.GetNamespace(), - "replicatedPolicyNamespace", decision.ClusterNamespace, - ) - // retrieve replicated policy in cluster namespace - replicatedPlc := &policiesv1.Policy{} - templateRefObjs := map[k8sdepwatches.ObjectIdentifier]bool{} - - err := r.Get(context.TODO(), types.NamespacedName{ - Namespace: decision.ClusterNamespace, - Name: common.FullNameForPolicy(rootPlc), - }, replicatedPlc) - if err != nil { - if k8serrors.IsNotFound(err) { - replicatedPlc, err = r.buildReplicatedPolicy(rootPlc, clusterDec) - if err != nil { - return templateRefObjs, err - } - - // do a quick check for any template delims in the policy before putting it through - // template processor - if policyHasTemplates(rootPlc) { - // resolve hubTemplate before replicating - // #nosec G104 -- any errors are logged and recorded in the processTemplates method, - // but the ignored status will be handled appropriately by the policy controllers on - // the managed cluster(s). - templateRefObjs, _ = r.processTemplates(replicatedPlc, decision, rootPlc) - } - - log.Info("Creating the replicated policy") - - err = r.Create(context.TODO(), replicatedPlc) - if err != nil { - log.Error(err, "Failed to create the replicated policy") - - return templateRefObjs, err - } - - r.Recorder.Event(rootPlc, "Normal", "PolicyPropagation", - fmt.Sprintf("Policy %s/%s was propagated to cluster %s/%s", rootPlc.GetNamespace(), - rootPlc.GetName(), decision.ClusterNamespace, decision.ClusterName)) - - // exit after handling the create path, shouldnt be going to through the update path - return templateRefObjs, nil + for decision := range decisions { + simpleObj := &GuttedObject{ + TypeMeta: metav1.TypeMeta{ + Kind: policiesv1.Kind, + APIVersion: policiesv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: common.FullNameForPolicy(instance), + Namespace: decision.ClusterNamespace, + }, } - // failed to get replicated object, requeue - log.Error(err, "Failed to get the replicated policy") + log.V(2).Info("Sending reconcile for replicated policy", "replicatedPolicyName", simpleObj.GetName()) - return templateRefObjs, err + r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj} } - // replicated policy already created, need to compare and patch - desiredReplicatedPolicy, err := r.buildReplicatedPolicy(rootPlc, clusterDec) + err = r.cleanUpOrphanedRplPolicies(instance, originalCPCS, decisions) if err != nil { - return templateRefObjs, err - } - - if policyHasTemplates(desiredReplicatedPolicy) { - // If the replicated policy has an initialization vector specified, set it for processing - if initializationVector, ok := replicatedPlc.Annotations[IVAnnotation]; ok { - tempAnnotations := desiredReplicatedPolicy.GetAnnotations() - if tempAnnotations == nil { - tempAnnotations = make(map[string]string) - } - - tempAnnotations[IVAnnotation] = initializationVector - - desiredReplicatedPolicy.SetAnnotations(tempAnnotations) - } - // resolve hubTemplate before replicating - // #nosec G104 -- any errors are logged and recorded in the processTemplates method, - // but the ignored status will be handled appropriately by the policy controllers on - // the managed cluster(s). - templateRefObjs, _ = r.processTemplates(desiredReplicatedPolicy, decision, rootPlc) - } - - if !equivalentReplicatedPolicies(desiredReplicatedPolicy, replicatedPlc) { - // update needed - log.Info("Root policy and replicated policy mismatch, updating replicated policy") - replicatedPlc.SetAnnotations(desiredReplicatedPolicy.GetAnnotations()) - replicatedPlc.SetLabels(desiredReplicatedPolicy.GetLabels()) - replicatedPlc.Spec = desiredReplicatedPolicy.Spec - - err = r.Update(context.TODO(), replicatedPlc) - if err != nil { - log.Error(err, "Failed to update the replicated policy") - - return templateRefObjs, err - } + log.Error(err, "Failed to delete orphaned replicated policies") - r.Recorder.Event(rootPlc, "Normal", "PolicyPropagation", - fmt.Sprintf("Policy %s/%s was updated for cluster %s/%s", rootPlc.GetNamespace(), - rootPlc.GetName(), decision.ClusterNamespace, decision.ClusterName)) + return err } - return templateRefObjs, nil + return nil } // a helper to quickly check if there are any templates in any of the policy templates @@ -864,7 +444,7 @@ func policyHasTemplates(instance *policiesv1.Policy) bool { // policy.open-cluster-management.io/trigger-update is used to trigger reprocessing of the templates // and ensure that replicated-policies in the cluster are updated only if there is a change. This // annotation is deleted from the replicated policies and not propagated to the cluster namespaces. -func (r *PolicyReconciler) processTemplates( +func (r *ReplicatedPolicyReconciler) processTemplates( replicatedPlc *policiesv1.Policy, decision appsv1.PlacementDecision, rootPlc *policiesv1.Policy, ) ( map[k8sdepwatches.ObjectIdentifier]bool, error, @@ -1104,7 +684,7 @@ func isConfigurationPolicy(policyT *policiesv1.PolicyTemplate) bool { return jsonDef != nil && jsonDef["kind"] == "ConfigurationPolicy" } -func (r *PolicyReconciler) isPolicyInPolicySet(policyName, policySetName, namespace string) bool { +func (r *Propagator) isPolicyInPolicySet(policyName, policySetName, namespace string) bool { log := log.WithValues("policyName", policyName, "policySetName", policySetName, "policyNamespace", namespace) policySet := policiesv1beta1.PolicySet{} diff --git a/controllers/propagator/propagation_test.go b/controllers/propagator/propagation_test.go index 07e412fd..850112e8 100644 --- a/controllers/propagator/propagation_test.go +++ b/controllers/propagator/propagation_test.go @@ -3,250 +3,18 @@ package propagator import ( - "errors" "fmt" - "os" "testing" - "time" - k8sdepwatches "github.com/stolostron/kubernetes-dependency-watches/client" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" ) -func TestInitializeConcurrencyPerPolicyEnvName(t *testing.T) { - tests := []struct { - envVarValue string - expected int - }{ - {"", concurrencyPerPolicyDefault}, - {fmt.Sprint(concurrencyPerPolicyDefault + 2), concurrencyPerPolicyDefault + 2}, - {"0", concurrencyPerPolicyDefault}, - {"-3", concurrencyPerPolicyDefault}, - } - - for _, test := range tests { - t.Run( - fmt.Sprintf(`%s="%s"`, concurrencyPerPolicyEnvName, test.envVarValue), - func(t *testing.T) { - defer func() { - // Reset to the default values - concurrencyPerPolicy = 0 - - err := os.Unsetenv(concurrencyPerPolicyEnvName) - if err != nil { - t.Fatalf("failed to unset the environment variable: %v", err) - } - }() - - t.Setenv(concurrencyPerPolicyEnvName, test.envVarValue) - - var k8sInterface kubernetes.Interface - Initialize(&rest.Config{}, &k8sInterface) - - if concurrencyPerPolicy != test.expected { - t.Fatalf("Expected concurrencyPerPolicy=%d, got %d", test.expected, concurrencyPerPolicy) - } - }, - ) - } -} - -// A mock implementation of the PolicyReconciler for the handleDecisionWrapper function. -type MockPolicyReconciler struct { - Err error -} - -func (r MockPolicyReconciler) handleDecision( - _ *policiesv1.Policy, _ clusterDecision, -) ( - map[k8sdepwatches.ObjectIdentifier]bool, error, -) { - return map[k8sdepwatches.ObjectIdentifier]bool{}, r.Err -} - -func TestHandleDecisionWrapper(t *testing.T) { - tests := []struct { - Error error - ExpectedError bool - }{ - {nil, false}, - {errors.New("some error"), true}, - } - - for _, test := range tests { - // Simulate three placement decisions for the policy. - clusterDecisions := []clusterDecision{ - { - Cluster: appsv1.PlacementDecision{ClusterName: "cluster1", ClusterNamespace: "cluster1"}, - PolicyOverrides: policiesv1.BindingOverrides{}, - }, - { - Cluster: appsv1.PlacementDecision{ClusterName: "cluster2", ClusterNamespace: "cluster2"}, - PolicyOverrides: policiesv1.BindingOverrides{}, - }, - { - Cluster: appsv1.PlacementDecision{ClusterName: "cluster3", ClusterNamespace: "cluster3"}, - PolicyOverrides: policiesv1.BindingOverrides{}, - }, - } - policy := policiesv1.Policy{ - ObjectMeta: metav1.ObjectMeta{Name: "gambling-age", Namespace: "laws"}, - } - - // Load up the decisionsChan channel with all the decisions so that handleDecisionWrapper - // will call handleDecision with each. - decisionsChan := make(chan clusterDecision, len(clusterDecisions)) - - for _, decision := range clusterDecisions { - decisionsChan <- decision - } - - resultsChan := make(chan decisionResult, len(clusterDecisions)) - - // Instantiate the mock PolicyReconciler to pass to handleDecisionWrapper. - reconciler := MockPolicyReconciler{Err: test.Error} - - go func() { - start := time.Now() - // Wait until handleDecisionWrapper has completed its work. Then close - // the channel so that handleDecisionWrapper returns. This times out - // after five seconds. - for len(resultsChan) != len(clusterDecisions) { - if time.Since(start) > (time.Second * 5) { - close(decisionsChan) - } - } - close(decisionsChan) - }() - - handleDecisionWrapper(reconciler, &policy, decisionsChan, resultsChan) - - // Expect a 1x1 mapping of results to decisions. - if len(resultsChan) != len(clusterDecisions) { - t.Fatalf( - "Expected the results channel length of %d, got %d", len(clusterDecisions), len(resultsChan), - ) - } - - // Ensure all the results from the channel are as expected. - for i := 0; i < len(clusterDecisions); i++ { - result := <-resultsChan - if test.ExpectedError { - if result.Err == nil { - t.Fatal("Expected an error but didn't get one") - } else if result.Err != test.Error { //nolint:errorlint - t.Fatalf("Expected the error %v but got: %v", test.Error, result.Err) - } - } else if result.Err != nil { - t.Fatalf("Didn't expect but got: %v", result.Err) - } - - expectedIdentifier := appsv1.PlacementDecision{ - ClusterName: fmt.Sprintf("cluster%d", i+1), - ClusterNamespace: fmt.Sprintf("cluster%d", i+1), - } - if result.Identifier != expectedIdentifier { - t.Fatalf("Expected the identifier %s, got %s", result.Identifier, expectedIdentifier) - } - } - close(resultsChan) - } -} - -func (r MockPolicyReconciler) deletePolicy( - _ *policiesv1.Policy, -) error { - return r.Err -} - -func TestPlcDeletionWrapper(t *testing.T) { - tests := []struct { - Error error - ExpectedError bool - }{ - {nil, false}, - {errors.New("some error"), true}, - } - - for _, test := range tests { - // Simulate three replicated policies - policies := []policiesv1.Policy{ - { - ObjectMeta: metav1.ObjectMeta{Name: "laws.gambling-age", Namespace: "cluster1"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "laws.gambling-age", Namespace: "cluster2"}, - }, - { - ObjectMeta: metav1.ObjectMeta{Name: "laws.gambling-age", Namespace: "cluster3"}, - }, - } - - // Load up the plcChan channel with all the decisions so that plcDeletionWrapper - // will call deletePolicy with each. - plcChan := make(chan policiesv1.Policy, len(policies)) - - for _, policy := range policies { - plcChan <- policy - } - - resultsChan := make(chan deletionResult, len(policies)) - - // Instantiate the mock PolicyReconciler to pass to plcDeletionWrapper - reconciler := MockPolicyReconciler{Err: test.Error} - - go func() { - start := time.Now() - // Wait until plcDeletionWrapper has completed its work. Then close - // the channel so that plcDeletionWrapper returns. This times out - // after five seconds. - for len(resultsChan) != len(policies) { - if time.Since(start) > (time.Second * 5) { - close(plcChan) - } - } - close(plcChan) - }() - - plcDeletionWrapper(reconciler, plcChan, resultsChan) - - // Expect a 1x1 mapping of results to replicated policies. - if len(resultsChan) != len(policies) { - t.Fatalf( - "Expected the results channel length of %d, got %d", len(policies), len(resultsChan), - ) - } - - // Ensure all the results from the channel are as expected. - for i := 0; i < len(policies); i++ { - result := <-resultsChan - if test.ExpectedError { - if result.Err == nil { - t.Fatal("Expected an error but didn't get one") - } else if result.Err != test.Error { //nolint:errorlint - t.Fatalf("Expected the error %v but got: %v", test.Error, result.Err) - } - } else if result.Err != nil { - t.Fatalf("Didn't expect but got: %v", result.Err) - } - - expectedIdentifier := fmt.Sprintf("cluster%d/laws.gambling-age", i+1) - if result.Identifier != expectedIdentifier { - t.Fatalf("Expected the identifier %s, got %s", result.Identifier, expectedIdentifier) - } - } - close(resultsChan) - } -} - func fakeRootPolicy(name, namespace string) policiesv1.Policy { return policiesv1.Policy{ ObjectMeta: metav1.ObjectMeta{ @@ -355,12 +123,12 @@ func TestGetAllClusterDecisions(t *testing.T) { t.Fatalf("Unexpected error building scheme: %v", err) } - reconciler := &PolicyReconciler{ + reconciler := &RootPolicyReconciler{Propagator{ Client: fake.NewClientBuilder(). WithScheme(testscheme). WithObjects(&prInitial, &prSub, &prSub2, &prExtended). Build(), - } + }} tests := map[string]struct { policy policiesv1.Policy @@ -554,7 +322,16 @@ func TestGetAllClusterDecisions(t *testing.T) { t.Fatal("Got unexpected error", err.Error()) } - assert.ElementsMatch(t, actualAllClusterDecisions, test.expectedClusterDecisions) + actualDecisions := make([]clusterDecision, 0, len(actualAllClusterDecisions)) + + for decision, overrides := range actualAllClusterDecisions { + actualDecisions = append(actualDecisions, clusterDecision{ + Cluster: decision, + PolicyOverrides: overrides, + }) + } + + assert.ElementsMatch(t, actualDecisions, test.expectedClusterDecisions) assert.ElementsMatch(t, actualPlacements, test.expectedPlacements) }) } diff --git a/controllers/propagator/replicatedpolicy_controller.go b/controllers/propagator/replicatedpolicy_controller.go new file mode 100644 index 00000000..e0fe9f7b --- /dev/null +++ b/controllers/propagator/replicatedpolicy_controller.go @@ -0,0 +1,501 @@ +package propagator + +import ( + "context" + "fmt" + "strings" + "sync" + + k8sdepwatches "github.com/stolostron/kubernetes-dependency-watches/client" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" + appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + "open-cluster-management.io/governance-policy-propagator/controllers/common" +) + +var _ reconcile.Reconciler = &ReplicatedPolicyReconciler{} + +type ReplicatedPolicyReconciler struct { + Propagator + ResourceVersions *sync.Map + DynamicWatcher k8sdepwatches.DynamicWatcher +} + +func (r *ReplicatedPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + log := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) + log.Info("Reconciling the replicated policy") + + // Set the hub template watch metric after reconcile + defer func() { + hubTempWatches := r.DynamicWatcher.GetWatchCount() + log.V(3).Info("Setting hub template watch metric", "value", hubTempWatches) + + hubTemplateActiveWatchesMetric.Set(float64(hubTempWatches)) + }() + + replicatedExists := true + replicatedPolicy := &policiesv1.Policy{} + + if err := r.Get(ctx, request.NamespacedName, replicatedPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to get the replicated policy") + + return reconcile.Result{}, err + } + + replicatedExists = false + } + + rootName, rootNS, err := common.ParseRootPolicyLabel(request.Name) + if err != nil { + if !replicatedExists { + log.Error(err, "Invalid replicated policy sent for reconcile, rejecting") + + return reconcile.Result{}, nil + } + + cleanUpErr := r.cleanUpReplicated(ctx, replicatedPolicy) + if cleanUpErr != nil && !k8serrors.IsNotFound(cleanUpErr) { + log.Error(err, "Failed to delete the invalid replicated policy, requeueing") + + return reconcile.Result{}, err + } + + log.Info("Invalid replicated policy deleted") + + return reconcile.Result{}, nil + } + + rsrcVersKey := request.Namespace + "/" + request.Name + + // Fetch the Root Policy instance + rootPolicy := &policiesv1.Policy{} + rootNN := types.NamespacedName{Namespace: rootNS, Name: rootName} + + if err := r.Get(ctx, rootNN, rootPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to get the root policy, requeueing") + + return reconcile.Result{}, err + } + + if !replicatedExists { + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + // Store this to ensure the cache matches a known possible state for this situation + version.resourceVersion = "deleted" + + log.V(1).Info("Root policy and replicated policy already missing") + + return reconcile.Result{}, nil + } + + inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace) + if err != nil { + return reconcile.Result{}, err + } + + if !inClusterNS { + // "Hub of hubs" scenario: this cluster is managed by another cluster, + // which has the root policy for the policy being reconciled. + log.V(1).Info("Found a replicated policy in non-cluster namespace, skipping it") + + return reconcile.Result{}, nil + } + + // otherwise, we need to clean it up + if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to delete the orphaned replicated policy, requeueing") + + return reconcile.Result{}, err + } + } + + log.Info("Orphaned replicated policy deleted") + + return reconcile.Result{}, nil + } + + if rootPolicy.Spec.Disabled { + if replicatedExists { + if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to delete the disabled replicated policy, requeueing") + + return reconcile.Result{}, err + } + } + + log.Info("Disabled replicated policy deleted") + + return reconcile.Result{}, nil + } + + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + // Store this to ensure the cache matches a known possible state for this situation + version.resourceVersion = "deleted" + + log.V(1).Info("Root policy is disabled, and replicated policy correctly not found.") + + return reconcile.Result{}, nil + } + + // calculate the decision for this specific cluster + decision, err := r.singleClusterDecision(ctx, rootPolicy, request.Namespace) + if err != nil { + log.Error(err, "Failed to determine if policy should be replicated, requeueing") + + return reconcile.Result{}, err + } + + // an empty decision means the policy should not be replicated + if decision.Cluster.ClusterName == "" { + if replicatedExists { + inClusterNS, err := common.IsInClusterNamespace(r.Client, request.Namespace) + if err != nil { + return reconcile.Result{}, err + } + + if !inClusterNS { + // "Hosted mode" scenario: this cluster is hosting another cluster, which is syncing + // this policy to a cluster namespace that this propagator doesn't know about. + log.V(1).Info("Found a possible replicated policy for a hosted cluster, skipping it") + + return reconcile.Result{}, nil + } + + if err := r.cleanUpReplicated(ctx, replicatedPolicy); err != nil { + if !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to remove the replicated policy for this managed cluster, requeueing") + + return reconcile.Result{}, err + } + } + + log.Info("Removed replicated policy from managed cluster") + + return reconcile.Result{}, nil + } + + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + // Store this to ensure the cache matches a known possible state for this situation + version.resourceVersion = "deleted" + + log.V(1).Info("Replicated policy should not exist on this managed cluster, and does not.") + + return reconcile.Result{}, nil + } + + objsToWatch := getPolicySetDependencies(rootPolicy) + + desiredReplicatedPolicy, err := r.buildReplicatedPolicy(rootPolicy, decision) + if err != nil { + log.Error(err, "Unable to build desired replicated policy, requeueing") + + return reconcile.Result{}, err + } + + if policyHasTemplates(rootPolicy) { + if replicatedExists { + // If the replicated policy has an initialization vector specified, set it for processing + if initializationVector, ok := replicatedPolicy.Annotations[IVAnnotation]; ok { + tempAnnotations := desiredReplicatedPolicy.GetAnnotations() + if tempAnnotations == nil { + tempAnnotations = make(map[string]string) + } + + tempAnnotations[IVAnnotation] = initializationVector + + desiredReplicatedPolicy.SetAnnotations(tempAnnotations) + } + } + + // resolve hubTemplate before replicating + // #nosec G104 -- any errors are logged and recorded in the processTemplates method, + // but the ignored status will be handled appropriately by the policy controllers on + // the managed cluster(s). + templObjsToWatch, _ := r.processTemplates(desiredReplicatedPolicy, decision.Cluster, rootPolicy) + + for objID, val := range templObjsToWatch { + if val { + objsToWatch[objID] = true + } + } + } + + instanceGVK := desiredReplicatedPolicy.GroupVersionKind() + instanceObjID := k8sdepwatches.ObjectIdentifier{ + Group: instanceGVK.Group, + Version: instanceGVK.Version, + Kind: instanceGVK.Kind, + Namespace: request.Namespace, + Name: request.Name, + } + refObjs := make([]k8sdepwatches.ObjectIdentifier, 0, len(objsToWatch)) + + for refObj := range objsToWatch { + refObjs = append(refObjs, refObj) + } + + // save the watcherError for later, so that the policy can still be updated now. + var watcherErr error + + if len(refObjs) != 0 { + watcherErr = r.DynamicWatcher.AddOrUpdateWatcher(instanceObjID, refObjs...) + if watcherErr != nil { + log.Error(watcherErr, "Failed to update the dynamic watches for the hub policy templates") + } + } else { + watcherErr = r.DynamicWatcher.RemoveWatcher(instanceObjID) + if watcherErr != nil { + log.Error(watcherErr, "Failed to remove the dynamic watches for the hub policy templates") + } + } + + if !replicatedExists { + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + err = r.Create(ctx, desiredReplicatedPolicy) + if err != nil { + log.Error(err, "Failed to create the replicated policy, requeueing") + + return reconcile.Result{}, err + } + + version.resourceVersion = desiredReplicatedPolicy.GetResourceVersion() + + log.Info("Created replicated policy") + + return reconcile.Result{}, watcherErr + } + + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + // replicated policy already created, need to compare and possibly update + if !equivalentReplicatedPolicies(desiredReplicatedPolicy, replicatedPolicy) { + replicatedPolicy.SetAnnotations(desiredReplicatedPolicy.GetAnnotations()) + replicatedPolicy.SetLabels(desiredReplicatedPolicy.GetLabels()) + replicatedPolicy.Spec = desiredReplicatedPolicy.Spec + + err = r.Update(ctx, replicatedPolicy) + if err != nil { + log.Error(err, "Failed to update the replicated policy, requeueing") + + return reconcile.Result{}, err + } + + log.Info("Replicated policy updated") + } else { + log.Info("Replicated policy matches, no update needed") + } + + // whether it was updated or not, this resourceVersion can be cached + version.resourceVersion = replicatedPolicy.GetResourceVersion() + + if watcherErr != nil { + log.Info("Requeueing for the dynamic watcher error") + } + + return reconcile.Result{}, watcherErr +} + +func (r *ReplicatedPolicyReconciler) cleanUpReplicated(ctx context.Context, replicatedPolicy *policiesv1.Policy) error { + gvk := replicatedPolicy.GroupVersionKind() + + watcherErr := r.DynamicWatcher.RemoveWatcher(k8sdepwatches.ObjectIdentifier{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, + Namespace: replicatedPolicy.Namespace, + Name: replicatedPolicy.Name, + }) + + rsrcVersKey := replicatedPolicy.GetNamespace() + "/" + replicatedPolicy.GetName() + + version := safeWriteLoad(r.ResourceVersions, rsrcVersKey) + defer version.Unlock() + + deleteErr := r.Delete(ctx, replicatedPolicy) + + version.resourceVersion = "deleted" + + if watcherErr != nil { + return watcherErr + } + + return deleteErr +} + +func (r *ReplicatedPolicyReconciler) singleClusterDecision( + ctx context.Context, rootPlc *policiesv1.Policy, clusterName string, +) (decision clusterDecision, err error) { + positiveDecision := clusterDecision{ + Cluster: appsv1.PlacementDecision{ + ClusterName: clusterName, + ClusterNamespace: clusterName, + }, + } + + pbList := &policiesv1.PlacementBindingList{} + + err = r.List(ctx, pbList, &client.ListOptions{Namespace: rootPlc.GetNamespace()}) + if err != nil { + return clusterDecision{}, err + } + + foundWithoutSubFilter := false + + // Process all placement bindings without subFilter + for i, pb := range pbList.Items { + if pb.SubFilter == policiesv1.Restricted { + continue + } + + found, err := r.isSingleClusterInDecisions(ctx, &pbList.Items[i], rootPlc.GetName(), clusterName) + if err != nil { + return clusterDecision{}, err + } + + if !found { + continue + } + + if strings.EqualFold(pb.BindingOverrides.RemediationAction, string(policiesv1.Enforce)) { + positiveDecision.PolicyOverrides = pb.BindingOverrides + // If an override is found, then no other decisions can currently change this result. + // NOTE: if additional overrides are added in the future, this will additional logic. + return positiveDecision, nil + } + + foundWithoutSubFilter = true + } + + if !foundWithoutSubFilter { + // No need to look through the subFilter bindings. + return clusterDecision{}, nil + } + + // Process all placement bindings with subFilter + for i, pb := range pbList.Items { + if pb.SubFilter != policiesv1.Restricted { + continue + } + + found, err := r.isSingleClusterInDecisions(ctx, &pbList.Items[i], rootPlc.GetName(), clusterName) + if err != nil { + return clusterDecision{}, err + } + + if !found { + continue + } + + if strings.EqualFold(pb.BindingOverrides.RemediationAction, string(policiesv1.Enforce)) { + positiveDecision.PolicyOverrides = pb.BindingOverrides + // If an override is found, then no other decisions can currently change this result. + // NOTE: if additional overrides are added in the future, this will additional logic. + return positiveDecision, nil + } + } + + // None of the bindings had any overrides. + return positiveDecision, nil +} + +func (r *ReplicatedPolicyReconciler) isSingleClusterInDecisions( + ctx context.Context, pb *policiesv1.PlacementBinding, policyName, clusterName string, +) (found bool, err error) { + if !common.HasValidPlacementRef(pb) { + return false, nil + } + + subjectFound := false + + for _, subject := range pb.Subjects { + if subject.APIGroup != policiesv1.SchemeGroupVersion.Group { + continue + } + + switch subject.Kind { + case policiesv1.Kind: + if subject.Name == policyName { + subjectFound = true + } + case policiesv1.PolicySetKind: + if r.isPolicyInPolicySet(policyName, subject.Name, pb.GetNamespace()) { + subjectFound = true + } + } + + if subjectFound { + break + } + } + + if !subjectFound { + return false, nil + } + + refNN := types.NamespacedName{ + Namespace: pb.GetNamespace(), + Name: pb.PlacementRef.Name, + } + + switch pb.PlacementRef.Kind { + case "PlacementRule": + plr := &appsv1.PlacementRule{} + if err := r.Get(ctx, refNN, plr); err != nil && !k8serrors.IsNotFound(err) { + return false, fmt.Errorf("failed to get PlacementRule '%v': %w", pb.PlacementRef.Name, err) + } + + for _, decision := range plr.Status.Decisions { + if decision.ClusterName == clusterName { + return true, nil + } + } + case "Placement": + pl := &clusterv1beta1.Placement{} + if err := r.Get(ctx, refNN, pl); err != nil && !k8serrors.IsNotFound(err) { + return false, fmt.Errorf("failed to get Placement '%v': %w", pb.PlacementRef.Name, err) + } + + if k8serrors.IsNotFound(err) { + return false, nil + } + + list := &clusterv1beta1.PlacementDecisionList{} + lopts := &client.ListOptions{Namespace: pb.GetNamespace()} + + opts := client.MatchingLabels{"cluster.open-cluster-management.io/placement": pl.GetName()} + opts.ApplyToList(lopts) + + err = r.List(ctx, list, lopts) + if err != nil && !k8serrors.IsNotFound(err) { + return false, fmt.Errorf("failed to list the PlacementDecisions for '%v', %w", pb.PlacementRef.Name, err) + } + + for _, item := range list.Items { + for _, cluster := range item.Status.Decisions { + if cluster.ClusterName == clusterName { + return true, nil + } + } + } + } + + return false, nil +} diff --git a/controllers/propagator/replicatedpolicy_setup.go b/controllers/propagator/replicatedpolicy_setup.go new file mode 100644 index 00000000..d4778da3 --- /dev/null +++ b/controllers/propagator/replicatedpolicy_setup.go @@ -0,0 +1,166 @@ +package propagator + +import ( + "context" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + "open-cluster-management.io/governance-policy-propagator/controllers/common" +) + +func (r *ReplicatedPolicyReconciler) SetupWithManager( + mgr ctrl.Manager, maxConcurrentReconciles uint, dynWatcherSrc source.Source, updateSrc source.Source, +) error { + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{MaxConcurrentReconciles: int(maxConcurrentReconciles)}). + Named("replicated-policy"). + For( + &policiesv1.Policy{}, + builder.WithPredicates(replicatedPolicyPredicates(r.ResourceVersions))). + WatchesRawSource( + dynWatcherSrc, + // The dependency-watcher could create an event before the same sort of watch in the + // controller-runtime triggers an update in the cache. This tries to ensure the cache is + // updated before the reconcile is triggered. + &delayGeneric{ + EventHandler: &handler.EnqueueRequestForObject{}, + delay: time.Second * 3, + }). + WatchesRawSource( + updateSrc, + &handler.EnqueueRequestForObject{}). + Complete(r) +} + +// replicatedPolicyPredicates triggers reconciliation if the policy is a replicated policy, and is +// not a pure status update. It will use the ResourceVersions cache to try and skip events caused +// by the replicated policy reconciler itself. +func replicatedPolicyPredicates(resourceVersions *sync.Map) predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + _, isReplicated := e.Object.GetLabels()[common.RootPolicyLabel] + if !isReplicated { + return false + } + + key := e.Object.GetNamespace() + "/" + e.Object.GetName() + version, loaded := safeReadLoad(resourceVersions, key) + defer version.RUnlock() + + return !loaded || version.resourceVersion != e.Object.GetResourceVersion() + }, + DeleteFunc: func(e event.DeleteEvent) bool { + _, isReplicated := e.Object.GetLabels()[common.RootPolicyLabel] + if !isReplicated { + return false + } + + key := e.Object.GetNamespace() + "/" + e.Object.GetName() + version, loaded := safeReadLoad(resourceVersions, key) + defer version.RUnlock() + + return !loaded || version.resourceVersion != "deleted" + }, + UpdateFunc: func(e event.UpdateEvent) bool { + _, newIsReplicated := e.ObjectNew.GetLabels()[common.RootPolicyLabel] + _, oldIsReplicated := e.ObjectOld.GetLabels()[common.RootPolicyLabel] + + // if neither has the label, it is not a replicated policy + if !(oldIsReplicated || newIsReplicated) { + return false + } + + key := e.ObjectNew.GetNamespace() + "/" + e.ObjectNew.GetName() + version, loaded := safeReadLoad(resourceVersions, key) + defer version.RUnlock() + + if loaded && version.resourceVersion == e.ObjectNew.GetResourceVersion() { + return false + } + + // Ignore pure status updates since those are handled by a separate controller + return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration() || + !equality.Semantic.DeepEqual(e.ObjectOld.GetLabels(), e.ObjectNew.GetLabels()) || + !equality.Semantic.DeepEqual(e.ObjectOld.GetAnnotations(), e.ObjectNew.GetAnnotations()) + }, + } +} + +type lockingRsrcVersion struct { + *sync.RWMutex + resourceVersion string +} + +// safeReadLoad gets the lockingRsrcVersion from the sync.Map if it already exists, or it puts a +// new empty lockingRsrcVersion in the sync.Map if it was missing. In either case, this function +// obtains the RLock before returning - the caller *must* call RUnlock() themselves. The bool +// returned indicates if the key already existed in the sync.Map. +func safeReadLoad(resourceVersions *sync.Map, key string) (*lockingRsrcVersion, bool) { + newRsrc := &lockingRsrcVersion{ + RWMutex: &sync.RWMutex{}, + resourceVersion: "", + } + + newRsrc.RLock() + + rsrc, loaded := resourceVersions.LoadOrStore(key, newRsrc) + if loaded { + newRsrc.RUnlock() + + version := rsrc.(*lockingRsrcVersion) + version.RLock() + + return version, true + } + + return newRsrc, false +} + +// safeWriteLoad gets the lockingRsrcVersion from the sync.Map if it already exists, or it puts a +// new empty lockingRsrcVersion in the sync.Map if it was missing. In either case, this function +// obtains the Lock (a write lock) before returning - the caller *must* call Unlock() themselves. +func safeWriteLoad(resourceVersions *sync.Map, key string) *lockingRsrcVersion { + newRsrc := &lockingRsrcVersion{ + RWMutex: &sync.RWMutex{}, + resourceVersion: "", + } + + newRsrc.Lock() + + rsrc, loaded := resourceVersions.LoadOrStore(key, newRsrc) + if loaded { + newRsrc.Unlock() + + version := rsrc.(*lockingRsrcVersion) + version.Lock() + + return version + } + + return newRsrc +} + +type delayGeneric struct { + handler.EventHandler + delay time.Duration +} + +func (d *delayGeneric) Generic(_ context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { + q.AddAfter(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}, d.delay) +} diff --git a/controllers/propagator/replication.go b/controllers/propagator/replication.go index c63e614f..b41ded7b 100644 --- a/controllers/propagator/replication.go +++ b/controllers/propagator/replication.go @@ -37,7 +37,7 @@ func equivalentReplicatedPolicies(plc1 *policiesv1.Policy, plc2 *policiesv1.Poli // In particular, it adds labels that the policy framework uses, and ensures that policy dependencies // are in a consistent format suited for use on managed clusters. // It can return an error if it needed to canonicalize a dependency, but a PolicySet lookup failed. -func (r *PolicyReconciler) buildReplicatedPolicy( +func (r *ReplicatedPolicyReconciler) buildReplicatedPolicy( root *policiesv1.Policy, clusterDec clusterDecision, ) (*policiesv1.Policy, error) { decision := clusterDec.Cluster @@ -135,7 +135,7 @@ func depIsPolicy(dep policiesv1.PolicyDependency) bool { // format as in replicated Policies), and that PolicySets are replaced with their constituent // Policies. If a PolicySet could not be found, that dependency will be copied as-is. It will // return an error if there is an unexpected error looking up a PolicySet to replace. -func (r *PolicyReconciler) canonicalizeDependencies( +func (r *Propagator) canonicalizeDependencies( rawDeps []policiesv1.PolicyDependency, defaultNamespace string, ) ([]policiesv1.PolicyDependency, error) { deps := make([]policiesv1.PolicyDependency, 0) diff --git a/controllers/propagator/replication_test.go b/controllers/propagator/replication_test.go index cc0a0018..135b0655 100644 --- a/controllers/propagator/replication_test.go +++ b/controllers/propagator/replication_test.go @@ -148,7 +148,7 @@ func TestCanonicalizeDependencies(t *testing.T) { WithObjects(fooSet, barSet). Build() - fakeReconciler := PolicyReconciler{Client: fakeClient} + fakeReconciler := Propagator{Client: fakeClient} tests := map[string]struct { input []policiesv1.PolicyDependency diff --git a/controllers/propagator/rootpolicy_controller.go b/controllers/propagator/rootpolicy_controller.go new file mode 100644 index 00000000..0bb413fb --- /dev/null +++ b/controllers/propagator/rootpolicy_controller.go @@ -0,0 +1,142 @@ +// Copyright (c) 2021 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package propagator + +import ( + "context" + "sync" + + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + "open-cluster-management.io/governance-policy-propagator/controllers/common" +) + +const ControllerName string = "policy-propagator" + +var log = ctrl.Log.WithName(ControllerName) + +type RootPolicyReconciler struct { + Propagator +} + +// Reconcile handles root policies, sending events to the replicated policy reconciler to ensure +// that the desired policies are on the correct clusters. It also populates the status of the root +// policy with placement information. +func (r *RootPolicyReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + log := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name) + + log.V(3).Info("Acquiring the lock for the root policy") + + lock, _ := r.RootPolicyLocks.LoadOrStore(request.NamespacedName, &sync.Mutex{}) + + lock.(*sync.Mutex).Lock() + defer lock.(*sync.Mutex).Unlock() + + log.Info("Reconciling the policy") + + // Fetch the Policy instance + instance := &policiesv1.Policy{} + + err := r.Get(ctx, request.NamespacedName, instance) + if err != nil { + if k8serrors.IsNotFound(err) { + count, err := r.updateExistingReplicas(ctx, request.Namespace+"."+request.Name) + if err != nil { + log.Error(err, "Failed to send update events to replicated policies, requeueing") + + return reconcile.Result{}, err + } + + log.Info("Replicated policies sent for deletion", "count", count) + + return reconcile.Result{}, nil + } + + log.Error(err, "Failed to get the policy") + + // Error reading the object - requeue the request. + return reconcile.Result{}, err + } + + inClusterNs, err := common.IsInClusterNamespace(r.Client, instance.Namespace) + if err != nil { + log.Error(err, "Failed to determine if the policy is in a managed cluster namespace. Requeueing the request.") + + return reconcile.Result{}, err + } + + if !inClusterNs { + err := r.handleRootPolicy(instance) + if err != nil { + log.Error(err, "Failure during root policy handling") + + propagationFailureMetric.WithLabelValues(instance.GetName(), instance.GetNamespace()).Inc() + } + + log.Info("Reconciliation complete") + + return reconcile.Result{}, err + } + + log = log.WithValues("name", instance.GetName(), "namespace", instance.GetNamespace()) + + log.Info("The policy was found in the cluster namespace but doesn't belong to any root policy, deleting it") + + err = r.Delete(ctx, instance) + if err != nil && !k8serrors.IsNotFound(err) { + log.Error(err, "Failed to delete the policy") + + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +//+kubebuilder:object:root=true + +type GuttedObject struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` +} + +// updateExistingReplicas lists all existing replicated policies for this root policy, and sends +// events for each of them to the replicated policy reconciler. This will trigger updates on those +// replicated policies, for example when the root policy spec changes, or when the replicated +// policies might need to be deleted. +func (r *RootPolicyReconciler) updateExistingReplicas(ctx context.Context, rootPolicyFullName string) (int, error) { + // Get all the replicated policies for this root policy + policyList := &policiesv1.PolicyList{} + opts := &client.ListOptions{} + + matcher := client.MatchingLabels{common.RootPolicyLabel: rootPolicyFullName} + matcher.ApplyToList(opts) + + err := r.List(ctx, policyList, opts) + if err != nil && !k8serrors.IsNotFound(err) { + return 0, err + } + + for _, replicated := range policyList.Items { + simpleObj := &GuttedObject{ + TypeMeta: metav1.TypeMeta{ + Kind: replicated.Kind, + APIVersion: replicated.APIVersion, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: replicated.Name, + Namespace: replicated.Namespace, + }, + } + + r.ReplicatedPolicyUpdates <- event.GenericEvent{Object: simpleObj} + } + + return len(policyList.Items), nil +} diff --git a/controllers/propagator/rootpolicy_setup.go b/controllers/propagator/rootpolicy_setup.go new file mode 100644 index 00000000..2ec7d396 --- /dev/null +++ b/controllers/propagator/rootpolicy_setup.go @@ -0,0 +1,252 @@ +// Copyright (c) 2023 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +package propagator + +import ( + "context" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/types" + clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1" + appsv1 "open-cluster-management.io/multicloud-operators-subscription/pkg/apis/apps/placementrule/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" + policiesv1beta1 "open-cluster-management.io/governance-policy-propagator/api/v1beta1" + "open-cluster-management.io/governance-policy-propagator/controllers/common" +) + +//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies/finalizers,verbs=update +//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=placementbindings,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policysets,verbs=get;list;watch +//+kubebuilder:rbac:groups=cluster.open-cluster-management.io,resources=managedclusters;placementdecisions;placements,verbs=get;list;watch +//+kubebuilder:rbac:groups=apps.open-cluster-management.io,resources=placementrules,verbs=get;list;watch +//+kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=*,resources=*,verbs=get;list;watch + +// SetupWithManager sets up the controller with the Manager. +func (r *RootPolicyReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles uint) error { + return ctrl.NewControllerManagedBy(mgr). + WithOptions(controller.Options{MaxConcurrentReconciles: int(maxConcurrentReconciles)}). + Named("root-policy-spec"). + For( + &policiesv1.Policy{}, + builder.WithPredicates(rootPolicyNonStatusUpdates())). + Watches( + &policiesv1beta1.PolicySet{}, + handler.EnqueueRequestsFromMapFunc(mapPolicySetToPolicies), + builder.WithPredicates(policySetPolicyListChanged())). + Watches( + &policiesv1.PlacementBinding{}, + handler.EnqueueRequestsFromMapFunc(mapBindingToPolicies(mgr.GetClient())), + builder.WithPredicates(bindingForPolicy())). + Watches( + &appsv1.PlacementRule{}, + handler.EnqueueRequestsFromMapFunc(mapPlacementRuleToPolicies(mgr.GetClient()))). + Watches( + &clusterv1beta1.PlacementDecision{}, + handler.EnqueueRequestsFromMapFunc(mapPlacementDecisionToPolicies(mgr.GetClient()))). + Complete(r) +} + +// policyNonStatusUpdates triggers reconciliation if the Policy has had a change that is not just +// a status update. +func rootPolicyNonStatusUpdates() predicate.Funcs { + return predicate.Funcs{ + CreateFunc: func(e event.CreateEvent) bool { + _, isReplicated := e.Object.GetLabels()[common.RootPolicyLabel] + + return !isReplicated + }, + DeleteFunc: func(e event.DeleteEvent) bool { + _, isReplicated := e.Object.GetLabels()[common.RootPolicyLabel] + + return !isReplicated + }, + UpdateFunc: func(e event.UpdateEvent) bool { + _, newIsReplicated := e.ObjectNew.GetLabels()[common.RootPolicyLabel] + _, oldIsReplicated := e.ObjectOld.GetLabels()[common.RootPolicyLabel] + + // if either has the label, it is a replicated policy + if oldIsReplicated || newIsReplicated { + return false + } + + // Ignore pure status updates since those are handled by a separate controller + return e.ObjectOld.GetGeneration() != e.ObjectNew.GetGeneration() || + !equality.Semantic.DeepEqual(e.ObjectOld.GetLabels(), e.ObjectNew.GetLabels()) || + !equality.Semantic.DeepEqual(e.ObjectOld.GetAnnotations(), e.ObjectNew.GetAnnotations()) + }, + } +} + +// mapPolicySetToPolicies maps a PolicySet to all the Policies in its policies list. +func mapPolicySetToPolicies(_ context.Context, object client.Object) []reconcile.Request { + log := log.WithValues("policySetName", object.GetName(), "namespace", object.GetNamespace()) + log.V(2).Info("Reconcile Request for PolicySet") + + var result []reconcile.Request + + //nolint:forcetypeassert + policySet := object.(*policiesv1beta1.PolicySet) + + for _, plc := range policySet.Spec.Policies { + log.V(2).Info("Found reconciliation request from a policyset", "policyName", string(plc)) + + request := reconcile.Request{NamespacedName: types.NamespacedName{ + Name: string(plc), + Namespace: object.GetNamespace(), + }} + result = append(result, request) + } + + return result +} + +// policySetPolicyListChanged triggers reconciliation if the list of policies in the PolicySet has +// changed, or if the PolicySet was just created or deleted. +func policySetPolicyListChanged() predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + //nolint:forcetypeassert + policySetObjNew := e.ObjectNew.(*policiesv1beta1.PolicySet) + //nolint:forcetypeassert + policySetObjOld := e.ObjectOld.(*policiesv1beta1.PolicySet) + + return !equality.Semantic.DeepEqual(policySetObjNew.Spec.Policies, policySetObjOld.Spec.Policies) + }, + CreateFunc: func(e event.CreateEvent) bool { + return true + }, + DeleteFunc: func(e event.DeleteEvent) bool { + return true + }, + } +} + +// mapBindingToPolicies maps a PlacementBinding to the Policies that are either directly in its +// subjects list, or are in a PolicySet which is a subject of this PlacementBinding. +func mapBindingToPolicies(c client.Client) handler.MapFunc { + return func(ctx context.Context, obj client.Object) []reconcile.Request { + //nolint:forcetypeassert + pb := obj.(*policiesv1.PlacementBinding) + + log := log.WithValues("placementBindingName", pb.GetName(), "namespace", pb.GetNamespace()) + log.V(2).Info("Reconcile request for a PlacementBinding") + + return common.GetPoliciesInPlacementBinding(ctx, c, pb) + } +} + +// bindingForPolicy triggers reconciliation if the binding has any Policies or PolicySets in its +// subjects list. +func bindingForPolicy() predicate.Funcs { + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + //nolint:forcetypeassert + pbObjNew := e.ObjectNew.(*policiesv1.PlacementBinding) + //nolint:forcetypeassert + pbObjOld := e.ObjectOld.(*policiesv1.PlacementBinding) + + return common.IsForPolicyOrPolicySet(pbObjNew) || common.IsForPolicyOrPolicySet(pbObjOld) + }, + CreateFunc: func(e event.CreateEvent) bool { + //nolint:forcetypeassert + pbObj := e.Object.(*policiesv1.PlacementBinding) + + return common.IsForPolicyOrPolicySet(pbObj) + }, + DeleteFunc: func(e event.DeleteEvent) bool { + //nolint:forcetypeassert + pbObj := e.Object.(*policiesv1.PlacementBinding) + + return common.IsForPolicyOrPolicySet(pbObj) + }, + } +} + +// mapPlacementRuleToPolicies maps a PlacementRule to all Policies which are either direct subjects +// of PlacementBindings for the PlacementRule, or are in PolicySets bound to the PlacementRule. +func mapPlacementRuleToPolicies(c client.Client) handler.MapFunc { + return func(ctx context.Context, object client.Object) []reconcile.Request { + log := log.WithValues("placementRuleName", object.GetName(), "namespace", object.GetNamespace()) + + log.V(2).Info("Reconcile Request for PlacementRule") + + // list pb + pbList := &policiesv1.PlacementBindingList{} + lopts := &client.ListOptions{Namespace: object.GetNamespace()} + opts := client.MatchingFields{"placementRef.name": object.GetName()} + opts.ApplyToList(lopts) + + // find pb in the same namespace of placementrule + err := c.List(ctx, pbList, &client.ListOptions{Namespace: object.GetNamespace()}) + if err != nil { + return nil + } + + var result []reconcile.Request + // loop through pbs and collect policies from each matching one. + for i, pb := range pbList.Items { + if pb.PlacementRef.APIGroup != appsv1.SchemeGroupVersion.Group || + pb.PlacementRef.Kind != "PlacementRule" || pb.PlacementRef.Name != object.GetName() { + continue + } + + result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...) + } + + return result + } +} + +// mapPlacementDecisionToPolicies maps a PlacementDecision to all Policies which are either direct +// subjects of PlacementBindings on the decision's Placement, or are in PolicySets which are bound +// to that Placement. +func mapPlacementDecisionToPolicies(c client.Client) handler.MapFunc { + return func(ctx context.Context, object client.Object) []reconcile.Request { + log := log.WithValues("placementDecisionName", object.GetName(), "namespace", object.GetNamespace()) + + log.V(2).Info("Reconcile request for a placement decision") + + // get the placement name from the placementdecision + placementName := object.GetLabels()["cluster.open-cluster-management.io/placement"] + if placementName == "" { + return nil + } + + pbList := &policiesv1.PlacementBindingList{} + // find pb in the same namespace of placementrule + lopts := &client.ListOptions{Namespace: object.GetNamespace()} + opts := client.MatchingFields{"placementRef.name": placementName} + opts.ApplyToList(lopts) + + err := c.List(ctx, pbList, lopts) + if err != nil { + return nil + } + + var result []reconcile.Request + // loop through pbs and collect policies from each matching one. + for i, pb := range pbList.Items { + if pb.PlacementRef.APIGroup != clusterv1beta1.SchemeGroupVersion.Group || + pb.PlacementRef.Kind != "Placement" || pb.PlacementRef.Name != placementName { + continue + } + + result = append(result, common.GetPoliciesInPlacementBinding(ctx, c, &pbList.Items[i])...) + } + + return result + } +} diff --git a/controllers/propagator/zz_generated.deepcopy.go b/controllers/propagator/zz_generated.deepcopy.go new file mode 100644 index 00000000..414da9ec --- /dev/null +++ b/controllers/propagator/zz_generated.deepcopy.go @@ -0,0 +1,38 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +// Copyright (c) 2021 Red Hat, Inc. +// Copyright Contributors to the Open Cluster Management project + +// Code generated by controller-gen. DO NOT EDIT. + +package propagator + +import ( + "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GuttedObject) DeepCopyInto(out *GuttedObject) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GuttedObject. +func (in *GuttedObject) DeepCopy() *GuttedObject { + if in == nil { + return nil + } + out := new(GuttedObject) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *GuttedObject) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/controllers/rootpolicystatus/root_policy_status_controller.go b/controllers/rootpolicystatus/root_policy_status_controller.go index 353bfe6c..b284e247 100644 --- a/controllers/rootpolicystatus/root_policy_status_controller.go +++ b/controllers/rootpolicystatus/root_policy_status_controller.go @@ -15,7 +15,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" policiesv1 "open-cluster-management.io/governance-policy-propagator/api/v1" "open-cluster-management.io/governance-policy-propagator/controllers/common" @@ -30,9 +29,9 @@ var log = ctrl.Log.WithName(ControllerName) //+kubebuilder:rbac:groups=policy.open-cluster-management.io,resources=policies/status,verbs=get;update;patch // SetupWithManager sets up the controller with the Manager. -func (r *RootPolicyStatusReconciler) SetupWithManager(mgr ctrl.Manager, _ ...source.Source) error { +func (r *RootPolicyStatusReconciler) SetupWithManager(mgr ctrl.Manager, maxConcurrentReconciles uint) error { return ctrl.NewControllerManagedBy(mgr). - WithOptions(controller.Options{MaxConcurrentReconciles: int(r.MaxConcurrentReconciles)}). + WithOptions(controller.Options{MaxConcurrentReconciles: int(maxConcurrentReconciles)}). Named(ControllerName). For( &policiesv1.Policy{}, @@ -43,7 +42,7 @@ func (r *RootPolicyStatusReconciler) SetupWithManager(mgr ctrl.Manager, _ ...sou // particular way, so we will define that in a separate "Watches" Watches( &policiesv1.Policy{}, - handler.EnqueueRequestsFromMapFunc(common.PolicyMapper(mgr.GetClient())), + handler.EnqueueRequestsFromMapFunc(common.MapToRootPolicy(mgr.GetClient())), builder.WithPredicates(policyStatusPredicate()), ). Complete(r) @@ -55,7 +54,6 @@ var _ reconcile.Reconciler = &RootPolicyStatusReconciler{} // RootPolicyStatusReconciler handles replicated policy status updates and updates the root policy status. type RootPolicyStatusReconciler struct { client.Client - MaxConcurrentReconciles uint // Use a shared lock with the main policy controller to avoid conflicting updates. RootPolicyLocks *sync.Map Scheme *runtime.Scheme diff --git a/main.go b/main.go index 4cc8960c..20a3a457 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/source" //+kubebuilder:scaffold:imports policyv1 "open-cluster-management.io/governance-policy-propagator/api/v1" @@ -87,11 +89,19 @@ func main() { pflag.CommandLine.AddGoFlagSet(flag.CommandLine) - var metricsAddr string - var enableLeaderElection bool - var probeAddr string - var keyRotationDays, keyRotationMaxConcurrency, policyMetricsMaxConcurrency, policyStatusMaxConcurrency uint - var enableComplianceEventsStore, enableWebhooks bool + var ( + metricsAddr string + enableLeaderElection bool + probeAddr string + keyRotationDays uint + keyRotationMaxConcurrency uint + policyMetricsMaxConcurrency uint + policyStatusMaxConcurrency uint + rootPolicyMaxConcurrency uint + replPolicyMaxConcurrency uint + enableComplianceEventsStore bool + enableWebhooks bool + ) pflag.StringVar(&metricsAddr, "metrics-bind-address", ":8383", "The address the metric endpoint binds to.") pflag.StringVar(&probeAddr, "health-probe-bind-address", ":8081", "The address the probe endpoint binds to.") @@ -126,6 +136,18 @@ func main() { 5, "The maximum number of concurrent reconciles for the policy-status controller", ) + pflag.UintVar( + &rootPolicyMaxConcurrency, + "root-policy-max-concurrency", + 2, + "The maximum number of concurrent reconciles for the root-policy controller", + ) + pflag.UintVar( + &replPolicyMaxConcurrency, + "replicated-policy-max-concurrency", + 10, + "The maximum number of concurrent reconciles for the replicated-policy controller", + ) pflag.Parse() @@ -245,24 +267,45 @@ func main() { }() policiesLock := &sync.Map{} + replicatedResourceVersions := &sync.Map{} - if err = (&propagatorctrl.PolicyReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Recorder: mgr.GetEventRecorderFor(propagatorctrl.ControllerName), - DynamicWatcher: dynamicWatcher, - RootPolicyLocks: policiesLock, - }).SetupWithManager(mgr, dynamicWatcherSource); err != nil { - log.Error(err, "Unable to create the controller", "controller", propagatorctrl.ControllerName) + bufferSize := 1024 + + replicatedPolicyUpdates := make(chan event.GenericEvent, bufferSize) + replicatedUpdatesSource := &source.Channel{ + Source: replicatedPolicyUpdates, + DestBufferSize: bufferSize, + } + + propagator := propagatorctrl.Propagator{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Recorder: mgr.GetEventRecorderFor(propagatorctrl.ControllerName), + RootPolicyLocks: policiesLock, + ReplicatedPolicyUpdates: replicatedPolicyUpdates, + } + + if err = (&propagatorctrl.RootPolicyReconciler{ + Propagator: propagator, + }).SetupWithManager(mgr, rootPolicyMaxConcurrency); err != nil { + log.Error(err, "Unable to create the controller", "controller", "root-policy-spec") + os.Exit(1) + } + + if err = (&propagatorctrl.ReplicatedPolicyReconciler{ + Propagator: propagator, + ResourceVersions: replicatedResourceVersions, + DynamicWatcher: dynamicWatcher, + }).SetupWithManager(mgr, replPolicyMaxConcurrency, dynamicWatcherSource, replicatedUpdatesSource); err != nil { + log.Error(err, "Unable to create the controller", "controller", "replicated-policy") os.Exit(1) } if reportMetrics() { if err = (&metricsctrl.MetricReconciler{ - Client: mgr.GetClient(), - MaxConcurrentReconciles: policyMetricsMaxConcurrency, - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, policyMetricsMaxConcurrency); err != nil { log.Error(err, "Unable to create the controller", "controller", metricsctrl.ControllerName) os.Exit(1) } @@ -288,21 +331,19 @@ func main() { } if err = (&encryptionkeysctrl.EncryptionKeysReconciler{ - Client: mgr.GetClient(), - KeyRotationDays: keyRotationDays, - MaxConcurrentReconciles: keyRotationMaxConcurrency, - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + Client: mgr.GetClient(), + KeyRotationDays: keyRotationDays, + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, keyRotationMaxConcurrency); err != nil { log.Error(err, "Unable to create controller", "controller", encryptionkeysctrl.ControllerName) os.Exit(1) } if err = (&rootpolicystatusctrl.RootPolicyStatusReconciler{ - Client: mgr.GetClient(), - MaxConcurrentReconciles: policyStatusMaxConcurrency, - RootPolicyLocks: policiesLock, - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + Client: mgr.GetClient(), + RootPolicyLocks: policiesLock, + Scheme: mgr.GetScheme(), + }).SetupWithManager(mgr, policyStatusMaxConcurrency); err != nil { log.Error(err, "Unable to create controller", "controller", rootpolicystatusctrl.ControllerName) os.Exit(1) } diff --git a/test/e2e/case12_encryptionkeys_controller_test.go b/test/e2e/case12_encryptionkeys_controller_test.go index 168e5325..1e8db74f 100644 --- a/test/e2e/case12_encryptionkeys_controller_test.go +++ b/test/e2e/case12_encryptionkeys_controller_test.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "open-cluster-management.io/governance-policy-propagator/controllers/common" "open-cluster-management.io/governance-policy-propagator/test/utils" ) @@ -30,65 +31,86 @@ var _ = Describe("Test policy encryption key rotation", func() { keyB64 := base64.StdEncoding.EncodeToString(key) previousKey := bytes.Repeat([]byte{byte('B')}, 256/8) - It("should create a some sample policies", func() { - policyConfigs := []struct { - Name string - IVAnnoation string - RootPolicy bool - }{ - // This policy should get the triggered update annotation - {"policy-one", "", true}, - {testNamespace + ".policy-one", "7cznVUq5SXEE4RMZNkGOrQ==", false}, - {"policy-two", "", true}, - {testNamespace + ".policy-two", "", false}, - } + rsrcPath := "../resources/case12_encryptionkeys_controller/" + policyOneYaml := rsrcPath + "policy-one.yaml" + policyTwoYaml := rsrcPath + "policy-two.yaml" + policyOneName := "policy-one" + policyTwoName := "policy-two" + replicatedPolicyOneYaml := rsrcPath + "replicated-policy-one.yaml" + replicatedPolicyOneName := "policy-propagator-test.policy-one" + + It("should create some sample policies", func() { + By("Creating the root policies with placement rules and bindings") + utils.Kubectl("apply", "-f", policyOneYaml, "-n", testNamespace) + rootOne := utils.GetWithTimeout( + clientHubDynamic, gvrPolicy, policyOneName, testNamespace, true, defaultTimeoutSeconds, + ) + Expect(rootOne).NotTo(BeNil()) - for _, policyConf := range policyConfigs { - annotations := map[string]interface{}{} - if policyConf.IVAnnoation != "" { - annotations[IVAnnotation] = policyConf.IVAnnoation - } + utils.Kubectl("apply", "-f", policyTwoYaml, "-n", testNamespace) + rootTwo := utils.GetWithTimeout( + clientHubDynamic, gvrPolicy, policyTwoName, testNamespace, true, defaultTimeoutSeconds, + ) + Expect(rootTwo).NotTo(BeNil()) - labels := map[string]interface{}{} - if !policyConf.RootPolicy { - labels[RootPolicyLabel] = policyConf.Name - } + By("Patching in the decision for policy-one") + plrOne := utils.GetWithTimeout( + clientHubDynamic, gvrPlacementRule, policyOneName+"-plr", testNamespace, true, defaultTimeoutSeconds, + ) + plrOne.Object["status"] = utils.GeneratePlrStatus("managed1") + _, err := clientHubDynamic.Resource(gvrPlacementRule).Namespace(testNamespace).UpdateStatus( + context.TODO(), plrOne, metav1.UpdateOptions{}, + ) + Expect(err).ToNot(HaveOccurred()) + replicatedOne := utils.GetWithTimeout( + clientHubDynamic, gvrPolicy, testNamespace+"."+policyOneName, "managed1", true, defaultTimeoutSeconds, + ) + Expect(replicatedOne).ToNot(BeNil()) + opt := metav1.ListOptions{ + LabelSelector: common.RootPolicyLabel + "=" + testNamespace + "." + policyOneName, + } + utils.ListWithTimeout(clientHubDynamic, gvrPolicy, opt, 1, true, defaultTimeoutSeconds) - policy := &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "policy.open-cluster-management.io/v1", - "kind": "Policy", - "metadata": map[string]interface{}{ - "annotations": annotations, - "labels": labels, - "name": policyConf.Name, - "namespace": testNamespace, - }, - "spec": map[string]interface{}{ - "disabled": false, - "policy-templates": []map[string]interface{}{ - {"objectDefinition": map[string]interface{}{}}, - }, - }, - }, - } - _, err := clientHubDynamic.Resource(gvrPolicy). - Namespace(testNamespace). - Create(context.TODO(), policy, metav1.CreateOptions{}) - Expect(err).ShouldNot(HaveOccurred()) + By("Patching in the decision for policy-two") + plrTwo := utils.GetWithTimeout( + clientHubDynamic, gvrPlacementRule, policyTwoName+"-plr", testNamespace, true, defaultTimeoutSeconds, + ) + plrTwo.Object["status"] = utils.GeneratePlrStatus("managed1") + _, err = clientHubDynamic.Resource(gvrPlacementRule).Namespace(testNamespace).UpdateStatus( + context.TODO(), plrTwo, metav1.UpdateOptions{}, + ) + Expect(err).ToNot(HaveOccurred()) + replicatedTwo := utils.GetWithTimeout( + clientHubDynamic, gvrPolicy, testNamespace+"."+policyTwoName, "managed1", true, defaultTimeoutSeconds, + ) + Expect(replicatedTwo).ToNot(BeNil()) + opt = metav1.ListOptions{ + LabelSelector: common.RootPolicyLabel + "=" + testNamespace + "." + policyTwoName, } + utils.ListWithTimeout(clientHubDynamic, gvrPolicy, opt, 1, true, defaultTimeoutSeconds) + + By("Adding the IV Annotation to the replicated policy-one") + utils.Kubectl("apply", "-n", "managed1", "-f", replicatedPolicyOneYaml) + + Eventually(func() interface{} { + replicatedPolicy := utils.GetWithTimeout( + clientHubDynamic, gvrPolicy, replicatedPolicyOneName, "managed1", true, defaultTimeoutSeconds, + ) + + return replicatedPolicy.GetAnnotations() + }, defaultTimeoutSeconds, 1).Should(HaveKey(IVAnnotation)) }) It("should create a "+EncryptionKeySecret+" secret that needs a rotation", func() { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: EncryptionKeySecret, - Namespace: testNamespace, + Namespace: "managed1", Annotations: map[string]string{LastRotatedAnnotation: "2020-04-15T01:02:03Z"}, }, Data: map[string][]byte{"key": key, "previousKey": previousKey}, } - _, err := clientHub.CoreV1().Secrets(testNamespace).Create(context.TODO(), secret, metav1.CreateOptions{}) + _, err := clientHub.CoreV1().Secrets("managed1").Create(context.TODO(), secret, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) @@ -99,7 +121,7 @@ var _ = Describe("Test policy encryption key rotation", func() { clientHubDynamic, gvrSecret, EncryptionKeySecret, - testNamespace, + "managed1", true, defaultTimeoutSeconds, ) @@ -144,7 +166,7 @@ var _ = Describe("Test policy encryption key rotation", func() { }) It("clean up", func() { - err := clientHub.CoreV1().Secrets(testNamespace).Delete( + err := clientHub.CoreV1().Secrets("managed1").Delete( context.TODO(), EncryptionKeySecret, metav1.DeleteOptions{}, ) Expect(err).ShouldNot(HaveOccurred()) diff --git a/test/e2e/case1_propagation_test.go b/test/e2e/case1_propagation_test.go index 95a638d4..1be45aa3 100644 --- a/test/e2e/case1_propagation_test.go +++ b/test/e2e/case1_propagation_test.go @@ -747,13 +747,24 @@ var _ = Describe("Test policy propagation", func() { policyMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(policy) Expect(err).ToNot(HaveOccurred()) - policyRV, err := policyClient().Create( + _, err = policyClient().Create( context.TODO(), &unstructured.Unstructured{Object: policyMap}, metav1.CreateOptions{}, ) Expect(err).ToNot(HaveOccurred()) - _, found, _ := unstructured.NestedBool(policyRV.Object, "spec", "copyPolicyMetadata") - Expect(found).To(BeFalse()) + Eventually(func(g Gomega) { + replicatedPlc := utils.GetWithTimeout( + clientHubDynamic, + gvrPolicy, + testNamespace+"."+policyName, + "managed1", + true, + defaultTimeoutSeconds, + ) + + _, found, _ := unstructured.NestedBool(replicatedPlc.Object, "spec", "copyPolicyMetadata") + g.Expect(found).To(BeFalse()) + }, defaultTimeoutSeconds, 1).Should(Succeed()) }) It("verifies that the labels and annotations are copied with spec.copyPolicyMetadata=true", func() { diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 906d1527..4ea17067 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -22,6 +22,8 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + + "open-cluster-management.io/governance-policy-propagator/test/utils" ) var ( @@ -102,6 +104,32 @@ var _ = BeforeSuite(func() { Expect(namespaces.Get(context.TODO(), testNamespace, metav1.GetOptions{})).NotTo(BeNil()) }) +var _ = AfterSuite(func() { + By("Collecting workqueue_adds_total metrics") + wqAddsLines, err := utils.MetricsLines("workqueue_adds_total") + if err != nil { + GinkgoWriter.Println("Error getting workqueue_adds_total metrics: ", err) + } + + GinkgoWriter.Println(wqAddsLines) + + By("Collecting controller_runtime_reconcile_total metrics") + ctrlReconcileTotalLines, err := utils.MetricsLines("controller_runtime_reconcile_total") + if err != nil { + GinkgoWriter.Println("Error getting controller_runtime_reconcile_total metrics: ", err) + } + + GinkgoWriter.Println(ctrlReconcileTotalLines) + + By("Collecting controller_runtime_reconcile_time_seconds_sum metrics") + ctrlReconcileTimeLines, err := utils.MetricsLines("controller_runtime_reconcile_time_seconds_sum") + if err != nil { + GinkgoWriter.Println("Error getting controller_runtime_reconcile_time_seconds_sum metrics: ", err) + } + + GinkgoWriter.Println(ctrlReconcileTimeLines) +}) + func NewKubeClient(url, kubeconfig, context string) kubernetes.Interface { klog.V(5).Infof("Create kubeclient for url %s using kubeconfig path %s\n", url, kubeconfig) diff --git a/test/resources/case12_encryptionkeys_controller/policy-one.yaml b/test/resources/case12_encryptionkeys_controller/policy-one.yaml new file mode 100644 index 00000000..af22795c --- /dev/null +++ b/test/resources/case12_encryptionkeys_controller/policy-one.yaml @@ -0,0 +1,34 @@ +apiVersion: policy.open-cluster-management.io/v1 +kind: Policy +metadata: + name: policy-one +spec: + disabled: false + policy-templates: + - objectDefinition: + data: '{{hub fakedata hub}}' +--- +apiVersion: policy.open-cluster-management.io/v1 +kind: PlacementBinding +metadata: + name: policy-one-pb +placementRef: + apiGroup: apps.open-cluster-management.io + kind: PlacementRule + name: policy-one-plr +subjects: +- apiGroup: policy.open-cluster-management.io + kind: Policy + name: policy-one +--- +apiVersion: apps.open-cluster-management.io/v1 +kind: PlacementRule +metadata: + name: policy-one-plr +spec: + clusterConditions: + - status: "True" + type: ManagedClusterConditionAvailable + clusterSelector: + matchExpressions: + [] diff --git a/test/resources/case12_encryptionkeys_controller/policy-two.yaml b/test/resources/case12_encryptionkeys_controller/policy-two.yaml new file mode 100644 index 00000000..ab1210df --- /dev/null +++ b/test/resources/case12_encryptionkeys_controller/policy-two.yaml @@ -0,0 +1,33 @@ +apiVersion: policy.open-cluster-management.io/v1 +kind: Policy +metadata: + name: policy-two +spec: + disabled: false + policy-templates: + - objectDefinition: {} +--- +apiVersion: policy.open-cluster-management.io/v1 +kind: PlacementBinding +metadata: + name: policy-two-pb +placementRef: + apiGroup: apps.open-cluster-management.io + kind: PlacementRule + name: policy-two-plr +subjects: +- apiGroup: policy.open-cluster-management.io + kind: Policy + name: policy-two +--- +apiVersion: apps.open-cluster-management.io/v1 +kind: PlacementRule +metadata: + name: policy-two-plr +spec: + clusterConditions: + - status: "True" + type: ManagedClusterConditionAvailable + clusterSelector: + matchExpressions: + [] diff --git a/test/resources/case12_encryptionkeys_controller/replicated-policy-one.yaml b/test/resources/case12_encryptionkeys_controller/replicated-policy-one.yaml new file mode 100644 index 00000000..a9a97760 --- /dev/null +++ b/test/resources/case12_encryptionkeys_controller/replicated-policy-one.yaml @@ -0,0 +1,11 @@ +apiVersion: policy.open-cluster-management.io/v1 +kind: Policy +metadata: + name: policy-propagator-test.policy-one + annotations: + policy.open-cluster-management.io/encryption-iv: '7cznVUq5SXEE4RMZNkGOrQ==' +spec: + disabled: false + policy-templates: + - objectDefinition: + data: '{{hub fakedata hub}}' diff --git a/test/utils/utils.go b/test/utils/utils.go index ac77cc52..cfa6eaca 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -327,3 +327,38 @@ func GetMatchingEvents( return matchingEvents } + +// MetricsLines execs into the propagator pod and curls the metrics endpoint, and returns lines +// that match the pattern. +func MetricsLines(pattern string) (string, error) { + propPodInfo, err := KubectlWithOutput("get", "pod", "-n=open-cluster-management", + "-l=name=governance-policy-propagator", "--no-headers") + if err != nil { + return "", err + } + + var cmd *exec.Cmd + + metricsCmd := fmt.Sprintf(`curl localhost:8383/metrics | grep %q`, pattern) + + // The pod name is "No" when the response is "No resources found" + propPodName := strings.Split(propPodInfo, " ")[0] + if propPodName == "No" { + // A missing pod could mean the controller is running locally + cmd = exec.Command("bash", "-c", metricsCmd) + } else { + cmd = exec.Command("kubectl", "exec", "-n=open-cluster-management", propPodName, "-c", + "governance-policy-propagator", "--", "bash", "-c", metricsCmd) + } + + matchingMetricsRaw, err := cmd.Output() + if err != nil { + if err.Error() == "exit status 1" { + return "", nil // exit 1 indicates that grep couldn't find a match. + } + + return "", err + } + + return string(matchingMetricsRaw), nil +}