Skip to content

Commit

Permalink
Check for resource staleness, requeue if stale
Browse files Browse the repository at this point in the history
  • Loading branch information
djwhatle committed Mar 26, 2021
1 parent 3855b9e commit 2a81bf1
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 30 deletions.
79 changes: 79 additions & 0 deletions pkg/cache/stale.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2021 Red Hat Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"sync"

"k8s.io/apimachinery/pkg/types"
)

// Functions in this file solve the problem of duplicate reconciles when the cached
// client hasn't received the latest resource and feeds a stale resource to controller.
// We will reject reconciling a resource if its generation is older than one we've
// successfully reconciled.

// UIDToGenerationMap keeps track of highest observed generation of a particular resource UID
type UIDToGenerationMap struct {
mutex sync.RWMutex
resourceUIDtoGeneration map[types.UID]int64
}

// CreateUIDToGenerationMap creates a new UID => generation map
func CreateUIDToGenerationMap() *UIDToGenerationMap {
uidGenMap := UIDToGenerationMap{}
uidGenMap.resourceUIDtoGeneration = make(map[types.UID]int64)
return &uidGenMap
}

// getGenerationForUID returns the latest successfully reconciled resource generation
// returns -1 if not found.
func (u *UIDToGenerationMap) getGenerationForUID(resourceUID types.UID) int64 {
u.mutex.Lock()
defer u.mutex.Unlock()
generation, ok := u.resourceUIDtoGeneration[resourceUID]
if !ok {
return -1
}
return generation
}

// setGenerationForUID returns the latest successfully reconciled resource generation
func (u *UIDToGenerationMap) setGenerationForUID(resourceUID types.UID, generation int64) {
u.mutex.Lock()
defer u.mutex.Unlock()
u.resourceUIDtoGeneration[resourceUID] = generation
}

// IsCacheStale checks if the cached client is providing a resource we've successfully reconciled.
func (u *UIDToGenerationMap) IsCacheStale(resourceUID types.UID, generation int64) bool {
reconciledGeneration := u.getGenerationForUID(resourceUID)
// Resource should be reconciled if we've never seen it
if generation == -1 {
return false
}
// Resource is stale if we've reconciled a newer generation
if generation < reconciledGeneration {
return true
}
return false
}

// RecordReconciledGeneration records that a resource UID generation was pushed back to the APIserver
func (u *UIDToGenerationMap) RecordReconciledGeneration(resourceUID types.UID, generation int64) {
u.setGenerationForUID(resourceUID, generation)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
migref "github.com/konveyor/mig-controller/pkg/reference"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -46,9 +47,10 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectImageMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("directimagemigration_controller"),
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("directimagemigration_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

Expand Down Expand Up @@ -99,8 +101,9 @@ var _ reconcile.Reconciler = &ReconcileDirectImageMigration{}
type ReconcileDirectImageMigration struct {
client.Client
record.EventRecorder
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectImageMigration object and makes changes based on the state read
Expand All @@ -126,6 +129,13 @@ func (r *ReconcileDirectImageMigration) Reconcile(request reconcile.Request) (re
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(imageMigration.UID, imageMigration.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(imageMigration.UID, imageMigration.Generation)

// Set up jaeger tracing
reconcileSpan := r.initTracer(imageMigration)
if reconcileSpan != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
migref "github.com/konveyor/mig-controller/pkg/reference"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -46,9 +47,10 @@ func Add(mgr manager.Manager) error {
// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectImageStreamMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("directimagestreammigration_controller"),
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("directimagestreammigration_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

Expand Down Expand Up @@ -90,8 +92,9 @@ var _ reconcile.Reconciler = &ReconcileDirectImageStreamMigration{}
type ReconcileDirectImageStreamMigration struct {
client.Client
record.EventRecorder
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectImageStreamMigration object and makes changes based on the state read
Expand All @@ -117,6 +120,14 @@ func (r *ReconcileDirectImageStreamMigration) Reconcile(request reconcile.Reques
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(imageStreamMigration.UID, imageStreamMigration.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(
imageStreamMigration.UID, imageStreamMigration.Generation)

// Set up jaeger tracing
reconcileSpan, err := r.initTracer(*imageStreamMigration)
if reconcileSpan != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
migapi "github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
"github.com/opentracing/opentracing-go"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -43,7 +44,11 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectVolumeMigration{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileDirectVolumeMigration{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -79,8 +84,9 @@ var _ reconcile.Reconciler = &ReconcileDirectVolumeMigration{}
// ReconcileDirectVolumeMigration reconciles a DirectVolumeMigration object
type ReconcileDirectVolumeMigration struct {
client.Client
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectVolumeMigration object and makes changes based on the state read
Expand All @@ -107,6 +113,13 @@ func (r *ReconcileDirectVolumeMigration) Reconcile(request reconcile.Request) (r
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(direct.UID, direct.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(direct.UID, direct.Generation)

// Set up jaeger tracing
reconcileSpan := r.initTracer(direct)
if reconcileSpan != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"time"

"github.com/konveyor/mig-controller/pkg/cache"
"github.com/konveyor/mig-controller/pkg/errorutil"
"github.com/opentracing/opentracing-go"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,7 +79,11 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileDirectVolumeMigrationProgress{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
return &ReconcileDirectVolumeMigrationProgress{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -113,8 +118,9 @@ var _ reconcile.Reconciler = &ReconcileDirectVolumeMigrationProgress{}
// ReconcileDirectVolumeMigrationProgress reconciles a DirectVolumeMigrationProgress object
type ReconcileDirectVolumeMigrationProgress struct {
client.Client
scheme *runtime.Scheme
tracer opentracing.Tracer
scheme *runtime.Scheme
tracer opentracing.Tracer
uidGenerationMap *cache.UIDToGenerationMap
}

// Reconcile reads that state of the cluster for a DirectVolumeMigrationProgress object and makes changes based on the state read
Expand All @@ -137,6 +143,13 @@ func (r *ReconcileDirectVolumeMigrationProgress) Reconcile(request reconcile.Req
return reconcile.Result{Requeue: true}, err
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(pvProgress.UID, pvProgress.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(pvProgress.UID, pvProgress.Generation)

// Set up jaeger tracing
reconcileSpan, err := r.initTracer(*pvProgress)
if reconcileSpan != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/controller/miganalytic/miganalytics_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/konveyor/controller/pkg/logging"
"github.com/konveyor/mig-controller/pkg/apis/migration/v1alpha1"
"github.com/konveyor/mig-controller/pkg/cache"
"github.com/konveyor/mig-controller/pkg/compat"
"github.com/konveyor/mig-controller/pkg/errorutil"
"github.com/konveyor/mig-controller/pkg/gvk"
Expand Down Expand Up @@ -72,7 +73,12 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileMigAnalytic{Client: mgr.GetClient(), scheme: mgr.GetScheme(), EventRecorder: mgr.GetRecorder("miganalytic_controller")}
return &ReconcileMigAnalytic{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("miganalytic_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -107,7 +113,8 @@ type ReconcileMigAnalytic struct {
client.Client
record.EventRecorder

scheme *runtime.Scheme
scheme *runtime.Scheme
uidGenerationMap *cache.UIDToGenerationMap
}

// MigAnalyticPersistentVolumeDetails defines extended properties of a volume discovered by MigAnalytic
Expand Down Expand Up @@ -138,6 +145,13 @@ func (r *ReconcileMigAnalytic) Reconcile(request reconcile.Request) (reconcile.R
return reconcile.Result{Requeue: true}, nil
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(analytic.UID, analytic.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(analytic.UID, analytic.Generation)

// Exit early if the MigAnalytic already has a ready condition
// and Refresh boolean is unset
if analytic.Status.IsReady() && !analytic.Spec.Refresh {
Expand Down
20 changes: 17 additions & 3 deletions pkg/controller/migcluster/migcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"time"

"github.com/konveyor/mig-controller/pkg/cache"
"github.com/konveyor/mig-controller/pkg/errorutil"

"github.com/konveyor/controller/pkg/logging"
Expand Down Expand Up @@ -48,7 +49,12 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) *ReconcileMigCluster {
return &ReconcileMigCluster{Client: mgr.GetClient(), scheme: mgr.GetScheme(), EventRecorder: mgr.GetRecorder("migcluster_controller")}
return &ReconcileMigCluster{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
EventRecorder: mgr.GetRecorder("migcluster_controller"),
uidGenerationMap: cache.CreateUIDToGenerationMap(),
}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand Down Expand Up @@ -101,8 +107,9 @@ type ReconcileMigCluster struct {
k8sclient.Client
record.EventRecorder

scheme *runtime.Scheme
Controller controller.Controller
scheme *runtime.Scheme
Controller controller.Controller
uidGenerationMap *cache.UIDToGenerationMap
}

func (r *ReconcileMigCluster) Reconcile(request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -121,6 +128,13 @@ func (r *ReconcileMigCluster) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{Requeue: true}, nil
}

// Check if cache is still catching up
if r.uidGenerationMap.IsCacheStale(cluster.UID, cluster.Generation) {
return reconcile.Result{Requeue: true}, nil
}
// Record reconciled generation
defer r.uidGenerationMap.RecordReconciledGeneration(cluster.UID, cluster.Generation)

// Report reconcile error.
defer func() {
log.Info("CR", "conditions", cluster.Status.Conditions)
Expand Down
Loading

0 comments on commit 2a81bf1

Please sign in to comment.