diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..39cd49a --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,29 @@ +name: Orb Agent - lint +on: + push: + branches: + - "!release" + pull_request: + +permissions: + contents: read + +jobs: + golangci: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: '1.23' + check-latest: true + - name: Lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.62 + working-directory: . + args: --config .github/golangci.yaml + skip-pkg-cache: true + skip-build-cache: true diff --git a/agent/agent.go b/agent/agent.go index 2ec7b49..b1e97c9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -9,7 +9,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/google/uuid" - _ "github.com/mattn/go-sqlite3" "github.com/mitchellh/mapstructure" "github.com/orb-community/orb/fleet" "go.uber.org/zap" @@ -20,8 +19,9 @@ import ( "github.com/netboxlabs/orb-agent/agent/version" ) -var ErrMqttConnection = errors.New("failed to connect to a broker") +const routineKey config.ContextKey = "routine" +// Agent is the interface that all agents must implement type Agent interface { Start(ctx context.Context, cancelFunc context.CancelFunc) error Stop(ctx context.Context) @@ -33,7 +33,7 @@ type orbAgent struct { logger *zap.Logger config config.Config client mqtt.Client - agent_id string + agentID string backends map[string]backend.Backend backendState map[string]*backend.State backendsCommon config.BackendCommons @@ -47,40 +47,29 @@ type orbAgent struct { heartbeatCancel context.CancelFunc // Agent RPC channel, configured from command line - baseTopic string - rpcToCoreTopic string - rpcFromCoreTopic string - capabilitiesTopic string - heartbeatsTopic string - logTopic string + baseTopic string + rpcFromCoreTopic string + heartbeatsTopic string // Retry Mechanism to ensure the Request is received - groupRequestTicker *time.Ticker groupRequestSucceeded context.CancelFunc - policyRequestTicker *time.Ticker policyRequestSucceeded context.CancelFunc // AgentGroup channels sent from core - groupsInfos map[string]GroupInfo + groupsInfos map[string]groupInfo policyManager manager.PolicyManager - configManager config.ConfigManager + configManager config.Manager } -const ( - retryRequestDuration = time.Second - retryRequestFixedTime = 15 - retryDurationIncrPerAttempts = 10 - retryMaxAttempts = 4 -) - -type GroupInfo struct { +type groupInfo struct { Name string ChannelID string } var _ Agent = (*orbAgent)(nil) +// New creates a new agent func New(logger *zap.Logger, c config.Config) (Agent, error) { pm, err := manager.New(logger, c) if err != nil { @@ -93,7 +82,7 @@ func New(logger *zap.Logger, c config.Config) (Agent, error) { } cm := config.New(logger, c.OrbAgent.ConfigManager) - return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]GroupInfo)}, nil + return &orbAgent{logger: logger, config: c, policyManager: pm, configManager: cm, groupsInfos: make(map[string]groupInfo)}, nil } func (a *orbAgent) managePolicies() error { @@ -146,7 +135,7 @@ func (a *orbAgent) startBackends(agentCtx context.Context) error { a.logger.Info("failed to configure backend", zap.String("backend", name), zap.Error(err)) return err } - backendCtx := context.WithValue(agentCtx, "routine", name) + backendCtx := context.WithValue(agentCtx, routineKey, name) backendCtx = a.configManager.GetContext(backendCtx) a.backends[name] = be initialState := be.GetInitialState() @@ -176,12 +165,12 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err defer func(t time.Time) { a.logger.Debug("Startup of agent execution duration", zap.String("Start() execution duration", time.Since(t).String())) }(startTime) - agentCtx := context.WithValue(ctx, "routine", "agentRoutine") - asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, "routine", "asyncParent")) + agentCtx := context.WithValue(ctx, routineKey, "agentRoutine") + asyncCtx, cancelAllAsync := context.WithCancel(context.WithValue(ctx, routineKey, "asyncParent")) a.asyncContext = asyncCtx a.rpcFromCancelFunc = cancelAllAsync a.cancelFunction = cancelFunc - a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value("routine"))) + a.logger.Info("agent started", zap.String("version", version.GetBuildVersion()), zap.Any("routine", agentCtx.Value(routineKey))) mqtt.CRITICAL = &agentLoggerCritical{a: a} mqtt.ERROR = &agentLoggerError{a: a} @@ -211,7 +200,7 @@ func (a *orbAgent) logonWithHeartbeat() { } func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) { - a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value("routine"))) + a.logger.Debug("stopping heartbeat, going offline status", zap.Any("routine", ctx.Value(routineKey))) if a.heartbeatCtx != nil { a.heartbeatCancel() } @@ -223,7 +212,7 @@ func (a *orbAgent) logoffWithHeartbeat(ctx context.Context) { } func (a *orbAgent) Stop(ctx context.Context) { - a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value("routine"))) + a.logger.Info("routine call for stop agent", zap.Any("routine", ctx.Value(routineKey))) if a.rpcFromCancelFunc != nil { a.rpcFromCancelFunc() } @@ -256,7 +245,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin be := a.backends[name] a.logger.Info("restarting backend", zap.String("backend", name), zap.String("reason", reason)) - a.backendState[name].RestartCount += 1 + a.backendState[name].RestartCount++ a.backendState[name].LastRestartTS = time.Now() a.backendState[name].LastRestartReason = reason a.logger.Info("removing policies", zap.String("backend", name)) @@ -272,7 +261,7 @@ func (a *orbAgent) RestartBackend(ctx context.Context, name string, reason strin a.backendState[name].LastError = fmt.Sprintf("failed to reset backend: %v", err) a.logger.Error("failed to reset backend", zap.String("backend", name), zap.Error(err)) } - be.SetCommsClient(a.agent_id, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name)) + be.SetCommsClient(a.agentID, &a.client, fmt.Sprintf("%s/?/%s", a.baseTopic, name)) return nil } @@ -294,7 +283,7 @@ func (a *orbAgent) RestartAll(ctx context.Context, reason string) error { } func (a *orbAgent) extendContext(routine string) (context.Context, context.CancelFunc) { - uuidTraceId := uuid.NewString() - a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceId)) - return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, "routine", routine), "trace-id", uuidTraceId)) + uuidTraceID := uuid.NewString() + a.logger.Debug("creating context for receiving message", zap.String("routine", routine), zap.String("trace-id", uuidTraceID)) + return context.WithCancel(context.WithValue(context.WithValue(a.asyncContext, routineKey, routine), config.ContextKey("trace-id"), uuidTraceID)) } diff --git a/agent/backend/backend.go b/agent/backend/backend.go index e3cafb2..c85c277 100644 --- a/agent/backend/backend.go +++ b/agent/backend/backend.go @@ -11,6 +11,7 @@ import ( "github.com/netboxlabs/orb-agent/agent/policies" ) +// Running Status types const ( Unknown RunningStatus = iota Running @@ -20,6 +21,7 @@ const ( Waiting ) +// RunningStatus is the status of the backend type RunningStatus int var runningStatusMap = [...]string{ @@ -31,15 +33,7 @@ var runningStatusMap = [...]string{ "waiting", } -var runningStatusRevMap = map[string]RunningStatus{ - "unknown": Unknown, - "running": Running, - "backend_error": BackendError, - "agent_error": AgentError, - "offline": Offline, - "waiting": Waiting, -} - +// State represents the state of the backend type State struct { Status RunningStatus RestartCount int64 @@ -52,6 +46,7 @@ func (s RunningStatus) String() string { return runningStatusMap[s] } +// Backend is the interface that all backends must implement type Backend interface { Configure(*zap.Logger, policies.PolicyRepo, map[string]interface{}, config.BackendCommons) error SetCommsClient(string, *mqtt.Client, string) @@ -71,10 +66,12 @@ type Backend interface { var registry = make(map[string]Backend) +// Register registers backend func Register(name string, b Backend) { registry[name] = b } +// GetList returns list of registered backends func GetList() []string { keys := make([]string, 0, len(registry)) for k := range registry { @@ -83,11 +80,13 @@ func GetList() []string { return keys } +// HaveBackend checks if backend is registered func HaveBackend(name string) bool { _, prs := registry[name] return prs } +// GetBackend returns a registered backend func GetBackend(name string) Backend { return registry[name] } diff --git a/agent/backend/devicediscovery/device_discovery.go b/agent/backend/devicediscovery/device_discovery.go index 1b8b4b7..2d4cc53 100644 --- a/agent/backend/devicediscovery/device_discovery.go +++ b/agent/backend/devicediscovery/device_discovery.go @@ -22,22 +22,21 @@ import ( var _ backend.Backend = (*deviceDiscoveryBackend)(nil) const ( - VersionTimeout = 2 - CapabilitiesTimeout = 5 - ReadinessBackoff = 10 - ReadinessTimeout = 10 - ApplyPolicyTimeout = 10 - RemovePolicyTimeout = 20 - DefaultExec = "device-discovery" - DefaultAPIHost = "localhost" - DefaultAPIPort = "8072" + versionTimeout = 2 + capabilitiesTimeout = 5 + readinessBackoff = 10 + readinessTimeout = 10 + applyPolicyTimeout = 10 + removePolicyTimeout = 20 + defaultExec = "device-discovery" + defaultAPIHost = "localhost" + defaultAPIPort = "8072" ) type deviceDiscoveryBackend struct { logger *zap.Logger policyRepo policies.PolicyRepo exec string - configFile string apiHost string apiPort string @@ -57,15 +56,16 @@ type deviceDiscoveryBackend struct { otlpMetricsTopic string } -type Info struct { +type info struct { Version string `json:"version"` UpTimeMin float64 `json:"up_time_min"` } +// Register registers the backend func Register() bool { backend.Register("device_discovery", &deviceDiscoveryBackend{ apiProtocol: "http", - exec: DefaultExec, + exec: defaultExec, }) return true } @@ -76,10 +76,10 @@ func (d *deviceDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Pol var prs bool if d.apiHost, prs = config["host"].(string); !prs { - d.apiHost = DefaultAPIHost + d.apiHost = defaultAPIHost } if d.apiPort, prs = config["port"].(string); !prs { - d.apiPort = DefaultAPIPort + d.apiPort = defaultAPIPort } d.diodeTarget = common.Diode.Target @@ -96,8 +96,8 @@ func (d *deviceDiscoveryBackend) SetCommsClient(agentID string, client *mqtt.Cli } func (d *deviceDiscoveryBackend) Version() (string, error) { - var info Info - err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", VersionTimeout) + var info info + err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout) if err != nil { return "", err } @@ -172,7 +172,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C d.logger.Info("device-discovery process started", zap.Int("pid", status.PID)) var readinessErr error - for backoff := 0; backoff < ReadinessBackoff; backoff++ { + for backoff := 0; backoff < readinessBackoff; backoff++ { version, readinessErr := d.Version() if readinessErr == nil { d.logger.Info("device-discovery readiness ok, got version ", zap.String("device_discovery_version", version)) @@ -196,7 +196,7 @@ func (d *deviceDiscoveryBackend) Start(ctx context.Context, cancelFunc context.C } func (d *deviceDiscoveryBackend) Stop(ctx context.Context) error { - d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value("routine"))) + d.logger.Info("routine call to stop device-discovery", zap.Any("routine", ctx.Value(config.ContextKey("routine")))) defer d.cancelFunc() err := d.proc.Stop() finalStatus := <-d.statusChan @@ -216,7 +216,7 @@ func (d *deviceDiscoveryBackend) FullReset(ctx context.Context) error { } } // for each policy, restart the scraper - backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "device-discovery")) + backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "device-discovery")) // start it if err := d.Start(backendCtx, cancelFunc); err != nil { d.logger.Error("failed to start backend on restart procedure", zap.Error(err)) @@ -231,7 +231,7 @@ func (d *deviceDiscoveryBackend) GetStartTime() time.Time { func (d *deviceDiscoveryBackend) GetCapabilities() (map[string]interface{}, error) { caps := make(map[string]interface{}) - err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", CapabilitiesTimeout) + err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", capabilitiesTimeout) if err != nil { return nil, err } @@ -280,7 +280,7 @@ func (d *deviceDiscoveryBackend) ApplyPolicy(data policies.PolicyData, updatePol } var resp map[string]interface{} - err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", ApplyPolicyTimeout) + err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout) if err != nil { d.logger.Warn("yaml policy application failure", zap.String("policy_id", data.ID), zap.ByteString("policy", policyYaml)) return err @@ -299,7 +299,7 @@ func (d *deviceDiscoveryBackend) RemovePolicy(data policies.PolicyData) error { } else { name = data.Name } - err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout) + err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", removePolicyTimeout) if err != nil { return err } diff --git a/agent/backend/devicediscovery/utils.go b/agent/backend/devicediscovery/utils.go index 7c41a21..d778167 100644 --- a/agent/backend/devicediscovery/utils.go +++ b/agent/backend/devicediscovery/utils.go @@ -2,7 +2,6 @@ package devicediscovery import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -63,19 +62,25 @@ func (d *deviceDiscoveryBackend) request(url string, payload interface{}, method return getErr } + defer func() { + if err := res.Body.Close(); err != nil { + d.logger.Error("failed to close response body", zap.Error(err)) + } + }() + if (res.StatusCode < 200) || (res.StatusCode > 299) { body, err := io.ReadAll(res.Body) if err != nil { - return errors.New(fmt.Sprintf("non 2xx HTTP error code from device-discovery, no or invalid body: %d", res.StatusCode)) + return fmt.Errorf("non 2xx HTTP error code from device-discovery, no or invalid body: %d", res.StatusCode) } if len(body) == 0 { - return errors.New(fmt.Sprintf("%d empty body", res.StatusCode)) + return fmt.Errorf("%d empty body", res.StatusCode) } else if body[0] == '{' { var jsonBody map[string]interface{} err := json.Unmarshal(body, &jsonBody) if err == nil { if errMsg, ok := jsonBody["error"]; ok { - return errors.New(fmt.Sprintf("%d %s", res.StatusCode, errMsg)) + return fmt.Errorf("%d %s", res.StatusCode, errMsg) } } } diff --git a/agent/backend/devicediscovery/vars.go b/agent/backend/devicediscovery/vars.go index a9426c0..a8261d3 100644 --- a/agent/backend/devicediscovery/vars.go +++ b/agent/backend/devicediscovery/vars.go @@ -4,7 +4,8 @@ import ( "github.com/spf13/viper" ) +// RegisterBackendSpecificVariables registers the backend specific variables for the device discovery backend func RegisterBackendSpecificVariables(v *viper.Viper) { - v.SetDefault("orb.backends.device_discovery.host", DefaultAPIHost) - v.SetDefault("orb.backends.device_discovery.port", DefaultAPIPort) + v.SetDefault("orb.backends.device_discovery.host", defaultAPIHost) + v.SetDefault("orb.backends.device_discovery.port", defaultAPIPort) } diff --git a/agent/backend/networkdiscovery/network_discovery.go b/agent/backend/networkdiscovery/network_discovery.go index 2542d70..e0e004e 100644 --- a/agent/backend/networkdiscovery/network_discovery.go +++ b/agent/backend/networkdiscovery/network_discovery.go @@ -22,22 +22,21 @@ import ( var _ backend.Backend = (*networkDiscoveryBackend)(nil) const ( - VersionTimeout = 2 - CapabilitiesTimeout = 5 - ReadinessBackoff = 10 - ReadinessTimeout = 10 - ApplyPolicyTimeout = 10 - RemovePolicyTimeout = 20 - DefaultExec = "network-discovery" - DefaultAPIHost = "localhost" - DefaultAPIPort = "8073" + versionTimeout = 2 + capabilitiesTimeout = 5 + readinessBackoff = 10 + readinessTimeout = 10 + applyPolicyTimeout = 10 + removePolicyTimeout = 20 + defaultExec = "network-discovery" + defaultAPIHost = "localhost" + defaultAPIPort = "8073" ) type networkDiscoveryBackend struct { logger *zap.Logger policyRepo policies.PolicyRepo exec string - configFile string apiHost string apiPort string @@ -57,15 +56,16 @@ type networkDiscoveryBackend struct { otlpMetricsTopic string } -type Info struct { +type info struct { Version string `json:"version"` UpTimeMin float64 `json:"up_time_seconds"` } +// Register registers the network discovery backend func Register() bool { backend.Register("network_discovery", &networkDiscoveryBackend{ apiProtocol: "http", - exec: DefaultExec, + exec: defaultExec, }) return true } @@ -76,10 +76,10 @@ func (d *networkDiscoveryBackend) Configure(logger *zap.Logger, repo policies.Po var prs bool if d.apiHost, prs = config["host"].(string); !prs { - d.apiHost = DefaultAPIHost + d.apiHost = defaultAPIHost } if d.apiPort, prs = config["port"].(string); !prs { - d.apiPort = DefaultAPIPort + d.apiPort = defaultAPIPort } d.diodeTarget = common.Diode.Target @@ -96,8 +96,8 @@ func (d *networkDiscoveryBackend) SetCommsClient(agentID string, client *mqtt.Cl } func (d *networkDiscoveryBackend) Version() (string, error) { - var info Info - err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", VersionTimeout) + var info info + err := d.request("status", &info, http.MethodGet, http.NoBody, "application/json", versionTimeout) if err != nil { return "", err } @@ -172,7 +172,7 @@ func (d *networkDiscoveryBackend) Start(ctx context.Context, cancelFunc context. d.logger.Info("network-discovery process started", zap.Int("pid", status.PID)) var readinessErr error - for backoff := 0; backoff < ReadinessBackoff; backoff++ { + for backoff := 0; backoff < readinessBackoff; backoff++ { version, readinessErr := d.Version() if readinessErr == nil { d.logger.Info("network-discovery readiness ok, got version ", zap.String("network_discovery_version", version)) @@ -196,7 +196,7 @@ func (d *networkDiscoveryBackend) Start(ctx context.Context, cancelFunc context. } func (d *networkDiscoveryBackend) Stop(ctx context.Context) error { - d.logger.Info("routine call to stop network-discovery", zap.Any("routine", ctx.Value("routine"))) + d.logger.Info("routine call to stop network-discovery", zap.Any("routine", ctx.Value(config.ContextKey("routine")))) defer d.cancelFunc() err := d.proc.Stop() finalStatus := <-d.statusChan @@ -216,7 +216,7 @@ func (d *networkDiscoveryBackend) FullReset(ctx context.Context) error { } } // for each policy, restart the scraper - backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "network-discovery")) + backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "network-discovery")) // start it if err := d.Start(backendCtx, cancelFunc); err != nil { d.logger.Error("failed to start backend on restart procedure", zap.Error(err)) @@ -231,7 +231,7 @@ func (d *networkDiscoveryBackend) GetStartTime() time.Time { func (d *networkDiscoveryBackend) GetCapabilities() (map[string]interface{}, error) { caps := make(map[string]interface{}) - err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", CapabilitiesTimeout) + err := d.request("capabilities", &caps, http.MethodGet, http.NoBody, "application/json", capabilitiesTimeout) if err != nil { return nil, err } @@ -280,7 +280,7 @@ func (d *networkDiscoveryBackend) ApplyPolicy(data policies.PolicyData, updatePo } var resp map[string]interface{} - err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", ApplyPolicyTimeout) + err = d.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout) if err != nil { d.logger.Warn("yaml policy application failure", zap.String("policy_id", data.ID), zap.ByteString("policy", policyYaml)) return err @@ -297,7 +297,7 @@ func (d *networkDiscoveryBackend) RemovePolicy(data policies.PolicyData) error { if data.PreviousPolicyData != nil && data.PreviousPolicyData.Name != data.Name { name = data.PreviousPolicyData.Name } - err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout) + err := d.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", removePolicyTimeout) if err != nil { return err } diff --git a/agent/backend/networkdiscovery/utils.go b/agent/backend/networkdiscovery/utils.go index 2d0d40b..6be0435 100644 --- a/agent/backend/networkdiscovery/utils.go +++ b/agent/backend/networkdiscovery/utils.go @@ -2,7 +2,6 @@ package networkdiscovery import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -63,19 +62,25 @@ func (d *networkDiscoveryBackend) request(url string, payload interface{}, metho return getErr } + defer func() { + if err := res.Body.Close(); err != nil { + d.logger.Error("failed to close response body", zap.Error(err)) + } + }() + if (res.StatusCode < 200) || (res.StatusCode > 299) { body, err := io.ReadAll(res.Body) if err != nil { - return errors.New(fmt.Sprintf("non 2xx HTTP error code from network-discovery, no or invalid body: %d", res.StatusCode)) + return fmt.Errorf("non 2xx HTTP error code from network-discovery, no or invalid body: %d", res.StatusCode) } if len(body) == 0 { - return errors.New(fmt.Sprintf("%d empty body", res.StatusCode)) + return fmt.Errorf("%d empty body", res.StatusCode) } else if body[0] == '{' { var jsonBody map[string]interface{} err := json.Unmarshal(body, &jsonBody) if err == nil { if errMsg, ok := jsonBody["error"]; ok { - return errors.New(fmt.Sprintf("%d %s", res.StatusCode, errMsg)) + return fmt.Errorf("%d %s", res.StatusCode, errMsg) } } } diff --git a/agent/backend/networkdiscovery/vars.go b/agent/backend/networkdiscovery/vars.go index c11c0fe..405c736 100644 --- a/agent/backend/networkdiscovery/vars.go +++ b/agent/backend/networkdiscovery/vars.go @@ -4,7 +4,8 @@ import ( "github.com/spf13/viper" ) +// RegisterBackendSpecificVariables registers the backend specific variables for the network discovery backend func RegisterBackendSpecificVariables(v *viper.Viper) { - v.SetDefault("orb.backends.network_discovery.host", DefaultAPIHost) - v.SetDefault("orb.backends.network_discovery.port", DefaultAPIPort) + v.SetDefault("orb.backends.network_discovery.host", defaultAPIHost) + v.SetDefault("orb.backends.network_discovery.port", defaultAPIPort) } diff --git a/agent/backend/otel/exporter_builder.go b/agent/backend/otel/exporter_builder.go index a1f49b7..60991dc 100644 --- a/agent/backend/otel/exporter_builder.go +++ b/agent/backend/otel/exporter_builder.go @@ -7,6 +7,7 @@ import ( "gopkg.in/yaml.v3" ) +// ExporterBuilder is an interface that defines the methods to build an exporter type ExporterBuilder interface { GetStructFromYaml(yamlString string) (openTelemetryConfig, error) MergeDefaultValueWithPolicy(config openTelemetryConfig, policyName string) (openTelemetryConfig, error) @@ -22,7 +23,7 @@ type openTelemetryConfig struct { type defaultOtlpExporter struct { Endpoint string `yaml:"endpoint"` - Tls *tls `yaml:"tls"` + TLS *tls `yaml:"tls"` } type tls struct { @@ -84,11 +85,11 @@ func (e *exporterBuilder) GetStructFromYaml(yamlString string) (openTelemetryCon return config, nil } -func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig, policyId string, policyName string) (openTelemetryConfig, error) { +func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig, policyID string, policyName string) (openTelemetryConfig, error) { endpoint := e.host + ":" + strconv.Itoa(e.port) defaultOtlpExporter := defaultOtlpExporter{ Endpoint: endpoint, - Tls: &tls{ + TLS: &tls{ Insecure: true, }, } @@ -104,7 +105,7 @@ func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig "metric_statements": map[string]interface{}{ "context": "scope", "statements": []string{ - `set(attributes["policy_id"], "` + policyId + `")`, + `set(attributes["policy_id"], "` + policyID + `")`, `set(attributes["policy_name"], "` + policyName + `")`, }, }, @@ -131,25 +132,3 @@ func (e *exporterBuilder) MergeDefaultValueWithPolicy(config openTelemetryConfig } return config, nil } - -func (o *openTelemetryBackend) buildDefaultExporterAndProcessor(policyYaml string, policyId string, policyName string, telemetryPort int) (openTelemetryConfig, error) { - defaultPolicyYaml, err := yaml.Marshal(policyYaml) - if err != nil { - o.logger.Warn("yaml policy marshal failure", zap.String("policy_id", policyId)) - return openTelemetryConfig{}, err - } - defaultPolicyString := string(defaultPolicyYaml) - builder := getExporterBuilder(o.logger, o.otelReceiverHost, o.otelReceiverPort) - defaultPolicyStruct, err := builder.GetStructFromYaml(defaultPolicyString) - if err != nil { - return openTelemetryConfig{}, err - } - defaultPolicyStruct, err = builder.MergeDefaultValueWithPolicy( - defaultPolicyStruct, - policyId, - policyName) - if err != nil { - return openTelemetryConfig{}, err - } - return defaultPolicyStruct, nil -} diff --git a/agent/backend/otel/exporter_builder_test.go b/agent/backend/otel/exporter_builder_test.go index 835e21e..cbcdd50 100644 --- a/agent/backend/otel/exporter_builder_test.go +++ b/agent/backend/otel/exporter_builder_test.go @@ -10,7 +10,7 @@ func TestBuildDefaultPolicy(t *testing.T) { testCases := []struct { caseName string inputString string - policyId string + policyID string policyName string expectedStruct openTelemetryConfig processedString string @@ -44,7 +44,7 @@ service: receivers: - httpcheck `, - policyId: "test-policy-id", + policyID: "test-policy-id", policyName: "test-policy", }, } @@ -56,7 +56,7 @@ service: if err != nil { t.Errorf("failed to merge default value with policy: %v", err) } - expectedStruct, err := exporterBuilder.MergeDefaultValueWithPolicy(gotOtelConfig, testCase.policyId, testCase.policyName) + expectedStruct, err := exporterBuilder.MergeDefaultValueWithPolicy(gotOtelConfig, testCase.policyID, testCase.policyName) if err != nil { t.Errorf("failed to merge default value with policy: %v", err) } diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index d996aca..7d2f740 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -2,7 +2,6 @@ package otel import ( "context" - _ "embed" "fmt" "os" "os/exec" @@ -11,8 +10,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/go-cmd/cmd" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "github.com/netboxlabs/orb-agent/agent/backend" @@ -23,9 +20,9 @@ import ( var _ backend.Backend = (*openTelemetryBackend)(nil) const ( - DefaultPath = "otelcol-contrib" - DefaultHost = "localhost" - DefaultPort = 4317 + defaultPath = "otelcol-contrib" + defaultHost = "localhost" + defaultPort = 4317 ) type openTelemetryBackend struct { @@ -42,8 +39,6 @@ type openTelemetryBackend struct { runningCollectors map[string]runningPolicy mainCancelFunction context.CancelFunc - // MQTT Config for OTEL MQTT Exporter - mqttConfig config.MQTTConfig mqttClient *mqtt.Client otlpMetricsTopic string @@ -55,13 +50,6 @@ type openTelemetryBackend struct { otelReceiverHost string otelReceiverPort int otelExecutablePath string - - metricsReceiver receiver.Metrics - metricsExporter exporter.Metrics - tracesReceiver receiver.Traces - tracesExporter exporter.Traces - logsReceiver receiver.Logs - logsExporter exporter.Logs } // Configure initializes the backend with the given configuration @@ -74,10 +62,14 @@ func (o *openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Polic var err error o.otelReceiverTaps = []string{"otelcol-contrib", "receivers", "processors", "extensions"} o.policyConfigDirectory, err = os.MkdirTemp("", "otel-policies") + if err != nil { + o.logger.Error("failed to create temporary directory for policy configs", zap.Error(err)) + return err + } if path, ok := config["binary"].(string); ok { o.otelExecutablePath = path } else { - o.otelExecutablePath = DefaultPath + o.otelExecutablePath = defaultPath } _, err = exec.LookPath(o.otelExecutablePath) if err != nil { @@ -94,15 +86,15 @@ func (o *openTelemetryBackend) Configure(logger *zap.Logger, repo policies.Polic o.otelReceiverPort, err = strconv.Atoi(otelPort.(string)) if err != nil { o.logger.Error("failed to parse otlp port using default", zap.Error(err)) - o.otelReceiverPort = DefaultPort + o.otelReceiverPort = defaultPort } } else { - o.otelReceiverPort = DefaultPort + o.otelReceiverPort = defaultPort } if otelHost, ok := config["otlp_host"].(string); ok { o.otelReceiverHost = otelHost } else { - o.otelReceiverHost = DefaultHost + o.otelReceiverHost = defaultHost } return nil @@ -124,6 +116,7 @@ func (o *openTelemetryBackend) Version() (string, error) { case finalStatus := <-status: if finalStatus.Error != nil { o.logger.Error("error during call of otelcol-contrib version", zap.Error(finalStatus.Error)) + cancel() return "", finalStatus.Error } else { output := finalStatus.Stdout @@ -186,13 +179,14 @@ func (o *openTelemetryBackend) Stop(_ context.Context) error { func (o *openTelemetryBackend) FullReset(ctx context.Context) error { o.logger.Info("restarting otel backend", zap.Int("running policies", len(o.runningCollectors))) - backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "otel")) + backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "otel")) if err := o.Start(backendCtx, cancelFunc); err != nil { return err } return nil } +// Register registers otel backend func Register() bool { backend.Register("otel", &openTelemetryBackend{}) return true diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index c177e3c..de8f4a1 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -11,18 +11,18 @@ import ( "golang.org/x/exp/slices" "gopkg.in/yaml.v3" + "github.com/netboxlabs/orb-agent/agent/config" "github.com/netboxlabs/orb-agent/agent/policies" ) const tempFileNamePattern = "otel-%s-config.yml" type runningPolicy struct { - ctx context.Context - cancel context.CancelFunc - policyId string - telemetryPort int - policyData policies.PolicyData - statusChan *cmd.Status + ctx context.Context + cancel context.CancelFunc + policyID string + policyData policies.PolicyData + statusChan *cmd.Status } func (o *openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { @@ -95,13 +95,13 @@ func (o *openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, up } func (o *openTelemetryBackend) addRunner(policyData policies.PolicyData, policyFilePath string) error { - policyContext, policyCancel := context.WithCancel(context.WithValue(o.mainContext, "policy_id", policyData.ID)) + policyContext, policyCancel := context.WithCancel(context.WithValue(o.mainContext, config.ContextKey("policy_id"), policyData.ID)) command := cmd.NewCmdOptions(cmd.Options{Buffered: false, Streaming: true}, o.otelExecutablePath, "--config", policyFilePath) go func(ctx context.Context, logger *zap.Logger) { status := command.Start() o.logger.Info("starting otel policy", zap.String("policy_id", policyData.ID), zap.Any("status", command.Status()), zap.Int("process id", command.Status().PID)) - for command.Status().Complete == false { + for !command.Status().Complete { select { case v := <-ctx.Done(): err := command.Stop() @@ -127,7 +127,7 @@ func (o *openTelemetryBackend) addRunner(policyData policies.PolicyData, policyF policyEntry := runningPolicy{ ctx: policyContext, cancel: policyCancel, - policyId: policyData.ID, + policyID: policyData.ID, policyData: policyData, statusChan: &status, } diff --git a/agent/backend/otel/vars.go b/agent/backend/otel/vars.go index bf7401f..5611485 100644 --- a/agent/backend/otel/vars.go +++ b/agent/backend/otel/vars.go @@ -2,6 +2,7 @@ package otel import "github.com/spf13/viper" +// RegisterBackendSpecificVariables registers the backend specific variables for the otel backend func RegisterBackendSpecificVariables(v *viper.Viper) { v.SetDefault("orb.backends.otel.otlp_port", "4316") } diff --git a/agent/backend/pktvisor/pktvisor.go b/agent/backend/pktvisor/pktvisor.go index e671161..38b68a6 100644 --- a/agent/backend/pktvisor/pktvisor.go +++ b/agent/backend/pktvisor/pktvisor.go @@ -13,8 +13,6 @@ import ( mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/go-cmd/cmd" - "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" "github.com/netboxlabs/orb-agent/agent/backend" @@ -25,17 +23,17 @@ import ( var _ backend.Backend = (*pktvisorBackend)(nil) const ( - DefaultBinary = "pktvisord" - ReadinessBackoff = 10 - ReadinessTimeout = 10 - ApplyPolicyTimeout = 10 - RemovePolicyTimeout = 20 - VersionTimeout = 2 - ScrapeTimeout = 5 - TapsTimeout = 5 - DefaultConfigPath = "/opt/orb/agent.yaml" - DefaultAPIHost = "localhost" - DefaultAPIPort = "10853" + defaultBinary = "pktvisord" + readinessBackoff = 10 + readinessTimeout = 10 + applyPolicyTimeout = 10 + removePolicyTimeout = 20 + versionTimeout = 2 + scrapeTimeout = 5 + tapsTimeout = 5 + defaultConfigPath = "/opt/orb/agent.yaml" + defaultAPIHost = "localhost" + defaultAPIPort = "10853" ) // AppInfo represents server application information @@ -57,9 +55,6 @@ type pktvisorBackend struct { cancelFunc context.CancelFunc ctx context.Context - // MQTT Config for OTEL MQTT Exporter - mqttConfig config.MQTTConfig - mqttClient *mqtt.Client metricsTopic string otlpMetricsTopic string @@ -75,8 +70,6 @@ type pktvisorBackend struct { // OpenTelemetry management otelReceiverHost string otelReceiverPort int - receiver receiver.Metrics - exporter exporter.Metrics } func (p *pktvisorBackend) getFreePort() (int, error) { @@ -88,7 +81,11 @@ func (p *pktvisorBackend) getFreePort() (int, error) { if err != nil { return 0, err } - defer l.Close() + defer func() { + if err := l.Close(); err != nil { + p.logger.Error("failed to close socket", zap.Error(err)) + } + }() return l.Addr().(*net.TCPAddr).Port, nil } @@ -234,9 +231,9 @@ func (p *pktvisorBackend) Start(ctx context.Context, cancelFunc context.CancelFu p.logger.Info("pktvisor process started", zap.Int("pid", status.PID)) var readinessError error - for backoff := 0; backoff < ReadinessBackoff; backoff++ { + for backoff := 0; backoff < readinessBackoff; backoff++ { var appMetrics AppInfo - readinessError = p.request("metrics/app", &appMetrics, http.MethodGet, http.NoBody, "application/json", ReadinessTimeout) + readinessError = p.request("metrics/app", &appMetrics, http.MethodGet, http.NoBody, "application/json", readinessTimeout) if readinessError == nil { p.logger.Info("pktvisor readiness ok, got version ", zap.String("pktvisor_version", appMetrics.App.Version)) break @@ -259,7 +256,7 @@ func (p *pktvisorBackend) Start(ctx context.Context, cancelFunc context.CancelFu } func (p *pktvisorBackend) Stop(ctx context.Context) error { - p.logger.Info("routine call to stop pktvisor", zap.Any("routine", ctx.Value("routine"))) + p.logger.Info("routine call to stop pktvisor", zap.Any("routine", ctx.Value(config.ContextKey("routine")))) defer p.cancelFunc() err := p.proc.Stop() finalStatus := <-p.statusChan @@ -278,16 +275,16 @@ func (p *pktvisorBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo var prs bool if p.binary, prs = config["binary"].(string); !prs { - p.binary = DefaultBinary + p.binary = defaultBinary } if p.configFile, prs = config["config_file"].(string); !prs { - p.configFile = DefaultConfigPath + p.configFile = defaultConfigPath } if p.adminAPIHost, prs = config["api_host"].(string); !prs { - p.adminAPIHost = DefaultAPIHost + p.adminAPIHost = defaultAPIHost } if p.adminAPIPort, prs = config["api_port"].(string); !prs { - p.adminAPIPort = DefaultAPIPort + p.adminAPIPort = defaultAPIPort } p.agentTags = common.Otel.AgentTags @@ -308,7 +305,7 @@ func (p *pktvisorBackend) Configure(logger *zap.Logger, repo policies.PolicyRepo func (p *pktvisorBackend) GetCapabilities() (map[string]interface{}, error) { var taps interface{} - err := p.request("taps", &taps, http.MethodGet, http.NoBody, "application/json", TapsTimeout) + err := p.request("taps", &taps, http.MethodGet, http.NoBody, "application/json", tapsTimeout) if err != nil { return nil, err } @@ -327,7 +324,7 @@ func (p *pktvisorBackend) FullReset(ctx context.Context) error { } // for each policy, restart the scraper - backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, "routine", "pktvisor")) + backendCtx, cancelFunc := context.WithCancel(context.WithValue(ctx, config.ContextKey("routine"), "pktvisor")) // start it if err := p.Start(backendCtx, cancelFunc); err != nil { @@ -338,6 +335,7 @@ func (p *pktvisorBackend) FullReset(ctx context.Context) error { return nil } +// Register registers pktvisor backend func Register() bool { backend.Register("pktvisor", &pktvisorBackend{ adminAPIProtocol: "http", diff --git a/agent/backend/pktvisor/policy.go b/agent/backend/pktvisor/policy.go index c670b7c..58c203b 100644 --- a/agent/backend/pktvisor/policy.go +++ b/agent/backend/pktvisor/policy.go @@ -37,7 +37,7 @@ func (p *pktvisorBackend) ApplyPolicy(data policies.PolicyData, updatePolicy boo } var resp map[string]interface{} - err = p.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", ApplyPolicyTimeout) + err = p.request("policies", &resp, http.MethodPost, bytes.NewBuffer(policyYaml), "application/x-yaml", applyPolicyTimeout) if err != nil { p.logger.Warn("yaml policy application failure", zap.String("policy_id", data.ID), zap.ByteString("policy", policyYaml)) return err @@ -56,7 +56,7 @@ func (p *pktvisorBackend) RemovePolicy(data policies.PolicyData) error { } else { name = data.Name } - err := p.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout) + err := p.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", removePolicyTimeout) if err != nil { return err } diff --git a/agent/backend/pktvisor/utils.go b/agent/backend/pktvisor/utils.go index 391141b..fd62633 100644 --- a/agent/backend/pktvisor/utils.go +++ b/agent/backend/pktvisor/utils.go @@ -2,7 +2,6 @@ package pktvisor import ( "encoding/json" - "errors" "fmt" "io" "net/http" @@ -41,19 +40,25 @@ func (p *pktvisorBackend) request(url string, payload interface{}, method string return getErr } + defer func() { + if err := res.Body.Close(); err != nil { + p.logger.Error("failed to close response body", zap.Error(err)) + } + }() + if (res.StatusCode < 200) || (res.StatusCode > 299) { body, err := io.ReadAll(res.Body) if err != nil { - return errors.New(fmt.Sprintf("non 2xx HTTP error code from pktvisord, no or invalid body: %d", res.StatusCode)) + return fmt.Errorf("non 2xx HTTP error code from pktvisord, no or invalid body: %d", res.StatusCode) } if len(body) == 0 { - return errors.New(fmt.Sprintf("%d empty body", res.StatusCode)) + return fmt.Errorf("%d empty body", res.StatusCode) } else if body[0] == '{' { var jsonBody map[string]interface{} err := json.Unmarshal(body, &jsonBody) if err == nil { if errMsg, ok := jsonBody["error"]; ok { - return errors.New(fmt.Sprintf("%d %s", res.StatusCode, errMsg)) + return fmt.Errorf("%d %s", res.StatusCode, errMsg) } } } @@ -93,6 +98,6 @@ func (p *pktvisorBackend) getProcRunningStatus() (backend.RunningStatus, string, // also used for HTTP REST API readiness check func (p *pktvisorBackend) getAppInfo() (AppInfo, error) { var appInfo AppInfo - err := p.request("metrics/app", &appInfo, http.MethodGet, http.NoBody, "application/json", VersionTimeout) + err := p.request("metrics/app", &appInfo, http.MethodGet, http.NoBody, "application/json", versionTimeout) return appInfo, err } diff --git a/agent/backend/pktvisor/vars.go b/agent/backend/pktvisor/vars.go index c98ead6..df3db3d 100644 --- a/agent/backend/pktvisor/vars.go +++ b/agent/backend/pktvisor/vars.go @@ -4,6 +4,7 @@ import ( "github.com/spf13/viper" ) +// RegisterBackendSpecificVariables registers the backend specific variables for the pktvisor backend func RegisterBackendSpecificVariables(v *viper.Viper) { v.SetDefault("orb.backends.pktvisor.binary", "/usr/local/sbin/pktvisord") v.SetDefault("orb.backends.pktvisor.config_file", "/opt/orb/agent.yaml") diff --git a/agent/config/cloud.go b/agent/config/cloud.go index bf64885..894f0a5 100644 --- a/agent/config/cloud.go +++ b/agent/config/cloud.go @@ -19,7 +19,7 @@ import ( "go.uber.org/zap" ) -var _ ConfigManager = (*cloudConfigManager)(nil) +var _ Manager = (*cloudConfigManager)(nil) type cloudConfigManager struct { logger *zap.Logger @@ -150,7 +150,7 @@ func (cc *cloudConfigManager) autoProvision(apiAddress string, token string) (MQ } return MQTTConfig{ - Id: result.ID, + ID: result.ID, Key: result.Key, ChannelID: result.ChannelID, }, nil @@ -168,13 +168,13 @@ func (cc *cloudConfigManager) GetConfig() (MQTTConfig, error) { // this may change in the future mqtt := cc.config.MQTT - if len(mqtt.Id) > 0 && len(mqtt.Key) > 0 && len(mqtt.ChannelID) > 0 { + if len(mqtt.ID) > 0 && len(mqtt.Key) > 0 && len(mqtt.ChannelID) > 0 { cc.logger.Info("using explicitly specified cloud configuration", zap.String("address", mqtt.Address), - zap.String("id", mqtt.Id)) + zap.String("id", mqtt.ID)) return MQTTConfig{ Address: mqtt.Address, - Id: mqtt.Id, + ID: mqtt.ID, Key: mqtt.Key, ChannelID: mqtt.ChannelID, }, nil @@ -193,7 +193,7 @@ func (cc *cloudConfigManager) GetConfig() (MQTTConfig, error) { // see if we have an existing auto provisioned configuration saved locally q := `SELECT id, key, channel FROM cloud_config ORDER BY ts_created DESC LIMIT 1` dba := MQTTConfig{} - if err := cc.db.QueryRowx(q).Scan(&dba.Id, &dba.Key, &dba.ChannelID); err != nil { + if err := cc.db.QueryRowx(q).Scan(&dba.ID, &dba.Key, &dba.ChannelID); err != nil { if err != sql.ErrNoRows { return MQTTConfig{}, err } @@ -202,7 +202,7 @@ func (cc *cloudConfigManager) GetConfig() (MQTTConfig, error) { dba.Address = mqtt.Address cc.logger.Info("using previous auto provisioned cloud configuration loaded from local storage", zap.String("address", mqtt.Address), - zap.String("id", dba.Id)) + zap.String("id", dba.ID)) return dba, nil } @@ -219,17 +219,17 @@ func (cc *cloudConfigManager) GetConfig() (MQTTConfig, error) { result.Address = mqtt.Address cc.logger.Info("using auto provisioned cloud configuration", zap.String("address", mqtt.Address), - zap.String("id", result.Id)) + zap.String("id", result.ID)) result.Connect = true return result, nil } func (cc *cloudConfigManager) GetContext(ctx context.Context) context.Context { - if cc.config.MQTT.Id != "" { - ctx = context.WithValue(ctx, "agent_id", cc.config.MQTT.Id) + if cc.config.MQTT.ID != "" { + ctx = context.WithValue(ctx, ContextKey("agent_id"), cc.config.MQTT.ID) } else { - ctx = context.WithValue(ctx, "agent_id", "auto-provisioning-without-id") + ctx = context.WithValue(ctx, ContextKey("agent_id"), "auto-provisioning-without-id") } return ctx } diff --git a/agent/config/local.go b/agent/config/local.go index af8f5ab..8f0b2d7 100644 --- a/agent/config/local.go +++ b/agent/config/local.go @@ -6,7 +6,7 @@ import ( "go.uber.org/zap" ) -var _ ConfigManager = (*localConfigManager)(nil) +var _ Manager = (*localConfigManager)(nil) type localConfigManager struct { logger *zap.Logger diff --git a/agent/config/manager.go b/agent/config/manager.go index 6448540..4989835 100644 --- a/agent/config/manager.go +++ b/agent/config/manager.go @@ -6,14 +6,14 @@ import ( "go.uber.org/zap" ) -// ConfigManager is the interface for configuration manager -type ConfigManager interface { +// Manager is the interface for configuration manager +type Manager interface { GetConfig() (MQTTConfig, error) GetContext(ctx context.Context) context.Context } // New creates a new instance of ConfigManager based on the configuration -func New(logger *zap.Logger, c ManagerConfig) ConfigManager { +func New(logger *zap.Logger, c ManagerConfig) Manager { switch c.Active { case "local": return &localConfigManager{logger: logger, config: c.Backends.Local} diff --git a/agent/config/types.go b/agent/config/types.go index 762f1ee..87a10a0 100644 --- a/agent/config/types.go +++ b/agent/config/types.go @@ -1,5 +1,8 @@ package config +// ContextKey represents the key for the context +type ContextKey string + // APIConfig represents the configuration for the API connection type APIConfig struct { Address string `mapstructure:"address"` @@ -10,7 +13,7 @@ type APIConfig struct { type MQTTConfig struct { Connect bool `mapstructure:"connect"` Address string `mapstructure:"address"` - Id string `mapstructure:"id"` + ID string `mapstructure:"id"` Key string `mapstructure:"key"` ChannelID string `mapstructure:"channel_id"` } diff --git a/agent/heartbeats.go b/agent/heartbeats.go index d76fea1..a055dff 100644 --- a/agent/heartbeats.go +++ b/agent/heartbeats.go @@ -16,8 +16,8 @@ import ( // HeartbeatFreq how often to heartbeat const HeartbeatFreq = 50 * time.Second -// RestartTimeMin minimum time to wait between restarts -const RestartTimeMin = 5 * time.Minute +// RestartTime minimum time to wait between restarts +const RestartTime = 5 * time.Minute func (a *orbAgent) sendSingleHeartbeat(ctx context.Context, t time.Time, agentsState fleet.State) { if a.heartbeatsTopic == "" { @@ -46,7 +46,7 @@ func (a *orbAgent) sendSingleHeartbeat(ctx context.Context, t time.Time, agentsS } // status is not running so we have a current error besi.Error = a.backendState[name].LastError - if time.Now().Sub(be.GetStartTime()) >= RestartTimeMin { + if time.Since(be.GetStartTime()) >= RestartTime { a.logger.Info("attempting backend restart due to failed status during heartbeat") ctx = a.configManager.GetContext(ctx) err := a.RestartBackend(ctx, name, "failed during heartbeat") @@ -54,7 +54,7 @@ func (a *orbAgent) sendSingleHeartbeat(ctx context.Context, t time.Time, agentsS a.logger.Error("failed to restart backend", zap.Error(err), zap.String("backend", name)) } } else { - a.logger.Info("waiting to attempt backend restart due to failed status", zap.Duration("remaining_secs", RestartTimeMin-(time.Now().Sub(be.GetStartTime())))) + a.logger.Info("waiting to attempt backend restart due to failed status", zap.Duration("remaining_secs", RestartTime-(time.Since(be.GetStartTime())))) } } else { // status is Running so no current error @@ -133,7 +133,7 @@ func (a *orbAgent) sendSingleHeartbeat(ctx context.Context, t time.Time, agentsS } func (a *orbAgent) sendHeartbeats(ctx context.Context, cancelFunc context.CancelFunc) { - a.logger.Debug("start heartbeats routine", zap.Any("routine", ctx.Value("routine"))) + a.logger.Debug("start heartbeats routine", zap.Any("routine", ctx.Value(routineKey))) a.sendSingleHeartbeat(ctx, time.Now(), fleet.Online) defer func() { cancelFunc() diff --git a/agent/logging.go b/agent/logging.go index 7cebe2c..f751583 100644 --- a/agent/logging.go +++ b/agent/logging.go @@ -29,7 +29,7 @@ func (a *agentLoggerWarn) Println(v ...interface{}) { a.a.logger.Warn("WARN mqtt log", zap.Any("payload", v)) } -func (a *agentLoggerWarn) Printf(format string, v ...interface{}) { +func (a *agentLoggerWarn) Printf(_ string, v ...interface{}) { a.a.logger.Warn("WARN mqtt log", zap.Any("payload", v)) } @@ -37,7 +37,7 @@ func (a *agentLoggerDebug) Println(v ...interface{}) { a.a.logger.Debug("DEBUG mqtt log", zap.Any("payload", v)) } -func (a *agentLoggerDebug) Printf(format string, v ...interface{}) { +func (a *agentLoggerDebug) Printf(_ string, v ...interface{}) { a.a.logger.Debug("DEBUG mqtt log", zap.Any("payload", v)) } @@ -45,7 +45,7 @@ func (a *agentLoggerCritical) Println(v ...interface{}) { a.a.logger.Error("CRITICAL mqtt log", zap.Any("payload", v)) } -func (a *agentLoggerCritical) Printf(format string, v ...interface{}) { +func (a *agentLoggerCritical) Printf(_ string, v ...interface{}) { a.a.logger.Error("CRITICAL mqtt log", zap.Any("payload", v)) } @@ -53,6 +53,6 @@ func (a *agentLoggerError) Println(v ...interface{}) { a.a.logger.Error("ERROR mqtt log", zap.Any("payload", v)) } -func (a *agentLoggerError) Printf(format string, v ...interface{}) { +func (a *agentLoggerError) Printf(_ string, v ...interface{}) { a.a.logger.Error("ERROR mqtt log", zap.Any("payload", v)) } diff --git a/cmd/e2e_agent_test.go b/cmd/e2e_agent_test.go index 71c032c..c835c17 100644 --- a/cmd/e2e_agent_test.go +++ b/cmd/e2e_agent_test.go @@ -16,7 +16,6 @@ import ( "go.uber.org/zap/zapcore" "github.com/netboxlabs/orb-agent/agent" - "github.com/netboxlabs/orb-agent/agent/backend/pktvisor" "github.com/netboxlabs/orb-agent/agent/config" ) @@ -35,7 +34,7 @@ func Test_e2e_orbAgent_ConfigFile(t *testing.T) { } runCmd.Flags().StringSliceVarP(&cfgFiles, "config", "c", []string{}, "Path to config files (may be specified multiple times)") - runCmd.PersistentFlags().BoolVarP(&Debug, "debug", "d", false, "Enable verbose (debug level) output") + runCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "Enable verbose (debug level) output") rootCmd.AddCommand(runCmd) rootCmd.SetArgs([]string{"run", "-d", "-c", "/home/lpegoraro/workspace/orb/localconfig/config.yaml"}) @@ -45,11 +44,8 @@ func Test_e2e_orbAgent_ConfigFile(t *testing.T) { t.Fail() } - select { - case <-ctx.Done(): - cancelF() - return - } + <-ctx.Done() + cancelF() } func Test_main(t *testing.T) { @@ -67,21 +63,10 @@ func Test_main(t *testing.T) { cfg.OrbAgent.Debug.Enable = true - // include pktvisor backend by default if binary is at default location - _, err = os.Stat(pktvisor.DefaultBinary) - if err == nil && cfg.OrbAgent.Backends == nil { - cfg.OrbAgent.Backends = make(map[string]map[string]interface{}) - cfg.OrbAgent.Backends["pktvisor"] = make(map[string]interface{}) - cfg.OrbAgent.Backends["pktvisor"]["binary"] = pktvisor.DefaultBinary - if len(cfgFiles) > 0 { - cfg.OrbAgent.Backends["pktvisor"]["config_file"] = "/home/lpegoraro/workspace/orb/localconfig/config.yaml" - } - } - // logger var logger *zap.Logger atomicLevel := zap.NewAtomicLevel() - if Debug { + if debug { atomicLevel.SetLevel(zap.DebugLevel) } else { atomicLevel.SetLevel(zap.InfoLevel) @@ -107,7 +92,7 @@ func Test_main(t *testing.T) { // handle signals done := make(chan bool, 1) - rootCtx, cancelFunc := context.WithTimeout(context.WithValue(context.Background(), "routine", "mainRoutine"), 15*time.Minute) + rootCtx, cancelFunc := context.WithTimeout(context.WithValue(context.Background(), routineKey, "mainRoutine"), 15*time.Minute) go func() { sigs := make(chan os.Signal, 1) diff --git a/cmd/main.go b/cmd/main.go index 35eca6e..a591cc4 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -23,12 +23,13 @@ import ( ) const ( - defaultConfig = "/opt/orb/agent_default.yaml" + defaultConfig = "/opt/orb/agent_default.yaml" + routineKey config.ContextKey = "routine" ) var ( cfgFiles []string - Debug bool + debug bool ) func init() { @@ -38,11 +39,13 @@ func init() { networkdiscovery.Register() } +// Version prints the version of the agent func Version(_ *cobra.Command, _ []string) { fmt.Printf("orb-agent %s\n", version.GetBuildVersion()) os.Exit(0) } +// Run starts the agent func Run(_ *cobra.Command, _ []string) { initConfig() @@ -57,7 +60,7 @@ func Run(_ *cobra.Command, _ []string) { // logger var logger *zap.Logger atomicLevel := zap.NewAtomicLevel() - if Debug { + if debug { atomicLevel.SetLevel(zap.DebugLevel) } else { atomicLevel.SetLevel(zap.InfoLevel) @@ -74,7 +77,6 @@ func Run(_ *cobra.Command, _ []string) { _ = logger.Sync() }(logger) - _, err = os.Stat(pktvisor.DefaultBinary) logger.Info("backends loaded", zap.Any("backends", configData.OrbAgent.Backends)) configData.OrbAgent.ConfigFile = defaultConfig @@ -91,11 +93,11 @@ func Run(_ *cobra.Command, _ []string) { // handle signals done := make(chan bool, 1) - rootCtx, cancelFunc := context.WithCancel(context.WithValue(context.Background(), "routine", "mainRoutine")) + rootCtx, cancelFunc := context.WithCancel(context.WithValue(context.Background(), routineKey, "mainRoutine")) go func() { sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) select { case <-sigs: logger.Warn("stop signal received stopping agent") @@ -202,7 +204,7 @@ func main() { } runCmd.Flags().StringSliceVarP(&cfgFiles, "config", "c", []string{}, "Path to config files (may be specified multiple times)") - runCmd.PersistentFlags().BoolVarP(&Debug, "debug", "d", false, "Enable verbose (debug level) output") + runCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "Enable verbose (debug level) output") rootCmd.AddCommand(runCmd) rootCmd.AddCommand(versionCmd)