Skip to content

Commit

Permalink
fix onDelete guard, subscription metrics, and some small things
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 9, 2025
1 parent 18cc8e9 commit a32b81b
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 16 deletions.
18 changes: 8 additions & 10 deletions pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -877,18 +877,16 @@ func (o *Operator) handleDeletion(obj interface{}) {
func (o *Operator) handleCatSrcDeletion(obj interface{}) {
catsrc, ok := obj.(metav1.Object)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj))
return
}

catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
catsrc, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Namespace %#v", obj))
return
}
}
sourceKey := registry.CatalogKey{Name: catsrc.GetName(), Namespace: catsrc.GetNamespace()}
Expand Down
18 changes: 17 additions & 1 deletion pkg/controller/operators/catalog/subscription/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE
initial = initial.Add()
case kubestate.ResourceUpdated:
initial = initial.Update()
metrics.UpdateSubsSyncCounterStorage(res)
}

reconciled, err := s.reconcilers.Reconcile(ctx, initial)
Expand Down Expand Up @@ -231,6 +230,23 @@ func newSyncerWithConfig(ctx context.Context, config *syncerConfig) (kubestate.S
},
}

// Add metrics handler to subscription informer
// NOTE: This is different from how metrics are handled for other resources (install plan, catalog source, etc.)
// which use metrics provider and through the QueueInformer
config.subscriptionInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
UpdateFunc: func(oldObj, newObj interface{}) {
if sub, ok := newObj.(*v1alpha1.Subscription); ok {
metrics.UpdateSubsSyncCounterStorage(sub)
}
},
DeleteFunc: func(obj interface{}) {
if sub, ok := obj.(*v1alpha1.Subscription); ok {
metrics.DeleteSubsMetric(sub)
}
},
})

// Build a reconciler chain from the default and configured reconcilers
// Default reconcilers should always come first in the chain
defaultReconcilers := kubestate.ReconcilerChain{
Expand Down
4 changes: 1 addition & 3 deletions pkg/lib/queueinformer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (c *queueInformerConfig) complete() {
// Extract indexer from informer if
c.indexer = c.informer.GetIndexer()
}
if c.onDelete != nil {
c.onDelete = func(obj interface{}) {}
}
}

// validate returns an error if the config isn't valid.
Expand All @@ -67,6 +64,7 @@ func (c *queueInformerConfig) validateQueueInformer() (err error) {
func defaultConfig() *queueInformerConfig {
return &queueInformerConfig{
provider: metrics.NewMetricsNil(),
onDelete: func(obj interface{}) {},
queue: workqueue.NewTypedRateLimitingQueueWithConfig[types.NamespacedName](
workqueue.DefaultTypedControllerRateLimiter[types.NamespacedName](),
workqueue.TypedRateLimitingQueueConfig[types.NamespacedName]{
Expand Down
1 change: 0 additions & 1 deletion pkg/lib/queueinformer/queueinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func (q *QueueInformer) Sync(ctx context.Context, event kubestate.ResourceEvent)

// Enqueue adds a key to the queue. If obj is a key already it gets added directly.
func (q *QueueInformer) Enqueue(item types.NamespacedName) {
// Create new resource event and add to queue
q.logger.WithField("item", item).Trace("enqueuing item")
q.queue.Add(item)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
}
event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource)

// Sync and requeue on error (throw out failed deletion syncs)
// Sync and requeue on error
err = loop.Sync(ctx, event)
if requeues := queue.NumRequeues(item); err != nil && requeues < 8 {
logger.WithField("requeues", requeues).Trace("requeuing with rate limiting")
Expand Down

0 comments on commit a32b81b

Please sign in to comment.