Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix all lint issues and add lint GHA #34

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Orb Agent - lint
on:
push:
branches:
- "!release"
pull_request:

permissions:
contents: read

jobs:
golangci:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Setup Go
uses: actions/setup-go@v4
with:
go-version: '1.23'
check-latest: true
- name: Lint
uses: golangci/golangci-lint-action@v3
with:
version: v1.62
working-directory: .
args: --config .github/golangci.yaml
skip-pkg-cache: true
skip-build-cache: true
55 changes: 22 additions & 33 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
_ "github.com/mattn/go-sqlite3"
"github.com/mitchellh/mapstructure"
"github.com/orb-community/orb/fleet"
"go.uber.org/zap"
Expand All @@ -20,8 +19,9 @@ import (
"github.com/netboxlabs/orb-agent/agent/version"
)

var ErrMqttConnection = errors.New("failed to connect to a broker")
const routineKey config.ContextKey = "routine"

// Agent is the interface that all agents must implement
type Agent interface {
Start(ctx context.Context, cancelFunc context.CancelFunc) error
Stop(ctx context.Context)
Expand All @@ -33,7 +33,7 @@ type orbAgent struct {
logger *zap.Logger
config config.Config
client mqtt.Client
agent_id string
agentID string
backends map[string]backend.Backend
backendState map[string]*backend.State
backendsCommon config.BackendCommons
Expand All @@ -47,40 +47,29 @@ type orbAgent struct {
heartbeatCancel context.CancelFunc

// Agent RPC channel, configured from command line
baseTopic string
rpcToCoreTopic string
rpcFromCoreTopic string
capabilitiesTopic string
heartbeatsTopic string
logTopic string
baseTopic string
rpcFromCoreTopic string
heartbeatsTopic string

// Retry Mechanism to ensure the Request is received
groupRequestTicker *time.Ticker
groupRequestSucceeded context.CancelFunc
policyRequestTicker *time.Ticker
policyRequestSucceeded context.CancelFunc

// AgentGroup channels sent from core
groupsInfos map[string]GroupInfo
groupsInfos map[string]groupInfo

policyManager manager.PolicyManager
configManager config.ConfigManager
configManager config.Manager
}

const (
retryRequestDuration = time.Second
retryRequestFixedTime = 15
retryDurationIncrPerAttempts = 10
retryMaxAttempts = 4
)

type GroupInfo struct {
type groupInfo struct {
Name string
ChannelID string
}

var _ Agent = (*orbAgent)(nil)

// New creates a new agent
func New(logger *zap.Logger, c config.Config) (Agent, error) {
pm, err := manager.New(logger, c)
if err != nil {
Expand All @@ -93,7 +82,7 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) {
}
cm := config.New(logger, c.OrbAgent.ConfigManager)

return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]GroupInfo)}, nil
return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]groupInfo)}, nil
}

func (a *orbAgent) managePolicies() error {
Expand Down Expand Up @@ -146,7 +135,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error {
a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err))
return err
}
backendCtx := context.WithValue(agentCtx, "routine", name)
backendCtx := context.WithValue(agentCtx, routineKey, name)
backendCtx = a.configManager.GetContext(backendCtx)
a.backends[name] = be
initialState := be.GetInitialState()
Expand Down Expand Up @@ -176,12 +165,12 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
defer func(t time.Time) {
a.logger.Debug("Startup of agent execution duration", zap.String("Start() execution duration", time.Since(t).String()))
}(startTime)
agentCtx := context.WithValue(ctx, "routine", "agentRoutine")
asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, "routine", "asyncParent"))
agentCtx := context.WithValue(ctx, routineKey, "agentRoutine")
asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, routineKey, "asyncParent"))
a.asyncContext = asyncCtx
a.rpcFromCancelFunc = cancelAllAsync
a.cancelFunction = cancelFunc
a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value("routine")))
a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value(routineKey)))
mqtt.CRITICAL = &agentLoggerCritical{a: a}
mqtt.ERROR = &agentLoggerError{a: a}

Expand Down Expand Up @@ -211,7 +200,7 @@ func (a *orbAgent) logonWithHeartbeat() {
}

func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value("routine")))
a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value(routineKey)))
if a.heartbeatCtx != nil {
a.heartbeatCancel()
}
Expand All @@ -223,7 +212,7 @@ func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) {
}

func (a *orbAgent) Stop(ctx context.Context) {
a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value("routine")))
a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value(routineKey)))
if a.rpcFromCancelFunc != nil {
a.rpcFromCancelFunc()
}
Expand Down Expand Up @@ -256,7 +245,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin

be := a.backends[name]
a.logger.Info("restarting backend", zap.String("backend", name), zap.String("reason", reason))
a.backendState[name].RestartCount += 1
a.backendState[name].RestartCount++
a.backendState[name].LastRestartTS = time.Now()
a.backendState[name].LastRestartReason = reason
a.logger.Info("removing policies", zap.String("backend", name))
Expand All @@ -272,7 +261,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin
a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err)
a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err))
}
be.SetCommsClient(a.agent_id, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))
be.SetCommsClient(a.agentID, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name))

return nil
}
Expand All @@ -294,7 +283,7 @@ func (a *orbAgent) RestartAll(ctx context.Context, reason string) error {
}

func (a *orbAgent) extendContext(routine string) (context.Context, context.CancelFunc) {
uuidTraceId := uuid.NewString()
a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceId))
return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, "routine", routine), "trace-id", uuidTraceId))
uuidTraceID := uuid.NewString()
a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceID))
return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, routineKey, routine), config.ContextKey("trace-id"), uuidTraceID))
}
17 changes: 8 additions & 9 deletions agent/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/netboxlabs/orb-agent/agent/policies"
)

// Running Status types
const (
Unknown RunningStatus = iota
Running
Expand All @@ -20,6 +21,7 @@ const (
Waiting
)

// RunningStatus is the status of the backend
type RunningStatus int

var runningStatusMap = [...]string{
Expand All @@ -31,15 +33,7 @@ var runningStatusMap = [...]string{
"waiting",
}

var runningStatusRevMap = map[string]RunningStatus{
"unknown": Unknown,
"running": Running,
"backend_error": BackendError,
"agent_error": AgentError,
"offline": Offline,
"waiting": Waiting,
}

// State represents the state of the backend
type State struct {
Status RunningStatus
RestartCount int64
Expand All @@ -52,6 +46,7 @@ func (s RunningStatus) String() string {
return runningStatusMap[s]
}

// Backend is the interface that all backends must implement
type Backend interface {
Configure(*zap.Logger, policies.PolicyRepo, map[string]interface{}, config.BackendCommons) error
SetCommsClient(string, *mqtt.Client, string)
Expand All @@ -71,10 +66,12 @@ type Backend interface {

var registry = make(map[string]Backend)

// Register registers backend
func Register(name string, b Backend) {
registry[name] = b
}

// GetList returns list of registered backends
func GetList() []string {
keys := make([]string, 0, len(registry))
for k := range registry {
Expand All @@ -83,11 +80,13 @@ func GetList() []string {
return keys
}

// HaveBackend checks if backend is registered
func HaveBackend(name string) bool {
_, prs := registry[name]
return prs
}

// GetBackend returns a registered backend
func GetBackend(name string) Backend {
return registry[name]
}
44 changes: 22 additions & 22 deletions agent/backend/devicediscovery/device_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ import (
var _ backend.Backend = (*deviceDiscoveryBackend)(nil)

const (
VersionTimeout = 2
CapabilitiesTimeout = 5
ReadinessBackoff = 10
ReadinessTimeout = 10
ApplyPolicyTimeout = 10
RemovePolicyTimeout = 20
DefaultExec = "device-discovery"
DefaultAPIHost = "localhost"
DefaultAPIPort = "8072"
versionTimeout = 2
capabilitiesTimeout = 5
readinessBackoff = 10
readinessTimeout = 10
applyPolicyTimeout = 10
removePolicyTimeout = 20
defaultExec = "device-discovery"
defaultAPIHost = "localhost"
defaultAPIPort = "8072"
)

type deviceDiscoveryBackend struct {
logger *zap.Logger
policyRepo policies.PolicyRepo
exec string
configFile string

apiHost string
apiPort string
Expand All @@ -57,15 +56,16 @@ type deviceDiscoveryBackend struct {
otlpMetricsTopic string
}

type Info struct {
type info struct {
Version string `json:"version"`
UpTimeMin float64 `json:"up_time_min"`
}

// Register registers the backend
func Register() bool {
backend.Register("device_discovery", &deviceDiscoveryBackend{
apiProtocol: "http",
exec: DefaultExec,
exec: defaultExec,
})
return true
}
Expand All @@ -76,10 +76,10 @@ func (d *deviceDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Pol

var prs bool
if d.apiHost, prs = config["host"].(string); !prs {
d.apiHost = DefaultAPIHost
d.apiHost = defaultAPIHost
}
if d.apiPort, prs = config["port"].(string); !prs {
d.apiPort = DefaultAPIPort
d.apiPort = defaultAPIPort
}

d.diodeTarget = common.Diode.Target
Expand All @@ -96,8 +96,8 @@ func (d *deviceDiscoveryBackend) SetCommsClient(agentID string, client *mqtt.Cli
}

func (d *deviceDiscoveryBackend) Version() (string, error) {
var info Info
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", VersionTimeout)
var info info
err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C
d.logger.Info("device-discovery process started", zap.Int("pid", status.PID))

var readinessErr error
for backoff := 0; backoff < ReadinessBackoff; backoff++ {
for backoff := 0; backoff < readinessBackoff; backoff++ {
version, readinessErr := d.Version()
if readinessErr == nil {
d.logger.Info("device-discovery readiness ok, got version ", zap.String("device_discovery_version", version))
Expand All @@ -196,7 +196,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C
}

func (d *deviceDiscoveryBackend) Stop(ctx context.Context) error {
d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value("routine")))
d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value(config.ContextKey("routine"))))
defer d.cancelFunc()
err := d.proc.Stop()
finalStatus := <-d.statusChan
Expand All @@ -216,7 +216,7 @@ func (d *deviceDiscoveryBackend) FullReset(ctx context.Context) error {
}
}
// for each policy, restart the scraper
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "device-discovery"))
backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "device-discovery"))
// start it
if err := d.Start(backendCtx, cancelFunc); err != nil {
d.logger.Error("failed to start backend on restart procedure", zap.Error(err))
Expand All @@ -231,7 +231,7 @@ func (d *deviceDiscoveryBackend) GetStartTime() time.Time {

func (d *deviceDiscoveryBackend) GetCapabilities() (map[string]interface{}, error) {
caps := make(map[string]interface{})
err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", CapabilitiesTimeout)
err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", capabilitiesTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func (d *deviceDiscoveryBackend) ApplyPolicy(data policies.PolicyData, updatePol
}

var resp map[string]interface{}
err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", ApplyPolicyTimeout)
err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout)
if err != nil {
d.logger.Warn("yaml policy application failure", zap.String("policy_id", data.ID), zap.ByteString("policy", policyYaml))
return err
Expand All @@ -299,7 +299,7 @@ func (d *deviceDiscoveryBackend) RemovePolicy(data policies.PolicyData) error {
} else {
name = data.Name
}
err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout)
err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", removePolicyTimeout)
if err != nil {
return err
}
Expand Down
Loading
Loading