Skip to content

Commit

Permalink
Convert lib/inventory to use slog (#50315)
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy authored Dec 17, 2024
1 parent 22bd499 commit b8ffebd
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 39 deletions.
119 changes: 92 additions & 27 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package inventory

import (
"context"
"log/slog"
"math/rand/v2"
"os"
"strings"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand All @@ -39,6 +39,7 @@ import (
usagereporter "github.com/gravitational/teleport/lib/usagereporter/teleport"
"github.com/gravitational/teleport/lib/utils"
"github.com/gravitational/teleport/lib/utils/interval"
logutils "github.com/gravitational/teleport/lib/utils/log"
)

// Auth is an interface representing the subset of the auth API that must be made available
Expand Down Expand Up @@ -373,27 +374,36 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {

defer func() {
if handle.goodbye.GetDeleteResources() {
log.WithFields(log.Fields{
"apps": len(handle.appServers),
"dbs": len(handle.databaseServers),
"kube": len(handle.kubernetesServers),
"server_id": handle.Hello().ServerID,
}).Debug("Cleaning up resources in response to instance termination")
slog.DebugContext(c.closeContext, "Cleaning up resources in response to instance termination",
"apps", len(handle.appServers),
"dbs", len(handle.databaseServers),
"kube", len(handle.kubernetesServers),
"server_id", handle.Hello().ServerID,
)
for _, app := range handle.appServers {
if err := c.auth.DeleteApplicationServer(c.closeContext, apidefaults.Namespace, app.resource.GetHostID(), app.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove app server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove app server on termination",
"app_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, db := range handle.databaseServers {
if err := c.auth.DeleteDatabaseServer(c.closeContext, apidefaults.Namespace, db.resource.GetHostID(), db.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove db server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove db server on termination",
"db_server", handle.Hello().ServerID,
"error", err,
)
}
}

for _, kube := range handle.kubernetesServers {
if err := c.auth.DeleteKubernetesServer(c.closeContext, kube.resource.GetHostID(), kube.resource.GetName()); err != nil && !trace.IsNotFound(err) {
log.Warnf("Failed to remove kube server %q on termination: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to remove kube server on termination",
"kube_server", handle.Hello().ServerID,
"error", err,
)
}
}
}
Expand Down Expand Up @@ -432,7 +442,7 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
case msg := <-handle.Recv():
switch m := msg.(type) {
case proto.UpstreamInventoryHello:
log.Warnf("Unexpected upstream hello on control stream of server %q.", handle.Hello().ServerID)
slog.WarnContext(c.closeContext, "Unexpected upstream hello on control stream of server", "server_id", handle.Hello().ServerID)
handle.CloseWithError(trace.BadParameter("unexpected upstream hello"))
return
case proto.UpstreamInventoryAgentMetadata:
Expand Down Expand Up @@ -477,7 +487,10 @@ func (c *Controller) handleControlStream(handle *upstreamHandle) {
case proto.UpstreamInventoryGoodbye:
handle.goodbye = m
default:
log.Warnf("Unexpected upstream message type %T on control stream of server %q.", m, handle.Hello().ServerID)
slog.WarnContext(c.closeContext, "Unexpected upstream message type on control stream",
"message_type", logutils.TypeAttr(m),
"server_id", handle.Hello().ServerID,
)
handle.CloseWithError(trace.BadParameter("unexpected upstream message type %T", m))
return
}
Expand Down Expand Up @@ -579,7 +592,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim

instance, err := tracker.nextHeartbeat(now, handle.Hello(), c.authID)
if err != nil {
log.Warnf("Failed to construct next heartbeat value for instance %q: %v (this is a bug)", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to construct next heartbeat value for instance (this is a bug)",
"server_id", handle.Hello().ServerID,
"error", err,
)
return trace.Wrap(err)
}

Expand All @@ -592,7 +608,10 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim
})

if err != nil {
log.Warnf("Failed to hb instance %q: %v", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to hb instance",
"server_id", handle.Hello().ServerID,
"error", err,
)
c.testEvent(instanceHeartbeatErr)
if !tracker.retryHeartbeat {
// suppress failure and retry exactly once
Expand All @@ -614,7 +633,9 @@ func (c *Controller) heartbeatInstanceState(handle *upstreamHandle, now time.Tim
func (c *Controller) handlePong(handle *upstreamHandle, msg proto.UpstreamInventoryPong) {
pending, ok := handle.pings[msg.ID]
if !ok {
log.Warnf("Unexpected upstream pong from server %q (id=%d).", handle.Hello().ServerID, msg.ID)
slog.WarnContext(c.closeContext, "Unexpected upstream pong",
"server_id", handle.Hello().ServerID,
"pong_id", msg.ID)
return
}
now := c.clock.Now()
Expand Down Expand Up @@ -718,7 +739,10 @@ func (c *Controller) handleSSHServerHB(handle *upstreamHandle, sshServer *types.
handle.sshServer.retryUpsert = false
} else {
c.testEvent(sshUpsertErr)
log.Warnf("Failed to upsert ssh server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert ssh server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -765,7 +789,10 @@ func (c *Controller) handleAppServerHB(handle *upstreamHandle, appServer *types.
srv.resource = appServer
} else {
c.testEvent(appUpsertErr)
log.Warnf("Failed to upsert app server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert app server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -813,7 +840,10 @@ func (c *Controller) handleDatabaseServerHB(handle *upstreamHandle, databaseServ
srv.resource = databaseServer
} else {
c.testEvent(dbUpsertErr)
log.Warnf("Failed to upsert database server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert database server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -861,7 +891,10 @@ func (c *Controller) handleKubernetesServerHB(handle *upstreamHandle, kubernetes
srv.resource = kubernetesServer
} else {
c.testEvent(kubeUpsertErr)
log.Warnf("Failed to upsert kubernetes server %q on heartbeat: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on heartbeat",
"server_id", handle.Hello().ServerID,
"error", err,
)

// blank old lease if any and set retry state. next time handleKeepAlive is called
// we will attempt to upsert the server again.
Expand Down Expand Up @@ -908,7 +941,12 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e
srv.keepAliveErrs++
handle.appServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive app server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive app server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(appKeepAliveDel)
Expand All @@ -924,7 +962,10 @@ func (c *Controller) keepAliveAppServer(handle *upstreamHandle, now time.Time) e
lease, err := c.auth.UpsertApplicationServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(appUpsertRetryErr)
log.Warnf("Failed to upsert app server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert app server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don't bother
// attempting a third time.
Expand All @@ -951,7 +992,12 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti
srv.keepAliveErrs++
handle.databaseServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive database server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive database server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(dbKeepAliveDel)
Expand All @@ -967,7 +1013,10 @@ func (c *Controller) keepAliveDatabaseServer(handle *upstreamHandle, now time.Ti
lease, err := c.auth.UpsertDatabaseServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(dbUpsertRetryErr)
log.Warnf("Failed to upsert database server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert database server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don't bother
// attempting a third time.
Expand All @@ -994,7 +1043,12 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time.
srv.keepAliveErrs++
handle.kubernetesServers[name] = srv
shouldRemove := srv.keepAliveErrs > c.maxKeepAliveErrs
log.Warnf("Failed to keep alive kubernetes server %q: %v (count=%d, removing=%v).", handle.Hello().ServerID, err, srv.keepAliveErrs, shouldRemove)
slog.WarnContext(c.closeContext, "Failed to keep alive kubernetes server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", srv.keepAliveErrs,
"should_remove", shouldRemove,
)

if shouldRemove {
c.testEvent(kubeKeepAliveDel)
Expand All @@ -1010,7 +1064,10 @@ func (c *Controller) keepAliveKubernetesServer(handle *upstreamHandle, now time.
lease, err := c.auth.UpsertKubernetesServer(c.closeContext, srv.resource)
if err != nil {
c.testEvent(kubeUpsertRetryErr)
log.Warnf("Failed to upsert kubernetes server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert kubernetes server on retry.",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don'resource bother
// attempting a third time.
Expand Down Expand Up @@ -1039,7 +1096,12 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
handle.sshServer.keepAliveErrs++
shouldClose := handle.sshServer.keepAliveErrs > c.maxKeepAliveErrs

log.Warnf("Failed to keep alive ssh server %q: %v (count=%d, closing=%v).", handle.Hello().ServerID, err, handle.sshServer.keepAliveErrs, shouldClose)
slog.WarnContext(c.closeContext, "Failed to keep alive ssh server",
"server_id", handle.Hello().ServerID,
"error", err,
"error_count", handle.sshServer.keepAliveErrs,
"should_remove", shouldClose,
)

if shouldClose {
return trace.Errorf("failed to keep alive ssh server: %v", err)
Expand All @@ -1053,7 +1115,10 @@ func (c *Controller) keepAliveSSHServer(handle *upstreamHandle, now time.Time) e
lease, err := c.auth.UpsertNode(c.closeContext, handle.sshServer.resource)
if err != nil {
c.testEvent(sshUpsertRetryErr)
log.Warnf("Failed to upsert ssh server %q on retry: %v.", handle.Hello().ServerID, err)
slog.WarnContext(c.closeContext, "Failed to upsert ssh server on retry",
"server_id", handle.Hello().ServerID,
"error", err,
)
// since this is retry-specific logic, an error here means that upsert failed twice in
// a row. Missing upserts is more problematic than missing keepalives so we don'resource bother
// attempting a third time.
Expand Down
14 changes: 7 additions & 7 deletions lib/inventory/inventory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"context"
"errors"
"io"
"log/slog"
"sync"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
log "github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
Expand Down Expand Up @@ -163,7 +163,7 @@ func (h *downstreamHandle) autoEmitMetadata() {
md, err := h.metadataGetter(h.CloseContext())
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Warnf("Failed to get agent metadata: %v", err)
slog.WarnContext(h.CloseContext(), "Failed to get agent metadata", "error", err)
}
return
}
Expand All @@ -188,7 +188,7 @@ func (h *downstreamHandle) autoEmitMetadata() {

// Send metadata.
if err := sender.Send(h.CloseContext(), msg); err != nil && !errors.Is(err, context.Canceled) {
log.Warnf("Failed to send agent metadata: %v", err)
slog.WarnContext(h.CloseContext(), "Failed to send agent metadata", "error", err)
}

// Block for the duration of the stream.
Expand All @@ -209,7 +209,7 @@ func (h *downstreamHandle) run(fn DownstreamCreateFunc, hello proto.UpstreamInve
return
}

log.Debugf("Re-attempt control stream acquisition in ~%s.", retry.Duration())
slog.DebugContext(h.closeContext, "Re-attempt control stream acquisition", "backoff", retry.Duration())
select {
case <-retry.After():
retry.Inc()
Expand All @@ -223,14 +223,14 @@ func (h *downstreamHandle) tryRun(fn DownstreamCreateFunc, hello proto.UpstreamI
stream, err := fn(h.CloseContext())
if err != nil {
if !h.closing() {
log.Warnf("Failed to create inventory control stream: %v.", err)
slog.WarnContext(h.CloseContext(), "Failed to create inventory control stream", "error", err)
}
return
}

if err := h.handleStream(stream, hello); err != nil {
if !h.closing() {
log.Warnf("Inventory control stream failed: %v", err)
slog.WarnContext(h.CloseContext(), "Inventory control stream failed", "error", err)
}
return
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func (h *downstreamHandle) handlePing(sender DownstreamSender, msg proto.Downstr
h.mu.Lock()
defer h.mu.Unlock()
if len(h.pingHandlers) == 0 {
log.Warnf("Got ping with no handlers registered (id=%d).", msg.ID)
slog.WarnContext(h.closeContext, "Got ping with no handlers registered", "ping_id", msg.ID)
return
}
for _, handler := range h.pingHandlers {
Expand Down
9 changes: 4 additions & 5 deletions lib/inventory/metadata/metadata_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
package metadata

import (
"runtime"

log "github.com/sirupsen/logrus"
"context"
"log/slog"
)

// fetchOSVersion returns "" if not on linux and not on darwin.
func (c *fetchConfig) fetchOSVersion() string {
log.Warningf("fetchOSVersion is not implemented for %s", runtime.GOOS)
slog.WarnContext(context.Background(), "fetchOSVersion is not implemented")
return ""
}

// fetchGlibcVersion returns "" if not on linux and not on darwin.
func (c *fetchConfig) fetchGlibcVersion() string {
log.Warningf("fetchGlibcVersion is not implemented for %s", runtime.GOOS)
slog.WarnContext(context.Background(), "fetchGlibcVersion is not implemented")
return ""
}

0 comments on commit b8ffebd

Please sign in to comment.