Skip to content

Commit

Permalink
Cleanup agent heartbeats handling
Browse files Browse the repository at this point in the history
Fixes #718

Signed-off-by: Sergei Lukianov <[email protected]>
  • Loading branch information
Frostman committed Jan 25, 2025
1 parent e23e0dc commit 2eb2203
Showing 1 changed file with 34 additions and 58 deletions.
92 changes: 34 additions & 58 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err
return errors.Wrap(err, "failed to set install and run IDs")
}

slog.Info("Starting watch for config changes in K8s")

kubeconfigPath := filepath.Join(svc.Basedir, KubeconfigFile)
kube, err := kubeutil.NewClient(ctx, kubeconfigPath, agentapi.SchemeBuilder)
if err != nil {
Expand Down Expand Up @@ -175,6 +173,8 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err
return errors.Wrapf(err, "failed to reset agent observability status") // TODO gracefully handle case if resourceVersion changed
}

slog.Debug("Starting watch for config changes in K8s")

watcher, err := kube.Watch(ctx, &agentapi.AgentList{}, client.InNamespace(metav1.NamespaceDefault), client.MatchingFields{
"metadata.name": svc.name,
})
Expand All @@ -184,15 +184,6 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err
defer watcher.Stop()

for {
// process currently loaded agent from K8s
err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, false)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s")
}
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after processing")
}

select {
case <-ctx.Done():
slog.Info("Context done, exiting")
Expand All @@ -204,13 +195,25 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s (enforce)")
}
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat after enforce")
}
case <-time.After(15 * time.Second):
if err := svc.hearbeat(ctx, kube, agent); err != nil {
return errors.Wrap(err, "failed to heartbeat")
slog.Debug("Sending heartbeat", "name", agent.Name)
hbStart := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(hbStart).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()
case event, ok := <-watcher.ResultChan():
// TODO check why channel gets closed
if !ok {
Expand Down Expand Up @@ -240,6 +243,11 @@ func (svc *Service) Run(ctx context.Context, getClient func() (*gnmi.Client, err
}

agent = event.Object.(*agentapi.Agent)

err = svc.processAgentFromKube(ctx, kube, agent, &currentGen, false)
if err != nil {
return errors.Wrap(err, "failed to process agent config from k8s")
}
}
}
}
Expand Down Expand Up @@ -270,35 +278,6 @@ func (svc *Service) setInstallAndRunIDs() error {
return nil
}

func (svc *Service) hearbeat(ctx context.Context, kube client.Client, agent *agentapi.Agent) error {
if agent == nil {
slog.Debug("No agent to heartbeat")

return nil
}

slog.Debug("Sending heartbeat", "name", agent.Name)
start := time.Now()

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err := kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update agent heartbeat") // TODO gracefully handle case if resourceVersion changed
}

svc.reg.AgentMetrics.HeartbeatDuration.Observe(time.Since(start).Seconds())
svc.reg.AgentMetrics.HeartbeatsTotal.Inc()

return nil
}

func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, readyCheck bool) error {
start := time.Now()
slog.Info("Processing agent config", "name", agent.Name, "gen", agent.Generation, "res", agent.ResourceVersion)
Expand All @@ -320,17 +299,6 @@ func (svc *Service) processAgent(ctx context.Context, agent *agentapi.Agent, rea
}
}

// Make sure we have switch state
if agent.Status.State.NOS.HwskuVersion == "" {
if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}
}

desired, err := svc.processor.PlanDesiredState(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to plan spec")
Expand Down Expand Up @@ -479,15 +447,23 @@ func (svc *Service) processAgentFromKube(ctx context.Context, kube client.Client
Message: fmt.Sprintf("Config applied, gen=%d", agent.Generation),
})

svc.reg.AgentMetrics.KubeApplyDuration.Observe(time.Since(start).Seconds())

if err := svc.processor.UpdateSwitchState(ctx, agent, svc.reg); err != nil {
return errors.Wrapf(err, "failed to update switch state")
}
if st := svc.reg.GetSwitchState(); st != nil {
agent.Status.State = *st
}
agent.Status.LastHeartbeat = metav1.Time{Time: time.Now()}

err = kube.Status().Update(ctx, agent)
if err != nil {
return errors.Wrapf(err, "failed to update status") // TODO gracefully handle case if resourceVersion changed
}

*currentGen = agent.Generation

svc.reg.AgentMetrics.KubeApplyDuration.Observe(time.Since(start).Seconds())

return nil
}

Expand Down

0 comments on commit 2eb2203

Please sign in to comment.