Skip to content

Commit

Permalink
Check for resource staleness, requeue if stale
Browse files Browse the repository at this point in the history
Move recording of generation to end of reconcile
  • Loading branch information
djwhatle committed Jun 8, 2021
1 parent e84d49e commit 57806fb
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 37 deletions.
79 changes: 79 additions & 0 deletions pkg/cache/stale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2021 Red Hat Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"

"k8s.io/apimachinery/pkg/types"
)

// Functions in this file solve the problem of duplicate reconciles when the cached
// client hasn't received the latest resource and feeds a stale resource to controller.
// We will reject reconciling a resource if its generation is older than one we've
// successfully reconciled.

// UIDToGenerationMap keeps track of highest reconciled generation of a particular resource UID
type UIDToGenerationMap struct {
mutex sync.RWMutex
resourceUIDtoGeneration map[types.UID]int64
}

// CreateUIDToGenerationMap creates a new UID => generation map
func CreateUIDToGenerationMap() *UIDToGenerationMap {
uidGenMap := UIDToGenerationMap{}
uidGenMap.resourceUIDtoGeneration = make(map[types.UID]int64)
return &uidGenMap
}

// getGenerationForUID returns the latest successfully reconciled resource generation
// returns -1 if not found.
func (u *UIDToGenerationMap) getGenerationForUID(resourceUID types.UID) int64 {
u.mutex.Lock()
defer u.mutex.Unlock()
generation, ok := u.resourceUIDtoGeneration[resourceUID]
if !ok {
return -1
}
return generation
}

// setGenerationForUID returns the latest successfully reconciled resource generation
func (u *UIDToGenerationMap) setGenerationForUID(resourceUID types.UID, generation int64) {
u.mutex.Lock()
defer u.mutex.Unlock()
u.resourceUIDtoGeneration[resourceUID] = generation
}

// IsCacheStale checks if the cached client is providing a resource we've successfully reconciled.
func (u *UIDToGenerationMap) IsCacheStale(resourceUID types.UID, generation int64) bool {
reconciledGeneration := u.getGenerationForUID(resourceUID)
// Resource should be reconciled if we've never seen it
if generation == -1 {
return false
}
// Resource is stale if we've reconciled a newer generation
if generation < reconciledGeneration {
return true
}
return false
}

// RecordReconciledGeneration records that a resource UID generation was pushed back to the APIserver
func (u *UIDToGenerationMap) RecordReconciledGeneration(resourceUID types.UID, generation int64) {
u.setGenerationForUID(resourceUID, generation)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
migref "github.com/konveyor/mig-controller/pkg/reference"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -46,9 +47,10 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectImageMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("directimagemigration_controller"),
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("directimagemigration_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

Expand Down Expand Up @@ -95,8 +97,9 @@ var _ reconcile.Reconciler = &ReconcileDirectImageMigration{}
type ReconcileDirectImageMigration struct {
client.Client
record.EventRecorder
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectImageMigration object and makes changes based on the state read
Expand All @@ -121,6 +124,11 @@ func (r *ReconcileDirectImageMigration) Reconcile(ctx context.Context, request r
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(imageMigration.UID, imageMigration.Generation) {
return reconcile.Result{Requeue: true}, nil
}

// Set MigMigration name key on logger
migration, err := imageMigration.GetMigrationForDIM(r)
if migration != nil {
Expand Down Expand Up @@ -177,6 +185,9 @@ func (r *ReconcileDirectImageMigration) Reconcile(ctx context.Context, request r
return reconcile.Result{Requeue: true}, nil
}

// Record reconciled generation
r.uidGenerationMap.RecordReconciledGeneration(imageMigration.UID, imageMigration.Generation)

// Requeue
if requeueAfter > 0 {
return reconcile.Result{RequeueAfter: requeueAfter}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
migref "github.com/konveyor/mig-controller/pkg/reference"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -46,9 +47,10 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectImageStreamMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("directimagestreammigration_controller"),
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("directimagestreammigration_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

Expand Down Expand Up @@ -86,8 +88,9 @@ var _ reconcile.Reconciler = &ReconcileDirectImageStreamMigration{}
type ReconcileDirectImageStreamMigration struct {
client.Client
record.EventRecorder
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectImageStreamMigration object and makes changes based on the state read
Expand All @@ -112,6 +115,11 @@ func (r *ReconcileDirectImageStreamMigration) Reconcile(ctx context.Context, req
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(imageStreamMigration.UID, imageStreamMigration.Generation) {
return reconcile.Result{Requeue: true}, nil
}

// Set MigMigration name key on logger
migration, err := imageStreamMigration.GetMigrationForDISM(r)
if migration != nil {
Expand Down Expand Up @@ -168,6 +176,10 @@ func (r *ReconcileDirectImageStreamMigration) Reconcile(ctx context.Context, req
return reconcile.Result{Requeue: true}, nil
}

// Record reconciled generation
r.uidGenerationMap.RecordReconciledGeneration(
imageStreamMigration.UID, imageStreamMigration.Generation)

// Requeue
if requeueAfter > 0 {
return reconcile.Result{RequeueAfter: requeueAfter}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -43,7 +44,11 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectVolumeMigration{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileDirectVolumeMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -79,8 +84,9 @@ var _ reconcile.Reconciler = &ReconcileDirectVolumeMigration{}
// ReconcileDirectVolumeMigration reconciles a DirectVolumeMigration object
type ReconcileDirectVolumeMigration struct {
client.Client
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectVolumeMigration object and makes changes based on the state read
Expand Down Expand Up @@ -109,6 +115,11 @@ func (r *ReconcileDirectVolumeMigration) Reconcile(ctx context.Context, request
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(direct.UID, direct.Generation) {
return reconcile.Result{Requeue: true}, nil
}

// Set MigMigration name key on logger
migration, err := direct.GetMigrationForDVM(r)
if migration != nil {
Expand Down Expand Up @@ -165,6 +176,9 @@ func (r *ReconcileDirectVolumeMigration) Reconcile(ctx context.Context, request
return reconcile.Result{Requeue: true}, nil
}

// Record reconciled generation
r.uidGenerationMap.RecordReconciledGeneration(direct.UID, direct.Generation)

// Requeue
if requeueAfter > 0 {
return reconcile.Result{RequeueAfter: requeueAfter}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"github.com/konveyor/mig-controller/pkg/cache"
"github.com/konveyor/mig-controller/pkg/compat"
"github.com/konveyor/mig-controller/pkg/errorutil"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -72,7 +73,11 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectVolumeMigrationProgress{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileDirectVolumeMigrationProgress{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -97,8 +102,9 @@ var _ reconcile.Reconciler = &ReconcileDirectVolumeMigrationProgress{}
// ReconcileDirectVolumeMigrationProgress reconciles a DirectVolumeMigrationProgress object
type ReconcileDirectVolumeMigrationProgress struct {
client.Client
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectVolumeMigrationProgress object and makes changes based on the state read
Expand All @@ -120,6 +126,11 @@ func (r *ReconcileDirectVolumeMigrationProgress) Reconcile(ctx context.Context,
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(pvProgress.UID, pvProgress.Generation) {
return reconcile.Result{Requeue: true}, nil
}

// Set MigMigration name key on logger
migration, err := pvProgress.GetMigrationforDVMP(r)
if migration != nil {
Expand Down Expand Up @@ -181,7 +192,10 @@ func (r *ReconcileDirectVolumeMigrationProgress) Reconcile(ctx context.Context,
return reconcile.Result{Requeue: true}, nil
}

// we will requeue this every 5 seconds
// Record reconciled generation
r.uidGenerationMap.RecordReconciledGeneration(pvProgress.UID, pvProgress.Generation)

// Requeue every 5 seconds
return reconcile.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil
}

Expand Down
21 changes: 18 additions & 3 deletions pkg/controller/miganalytic/miganalytics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/konveyor/controller/pkg/logging"
"github.com/konveyor/mig-controller/pkg/cache"
"github.com/konveyor/mig-controller/pkg/compat"
"github.com/konveyor/mig-controller/pkg/errorutil"
"github.com/konveyor/mig-controller/pkg/gvk"
Expand Down Expand Up @@ -73,7 +74,12 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileMigAnalytic{Client: mgr.GetClient(), scheme: mgr.GetScheme(), EventRecorder: mgr.GetEventRecorderFor("miganalytic_controller")}
return &ReconcileMigAnalytic{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("miganalytic_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -108,8 +114,9 @@ type ReconcileMigAnalytic struct {
client.Client
record.EventRecorder

scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// MigAnalyticPersistentVolumeDetails defines extended properties of a volume discovered by MigAnalytic
Expand Down Expand Up @@ -139,6 +146,11 @@ func (r *ReconcileMigAnalytic) Reconcile(ctx context.Context, request reconcile.
return reconcile.Result{Requeue: true}, nil
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(analytic.UID, analytic.Generation) {
return reconcile.Result{Requeue: true}, nil
}

// Get jaeger span for reconcile, add to ctx
reconcileSpan := r.initTracer(analytic)
if reconcileSpan != nil {
Expand Down Expand Up @@ -218,6 +230,9 @@ func (r *ReconcileMigAnalytic) Reconcile(ctx context.Context, request reconcile.
return reconcile.Result{Requeue: true}, nil
}

// Record reconciled generation
r.uidGenerationMap.RecordReconciledGeneration(analytic.UID, analytic.Generation)

// Done
return reconcile.Result{Requeue: false}, nil
}
Expand Down
Loading

0 comments on commit 57806fb

Please sign in to comment.