diff --git a/pkg/controller/operators/catalog/operator.go b/pkg/controller/operators/catalog/operator.go index ebdd6313a4..85d0e1bdeb 100644 --- a/pkg/controller/operators/catalog/operator.go +++ b/pkg/controller/operators/catalog/operator.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -114,7 +115,7 @@ type Operator struct { subQueueSet *queueinformer.ResourceQueueSet ipQueueSet *queueinformer.ResourceQueueSet ogQueueSet *queueinformer.ResourceQueueSet - nsResolveQueue workqueue.TypedRateLimitingInterface[any] + nsResolveQueue workqueue.TypedRateLimitingInterface[types.NamespacedName] namespace string recorder record.EventRecorder sources *grpc.SourceStore @@ -268,8 +269,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire InstallPlans ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans() op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister()) - ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "ips", }) op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue) @@ -290,8 +291,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups() op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "ogs", }) op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue) @@ -312,8 +313,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "catsrcs", }) op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue) @@ -323,7 +324,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo queueinformer.WithLogger(op.logger), queueinformer.WithQueue(catsrcQueue), queueinformer.WithInformer(catsrcInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()), + queueinformer.WithDeletionHandler(op.handleCatSrcDeletion), ) if err != nil { return nil, err @@ -341,8 +343,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subIndexer := subInformer.Informer().GetIndexer() op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer - subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "subs", }) op.subQueueSet.Set(metav1.NamespaceAll, subQueue) @@ -355,7 +357,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo subscription.WithCatalogInformer(catsrcInformer.Informer()), subscription.WithInstallPlanInformer(ipInformer.Informer()), subscription.WithSubscriptionQueue(subQueue), - subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)), + subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)), subscription.WithRegistryReconcilerFactory(op.reconciler), subscription.WithGlobalCatalogNamespace(op.namespace), subscription.WithSourceProvider(op.resolverSourceProvider), @@ -415,7 +417,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ + queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: gvr.String(), }) queueInformer, err := queueinformer.NewQueueInformer( @@ -560,7 +562,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()}) logger.Info("registering owner reference fixer") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ + queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: gvr.String(), }) queueInformer, err := queueinformer.NewQueueInformer( @@ -670,13 +672,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo } // Generate and register QueueInformers for k8s resources - k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion) + k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer() for _, informer := range sharedIndexInformers { queueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(informer), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -724,7 +727,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(crdInformer), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -745,8 +749,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo // Namespace sync for resolving subscriptions namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces() op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "resolve", }) namespaceQueueInformer, err := queueinformer.NewQueueInformer( @@ -787,12 +791,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) { if err == nil { for ns := range namespaces { - o.nsResolveQueue.Add(ns) + o.nsResolveQueue.Add(types.NamespacedName{Name: ns}) } } } - o.nsResolveQueue.Add(state.Key.Namespace) + o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace}) } if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil { o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change") @@ -1411,7 +1415,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { } logger.Info("unpacking is not complete yet, requeueing") - o.nsResolveQueue.AddAfter(namespace, 5*time.Second) + o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second) return nil } } @@ -1506,7 +1510,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error { return fmt.Errorf("casting Subscription failed") } - o.nsResolveQueue.Add(sub.GetNamespace()) + o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()}) return nil } @@ -1520,7 +1524,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error { return fmt.Errorf("casting OperatorGroup failed") } - o.nsResolveQueue.Add(og.GetNamespace()) + o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()}) return nil } diff --git a/pkg/controller/operators/catalog/operator_test.go b/pkg/controller/operators/catalog/operator_test.go index c251cfc77b..e0e543b47c 100644 --- a/pkg/controller/operators/catalog/operator_test.go +++ b/pkg/controller/operators/catalog/operator_test.go @@ -2156,13 +2156,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string, client: clientFake, lister: lister, namespace: namespace, - nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.NewTypedMaxOfRateLimiter[any]( - workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second), + nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.NewTypedMaxOfRateLimiter[types.NamespacedName]( + workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](1*time.Second, 1000*time.Second), // 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) - &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, + &workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(1), 100)}, ), - workqueue.TypedRateLimitingQueueConfig[any]{ + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "resolver", }), resolver: config.resolver, diff --git a/pkg/controller/operators/catalog/subscription/config.go b/pkg/controller/operators/catalog/subscription/config.go index c4c1877b64..01c8a21899 100644 --- a/pkg/controller/operators/catalog/subscription/config.go +++ b/pkg/controller/operators/catalog/subscription/config.go @@ -4,6 +4,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" utilclock "k8s.io/utils/clock" @@ -23,7 +24,7 @@ type syncerConfig struct { subscriptionInformer cache.SharedIndexInformer catalogInformer cache.SharedIndexInformer installPlanInformer cache.SharedIndexInformer - subscriptionQueue workqueue.TypedRateLimitingInterface[any] + subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName] reconcilers kubestate.ReconcilerChain registryReconcilerFactory reconciler.RegistryReconcilerFactory globalCatalogNamespace string @@ -97,7 +98,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption { } // WithSubscriptionQueue sets a syncer's subscription queue. -func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption { +func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]) SyncerOption { return func(config *syncerConfig) { config.subscriptionQueue = subscriptionQueue } diff --git a/pkg/controller/operators/catalog/subscription/reconciler.go b/pkg/controller/operators/catalog/subscription/reconciler.go index 2d3ccf9724..1a6741a650 100644 --- a/pkg/controller/operators/catalog/subscription/reconciler.go +++ b/pkg/controller/operators/catalog/subscription/reconciler.go @@ -28,7 +28,7 @@ import ( // ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs. // Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain. -func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler { +func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler { var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) { out = in switch s := in.(type) { @@ -36,10 +36,6 @@ func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDel if sync != nil { err = sync(s.Subscription()) } - case SubscriptionDeletedState: - if onDelete != nil { - onDelete(s.Subscription()) - } case SubscriptionState: if sync != nil { err = sync(s.Subscription()) diff --git a/pkg/controller/operators/catalog/subscription/state.go b/pkg/controller/operators/catalog/subscription/state.go index 50e26b67b9..64e2d26ae8 100644 --- a/pkg/controller/operators/catalog/subscription/state.go +++ b/pkg/controller/operators/catalog/subscription/state.go @@ -25,7 +25,6 @@ type SubscriptionState interface { Subscription() *v1alpha1.Subscription Add() SubscriptionExistsState Update() SubscriptionExistsState - Delete() SubscriptionDeletedState } // SubscriptionExistsState describes subscription states in which the subscription exists on the cluster. @@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface { isSubscriptionUpdatedState() } -// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster. -type SubscriptionDeletedState interface { - SubscriptionState - - isSubscriptionDeletedState() -} - // CatalogHealthState describes subscription states that represent a subscription with respect to catalog health. type CatalogHealthState interface { SubscriptionExistsState @@ -176,12 +168,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState { } } -func (s *subscriptionState) Delete() SubscriptionDeletedState { - return &subscriptionDeletedState{ - SubscriptionState: s, - } -} - func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState { return &subscriptionState{ State: kubestate.NewState(), @@ -207,12 +193,6 @@ type subscriptionUpdatedState struct { func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {} -type subscriptionDeletedState struct { - SubscriptionState -} - -func (c *subscriptionDeletedState) isSubscriptionDeletedState() {} - type catalogHealthState struct { SubscriptionExistsState } diff --git a/pkg/controller/operators/catalog/subscription/syncer.go b/pkg/controller/operators/catalog/subscription/syncer.go index 04e8edeb5a..b53974215a 100644 --- a/pkg/controller/operators/catalog/subscription/syncer.go +++ b/pkg/controller/operators/catalog/subscription/syncer.go @@ -7,6 +7,7 @@ import ( "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" utilclock "k8s.io/utils/clock" @@ -71,9 +72,6 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE case kubestate.ResourceUpdated: initial = initial.Update() metrics.UpdateSubsSyncCounterStorage(res) - case kubestate.ResourceDeleted: - initial = initial.Delete() - metrics.DeleteSubsMetric(res) } reconciled, err := s.reconcilers.Reconcile(ctx, initial) @@ -89,18 +87,28 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE return nil } -func (s *subscriptionSyncer) Notify(event kubestate.ResourceEvent) { +func (s *subscriptionSyncer) Notify(event types.NamespacedName) { s.notify(event) } // catalogSubscriptionKeys returns the set of explicit subscription keys, cluster-wide, that are possibly affected by catalogs in the given namespace. -func (s *subscriptionSyncer) catalogSubscriptionKeys(namespace string) ([]string, error) { - var keys []string +func (s *subscriptionSyncer) catalogSubscriptionKeys(namespace string) ([]types.NamespacedName, error) { + var cacheKeys []string var err error if namespace == s.globalCatalogNamespace { - keys = s.subscriptionCache.ListKeys() + cacheKeys = s.subscriptionCache.ListKeys() } else { - keys, err = s.subscriptionCache.IndexKeys(cache.NamespaceIndex, namespace) + cacheKeys, err = s.subscriptionCache.IndexKeys(cache.NamespaceIndex, namespace) + } + + keys := make([]types.NamespacedName, len(cacheKeys)) + for _, k := range cacheKeys { + ns, name, err := cache.SplitMetaNamespaceKey(k) + if err != nil { + s.logger.Warnf("could not split meta key %q", k) + continue + } + keys = append(keys, types.NamespacedName{Namespace: ns, Name: name}) } return keys, err @@ -132,7 +140,7 @@ func (s *subscriptionSyncer) notifyOnCatalog(ctx context.Context, obj interface{ logger.Trace("notifing dependent subscriptions") for _, subKey := range dependentKeys { logger.Tracef("notifying subscription %s", subKey) - s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, subKey)) + s.Notify(subKey) } logger.Trace("dependent subscriptions notified") } @@ -177,9 +185,9 @@ func (s *subscriptionSyncer) notifyOnInstallPlan(ctx context.Context, obj interf // Notify dependent owner Subscriptions owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind) for _, owner := range owners { - subKey := fmt.Sprintf("%s/%s", plan.GetNamespace(), owner.Name) + subKey := types.NamespacedName{Namespace: plan.GetNamespace(), Name: owner.Name} logger.Tracef("notifying subscription %s", subKey) - s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, cache.ExplicitKey(subKey))) + s.Notify(subKey) } } @@ -217,7 +225,7 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S subscriptionCache: config.subscriptionInformer.GetIndexer(), installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(), sourceProvider: config.sourceProvider, - notify: func(event kubestate.ResourceEvent) { + notify: func(event types.NamespacedName) { // Notify Subscriptions by enqueuing to the Subscription queue. config.subscriptionQueue.Add(event) }, diff --git a/pkg/controller/operators/catalogtemplate/operator.go b/pkg/controller/operators/catalogtemplate/operator.go index ea10454506..66a14cde1c 100644 --- a/pkg/controller/operators/catalogtemplate/operator.go +++ b/pkg/controller/operators/catalogtemplate/operator.go @@ -10,6 +10,7 @@ import ( "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" @@ -101,8 +102,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg // Wire CatalogSources catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources() op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister()) - catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "catalogSourceTemplate", }) op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue) diff --git a/pkg/controller/operators/olm/operator.go b/pkg/controller/operators/olm/operator.go index e6dfaaf1d0..2bb23a555e 100644 --- a/pkg/controller/operators/olm/operator.go +++ b/pkg/controller/operators/olm/operator.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" appsv1applyconfigurations "k8s.io/client-go/applyconfigurations/apps/v1" @@ -83,11 +84,11 @@ type Operator struct { copiedCSVLister metadatalister.Lister ogQueueSet *queueinformer.ResourceQueueSet csvQueueSet *queueinformer.ResourceQueueSet - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[types.NamespacedName] csvCopyQueueSet *queueinformer.ResourceQueueSet copiedCSVGCQueueSet *queueinformer.ResourceQueueSet - nsQueueSet workqueue.TypedRateLimitingInterface[any] - apiServiceQueue workqueue.TypedRateLimitingInterface[any] + nsQueueSet workqueue.TypedRateLimitingInterface[types.NamespacedName] + apiServiceQueue workqueue.TypedRateLimitingInterface[types.NamespacedName] csvIndexers map[string]cache.Indexer recorder record.EventRecorder resolver install.StrategyResolverInterface @@ -198,17 +199,17 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat client: config.externalClient, ogQueueSet: queueinformer.NewEmptyResourceQueueSet(), csvQueueSet: queueinformer.NewEmptyResourceQueueSet(), - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "olmConfig", }), csvCopyQueueSet: queueinformer.NewEmptyResourceQueueSet(), copiedCSVGCQueueSet: queueinformer.NewEmptyResourceQueueSet(), - apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + apiServiceQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "apiservice", }), resolver: config.strategyResolver, @@ -232,7 +233,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace := map[string]*plugins.Informers{} // Set up syncing for namespace-scoped resources - k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion) + k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer() for _, namespace := range config.watchedNamespaces { informersByNamespace[namespace] = &plugins.Informers{} // Wire CSVs @@ -246,9 +247,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ).Operators().V1alpha1().ClusterServiceVersions() informersByNamespace[namespace].CSVInformer = csvInformer op.lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, csvInformer.Lister()) - csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + csvQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: fmt.Sprintf("%s/csv", namespace), }) op.csvQueueSet.Set(namespace, csvQueue) @@ -258,7 +259,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithQueue(csvQueue), queueinformer.WithInformer(csvInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncClusterServiceVersion).ToSyncerWithDelete(op.handleClusterServiceVersionDeletion)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncClusterServiceVersion).ToSyncer()), + queueinformer.WithDeletionHandler(op.handleClusterServiceVersionDeletion), ) if err != nil { return nil, err @@ -273,7 +275,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat op.csvIndexers[namespace] = csvIndexer // Register separate queue for copying csvs - csvCopyQueue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any](), fmt.Sprintf("%s/csv-copy", namespace)) + csvCopyQueue := workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ + Name: fmt.Sprintf("%s/csv-copy", namespace), + }) op.csvCopyQueueSet.Set(namespace, csvCopyQueue) csvCopyQueueInformer, err := queueinformer.NewQueueInformer( ctx, @@ -307,9 +311,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat informersByNamespace[namespace].CopiedCSVLister = op.copiedCSVLister // Register separate queue for gcing copied csvs - copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + copiedCSVGCQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: fmt.Sprintf("%s/csv-gc", namespace), }) op.copiedCSVGCQueueSet.Set(namespace, copiedCSVGCQueue) @@ -333,9 +337,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat operatorGroupInformer := extInformerFactory.Operators().V1().OperatorGroups() informersByNamespace[namespace].OperatorGroupInformer = operatorGroupInformer op.lister.OperatorsV1().RegisterOperatorGroupLister(namespace, operatorGroupInformer.Lister()) - ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: fmt.Sprintf("%s/og", namespace), }) op.ogQueueSet.Set(namespace, ogQueue) @@ -344,7 +348,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithQueue(ogQueue), queueinformer.WithInformer(operatorGroupInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroups).ToSyncerWithDelete(op.operatorGroupDeleted)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncOperatorGroups).ToSyncer()), + queueinformer.WithDeletionHandler(op.operatorGroupDeleted), ) if err != nil { return nil, err @@ -362,6 +367,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(opConditionInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -377,7 +383,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(subInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncSubscription).ToSyncerWithDelete(op.syncSubscriptionDeleted)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncSubscription).ToSyncer()), + queueinformer.WithDeletionHandler(op.syncSubscriptionDeleted), ) if err != nil { return nil, err @@ -406,6 +413,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(depInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -423,6 +431,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(roleInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -439,6 +448,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(roleBindingInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -458,6 +468,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(secretInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -475,6 +486,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(serviceInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -492,6 +504,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(serviceAccountInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -522,7 +535,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx}) logger.Info("registering labeller") - queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{ + queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: gvr.String(), }) queueInformer, err := queueinformer.NewQueueInformer( @@ -609,6 +622,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(clusterRoleInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -654,6 +668,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(clusterRoleBindingInformer.Informer()), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -696,9 +711,9 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), config.resyncPeriod()).Core().V1().Namespaces() informersByNamespace[metav1.NamespaceAll].NamespaceInformer = namespaceInformer op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister()) - op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + op.nsQueueSet = workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "resolver", }) namespaceInformer.Informer().AddEventHandler( @@ -730,7 +745,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithQueue(op.apiServiceQueue), queueinformer.WithInformer(apiServiceInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncAPIService).ToSyncerWithDelete(op.handleDeletion)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncAPIService).ToSyncer()), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -759,6 +775,7 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat queueinformer.WithLogger(op.logger), queueinformer.WithInformer(crdInformer), queueinformer.WithSyncer(k8sSyncer), + queueinformer.WithDeletionHandler(op.handleDeletion), ) if err != nil { return nil, err @@ -792,7 +809,8 @@ func newOperatorWithConfig(ctx context.Context, config *operatorConfig) (*Operat ctx, queueinformer.WithLogger(op.logger), queueinformer.WithInformer(proxyInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(proxySyncer.SyncProxy).ToSyncerWithDelete(proxySyncer.HandleProxyDelete)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(proxySyncer.SyncProxy).ToSyncer()), + queueinformer.WithDeletionHandler(proxySyncer.HandleProxyDelete), ) if err != nil { return nil, err @@ -1665,7 +1683,8 @@ func (a *Operator) syncCopyCSV(obj interface{}) (syncError error) { } if err == nil { - go a.olmConfigQueue.AddAfter(olmConfig, time.Second*5) + key := types.NamespacedName{Namespace: olmConfig.GetNamespace(), Name: olmConfig.GetName()} + go a.olmConfigQueue.AddAfter(key, time.Second*5) } logger := a.logger.WithFields(logrus.Fields{ @@ -2756,7 +2775,7 @@ func (a *Operator) requeueCSVsByLabelSet(logger *logrus.Entry, labelSets ...labe } for _, key := range keys { - if err := a.csvQueueSet.RequeueByKey(key); err != nil { + if err := a.csvQueueSet.Requeue(key.Namespace, key.Name); err != nil { logger.WithError(err).Debug("cannot requeue requiring/providing csv") } else { logger.WithField("key", key).Debug("csv successfully requeued on crd change") diff --git a/pkg/controller/operators/olm/operatorgroup.go b/pkg/controller/operators/olm/operatorgroup.go index 18c8b19008..8429a59ce8 100644 --- a/pkg/controller/operators/olm/operatorgroup.go +++ b/pkg/controller/operators/olm/operatorgroup.go @@ -8,20 +8,18 @@ import ( "reflect" "strings" - "k8s.io/apimachinery/pkg/api/equality" - "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/errors" - utillabels "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/labels" - operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -30,6 +28,7 @@ import ( "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/decorators" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" hashutil "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/hash" + utillabels "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubernetes/pkg/util/labels" "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/ownerutil" ) @@ -182,7 +181,7 @@ func (a *Operator) syncOperatorGroups(obj interface{}) error { logger.Debug("Requeueing out of sync namespaces") for _, ns := range outOfSyncNamespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(types.NamespacedName{Name: ns}) } // CSV requeue is handled by the succeeding sync in `annotateCSVs` @@ -263,7 +262,7 @@ func (a *Operator) operatorGroupDeleted(obj interface{}) { logger.Debug("OperatorGroup deleted, requeueing out of sync namespaces") for _, ns := range op.Status.Namespaces { logger.WithField("namespace", ns).Debug("requeueing") - a.nsQueueSet.Add(ns) + a.nsQueueSet.Add(types.NamespacedName{Name: ns}) } } diff --git a/pkg/lib/index/label.go b/pkg/lib/index/label.go index 52ccd29bea..ac891b5f46 100644 --- a/pkg/lib/index/label.go +++ b/pkg/lib/index/label.go @@ -2,6 +2,7 @@ package indexer import ( "fmt" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" @@ -29,9 +30,9 @@ func MetaLabelIndexFunc(obj interface{}) ([]string, error) { } // LabelIndexKeys returns the union of indexed cache keys in the given indexers matching the same labels as the given selector -func LabelIndexKeys(indexers map[string]cache.Indexer, labelSets ...labels.Set) ([]string, error) { - keySet := map[string]struct{}{} - keys := []string{} +func LabelIndexKeys(indexers map[string]cache.Indexer, labelSets ...labels.Set) ([]types.NamespacedName, error) { + stringKeySet := map[string]struct{}{} + stringKeys := []string{} for _, indexer := range indexers { for _, labelSet := range labelSets { for key, value := range labelSet { @@ -43,18 +44,26 @@ func LabelIndexKeys(indexers map[string]cache.Indexer, labelSets ...labels.Set) for _, cacheKey := range cacheKeys { // Detect duplication - if _, ok := keySet[cacheKey]; ok { + if _, ok := stringKeySet[cacheKey]; ok { continue } // Add to set - keySet[cacheKey] = struct{}{} - keys = append(keys, cacheKey) + stringKeySet[cacheKey] = struct{}{} + stringKeys = append(stringKeys, cacheKey) } } } } + keys := make([]types.NamespacedName, 0, len(stringKeys)) + for _, k := range stringKeys { + ns, name, err := cache.SplitMetaNamespaceKey(k) + if err != nil { + return nil, err + } + keys = append(keys, types.NamespacedName{Namespace: ns, Name: name}) + } return keys, nil } diff --git a/pkg/lib/kubestate/kubestate.go b/pkg/lib/kubestate/kubestate.go index 3f656069de..6d95cc918d 100644 --- a/pkg/lib/kubestate/kubestate.go +++ b/pkg/lib/kubestate/kubestate.go @@ -2,6 +2,7 @@ package kubestate import ( "context" + "k8s.io/apimachinery/pkg/types" ) type State interface { @@ -141,8 +142,6 @@ const ( ResourceAdded ResourceEventType = "add" // ResourceUpdated tells the operator that a given resources has been updated. ResourceUpdated ResourceEventType = "update" - // ResourceDeleted tells the operator that a given resources has been deleted. - ResourceDeleted ResourceEventType = "delete" ) type ResourceEvent interface { @@ -171,10 +170,10 @@ func NewResourceEvent(eventType ResourceEventType, resource interface{}) Resourc } type Notifier interface { - Notify(event ResourceEvent) + Notify(event types.NamespacedName) } -type NotifyFunc func(event ResourceEvent) +type NotifyFunc func(event types.NamespacedName) // SyncFunc syncs resource events. type SyncFunc func(ctx context.Context, event ResourceEvent) error diff --git a/pkg/lib/queueinformer/config.go b/pkg/lib/queueinformer/config.go index bd69d2403b..bed1398542 100644 --- a/pkg/lib/queueinformer/config.go +++ b/pkg/lib/queueinformer/config.go @@ -3,6 +3,7 @@ package queueinformer import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -14,11 +15,11 @@ import ( type queueInformerConfig struct { provider metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[types.NamespacedName] informer cache.SharedIndexInformer indexer cache.Indexer - keyFunc KeyFunc syncer kubestate.Syncer + onDelete func(interface{}) } // Option applies an option to the given queue informer config. @@ -40,6 +41,9 @@ func (c *queueInformerConfig) complete() { // Extract indexer from informer if c.indexer = c.informer.GetIndexer() } + if c.onDelete != nil { + c.onDelete = func(obj interface{}) {} + } } // validate returns an error if the config isn't valid. @@ -51,10 +55,8 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) { err = newInvalidConfigError("nil logger") case config.queue == nil: err = newInvalidConfigError("nil queue") - case config.indexer == nil && config.informer == nil: - err = newInvalidConfigError("nil indexer and informer") - case config.keyFunc == nil: - err = newInvalidConfigError("nil key function") + case config.indexer == nil: + err = newInvalidConfigError("nil indexer") case config.syncer == nil: err = newInvalidConfigError("nil syncer") } @@ -62,56 +64,15 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) { return } -// difference from above is that this intentionally verifies without index/informer -func (c *queueInformerConfig) validateQueue() (err error) { - switch config := c; { - case config.provider == nil: - err = newInvalidConfigError("nil metrics provider") - case config.logger == nil: - err = newInvalidConfigError("nil logger") - case config.queue == nil: - err = newInvalidConfigError("nil queue") - case config.keyFunc == nil: - err = newInvalidConfigError("nil key function") - case config.syncer == nil: - err = newInvalidConfigError("nil syncer") - } - - return -} - -func defaultKeyFunc(obj interface{}) (string, bool) { - // Get keys nested in resource events up to depth 2 - keyable := false - for d := 0; d < 2 && !keyable; d++ { - switch v := obj.(type) { - case string: - return v, true - case kubestate.ResourceEvent: - obj = v.Resource() - default: - keyable = true - } - } - - k, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err != nil { - return k, false - } - - return k, true -} - func defaultConfig() *queueInformerConfig { return &queueInformerConfig{ provider: metrics.NewMetricsNil(), - queue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + queue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "default", }), - logger: logrus.New(), - keyFunc: defaultKeyFunc, + logger: logrus.New(), } } @@ -130,7 +91,7 @@ func WithLogger(logger *logrus.Logger) Option { } // WithQueue sets the queue used by a QueueInformer. -func WithQueue(queue workqueue.RateLimitingInterface) Option { +func WithQueue(queue workqueue.TypedRateLimitingInterface[types.NamespacedName]) Option { return func(config *queueInformerConfig) { config.queue = queue } @@ -150,17 +111,16 @@ func WithIndexer(indexer cache.Indexer) Option { } } -// WithKeyFunc sets the key func used by a QueueInformer. -func WithKeyFunc(keyFunc KeyFunc) Option { +// WithSyncer sets the syncer invoked by a QueueInformer. +func WithSyncer(syncer kubestate.Syncer) Option { return func(config *queueInformerConfig) { - config.keyFunc = keyFunc + config.syncer = syncer } } -// WithSyncer sets the syncer invoked by a QueueInformer. -func WithSyncer(syncer kubestate.Syncer) Option { +func WithDeletionHandler(onDelete func(obj interface{})) Option { return func(config *queueInformerConfig) { - config.syncer = syncer + config.onDelete = onDelete } } diff --git a/pkg/lib/queueinformer/config_test.go b/pkg/lib/queueinformer/config_test.go deleted file mode 100644 index ab31e131d4..0000000000 --- a/pkg/lib/queueinformer/config_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package queueinformer - -import ( - "testing" - - "github.com/stretchr/testify/require" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/cache" - - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" -) - -func TestDefaultKeyFunc(t *testing.T) { - tests := []struct { - description string - obj interface{} - expectedKey string - expectedCreated bool - }{ - { - description: "String/Created", - obj: "a-string-key", - expectedKey: "a-string-key", - expectedCreated: true, - }, - { - description: "ExplicitKey/Created", - obj: cache.ExplicitKey("an-explicit-key"), - expectedKey: "an-explicit-key", - expectedCreated: true, - }, - { - description: "Meta/Created", - obj: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}}, - expectedKey: "default/a-pod", - expectedCreated: true, - }, - { - description: "Meta/NonNamespaced/Created", - obj: &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "a-namespace"}}, - expectedKey: "a-namespace", - expectedCreated: true, - }, - { - description: "ResourceEvent/String/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, "a-string-key"), - expectedKey: "a-string-key", - expectedCreated: true, - }, - { - description: "ResourceEvent/ExplicitKey/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, cache.ExplicitKey("an-explicit-key")), - expectedKey: "an-explicit-key", - expectedCreated: true, - }, - { - description: "ResourceEvent/Meta/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}}), - expectedKey: "default/a-pod", - expectedCreated: true, - }, - { - description: "ResourceEvent/Meta/NonNamespaced/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "a-namespace"}}), - expectedKey: "a-namespace", - expectedCreated: true, - }, - { - description: "ResourceEvent/ResourceEvent/ExplicitKey/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, cache.ExplicitKey("an-explicit-key"))), - expectedKey: "an-explicit-key", - expectedCreated: true, - }, - { - description: "ResourceEvent/ResourceEvent/Meta/Created", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}})), - expectedKey: "default/a-pod", - expectedCreated: true, - }, - { - description: "Arbitrary/NotCreated", - obj: struct{}{}, - expectedKey: "", - expectedCreated: false, - }, - { - description: "ResourceEvent/Arbitrary/NotCreated", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, struct{}{}), - expectedKey: "", - expectedCreated: false, - }, - { - description: "ResourceEvent/ResourceEvent/Arbitrary/NotCreated", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, struct{}{})), - expectedKey: "", - expectedCreated: false, - }, - { - description: "ResourceEvent/ResourceEvent/ResourceEvent/String/NotCreated", - obj: kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, kubestate.NewResourceEvent(kubestate.ResourceAdded, &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "a-pod"}}))), - expectedKey: "", - expectedCreated: false, - }, - } - - for _, tt := range tests { - t.Run(tt.description, func(t *testing.T) { - key, created := defaultKeyFunc(tt.obj) - require.Equal(t, tt.expectedKey, key) - require.Equal(t, tt.expectedCreated, created) - }) - } -} diff --git a/pkg/lib/queueinformer/queueinformer.go b/pkg/lib/queueinformer/queueinformer.go index 02a66cb527..54b16d9231 100644 --- a/pkg/lib/queueinformer/queueinformer.go +++ b/pkg/lib/queueinformer/queueinformer.go @@ -2,6 +2,9 @@ package queueinformer import ( "context" + "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -12,10 +15,6 @@ import ( "github.com/operator-framework/operator-lifecycle-manager/pkg/metrics" ) -// KeyFunc returns a key for the given object and a bool which is true if the key was -// successfully generated and false otherwise. -type KeyFunc func(obj interface{}) (string, bool) - // QueueInformer ties an informer to a queue in order to process events from the informer // the informer watches objects of interest and adds objects to the queue for processing // the syncHandler is called for all objects on the queue @@ -23,11 +22,11 @@ type QueueInformer struct { metrics.MetricsProvider logger *logrus.Logger - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[types.NamespacedName] informer cache.SharedIndexInformer indexer cache.Indexer - keyFunc KeyFunc syncer kubestate.Syncer + onDelete func(interface{}) } // Sync invokes all registered sync handlers in the QueueInformer's chain @@ -36,49 +35,38 @@ func (q *QueueInformer) Sync(ctx context.Context, event kubestate.ResourceEvent) } // Enqueue adds a key to the queue. If obj is a key already it gets added directly. -// Otherwise, the key is extracted via keyFunc. -func (q *QueueInformer) Enqueue(event kubestate.ResourceEvent) { - if event == nil { - // Don't enqueue nil events - return - } - - resource := event.Resource() - if event.Type() == kubestate.ResourceDeleted { - // Get object from tombstone if possible - if tombstone, ok := resource.(cache.DeletedFinalStateUnknown); ok { - resource = tombstone - } - } else { - // Extract key for add and update events - if key, ok := q.key(resource); ok { - resource = key - } - } - +func (q *QueueInformer) Enqueue(item types.NamespacedName) { // Create new resource event and add to queue - e := kubestate.NewResourceEvent(event.Type(), resource) - q.logger.WithField("event", e).Trace("enqueuing resource event") - q.queue.Add(e) -} - -// key turns an object into a key for the indexer. -func (q *QueueInformer) key(obj interface{}) (string, bool) { - return q.keyFunc(obj) + q.logger.WithField("item", item).Trace("enqueuing item") + q.queue.Add(item) } // resourceHandlers provides the default implementation for responding to events // these simply Log the event and add the object's key to the queue for later processing. -func (q *QueueInformer) resourceHandlers(ctx context.Context) *cache.ResourceEventHandlerFuncs { +func (q *QueueInformer) resourceHandlers(_ context.Context) *cache.ResourceEventHandlerFuncs { return &cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, obj)) + metaObj, ok := obj.(metav1.Object) + if !ok { + panic(fmt.Errorf("unexpected object type in add event: %T", obj)) + } + q.Enqueue(types.NamespacedName{ + Namespace: metaObj.GetNamespace(), + Name: metaObj.GetName(), + }) }, - UpdateFunc: func(oldObj, newObj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceUpdated, newObj)) + UpdateFunc: func(_, newObj interface{}) { + metaObj, ok := newObj.(metav1.Object) + if !ok { + panic(fmt.Errorf("unexpected object type in update event: %T", newObj)) + } + q.Enqueue(types.NamespacedName{ + Namespace: metaObj.GetNamespace(), + Name: metaObj.GetName(), + }) }, DeleteFunc: func(obj interface{}) { - q.Enqueue(kubestate.NewResourceEvent(kubestate.ResourceDeleted, obj)) + q.onDelete(obj) }, } } @@ -104,25 +92,6 @@ func (q *QueueInformer) metricHandlers() *cache.ResourceEventHandlerFuncs { } } -func NewQueue(ctx context.Context, options ...Option) (*QueueInformer, error) { - config := defaultConfig() - config.apply(options) - - if err := config.validateQueue(); err != nil { - return nil, err - } - - queue := &QueueInformer{ - MetricsProvider: config.provider, - logger: config.logger, - queue: config.queue, - keyFunc: config.keyFunc, - syncer: config.syncer, - } - - return queue, nil -} - // NewQueueInformer returns a new QueueInformer configured with options. func NewQueueInformer(ctx context.Context, options ...Option) (*QueueInformer, error) { // Get default config and apply given options @@ -145,8 +114,8 @@ func newQueueInformerFromConfig(ctx context.Context, config *queueInformerConfig queue: config.queue, indexer: config.indexer, informer: config.informer, - keyFunc: config.keyFunc, syncer: config.syncer, + onDelete: config.onDelete, } // Register event handlers for resource and metrics @@ -163,28 +132,12 @@ type LegacySyncHandler func(obj interface{}) error // ToSyncer returns the Syncer equivalent of the sync handler. func (l LegacySyncHandler) ToSyncer() kubestate.Syncer { - return l.ToSyncerWithDelete(nil) -} - -// ToSyncerWithDelete returns the Syncer equivalent of the given sync handler and delete function. -func (l LegacySyncHandler) ToSyncerWithDelete(onDelete func(obj interface{})) kubestate.Syncer { - var syncer kubestate.SyncFunc = func(ctx context.Context, event kubestate.ResourceEvent) error { + return kubestate.SyncFunc(func(ctx context.Context, event kubestate.ResourceEvent) error { switch event.Type() { - case kubestate.ResourceDeleted: - if onDelete != nil { - onDelete(event.Resource()) - } - case kubestate.ResourceAdded: - // Added and updated are treated the same - fallthrough - case kubestate.ResourceUpdated: + case kubestate.ResourceAdded, kubestate.ResourceUpdated: return l(event.Resource()) default: return errors.Errorf("unexpected resource event type: %s", event.Type()) } - - return nil - } - - return syncer + }) } diff --git a/pkg/lib/queueinformer/queueinformer_operator.go b/pkg/lib/queueinformer/queueinformer_operator.go index ecdb4eb896..3dc81d6982 100644 --- a/pkg/lib/queueinformer/queueinformer_operator.go +++ b/pkg/lib/queueinformer/queueinformer_operator.go @@ -9,6 +9,7 @@ import ( "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" @@ -261,6 +262,13 @@ func (o *operator) worker(ctx context.Context, loop *QueueInformer) { } } +func keyForNamespacedName(item types.NamespacedName) string { + if item.Namespace == "" { + return item.Name + } + return item.String() +} + func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) bool { queue := loop.queue item, quit := queue.Get() @@ -273,48 +281,28 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer) logger := o.logger.WithField("item", item) logger.WithField("queue-length", queue.Len()).Trace("popped queue") - event, ok := item.(kubestate.ResourceEvent) - if !ok || event.Type() != kubestate.ResourceDeleted { - // Get the key - key, keyable := loop.key(item) - if !keyable { - logger.WithField("item", item).Warn("could not form key") - queue.Forget(item) - return true - } - - logger = logger.WithField("cache-key", key) - - var resource interface{} - if loop.indexer == nil { - resource = event.Resource() - } else { - // Get the current cached version of the resource - var exists bool - var err error - resource, exists, err = loop.indexer.GetByKey(key) - if err != nil { - logger.WithError(err).Error("cache get failed") - queue.Forget(item) - return true - } - if !exists { - logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") - queue.Forget(item) - return true - } - } + key := keyForNamespacedName(item) + logger = logger.WithField("cache-key", key) - if !ok { - event = kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) - } else { - event = kubestate.NewResourceEvent(event.Type(), resource) - } + // Get the current cached version of the resource + var exists bool + var err error + resource, exists, err := loop.indexer.GetByKey(key) + if err != nil { + logger.WithError(err).Error("cache get failed") + queue.Forget(item) + return true + } + if !exists { + logger.WithField("existing-cache-keys", loop.indexer.ListKeys()).Debug("cache get failed, key not in cache") + queue.Forget(item) + return true } + event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource) // Sync and requeue on error (throw out failed deletion syncs) - err := loop.Sync(ctx, event) - if requeues := queue.NumRequeues(item); err != nil && requeues < 8 && event.Type() != kubestate.ResourceDeleted { + err = loop.Sync(ctx, event) + if requeues := queue.NumRequeues(item); err != nil && requeues < 8 { logger.WithField("requeues", requeues).Trace("requeuing with rate limiting") utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item))) queue.AddRateLimited(item) diff --git a/pkg/lib/queueinformer/resourcequeue.go b/pkg/lib/queueinformer/resourcequeue.go index 0e4da56cde..957d6483ae 100644 --- a/pkg/lib/queueinformer/resourcequeue.go +++ b/pkg/lib/queueinformer/resourcequeue.go @@ -2,72 +2,56 @@ package queueinformer import ( "fmt" - "strings" + "k8s.io/apimachinery/pkg/types" "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/util/workqueue" - - "github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate" ) // ResourceQueueSet is a set of workqueues that is assumed to be keyed by namespace type ResourceQueueSet struct { - queueSet map[string]workqueue.RateLimitingInterface + queueSet map[string]workqueue.TypedRateLimitingInterface[types.NamespacedName] mutex sync.RWMutex } // NewResourceQueueSet returns a new queue set with the given queue map -func NewResourceQueueSet(queueSet map[string]workqueue.RateLimitingInterface) *ResourceQueueSet { +func NewResourceQueueSet(queueSet map[string]workqueue.TypedRateLimitingInterface[types.NamespacedName]) *ResourceQueueSet { return &ResourceQueueSet{queueSet: queueSet} } // NewEmptyResourceQueueSet returns a new queue set with an empty but initialized queue map func NewEmptyResourceQueueSet() *ResourceQueueSet { - return &ResourceQueueSet{queueSet: make(map[string]workqueue.RateLimitingInterface)} + return &ResourceQueueSet{queueSet: make(map[string]workqueue.TypedRateLimitingInterface[types.NamespacedName])} } // Set sets the queue at the given key -func (r *ResourceQueueSet) Set(key string, queue workqueue.RateLimitingInterface) { +func (r *ResourceQueueSet) Set(key string, queue workqueue.TypedRateLimitingInterface[types.NamespacedName]) { r.mutex.Lock() defer r.mutex.Unlock() r.queueSet[key] = queue } -func (r *ResourceQueueSet) RequeueEvent(namespace string, resourceEvent kubestate.ResourceEvent) error { - r.mutex.RLock() - defer r.mutex.RUnlock() - - if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { - queue.AddRateLimited(resourceEvent) - return nil - } - - if queue, ok := r.queueSet[namespace]; ok { - queue.AddRateLimited(resourceEvent) - return nil - } - - return fmt.Errorf("couldn't find queue '%v' for event: %v", namespace, resourceEvent) -} - // Requeue requeues the resource in the set with the given name and namespace func (r *ResourceQueueSet) Requeue(namespace, name string) error { r.mutex.RLock() defer r.mutex.RUnlock() // We can build the key directly, will need to change if queue uses different key scheme - key := fmt.Sprintf("%s/%s", namespace, name) - event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, key) + key := types.NamespacedName{Namespace: namespace, Name: name} if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { - queue.Add(event) + queue.Add(key) return nil } + if namespace == "" { + return fmt.Errorf("non-namespaced key %s cannot be used with namespaced queues", key) + } + if queue, ok := r.queueSet[namespace]; ok { - queue.Add(event) + queue.Add(key) return nil } @@ -81,8 +65,7 @@ func (r *ResourceQueueSet) RequeueAfter(namespace, name string, duration time.Du defer r.mutex.RUnlock() // We can build the key directly, will need to change if queue uses different key scheme - key := fmt.Sprintf("%s/%s", namespace, name) - event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, key) + event := types.NamespacedName{Namespace: namespace, Name: name} if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { queue.AddAfter(event, duration) @@ -96,27 +79,3 @@ func (r *ResourceQueueSet) RequeueAfter(namespace, name string, duration time.Du return fmt.Errorf("couldn't find queue for resource") } - -// RequeueByKey adds the given key to the resource queue that should contain it -func (r *ResourceQueueSet) RequeueByKey(key string) error { - r.mutex.RLock() - defer r.mutex.RUnlock() - - event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, key) - if queue, ok := r.queueSet[metav1.NamespaceAll]; len(r.queueSet) == 1 && ok { - queue.Add(event) - return nil - } - - parts := strings.Split(key, "/") - if len(parts) != 2 { - return fmt.Errorf("non-namespaced key %s cannot be used with namespaced queues", key) - } - - if queue, ok := r.queueSet[parts[0]]; ok { - queue.Add(event) - return nil - } - - return fmt.Errorf("couldn't find queue for resource") -} diff --git a/pkg/package-server/provider/registry.go b/pkg/package-server/provider/registry.go index 2163db9435..1dad37c048 100644 --- a/pkg/package-server/provider/registry.go +++ b/pkg/package-server/provider/registry.go @@ -146,7 +146,8 @@ func NewRegistryProvider(ctx context.Context, crClient versioned.Interface, oper catsrcQueueInformer, err := queueinformer.NewQueueInformer( ctx, queueinformer.WithInformer(catsrcInformer.Informer()), - queueinformer.WithSyncer(queueinformer.LegacySyncHandler(p.syncCatalogSource).ToSyncerWithDelete(p.catalogSourceDeleted)), + queueinformer.WithSyncer(queueinformer.LegacySyncHandler(p.syncCatalogSource).ToSyncer()), + queueinformer.WithDeletionHandler(p.catalogSourceDeleted), ) if err != nil { return nil, err diff --git a/pkg/package-server/server/server.go b/pkg/package-server/server/server.go index 85ad4931dd..81ea882c78 100644 --- a/pkg/package-server/server/server.go +++ b/pkg/package-server/server/server.go @@ -11,6 +11,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" genericfeatures "k8s.io/apiserver/pkg/features" genericserver "k8s.io/apiserver/pkg/server" @@ -35,7 +36,7 @@ const DefaultWakeupInterval = 12 * time.Hour type Operator struct { queueinformer.Operator - olmConfigQueue workqueue.TypedRateLimitingInterface[any] + olmConfigQueue workqueue.TypedRateLimitingInterface[types.NamespacedName] options *PackageServerOptions } @@ -239,9 +240,9 @@ func (o *PackageServerOptions) Run(ctx context.Context) error { op := &Operator{ Operator: queueOperator, - olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any]( - workqueue.DefaultTypedControllerRateLimiter[any](), - workqueue.TypedRateLimitingQueueConfig[any]{ + olmConfigQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName]( + workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), + workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{ Name: "olmConfig", }), options: o,