diff --git a/cmd/controlloop/controlloop.go b/cmd/controlloop/controlloop.go index 06dcff955..e6a72c3ea 100644 --- a/cmd/controlloop/controlloop.go +++ b/cmd/controlloop/controlloop.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "os/signal" - "strings" "time" "github.com/fsnotify/fsnotify" @@ -22,11 +21,9 @@ import ( wbclient "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/clientset/versioned" wbinformers "github.com/k8snetworkplumbingwg/whereabouts/pkg/client/informers/externalversions" - "github.com/k8snetworkplumbingwg/whereabouts/pkg/config" "github.com/k8snetworkplumbingwg/whereabouts/pkg/controlloop" "github.com/k8snetworkplumbingwg/whereabouts/pkg/logging" "github.com/k8snetworkplumbingwg/whereabouts/pkg/reconciler" - "github.com/k8snetworkplumbingwg/whereabouts/pkg/types" ) const ( @@ -38,11 +35,9 @@ const ( const ( _ int = iota couldNotCreateController - couldNotGetFlatIPAM - cronExpressionError cronSchedulerCreationError fileWatcherError - fileWatcherAddWatcherError + couldNotCreateConfigWatcherError ) const ( @@ -75,21 +70,6 @@ func main() { if err != nil { os.Exit(cronSchedulerCreationError) } - schedule := determineCronExpression() - - job, err := s.NewJob( - gocron.CronJob(schedule, false), - gocron.NewTask(func() { - reconciler.ReconcileIPs(errorChan) - }), - ) - if err != nil { - _ = logging.Errorf("error with cron expression schedule: %v", err) - os.Exit(cronExpressionError) - } - - logging.Verbosef("started cron with job ID: %q", job.ID().String()) - s.Start() watcher, err := fsnotify.NewWatcher() if err != nil { @@ -98,11 +78,24 @@ func main() { } defer watcher.Close() - go syncConfiguration(watcher, s, job, errorChan) - if err := watcher.Add(reconcilerCronConfiguration); err != nil { - _ = logging.Errorf("error adding watcher to config %q: %v", reconcilerCronConfiguration, err) - os.Exit(fileWatcherAddWatcherError) + reconcilerConfigWatcher, err := reconciler.NewConfigWatcher( + reconcilerCronConfiguration, + s, + watcher, + func() { + reconciler.ReconcileIPs(errorChan) + }, + ) + if err != nil { + os.Exit(couldNotCreateConfigWatcherError) } + s.Start() + + const reconcilerConfigMntFile = "/cron-schedule/..data" + p := func(e fsnotify.Event) bool { + return e.Name == reconcilerConfigMntFile && e.Op&fsnotify.Create == fsnotify.Create + } + reconcilerConfigWatcher.SyncConfiguration(p) for { select { @@ -191,63 +184,3 @@ func newEventBroadcaster(k8sClientset kubernetes.Interface) record.EventBroadcas func newEventRecorder(broadcaster record.EventBroadcaster) record.EventRecorder { return broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName}) } - -func determineCronExpression() string { - // We read the expression from a file if present, otherwise we use ReconcilerCronExpression - fileContents, err := os.ReadFile(reconcilerCronConfiguration) - if err != nil { - flatipam, _, err := config.GetFlatIPAM(true, &types.IPAMConfig{}, "") - if err != nil { - _ = logging.Errorf("could not get flatipam config: %v", err) - os.Exit(couldNotGetFlatIPAM) - } - _ = logging.Errorf("could not read file: %v, using expression from flatfile: %v", err, flatipam.IPAM.ReconcilerCronExpression) - return flatipam.IPAM.ReconcilerCronExpression - } - logging.Verbosef("using expression: %v", strings.TrimSpace(string(fileContents))) // do i need to trim spaces? idk i think the file would JUST be the expression? - return strings.TrimSpace(string(fileContents)) -} - -func syncConfiguration( - watcher *fsnotify.Watcher, - scheduler gocron.Scheduler, - job gocron.Job, - errorChannel chan error, -) { - for { - select { - case event, ok := <-watcher.Events: - if !ok { - return - } - - updatedSchedule := determineCronExpression() - logging.Verbosef( - "configuration updated to file %q. New cron expression: %s", - event.Name, - updatedSchedule, - ) - updatedJob, err := scheduler.Update( - job.ID(), - gocron.CronJob(updatedSchedule, false), - gocron.NewTask(func() { - reconciler.ReconcileIPs(errorChannel) - }), - ) - if err != nil { - _ = logging.Errorf("error updating job %q configuration: %v", job.ID().String(), err) - } - - logging.Verbosef( - "successfully updated CRON configuration id %q - new cron expression: %s", - updatedJob.ID().String(), - updatedSchedule, - ) - case err, ok := <-watcher.Errors: - _ = logging.Errorf("error when listening to config changes: %v", err) - if !ok { - return - } - } - } -} diff --git a/pkg/reconciler/config.go b/pkg/reconciler/config.go new file mode 100644 index 000000000..9223b66de --- /dev/null +++ b/pkg/reconciler/config.go @@ -0,0 +1,122 @@ +package reconciler + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "github.com/fsnotify/fsnotify" + "github.com/go-co-op/gocron/v2" + + "github.com/k8snetworkplumbingwg/whereabouts/pkg/config" + "github.com/k8snetworkplumbingwg/whereabouts/pkg/logging" + "github.com/k8snetworkplumbingwg/whereabouts/pkg/types" +) + +type ConfigWatcher struct { + configDir string + configPath string + currentSchedule string + job gocron.Job + scheduler gocron.Scheduler + handlerFunc func() + watcher *fsnotify.Watcher +} + +func NewConfigWatcher(configPath string, scheduler gocron.Scheduler, configWatcher *fsnotify.Watcher, handlerFunc func()) (*ConfigWatcher, error) { + schedule, err := determineCronExpression(configPath) + if err != nil { + return nil, err + } + + job, err := scheduler.NewJob( + gocron.CronJob(schedule, false), + gocron.NewTask(handlerFunc), + ) + if err != nil { + return nil, fmt.Errorf("error creating job: %v", err) + } + + return &ConfigWatcher{ + configDir: filepath.Dir(configPath), + configPath: configPath, + currentSchedule: schedule, + job: job, + scheduler: scheduler, + watcher: configWatcher, + handlerFunc: handlerFunc, + }, nil +} + +func determineCronExpression(configPath string) (string, error) { + // We read the expression from a file if present, otherwise we use ReconcilerCronExpression + fileContents, err := os.ReadFile(configPath) + if err != nil { + flatipam, _, err := config.GetFlatIPAM(true, &types.IPAMConfig{}, "") + if err != nil { + return "", logging.Errorf("could not get flatipam config: %v", err) + } + + _ = logging.Errorf("could not read file: %v, using expression from flatfile: %v", err, flatipam.IPAM.ReconcilerCronExpression) + return flatipam.IPAM.ReconcilerCronExpression, nil + } + logging.Verbosef("using expression: %v", strings.TrimSpace(string(fileContents))) // do i need to trim spaces? idk i think the file would JUST be the expression? + return strings.TrimSpace(string(fileContents)), nil +} + +func (c *ConfigWatcher) SyncConfiguration(relevantEventPredicate func(event fsnotify.Event) bool) { + go c.syncConfig(relevantEventPredicate) + if err := c.watcher.Add(c.configDir); err != nil { + _ = logging.Errorf("error adding watcher to config %q: %v", c.configPath, err) + } +} + +func (c *ConfigWatcher) syncConfig(relevantEventPredicate func(event fsnotify.Event) bool) { + for { + select { + case event, ok := <-c.watcher.Events: + if !ok { + return + } + + if !relevantEventPredicate(event) { + logging.Verbosef("event not relevant: %v", event) + continue + } + updatedSchedule, err := determineCronExpression(c.configPath) + if err != nil { + _ = logging.Errorf("error determining cron expression from %q: %v", c.configPath, err) + } + logging.Verbosef( + "configuration updated to file %q. New cron expression: %s", + event.Name, + updatedSchedule, + ) + + if updatedSchedule == c.currentSchedule { + logging.Debugf("no changes in schedule, nothing to do.") + continue + } + updatedJob, err := c.scheduler.Update( + c.job.ID(), + gocron.CronJob(updatedSchedule, false), + gocron.NewTask(c.handlerFunc), + ) + if err != nil { + _ = logging.Errorf("error updating job %q configuration: %v", c.job.ID().String(), err) + } + c.currentSchedule = updatedSchedule + logging.Verbosef( + "successfully updated CRON configuration id %q - new cron expression: %s", + updatedJob.ID().String(), + updatedSchedule, + ) + case err, ok := <-c.watcher.Errors: + _ = logging.Errorf("error when listening to config changes: %v", err) + if !ok { + return + } + } + } +} diff --git a/pkg/reconciler/config_test.go b/pkg/reconciler/config_test.go new file mode 100644 index 000000000..dbe9f4d91 --- /dev/null +++ b/pkg/reconciler/config_test.go @@ -0,0 +1,70 @@ +package reconciler + +import ( + "os" + "path/filepath" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/fsnotify/fsnotify" + "github.com/go-co-op/gocron/v2" +) + +var _ = Describe("Reconciler configuration watcher", func() { + var ( + config *ConfigWatcher + configDir string + dummyConfig *os.File + mailbox chan struct{} + watcher *fsnotify.Watcher + ) + + BeforeEach(func() { + var err error + + mailbox = make(chan struct{}) + + configDir, err = os.MkdirTemp("", "config") + Expect(err).NotTo(HaveOccurred()) + const initialCron = "0/1 2 3 * *" + dummyConfig, err = os.Create(filepath.Join(configDir, filepath.Base("..data"))) + Expect(err).NotTo(HaveOccurred()) + + Expect(dummyConfig.Write([]byte(initialCron))).To(Equal(len(initialCron))) + scheduler, err := gocron.NewScheduler() + Expect(err).NotTo(HaveOccurred()) + watcher, err = fsnotify.NewWatcher() + Expect(err).NotTo(HaveOccurred()) + config, err = NewConfigWatcher( + dummyConfig.Name(), + scheduler, + watcher, + func() { mailbox <- struct{}{} }, + ) + scheduler.Start() + Expect(err).NotTo(HaveOccurred()) + config.SyncConfiguration(func(event fsnotify.Event) bool { + return event.Name == dummyConfig.Name() && event.Op&fsnotify.Write == fsnotify.Write + }) + }) + + AfterEach(func() { + watcher.Close() + dummyConfig.Close() + }) + + When("the cron job expression is updated in the file-system", func() { + const updatedCron = "0/1 * * * *" + + BeforeEach(func() { + Expect(dummyConfig.WriteAt([]byte(updatedCron), 0)).To(Equal(len(updatedCron))) + }) + + It("the current schedule is updated, and the handler function executed", func() { + Eventually(func() string { return config.currentSchedule }).Should(Equal(updatedCron)) + Eventually(mailbox).WithTimeout(time.Minute).Should(Receive()) + }) + }) +})