Skip to content

Commit

Permalink
switch queue informer to use types.NamespacedName; relocate deletion …
Browse files Browse the repository at this point in the history
…handler

Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Jan 9, 2025
1 parent 1cfabfe commit 18cc8e9
Show file tree
Hide file tree
Showing 18 changed files with 226 additions and 462 deletions.
48 changes: 26 additions & 22 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -114,7 +115,7 @@ type Operator struct {
subQueueSet *queueinformer.ResourceQueueSet
ipQueueSet *queueinformer.ResourceQueueSet
ogQueueSet *queueinformer.ResourceQueueSet
nsResolveQueue workqueue.TypedRateLimitingInterface[any]
nsResolveQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]
namespace string
recorder record.EventRecorder
sources *grpc.SourceStore
Expand Down Expand Up @@ -268,8 +269,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire InstallPlans
ipInformer := crInformerFactory.Operators().V1alpha1().InstallPlans()
op.lister.OperatorsV1alpha1().RegisterInstallPlanLister(metav1.NamespaceAll, ipInformer.Lister())
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ipQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "ips",
})
op.ipQueueSet.Set(metav1.NamespaceAll, ipQueue)
Expand All @@ -290,8 +291,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo

operatorGroupInformer := crInformerFactory.Operators().V1().OperatorGroups()
op.lister.OperatorsV1().RegisterOperatorGroupLister(metav1.NamespaceAll, operatorGroupInformer.Lister())
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
ogQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "ogs",
})
op.ogQueueSet.Set(metav1.NamespaceAll, ogQueue)
Expand All @@ -312,8 +313,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catsrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "catsrcs",
})
op.catsrcQueueSet.Set(metav1.NamespaceAll, catsrcQueue)
Expand All @@ -323,7 +324,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
queueinformer.WithLogger(op.logger),
queueinformer.WithQueue(catsrcQueue),
queueinformer.WithInformer(catsrcInformer.Informer()),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncerWithDelete(op.handleCatSrcDeletion)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncCatalogSources).ToSyncer()),
queueinformer.WithDeletionHandler(op.handleCatSrcDeletion),
)
if err != nil {
return nil, err
Expand All @@ -341,8 +343,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subIndexer := subInformer.Informer().GetIndexer()
op.catalogSubscriberIndexer[metav1.NamespaceAll] = subIndexer

subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
subQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "subs",
})
op.subQueueSet.Set(metav1.NamespaceAll, subQueue)
Expand All @@ -355,7 +357,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
subscription.WithCatalogInformer(catsrcInformer.Informer()),
subscription.WithInstallPlanInformer(ipInformer.Informer()),
subscription.WithSubscriptionQueue(subQueue),
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions, nil)),
subscription.WithAppendedReconcilers(subscription.ReconcilerFromLegacySyncHandler(op.syncSubscriptions)),
subscription.WithRegistryReconcilerFactory(op.reconciler),
subscription.WithGlobalCatalogNamespace(op.namespace),
subscription.WithSourceProvider(op.resolverSourceProvider),
Expand Down Expand Up @@ -415,7 +417,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String(), "index": idx})
logger.Info("registering labeller")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -560,7 +562,7 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
logger := op.logger.WithFields(logrus.Fields{"gvr": gvr.String()})
logger.Info("registering owner reference fixer")

queue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{
queue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](), workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: gvr.String(),
})
queueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -670,13 +672,14 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// Generate and register QueueInformers for k8s resources
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)
k8sSyncer := queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()
for _, informer := range sharedIndexInformers {
queueInformer, err := queueinformer.NewQueueInformer(
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(informer),
queueinformer.WithSyncer(k8sSyncer),
queueinformer.WithDeletionHandler(op.handleDeletion),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -724,7 +727,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
ctx,
queueinformer.WithLogger(op.logger),
queueinformer.WithInformer(crdInformer),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncerWithDelete(op.handleDeletion)),
queueinformer.WithSyncer(queueinformer.LegacySyncHandler(op.syncObject).ToSyncer()),
queueinformer.WithDeletionHandler(op.handleDeletion),
)
if err != nil {
return nil, err
Expand All @@ -745,8 +749,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
// Namespace sync for resolving subscriptions
namespaceInformer := informers.NewSharedInformerFactory(op.opClient.KubernetesInterface(), resyncPeriod()).Core().V1().Namespaces()
op.lister.CoreV1().RegisterNamespaceLister(namespaceInformer.Lister())
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
op.nsResolveQueue = workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "resolve",
})
namespaceQueueInformer, err := queueinformer.NewQueueInformer(
Expand Down Expand Up @@ -787,12 +791,12 @@ func (o *Operator) syncSourceState(state grpc.SourceState) {

if err == nil {
for ns := range namespaces {
o.nsResolveQueue.Add(ns)
o.nsResolveQueue.Add(types.NamespacedName{Name: ns})
}
}
}

o.nsResolveQueue.Add(state.Key.Namespace)
o.nsResolveQueue.Add(types.NamespacedName{Name: state.Key.Namespace})
}
if err := o.catsrcQueueSet.Requeue(state.Key.Namespace, state.Key.Name); err != nil {
o.logger.WithError(err).Info("couldn't requeue catalogsource from catalog status change")
Expand Down Expand Up @@ -1411,7 +1415,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error {
}

logger.Info("unpacking is not complete yet, requeueing")
o.nsResolveQueue.AddAfter(namespace, 5*time.Second)
o.nsResolveQueue.AddAfter(types.NamespacedName{Name: namespace}, 5*time.Second)
return nil
}
}
Expand Down Expand Up @@ -1506,7 +1510,7 @@ func (o *Operator) syncSubscriptions(obj interface{}) error {
return fmt.Errorf("casting Subscription failed")
}

o.nsResolveQueue.Add(sub.GetNamespace())
o.nsResolveQueue.Add(types.NamespacedName{Name: sub.GetNamespace()})

return nil
}
Expand All @@ -1520,7 +1524,7 @@ func (o *Operator) syncOperatorGroups(obj interface{}) error {
return fmt.Errorf("casting OperatorGroup failed")
}

o.nsResolveQueue.Add(og.GetNamespace())
o.nsResolveQueue.Add(types.NamespacedName{Name: og.GetNamespace()})

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/operators/catalog/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2156,13 +2156,13 @@ func NewFakeOperator(ctx context.Context, namespace string, namespaces []string,
client: clientFake,
lister: lister,
namespace: namespace,
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](
workqueue.NewTypedMaxOfRateLimiter[any](
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Second, 1000*time.Second),
nsResolveQueue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](
workqueue.NewTypedMaxOfRateLimiter[types.NamespacedName](
workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](1*time.Second, 1000*time.Second),
// 1 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
&workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(1), 100)},
),
workqueue.TypedRateLimitingQueueConfig[any]{
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "resolver",
}),
resolver: config.resolver,
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/operators/catalog/subscription/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
utilclock "k8s.io/utils/clock"
Expand All @@ -23,7 +24,7 @@ type syncerConfig struct {
subscriptionInformer cache.SharedIndexInformer
catalogInformer cache.SharedIndexInformer
installPlanInformer cache.SharedIndexInformer
subscriptionQueue workqueue.TypedRateLimitingInterface[any]
subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]
reconcilers kubestate.ReconcilerChain
registryReconcilerFactory reconciler.RegistryReconcilerFactory
globalCatalogNamespace string
Expand Down Expand Up @@ -97,7 +98,7 @@ func WithOperatorLister(lister operatorlister.OperatorLister) SyncerOption {
}

// WithSubscriptionQueue sets a syncer's subscription queue.
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[any]) SyncerOption {
func WithSubscriptionQueue(subscriptionQueue workqueue.TypedRateLimitingInterface[types.NamespacedName]) SyncerOption {
return func(config *syncerConfig) {
config.subscriptionQueue = subscriptionQueue
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/controller/operators/catalog/subscription/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,14 @@ import (

// ReconcilerFromLegacySyncHandler returns a reconciler that invokes the given legacy sync handler and on delete funcs.
// Since the reconciler does not return an updated kubestate, it MUST be the last reconciler in a given chain.
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler, onDelete func(obj interface{})) kubestate.Reconciler {
func ReconcilerFromLegacySyncHandler(sync queueinformer.LegacySyncHandler) kubestate.Reconciler {
var rec kubestate.ReconcilerFunc = func(ctx context.Context, in kubestate.State) (out kubestate.State, err error) {
out = in
switch s := in.(type) {
case SubscriptionExistsState:
if sync != nil {
err = sync(s.Subscription())
}
case SubscriptionDeletedState:
if onDelete != nil {
onDelete(s.Subscription())
}
case SubscriptionState:
if sync != nil {
err = sync(s.Subscription())
Expand Down
20 changes: 0 additions & 20 deletions pkg/controller/operators/catalog/subscription/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type SubscriptionState interface {
Subscription() *v1alpha1.Subscription
Add() SubscriptionExistsState
Update() SubscriptionExistsState
Delete() SubscriptionDeletedState
}

// SubscriptionExistsState describes subscription states in which the subscription exists on the cluster.
Expand All @@ -49,13 +48,6 @@ type SubscriptionUpdatedState interface {
isSubscriptionUpdatedState()
}

// SubscriptionDeletedState describes subscription states in which the subscription no longer exists and was deleted from the cluster.
type SubscriptionDeletedState interface {
SubscriptionState

isSubscriptionDeletedState()
}

// CatalogHealthState describes subscription states that represent a subscription with respect to catalog health.
type CatalogHealthState interface {
SubscriptionExistsState
Expand Down Expand Up @@ -176,12 +168,6 @@ func (s *subscriptionState) Update() SubscriptionExistsState {
}
}

func (s *subscriptionState) Delete() SubscriptionDeletedState {
return &subscriptionDeletedState{
SubscriptionState: s,
}
}

func NewSubscriptionState(sub *v1alpha1.Subscription) SubscriptionState {
return &subscriptionState{
State: kubestate.NewState(),
Expand All @@ -207,12 +193,6 @@ type subscriptionUpdatedState struct {

func (c *subscriptionUpdatedState) isSubscriptionUpdatedState() {}

type subscriptionDeletedState struct {
SubscriptionState
}

func (c *subscriptionDeletedState) isSubscriptionDeletedState() {}

type catalogHealthState struct {
SubscriptionExistsState
}
Expand Down
32 changes: 20 additions & 12 deletions pkg/controller/operators/catalog/subscription/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
utilclock "k8s.io/utils/clock"

Expand Down Expand Up @@ -71,9 +72,6 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE
case kubestate.ResourceUpdated:
initial = initial.Update()
metrics.UpdateSubsSyncCounterStorage(res)
case kubestate.ResourceDeleted:
initial = initial.Delete()
metrics.DeleteSubsMetric(res)
}

reconciled, err := s.reconcilers.Reconcile(ctx, initial)
Expand All @@ -89,18 +87,28 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE
return nil
}

func (s *subscriptionSyncer) Notify(event kubestate.ResourceEvent) {
func (s *subscriptionSyncer) Notify(event types.NamespacedName) {
s.notify(event)
}

// catalogSubscriptionKeys returns the set of explicit subscription keys, cluster-wide, that are possibly affected by catalogs in the given namespace.
func (s *subscriptionSyncer) catalogSubscriptionKeys(namespace string) ([]string, error) {
var keys []string
func (s *subscriptionSyncer) catalogSubscriptionKeys(namespace string) ([]types.NamespacedName, error) {
var cacheKeys []string
var err error
if namespace == s.globalCatalogNamespace {
keys = s.subscriptionCache.ListKeys()
cacheKeys = s.subscriptionCache.ListKeys()
} else {
keys, err = s.subscriptionCache.IndexKeys(cache.NamespaceIndex, namespace)
cacheKeys, err = s.subscriptionCache.IndexKeys(cache.NamespaceIndex, namespace)
}

keys := make([]types.NamespacedName, len(cacheKeys))
for _, k := range cacheKeys {
ns, name, err := cache.SplitMetaNamespaceKey(k)
if err != nil {
s.logger.Warnf("could not split meta key %q", k)
continue
}
keys = append(keys, types.NamespacedName{Namespace: ns, Name: name})
}

return keys, err
Expand Down Expand Up @@ -132,7 +140,7 @@ func (s *subscriptionSyncer) notifyOnCatalog(ctx context.Context, obj interface{
logger.Trace("notifing dependent subscriptions")
for _, subKey := range dependentKeys {
logger.Tracef("notifying subscription %s", subKey)
s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, subKey))
s.Notify(subKey)
}
logger.Trace("dependent subscriptions notified")
}
Expand Down Expand Up @@ -177,9 +185,9 @@ func (s *subscriptionSyncer) notifyOnInstallPlan(ctx context.Context, obj interf
// Notify dependent owner Subscriptions
owners := ownerutil.GetOwnersByKind(plan, v1alpha1.SubscriptionKind)
for _, owner := range owners {
subKey := fmt.Sprintf("%s/%s", plan.GetNamespace(), owner.Name)
subKey := types.NamespacedName{Namespace: plan.GetNamespace(), Name: owner.Name}
logger.Tracef("notifying subscription %s", subKey)
s.Notify(kubestate.NewResourceEvent(kubestate.ResourceUpdated, cache.ExplicitKey(subKey)))
s.Notify(subKey)
}
}

Expand Down Expand Up @@ -217,7 +225,7 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
subscriptionCache: config.subscriptionInformer.GetIndexer(),
installPlanLister: config.lister.OperatorsV1alpha1().InstallPlanLister(),
sourceProvider: config.sourceProvider,
notify: func(event kubestate.ResourceEvent) {
notify: func(event types.NamespacedName) {
// Notify Subscriptions by enqueuing to the Subscription queue.
config.subscriptionQueue.Add(event)
},
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/operators/catalogtemplate/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -101,8 +102,8 @@ func NewOperator(ctx context.Context, kubeconfigPath string, logger *logrus.Logg
// Wire CatalogSources
catsrcInformer := crInformerFactory.Operators().V1alpha1().CatalogSources()
op.lister.OperatorsV1alpha1().RegisterCatalogSourceLister(metav1.NamespaceAll, catsrcInformer.Lister())
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{
catalogTemplateSrcQueue := workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Name: "catalogSourceTemplate",
})
op.catalogSourceTemplateQueueSet.Set(metav1.NamespaceAll, catalogTemplateSrcQueue)
Expand Down
Loading

0 comments on commit 18cc8e9

Please sign in to comment.