Skip to content

Commit

Permalink
remove ResourceEvent
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 9, 2025
1 parent 9e61c41 commit 7e2546f
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 68 deletions.
15 changes: 5 additions & 10 deletions pkg/controller/operators/catalog/subscription/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -47,9 +49,9 @@ func (s *subscriptionSyncer) now() *metav1.Time {

// Sync reconciles Subscription events by invoking a sequence of reconcilers, passing the result of each
// successful reconciliation as an argument to its successor.
func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceEvent) error {
func (s *subscriptionSyncer) Sync(ctx context.Context, obj client.Object) error {
res := &v1alpha1.Subscription{}
if err := scheme.Convert(event.Resource(), res, nil); err != nil {
if err := scheme.Convert(obj, res, nil); err != nil {
return err
}

Expand All @@ -58,20 +60,13 @@ func (s *subscriptionSyncer) Sync(ctx context.Context, event kubestate.ResourceE
logger := s.logger.WithFields(logrus.Fields{
"reconciling": fmt.Sprintf("%T", res),
"selflink": res.GetSelfLink(),
"event": event.Type(),
})
logger.Info("syncing")

// Enter initial state based on subscription and event type
// TODO: Consider generalizing initial generic add, update, delete transitions in the kubestate package.
// Possibly make a resource event aware bridge between Sync and reconciler.
initial := NewSubscriptionState(res.DeepCopy())
switch event.Type() {
case kubestate.ResourceAdded:
initial = initial.Add()
case kubestate.ResourceUpdated:
initial = initial.Update()
}
initial := NewSubscriptionState(res.DeepCopy()).Update()

reconciled, err := s.reconcilers.Reconcile(ctx, initial)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/operators/catalog/subscription/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"testing"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

Expand All @@ -16,7 +18,7 @@ func TestSync(t *testing.T) {
syncer kubestate.Syncer
}
type args struct {
event kubestate.ResourceEvent
obj client.Object
}
type want struct {
err error
Expand All @@ -36,10 +38,7 @@ func TestSync(t *testing.T) {
},
},
args: args{
event: kubestate.NewResourceEvent(
kubestate.ResourceAdded,
&v1alpha1.Subscription{},
),
obj: &v1alpha1.Subscription{},
},
want: want{
err: nil,
Expand All @@ -52,7 +51,7 @@ func TestSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

require.Equal(t, tt.want.err, tt.fields.syncer.Sync(ctx, tt.args.event))
require.Equal(t, tt.want.err, tt.fields.syncer.Sync(ctx, tt.args.obj))
})
}
}
44 changes: 5 additions & 39 deletions pkg/lib/kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubestate
import (
"context"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type State interface {
Expand Down Expand Up @@ -134,56 +135,21 @@ func (r ReconcilerChain) Reconcile(ctx context.Context, in State) (out State, er
return
}

// ResourceEventType tells an operator what kind of event has occurred on a given resource.
type ResourceEventType string

const (
// ResourceAdded tells the operator that a given resources has been added.
ResourceAdded ResourceEventType = "add"
// ResourceUpdated tells the operator that a given resources has been updated.
ResourceUpdated ResourceEventType = "update"
)

type ResourceEvent interface {
Type() ResourceEventType
Resource() interface{}
}

type resourceEvent struct {
eventType ResourceEventType
resource interface{}
}

func (r resourceEvent) Type() ResourceEventType {
return r.eventType
}

func (r resourceEvent) Resource() interface{} {
return r.resource
}

func NewResourceEvent(eventType ResourceEventType, resource interface{}) ResourceEvent {
return resourceEvent{
eventType: eventType,
resource: resource,
}
}

type Notifier interface {
Notify(event types.NamespacedName)
}

type NotifyFunc func(event types.NamespacedName)

// SyncFunc syncs resource events.
type SyncFunc func(ctx context.Context, event ResourceEvent) error
type SyncFunc func(ctx context.Context, obj client.Object) error

// Sync lets a sync func implement Syncer.
func (s SyncFunc) Sync(ctx context.Context, event ResourceEvent) error {
return s(ctx, event)
func (s SyncFunc) Sync(ctx context.Context, obj client.Object) error {
return s(ctx, obj)
}

// Syncer describes something that syncs resource events.
type Syncer interface {
Sync(ctx context.Context, event ResourceEvent) error
Sync(ctx context.Context, obj client.Object) error
}
15 changes: 5 additions & 10 deletions pkg/lib/queueinformer/queueinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -30,8 +30,8 @@ type QueueInformer struct {
}

// Sync invokes all registered sync handlers in the QueueInformer's chain
func (q *QueueInformer) Sync(ctx context.Context, event kubestate.ResourceEvent) error {
return q.syncer.Sync(ctx, event)
func (q *QueueInformer) Sync(ctx context.Context, obj client.Object) error {
return q.syncer.Sync(ctx, obj)
}

// Enqueue adds a key to the queue. If obj is a key already it gets added directly.
Expand Down Expand Up @@ -131,12 +131,7 @@ type LegacySyncHandler func(obj interface{}) error

// ToSyncer returns the Syncer equivalent of the sync handler.
func (l LegacySyncHandler) ToSyncer() kubestate.Syncer {
return kubestate.SyncFunc(func(ctx context.Context, event kubestate.ResourceEvent) error {
switch event.Type() {
case kubestate.ResourceAdded, kubestate.ResourceUpdated:
return l(event.Resource())
default:
return errors.Errorf("unexpected resource event type: %s", event.Type())
}
return kubestate.SyncFunc(func(ctx context.Context, obj client.Object) error {
return l(obj)
})
}
11 changes: 8 additions & 3 deletions pkg/lib/queueinformer/queueinformer_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package queueinformer
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"

"github.com/operator-framework/operator-lifecycle-manager/pkg/lib/kubestate"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -298,10 +298,15 @@ func (o *operator) processNextWorkItem(ctx context.Context, loop *QueueInformer)
queue.Forget(item)
return true
}
event := kubestate.NewResourceEvent(kubestate.ResourceUpdated, resource)
obj, ok := resource.(client.Object)
if !ok {
logger.Warn("cached object is not a kubernetes resource (client.Object)")
queue.Forget(item)
return true
}

// Sync and requeue on error
err = loop.Sync(ctx, event)
err = loop.Sync(ctx, obj)
if requeues := queue.NumRequeues(item); err != nil && requeues < 8 {
logger.WithField("requeues", requeues).Trace("requeuing with rate limiting")
utilruntime.HandleError(errors.Wrap(err, fmt.Sprintf("sync %q failed", item)))
Expand Down

0 comments on commit 7e2546f

Please sign in to comment.