Skip to content

Commit

Permalink
fix reconcilitation race
Browse files Browse the repository at this point in the history
  • Loading branch information
GavinFrazar committed Jan 13, 2025
1 parent d015b06 commit 13f9f97
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 22 deletions.
27 changes: 16 additions & 11 deletions lib/srv/db/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,10 @@ func (m *monitoredDatabases) setCloud(databases types.Databases) {
m.cloud = databases
}

func (m *monitoredDatabases) isCloud(database types.Database) bool {
m.mu.RLock()
defer m.mu.RUnlock()
// isCloudLocked returns whether a database was discovered by the cloud
// watchers, aka legacy database discovery done by the db service.
// The lock must be held when calling this function.
func (m *monitoredDatabases) isCloudLocked(database types.Database) bool {
for i := range m.cloud {
if m.cloud[i] == database {
return true
Expand All @@ -402,13 +403,17 @@ func (m *monitoredDatabases) isCloud(database types.Database) bool {
return false
}

func (m *monitoredDatabases) isDiscoveryResource(database types.Database) bool {
return database.Origin() == types.OriginCloud && m.isResource(database)
// isDiscoveryResourceLocked returns whether a database was discovered by the
// discovery service.
// The lock must be held when calling this function.
func (m *monitoredDatabases) isDiscoveryResourceLocked(database types.Database) bool {
return database.Origin() == types.OriginCloud && m.isResourceLocked(database)
}

func (m *monitoredDatabases) isResource(database types.Database) bool {
m.mu.RLock()
defer m.mu.RUnlock()
// isResourceLocked returns whether a database is a dynamic database, aka a db
// object.
// The lock must be held when calling this function.
func (m *monitoredDatabases) isResourceLocked(database types.Database) bool {
for i := range m.resources {
if m.resources[i] == database {
return true
Expand All @@ -417,9 +422,9 @@ func (m *monitoredDatabases) isResource(database types.Database) bool {
return false
}

func (m *monitoredDatabases) get() map[string]types.Database {
m.mu.RLock()
defer m.mu.RUnlock()
// getLocked returns a slice containing all of the monitored databases.
// The lock must be held when calling this function.
func (m *monitoredDatabases) getLocked() map[string]types.Database {
return utils.FromSlice(append(append(m.static, m.resources...), m.cloud...), types.Database.GetName)
}

Expand Down
27 changes: 16 additions & 11 deletions lib/srv/db/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (s *Server) startReconciler(ctx context.Context) error {
reconciler, err := services.NewReconciler(services.ReconcilerConfig[types.Database]{
Matcher: s.matcher,
GetCurrentResources: s.getResources,
GetNewResources: s.monitoredDatabases.get,
GetNewResources: s.monitoredDatabases.getLocked,
OnCreate: s.onCreate,
OnUpdate: s.onUpdate,
OnDelete: s.onDelete,
Expand All @@ -53,12 +53,17 @@ func (s *Server) startReconciler(ctx context.Context) error {
for {
select {
case <-s.reconcileCh:
if err := reconciler.Reconcile(ctx); err != nil {
s.log.ErrorContext(ctx, "Failed to reconcile.", "error", err)
}
if s.cfg.OnReconcile != nil {
s.cfg.OnReconcile(s.getProxiedDatabases())
}
func() {
// don't let monitored dbs change during reconciliation
s.monitoredDatabases.mu.RLock()
defer s.monitoredDatabases.mu.RUnlock()
if err := reconciler.Reconcile(ctx); err != nil {
s.log.ErrorContext(ctx, "Failed to reconcile.", "error", err)
}
if s.cfg.OnReconcile != nil {
s.cfg.OnReconcile(s.getProxiedDatabases())
}
}()
case <-ctx.Done():
s.log.DebugContext(ctx, "Reconciler done.")
return
Expand Down Expand Up @@ -171,13 +176,13 @@ func (s *Server) onCreate(ctx context.Context, database types.Database) error {
databaseCopy := database.Copy()

// only apply resource matcher settings to dynamic resources.
if s.monitoredDatabases.isResource(database) {
if s.monitoredDatabases.isResourceLocked(database) {
s.applyResourceMatcherSettings(databaseCopy)
}

// Run DiscoveryResourceChecker after resource matchers are applied to make
// sure the correct AssumeRoleARN is used.
if s.monitoredDatabases.isDiscoveryResource(database) {
if s.monitoredDatabases.isDiscoveryResourceLocked(database) {
if err := s.cfg.discoveryResourceChecker.Check(ctx, databaseCopy); err != nil {
return trace.Wrap(err)
}
Expand All @@ -193,7 +198,7 @@ func (s *Server) onUpdate(ctx context.Context, database, _ types.Database) error
databaseCopy := database.Copy()

// only apply resource matcher settings to dynamic resources.
if s.monitoredDatabases.isResource(database) {
if s.monitoredDatabases.isResourceLocked(database) {
s.applyResourceMatcherSettings(databaseCopy)
}
return s.updateDatabase(ctx, databaseCopy)
Expand All @@ -208,7 +213,7 @@ func (s *Server) onDelete(ctx context.Context, database types.Database) error {
func (s *Server) matcher(database types.Database) bool {
// In the case of databases discovered by this database server, matchers
// should be skipped.
if s.monitoredDatabases.isCloud(database) {
if s.monitoredDatabases.isCloudLocked(database) {
return true // Cloud fetchers return only matching databases.
}

Expand Down

0 comments on commit 13f9f97

Please sign in to comment.