Skip to content

Commit

Permalink
feat: added shutdown for the target manager
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Bressi <[email protected]>
  • Loading branch information
puffitos committed Dec 14, 2023
1 parent 97a48e2 commit 7a29916
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 53 deletions.
12 changes: 6 additions & 6 deletions pkg/sparrow/gitlab/test/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ type MockClient struct {
}

func (m *MockClient) PutFile(ctx context.Context, _ gitlab.File) error { //nolint: gocritic // irrelevant
log := logger.FromContext(ctx).With("name", "MockPutFile")
log.Debug("MockPutFile called", "err", m.putFileErr)
log := logger.FromContext(ctx)
log.Info("MockPutFile called", "err", m.putFileErr)
return m.putFileErr
}

func (m *MockClient) PostFile(ctx context.Context, _ gitlab.File) error { //nolint: gocritic // irrelevant
log := logger.FromContext(ctx).With("name", "MockPostFile")
log.Debug("MockPostFile called", "err", m.postFileErr)
log := logger.FromContext(ctx)
log.Info("MockPostFile called", "err", m.postFileErr)
return m.postFileErr
}

func (m *MockClient) FetchFiles(ctx context.Context) ([]checks.GlobalTarget, error) {
log := logger.FromContext(ctx).With("name", "MockFetchFiles")
log.Debug("MockFetchFiles called", "targets", len(m.targets), "err", m.fetchFilesErr)
log := logger.FromContext(ctx)
log.Info("MockFetchFiles called", "targets", len(m.targets), "err", m.fetchFilesErr)
return m.targets, m.fetchFilesErr
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/sparrow/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
gitlabRegistrationProjectID = 1
globalTargetsCheckInterval = 5 * time.Minute
registrationUnhealthyThreshold = 15 * time.Minute
registrationInterval = 5 * time.Minute
)

type Sparrow struct {
Expand Down Expand Up @@ -71,7 +72,13 @@ func New(cfg *config.Config) *Sparrow {
cCfgChecks: make(chan map[string]any, 1),
routingTree: api.NewRoutingTree(),
router: chi.NewRouter(),
targets: targets.NewGitlabManager(gitlab.New("targetsRepo", "gitlabToken", gitlabRegistrationProjectID), globalTargetsCheckInterval, registrationUnhealthyThreshold),
targets: targets.NewGitlabManager(
gitlab.New("targetsRepo", "gitlabToken", gitlabRegistrationProjectID),
"DNS-Name",
globalTargetsCheckInterval,
registrationUnhealthyThreshold,
registrationInterval,
),
}

sparrow.loader = config.NewLoader(cfg, sparrow.cCfgChecks)
Expand Down
137 changes: 91 additions & 46 deletions pkg/sparrow/targets/gitlab.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package targets
import (
"context"
"fmt"
"sync"
"time"

"github.com/caas-team/sparrow/pkg/checks"
Expand All @@ -16,6 +17,8 @@ var _ TargetManager = &gitlabTargetManager{}
// gitlabTargetManager implements TargetManager
type gitlabTargetManager struct {
targets []checks.GlobalTarget
mu sync.RWMutex
done chan struct{}
gitlab gitlab.Gitlab
// the DNS name used for self-registration
name string
Expand All @@ -31,49 +34,18 @@ type gitlabTargetManager struct {
}

// NewGitlabManager creates a new gitlabTargetManager
func NewGitlabManager(g gitlab.Gitlab, checkInterval, unhealthyThreshold time.Duration) TargetManager {
func NewGitlabManager(g gitlab.Gitlab, name string, checkInterval, unhealthyThreshold, regInterval time.Duration) *gitlabTargetManager {
return &gitlabTargetManager{
gitlab: g,
checkInterval: checkInterval,
unhealthyThreshold: unhealthyThreshold,
gitlab: g,
name: name,
checkInterval: checkInterval,
registrationInterval: regInterval,
unhealthyThreshold: unhealthyThreshold,
mu: sync.RWMutex{},
done: make(chan struct{}, 1),
}
}

// updateRegistration registers the current instance as a global target
func (t *gitlabTargetManager) updateRegistration(ctx context.Context) error {
log := logger.FromContext(ctx).With("name", t.name, "registered", t.registered)
log.Debug("Updating registration")

f := gitlab.File{
Branch: "main",
AuthorEmail: fmt.Sprintf("%s@sparrow", t.name),
AuthorName: t.name,
Content: checks.GlobalTarget{Url: fmt.Sprintf("https://%s", t.name), LastSeen: time.Now().UTC()},
}

if t.registered {
f.CommitMessage = "Updated registration"
err := t.gitlab.PutFile(ctx, f)
if err != nil {
log.Error("Failed to update registration", "error", err)
return err
}
log.Debug("Successfully updated registration")
return nil
}

f.CommitMessage = "Initial registration"
err := t.gitlab.PostFile(ctx, f)
if err != nil {
log.Error("Failed to register global gitlabTargetManager", "error", err)
return err
}

log.Debug("Successfully registered")
t.registered = true
return nil
}

// Reconcile reconciles the targets of the gitlabTargetManager.
// The global targets are parsed from a gitlab repository.
//
Expand All @@ -83,43 +55,109 @@ func (t *gitlabTargetManager) Reconcile(ctx context.Context) {
log := logger.FromContext(ctx).With("name", "ReconcileGlobalTargets")
log.Debug("Starting global gitlabTargetManager reconciler")

checkTimer := time.NewTimer(t.checkInterval)
registrationTimer := time.NewTimer(t.registrationInterval)

defer checkTimer.Stop()
defer registrationTimer.Stop()

for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
log.Error("Context canceled", "error", err)
return
err = t.Shutdown(ctx)
if err != nil {
log.Error("Failed to shutdown gracefully", "error", err)
return
}
}
// check if this blocks when context is canceled
case <-time.After(t.checkInterval):
log.Debug("Getting global gitlabTargetManager")
case <-t.done:
log.Info("Ending Reconcile routine.")
return
case <-checkTimer.C:
err := t.refreshTargets(ctx)
if err != nil {
log.Error("Failed to get global gitlabTargetManager", "error", err)
continue
}
case <-time.After(t.registrationInterval):
log.Debug("Registering global gitlabTargetManager")
checkTimer.Reset(t.checkInterval)
case <-registrationTimer.C:
err := t.updateRegistration(ctx)
if err != nil {
log.Error("Failed to register global gitlabTargetManager", "error", err)
continue
}
registrationTimer.Reset(t.registrationInterval)
}
}
}

// GetTargets returns the current targets of the gitlabTargetManager
func (t *gitlabTargetManager) GetTargets() []checks.GlobalTarget {
t.mu.RLock()
defer t.mu.RUnlock()
return t.targets
}

// Shutdown shuts down the gitlabTargetManager and deletes the file containing
// the sparrow's registration from Gitlab
func (t *gitlabTargetManager) Shutdown(ctx context.Context) error {
log := logger.FromContext(ctx).With("name", "Shutdown")
log.Debug("Shutting down global gitlabTargetManager")
t.mu.Lock()
defer t.mu.Unlock()
t.registered = false
t.done <- struct{}{}
return nil
}

// updateRegistration registers the current instance as a global target
func (t *gitlabTargetManager) updateRegistration(ctx context.Context) error {
log := logger.FromContext(ctx).With("name", t.name, "registered", t.registered)
log.Debug("Updating registration")

f := gitlab.File{
Branch: "main",
AuthorEmail: fmt.Sprintf("%s@sparrow", t.name),
AuthorName: t.name,
Content: checks.GlobalTarget{Url: fmt.Sprintf("https://%s", t.name), LastSeen: time.Now().UTC()},
}

if t.Registered() {
t.mu.Lock()
defer t.mu.Unlock()
f.CommitMessage = "Updated registration"
err := t.gitlab.PutFile(ctx, f)
if err != nil {
log.Error("Failed to update registration", "error", err)
return err
}
log.Debug("Successfully updated registration")
return nil
}

t.mu.Lock()
defer t.mu.Unlock()
f.CommitMessage = "Initial registration"
err := t.gitlab.PostFile(ctx, f)
if err != nil {
log.Error("Failed to register global gitlabTargetManager", "error", err)
return err
}

log.Debug("Successfully registered")
t.registered = true
return nil
}

// refreshTargets updates the targets of the gitlabTargetManager
// with the latest available healthy targets
func (t *gitlabTargetManager) refreshTargets(ctx context.Context) error {
log := logger.FromContext(ctx).With("name", "updateGlobalTargets")
t.mu.Lock()
var healthyTargets []checks.GlobalTarget

defer t.mu.Unlock()
targets, err := t.gitlab.FetchFiles(ctx)
if err != nil {
log.Error("Failed to update global targets", "error", err)
Expand All @@ -129,6 +167,7 @@ func (t *gitlabTargetManager) refreshTargets(ctx context.Context) error {
// filter unhealthy targets - this may be removed in the future
for _, target := range targets {
if time.Now().Add(-t.unhealthyThreshold).After(target.LastSeen) {
log.Debug("Skipping unhealthy target", "target", target)
continue
}
healthyTargets = append(healthyTargets, target)
Expand All @@ -138,3 +177,9 @@ func (t *gitlabTargetManager) refreshTargets(ctx context.Context) error {
log.Debug("Updated global targets", "targets", len(t.targets))
return nil
}

func (t *gitlabTargetManager) Registered() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.registered
}
Loading

0 comments on commit 7a29916

Please sign in to comment.