Skip to content

Commit

Permalink
chore: fix all lint issues and add lint GHA
Browse files Browse the repository at this point in the history
  • Loading branch information
leoparente committed Dec 18, 2024
1 parent 7051b69 commit 92380e3
Show file tree
Hide file tree
Showing 26 changed files with 240 additions and 243 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Orb Agent - lint
on:
push:
branches:
- "!release"
pull_request:

permissions:
contents: read

jobs:
golangci:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.23'
check-latest: true
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.62
working-directory: .
args: --config .github/golangci.yaml
skip-pkg-cache: true
skip-build-cache: true
55 changes: 22 additions & 33 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
_ "github.com/mattn/go-sqlite3"
"github.com/mitchellh/mapstructure"
"github.com/orb-community/orb/fleet"
"go.uber.org/zap"
Expand All @@ -20,8 +19,9 @@ import (
"github.com/netboxlabs/orb-agent/agent/version"
)

var ErrMqttConnection = errors.New("failed to connect to a broker")
const routineKey config.ContextKey = "routine"

// Agent is the interface that all agents must implement
type Agent interface {
Start(ctx context.Context, cancelFunc context.CancelFunc) error
Stop(ctx context.Context)
Expand All @@ -33,7 +33,7 @@ type orbAgent struct {
logger *zap.Logger
config config.Config
client mqtt.Client
agent_id string
agentID string
backends map[string]backend.Backend
backendState map[string]*backend.State
backendsCommon config.BackendCommons
Expand All @@ -47,40 +47,29 @@ type orbAgent struct {
heartbeatCancel context.CancelFunc

// Agent RPC channel, configured from command line
baseTopic string
rpcToCoreTopic string
rpcFromCoreTopic string
capabilitiesTopic string
heartbeatsTopic string
logTopic string
baseTopic string
rpcFromCoreTopic string
heartbeatsTopic string

// Retry Mechanism to ensure the Request is received
groupRequestTicker *time.Ticker
groupRequestSucceeded context.CancelFunc
policyRequestTicker *time.Ticker
policyRequestSucceeded context.CancelFunc

// AgentGroup channels sent from core
groupsInfos map[string]GroupInfo
groupsInfos map[string]groupInfo

policyManager manager.PolicyManager
configManager config.ConfigManager
configManager config.Manager
}

const (
retryRequestDuration = time.Second
retryRequestFixedTime = 15
retryDurationIncrPerAttempts = 10
retryMaxAttempts = 4
)

type GroupInfo struct {
type groupInfo struct {
Name string
ChannelID string
}

var _ Agent = (*orbAgent)(nil)

// New creates a new agent
func New(logger *zap.Logger, c config.Config) (Agent, error) {
pm, err := manager.New(logger, c)
if err != nil {
Expand All @@ -93,7 +82,7 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) {
}
cm := config.New(logger, c.OrbAgent.ConfigManager)

return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]GroupInfo)}, nil
return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]groupInfo)}, nil
}

func (a *orbAgent) managePolicies() error {
Expand Down Expand Up @@ -146,7 +135,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err))
return err
}
backendCtx := context.WithValue(agentCtx, "routine", name)
backendCtx := context.WithValue(agentCtx, routineKey, name)
backendCtx = a.configManager.GetContext(backendCtx)
a.backends[name] = be
initialState := be.GetInitialState()
Expand Down Expand Up @@ -176,12 +165,12 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
defer func(t time.Time) {
a.logger.Debug("Startup of agent execution duration", zap.String("Start() execution duration", time.Since(t).String()))
}(startTime)
agentCtx := context.WithValue(ctx, "routine", "agentRoutine")
asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, "routine", "asyncParent"))
agentCtx := context.WithValue(ctx, routineKey, "agentRoutine")
asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, routineKey, "asyncParent"))
a.asyncContext = asyncCtx
a.rpcFromCancelFunc = cancelAllAsync
a.cancelFunction = cancelFunc
a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value("routine")))
a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value(routineKey)))
mqtt.CRITICAL = &agentLoggerCritical{a: a}
mqtt.ERROR = &agentLoggerError{a: a}

Expand Down Expand Up @@ -211,7 +200,7 @@ func (a *orbAgent) logonWithHeartbeat() {
}

func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value("routine")))
a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value(routineKey)))
if a.heartbeatCtx != nil {
a.heartbeatCancel()
}
Expand All @@ -223,7 +212,7 @@ func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
}

func (a *orbAgent) Stop(ctx context.Context) {
a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value("routine")))
a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value(routineKey)))
if a.rpcFromCancelFunc != nil {
a.rpcFromCancelFunc()
}
Expand Down Expand Up @@ -256,7 +245,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin

be := a.backends[name]
a.logger.Info("restarting backend", zap.String("backend", name), zap.String("reason", reason))
a.backendState[name].RestartCount += 1
a.backendState[name].RestartCount++
a.backendState[name].LastRestartTS = time.Now()
a.backendState[name].LastRestartReason = reason
a.logger.Info("removing policies", zap.String("backend", name))
Expand All @@ -272,7 +261,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin
a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err)
a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err))
}
be.SetCommsClient(a.agent_id, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))
be.SetCommsClient(a.agentID, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))

return nil
}
Expand All @@ -294,7 +283,7 @@ func (a *orbAgent) RestartAll(ctx context.Context, reason string) error {
}

func (a *orbAgent) extendContext(routine string) (context.Context, context.CancelFunc) {
uuidTraceId := uuid.NewString()
a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceId))
return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, "routine", routine), "trace-id", uuidTraceId))
uuidTraceID := uuid.NewString()
a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceID))
return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, routineKey, routine), config.ContextKey("trace-id"), uuidTraceID))
}
17 changes: 8 additions & 9 deletions agent/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/netboxlabs/orb-agent/agent/policies"
)

// Running Status types
const (
Unknown RunningStatus = iota
Running
Expand All @@ -20,6 +21,7 @@ const (
Waiting
)

// RunningStatus is the status of the backend
type RunningStatus int

var runningStatusMap = [...]string{
Expand All @@ -31,15 +33,7 @@ var runningStatusMap = [...]string{
"waiting",
}

var runningStatusRevMap = map[string]RunningStatus{
"unknown": Unknown,
"running": Running,
"backend_error": BackendError,
"agent_error": AgentError,
"offline": Offline,
"waiting": Waiting,
}

// State represents the state of the backend
type State struct {
Status RunningStatus
RestartCount int64
Expand All @@ -52,6 +46,7 @@ func (s RunningStatus) String() string {
return runningStatusMap[s]
}

// Backend is the interface that all backends must implement
type Backend interface {
Configure(*zap.Logger, policies.PolicyRepo, map[string]interface{}, config.BackendCommons) error
SetCommsClient(string, *mqtt.Client, string)
Expand All @@ -71,10 +66,12 @@ type Backend interface {

var registry = make(map[string]Backend)

// Register registers backend
func Register(name string, b Backend) {
registry[name] = b
}

// GetList returns list of registered backends
func GetList() []string {
keys := make([]string, 0, len(registry))
for k := range registry {
Expand All @@ -83,11 +80,13 @@ func GetList() []string {
return keys
}

// HaveBackend checks if backend is registered
func HaveBackend(name string) bool {
_, prs := registry[name]
return prs
}

// GetBackend returns a registered backend
func GetBackend(name string) Backend {
return registry[name]
}
44 changes: 22 additions & 22 deletions agent/backend/devicediscovery/device_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ import (
var _ backend.Backend = (*deviceDiscoveryBackend)(nil)

const (
VersionTimeout = 2
CapabilitiesTimeout = 5
ReadinessBackoff = 10
ReadinessTimeout = 10
ApplyPolicyTimeout = 10
RemovePolicyTimeout = 20
DefaultExec = "device-discovery"
DefaultAPIHost = "localhost"
DefaultAPIPort = "8072"
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessTimeout = 10
applyPolicyTimeout = 10
removePolicyTimeout = 20
defaultExec = "device-discovery"
defaultAPIHost = "localhost"
defaultAPIPort = "8072"
)

type deviceDiscoveryBackend struct {
logger *zap.Logger
policyRepo policies.PolicyRepo
exec string
configFile string

apiHost string
apiPort string
Expand All @@ -57,15 +56,16 @@ type deviceDiscoveryBackend struct {
otlpMetricsTopic string
}

type Info struct {
type info struct {
Version string `json:"version"`
UpTimeMin float64 `json:"up_time_min"`
}

// Register registers the backend
func Register() bool {
backend.Register("device_discovery", &deviceDiscoveryBackend{
apiProtocol: "http",
exec: DefaultExec,
exec: defaultExec,
})
return true
}
Expand All @@ -76,10 +76,10 @@ func (d *deviceDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Pol

var prs bool
if d.apiHost, prs = config["host"].(string); !prs {
d.apiHost = DefaultAPIHost
d.apiHost = defaultAPIHost
}
if d.apiPort, prs = config["port"].(string); !prs {
d.apiPort = DefaultAPIPort
d.apiPort = defaultAPIPort
}

d.diodeTarget = common.Diode.Target
Expand All @@ -96,8 +96,8 @@ func (d *deviceDiscoveryBackend) SetCommsClient(agentID string, client *mqtt.Cli
}

func (d *deviceDiscoveryBackend) Version() (string, error) {
var info Info
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", VersionTimeout)
var info info
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C
d.logger.Info("device-discovery process started", zap.Int("pid", status.PID))

var readinessErr error
for backoff := 0; backoff < ReadinessBackoff; backoff++ {
for backoff := 0; backoff < readinessBackoff; backoff++ {
version, readinessErr := d.Version()
if readinessErr == nil {
d.logger.Info("device-discovery readiness ok, got version ", zap.String("device_discovery_version", version))
Expand All @@ -196,7 +196,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C
}

func (d *deviceDiscoveryBackend) Stop(ctx context.Context) error {
d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value("routine")))
d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value(config.ContextKey("routine"))))
defer d.cancelFunc()
err := d.proc.Stop()
finalStatus := <-d.statusChan
Expand All @@ -216,7 +216,7 @@ func (d *deviceDiscoveryBackend) FullReset(ctx context.Context) error {
}
}
// for each policy, restart the scraper
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "device-discovery"))
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "device-discovery"))
// start it
if err := d.Start(backendCtx, cancelFunc); err != nil {
d.logger.Error("failed to start backend on restart procedure", zap.Error(err))
Expand All @@ -231,7 +231,7 @@ func (d *deviceDiscoveryBackend) GetStartTime() time.Time {

func (d *deviceDiscoveryBackend) GetCapabilities() (map[string]interface{}, error) {
caps := make(map[string]interface{})
err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", CapabilitiesTimeout)
err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", capabilitiesTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (d *deviceDiscoveryBackend) ApplyPolicy(data policies.PolicyData, updatePol
}

var resp map[string]interface{}
err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", ApplyPolicyTimeout)
err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout)
if err != nil {
d.logger.Warn("yaml policy application failure", zap.String("policy_id", data.ID), zap.ByteString("policy", policyYaml))
return err
Expand All @@ -299,7 +299,7 @@ func (d *deviceDiscoveryBackend) RemovePolicy(data policies.PolicyData) error {
} else {
name = data.Name
}
err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout)
err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", removePolicyTimeout)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 92380e3

Please sign in to comment.