Skip to content

Commit

Permalink
chore: merged from main
Browse files Browse the repository at this point in the history
Signed-off-by: Bruno Bressi <[email protected]>
  • Loading branch information
puffitos committed Jan 16, 2024
2 parents abcfd70 + d2de615 commit 8a102c1
Show file tree
Hide file tree
Showing 26 changed files with 306 additions and 199 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ gen

# Temporary directory
.tmp/*

98 changes: 49 additions & 49 deletions chart/README.md

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ func run() func(cmd *cobra.Command, args []string) error {
log.Info("Running sparrow")
if err = s.Run(ctx); err != nil {
err = fmt.Errorf("error while running sparrow: %w", err)
// by this time all shutdown routines should have been called
// so we can exit here
return err
}

Expand Down
6 changes: 3 additions & 3 deletions internal/helper/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestRetry(t *testing.T) {
if effectorFuncCallCounter > 1 {
return nil
}
return fmt.Errorf("Ups sth wrong")
return fmt.Errorf("ups sth wrong")
},
rc: RetryConfig{
Count: 2,
Expand All @@ -81,7 +81,7 @@ func TestRetry(t *testing.T) {
args: args{
effector: func(ctx context.Context) error {
effectorFuncCallCounter++
return fmt.Errorf("Ups sth wrong")
return fmt.Errorf("ups sth wrong")
},
rc: RetryConfig{
Count: 2,
Expand All @@ -98,7 +98,7 @@ func TestRetry(t *testing.T) {
effector: func(ctx context.Context) error {
effectorFuncCallCounter++
cancel()
return errors.New("Ups")
return errors.New("ups")
},
rc: RetryConfig{
Count: 2,
Expand Down
13 changes: 7 additions & 6 deletions internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,19 @@ func NewLogger(h ...slog.Handler) *slog.Logger {
if len(h) > 0 {
handler = h[0]
} else {
handler = slog.NewJSONHandler(os.Stderr, nil)
handler = slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
AddSource: true,
})
}
return slog.New(handler)
}

// NewContextWithLogger creates a new context based on the provided parent context.
// It embeds a logger into this new context, which is a child of the logger from the parent context.
// The child logger inherits settings from the parent and is grouped under the provided childName.
// It embeds a logger into this new context.
// It also returns a cancel function to cancel the new context.
func NewContextWithLogger(parent context.Context, childName string) (context.Context, context.CancelFunc) {
func NewContextWithLogger(parent context.Context) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(parent)
return IntoContext(ctx, FromContext(parent).WithGroup(childName)), cancel
return IntoContext(ctx, FromContext(parent)), cancel
}

// IntoContext embeds the provided slog.Logger into the given context and returns the modified context.
Expand All @@ -68,7 +69,7 @@ func FromContext(ctx context.Context) *slog.Logger {
return NewLogger()
}

// Take the logger from the context and add it to the request context
// Middleware takes the logger from the context and adds it to the request context
func Middleware(ctx context.Context) func(http.Handler) http.Handler {
log := FromContext(ctx)
return func(next http.Handler) http.Handler {
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/routingtree.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"sync"
)

// creates a simple routing tree, so checks can easily create and remove handlers
// RoutingTree creates a simple routing tree, so checks can easily create and remove handlers
// Maps the method to the path and the handler
type RoutingTree struct {
tree map[string]map[string]http.HandlerFunc
Expand Down
9 changes: 6 additions & 3 deletions pkg/checks/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type healthMetrics struct {

// Run starts the health check
func (h *Health) Run(ctx context.Context) error {
ctx, cancel := logger.NewContextWithLogger(ctx, "health")
ctx, cancel := logger.NewContextWithLogger(ctx)
defer cancel()
log := logger.FromContext(ctx)
log.Info("Starting healthcheck", "interval", h.config.Interval.String())
Expand Down Expand Up @@ -193,7 +193,7 @@ func (h *Health) GetMetricCollectors() []prometheus.Collector {
// check performs a health check using a retry function
// to get the health status for all targets
func (h *Health) check(ctx context.Context) map[string]string {
log := logger.FromContext(ctx).WithGroup("check")
log := logger.FromContext(ctx)
log.Debug("Checking health")
if len(h.config.Targets) == 0 {
log.Debug("No targets defined")
Expand Down Expand Up @@ -259,7 +259,10 @@ func getHealth(ctx context.Context, client *http.Client, url string) error {
return err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
err := Body.Close()
if err != nil {
log.Error("Failed to close response body", "error", err.Error())
}
}(resp.Body)

if resp.StatusCode != http.StatusOK {
Expand Down
6 changes: 3 additions & 3 deletions pkg/checks/latency/latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type latencyMetrics struct {

// Run starts the latency check
func (l *Latency) Run(ctx context.Context) error {
ctx, cancel := logger.NewContextWithLogger(ctx, "latency")
ctx, cancel := logger.NewContextWithLogger(ctx)
defer cancel()
log := logger.FromContext(ctx)
log.Info("Starting latency check", "interval", l.config.Interval.String())
Expand Down Expand Up @@ -114,7 +114,7 @@ func (l *Latency) Run(ctx context.Context) error {
}

func (l *Latency) Startup(ctx context.Context, cResult chan<- specs.Result) error {
log := logger.FromContext(ctx).WithGroup("latency")
log := logger.FromContext(ctx)
log.Debug("Initializing latency check")

l.CResult = cResult
Expand Down Expand Up @@ -211,7 +211,7 @@ func (l *Latency) GetMetricCollectors() []prometheus.Collector {
// check performs a latency check using a retry function
// to get the latency to all targets
func (l *Latency) check(ctx context.Context) map[string]LatencyResult {
log := logger.FromContext(ctx).WithGroup("check")
log := logger.FromContext(ctx)
log.Debug("Checking latency")
if len(l.config.Targets) == 0 {
log.Debug("No targets defined")
Expand Down
2 changes: 1 addition & 1 deletion pkg/checks/oapi/oapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/getkin/kin-openapi/openapi3gen"
)

// Takes in check perfdata and returns an openapi3.SchemaRef of a result wrapping the perfData
// OpenapiFromPerfData takes in check perfdata and returns an openapi3.SchemaRef of a result wrapping the perfData
// this is a workaround, since the openapi3gen.NewSchemaRefForValue function does not work with any types
func OpenapiFromPerfData[T any](data T) (*openapi3.SchemaRef, error) {
checkSchema, err := openapi3gen.NewSchemaRefForValue(specs.Result{}, openapi3.Schemas{})
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package config

import (
"context"
"fmt"
"os"

"gopkg.in/yaml.v3"
Expand All @@ -41,22 +42,23 @@ func NewFileLoader(cfg *Config, cCfgChecks chan<- map[string]any) *FileLoader {
}
}

func (f *FileLoader) Run(ctx context.Context) {
log := logger.FromContext(ctx).WithGroup("FileLoader")
func (f *FileLoader) Run(ctx context.Context) error {
log := logger.FromContext(ctx)
log.Info("Reading config from file", "file", f.path)
// TODO refactor this to use fs.FS
b, err := os.ReadFile(f.path)
if err != nil {
log.Error("Failed to read config file", "path", f.path, "error", err)
panic("failed to read config file " + err.Error())
return fmt.Errorf("failed to read config file: %w", err)
}

var cfg RuntimeConfig

if err := yaml.Unmarshal(b, &cfg); err != nil {
log.Error("Failed to parse config file", "error", err)
panic("failed to parse config file: " + err.Error())
return fmt.Errorf("failed to parse config file: %w", err)
}

f.c <- cfg.Checks
return nil
}
8 changes: 7 additions & 1 deletion pkg/config/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ func TestFileLoader_Run(t *testing.T) {
path: tt.fields.path,
c: tt.fields.c,
}
go f.Run(*tt.args.ctx)
go func() {
err := f.Run(*tt.args.ctx)
if err != nil {
t.Errorf("Expected no error, got %v", err)
return
}
}()
(*tt.args.cancel)()

config := <-tt.fields.c
Expand Down
55 changes: 29 additions & 26 deletions pkg/config/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,54 +51,57 @@ func NewHttpLoader(cfg *Config, cCfgChecks chan<- map[string]any) *HttpLoader {
// The config is will be loaded periodically defined by the
// loader interval configuration. A failed request will be retried defined
// by the retry configuration
func (gl *HttpLoader) Run(ctx context.Context) {
ctx, cancel := logger.NewContextWithLogger(ctx, "httpLoader")
func (hl *HttpLoader) Run(ctx context.Context) error {
ctx, cancel := logger.NewContextWithLogger(ctx)
defer cancel()
log := logger.FromContext(ctx)

var runtimeCfg *RuntimeConfig
for {
getConfigRetry := helper.Retry(func(ctx context.Context) error {
var err error
runtimeCfg, err = gl.GetRuntimeConfig(ctx)
return err
}, gl.cfg.Loader.Http.RetryCfg)

if err := getConfigRetry(ctx); err != nil {
log.Error("Could not get remote runtime configuration", "error", err)
return
}

log.Info("Successfully got remote runtime configuration")
gl.cCfgChecks <- runtimeCfg.Checks

for {
select {
case <-ctx.Done():
return
case <-time.After(gl.cfg.Loader.Interval):
return ctx.Err()
case <-time.After(hl.cfg.Loader.Interval):
getConfigRetry := helper.Retry(func(ctx context.Context) error {
var err error
runtimeCfg, err = hl.GetRuntimeConfig(ctx)
return err
}, hl.cfg.Loader.Http.RetryCfg)

if err := getConfigRetry(ctx); err != nil {
log.Warn("Could not get remote runtime configuration", "error", err)
}

log.Info("Successfully got remote runtime configuration")
hl.cCfgChecks <- runtimeCfg.Checks
}
}
}

// GetRuntimeConfig gets the remote runtime configuration
func (gl *HttpLoader) GetRuntimeConfig(ctx context.Context) (*RuntimeConfig, error) {
log := logger.FromContext(ctx).With("url", gl.cfg.Loader.Http.Url)
func (hl *HttpLoader) GetRuntimeConfig(ctx context.Context) (*RuntimeConfig, error) {
log := logger.FromContext(ctx).With("url", hl.cfg.Loader.Http.Url)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, gl.cfg.Loader.Http.Url, http.NoBody)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, hl.cfg.Loader.Http.Url, http.NoBody)
if err != nil {
log.Error("Could not create http GET request", "error", err.Error())
return nil, err
}
if gl.cfg.Loader.Http.Token != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", gl.cfg.Loader.Http.Token))
if hl.cfg.Loader.Http.Token != "" {
req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", hl.cfg.Loader.Http.Token))
}

res, err := gl.client.Do(req)
res, err := hl.client.Do(req) //nolint:bodyclose
if err != nil {
log.Error("Http get request failed", "error", err.Error())
return nil, err
}
defer res.Body.Close()
defer func(Body io.ReadCloser) {
err = Body.Close()
if err != nil {
log.Error("Failed to close response body", "error", err.Error())
}
}(res.Body)

if res.StatusCode != http.StatusOK {
log.Error("Http get request failed", "status", res.Status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

type Loader interface {
Run(context.Context)
Run(context.Context) error
}

// NewLoader Get a new typed runtime configuration loader
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

// Validate validates the startup config
func (c *Config) Validate(ctx context.Context) error {
ctx, cancel := logger.NewContextWithLogger(ctx, "configValidation")
ctx, cancel := logger.NewContextWithLogger(ctx)
defer cancel()
log := logger.FromContext(ctx)

Expand Down
2 changes: 1 addition & 1 deletion pkg/config/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func TestConfig_Validate(t *testing.T) {
ctx, cancel := logger.NewContextWithLogger(context.Background(), "validation-test")
ctx, cancel := logger.NewContextWithLogger(context.Background())
defer cancel()

type fields struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *InMemory) Get(check string) (specs.Result, bool) {
return *result, true
}

// Returns a copy of the map
// List returns a copy of the map
func (i *InMemory) List() map[string]specs.Result {
results := make(map[string]specs.Result)
i.data.Range(func(key, value any) bool {
Expand Down
43 changes: 21 additions & 22 deletions pkg/sparrow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ const (
readHeaderTimeout = time.Second * 5
)

var (
ErrServeApi = errors.New("failed to serve api")
ErrApiContext = errors.New("api context canceled")
ErrCreateOpenapiSchema = errors.New("failed to get schema for check")
)
var ErrCreateOpenapiSchema = errors.New("failed to get schema for check")

func (s *Sparrow) register(ctx context.Context) {
s.router.Use(logger.Middleware(ctx))
Expand Down Expand Up @@ -75,44 +71,47 @@ func (s *Sparrow) register(ctx context.Context) {
//
// Blocks until context is done
func (s *Sparrow) api(ctx context.Context) error {
log := logger.FromContext(ctx).WithGroup("api")
log := logger.FromContext(ctx)
cErr := make(chan error, 1)
s.register(ctx)

server := http.Server{Addr: s.cfg.Api.ListeningAddress, Handler: s.router, ReadHeaderTimeout: readHeaderTimeout}

// run http server in goroutine
go func(cErr chan error) {
defer close(cErr)
log.Info("Serving Api", "addr", s.cfg.Api.ListeningAddress)
if err := server.ListenAndServe(); err != nil {
if err := s.server.ListenAndServe(); err != nil {
log.Error("Failed to serve api", "error", err)
cErr <- err
}
}(cErr)

select {
case <-ctx.Done():
if ctx.Err() != nil {
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := server.Shutdown(shutdownCtx)
if err != nil {
log.Error("Failed to shutdown api server", "error", err)
}
log.Error("Api context canceled", "error", ctx.Err())
return fmt.Errorf("%w: %w", ErrApiContext, ctx.Err())
}
return fmt.Errorf("failed serving API: %w", ctx.Err())
case err := <-cErr:
if errors.Is(err, http.ErrServerClosed) || err == nil {
log.Info("Api server closed")
return nil
}
log.Error("failed to serve api", "error", err)
return fmt.Errorf("%w: %w", ErrServeApi, err)
log.Error("failed serving API", "error", err)
return fmt.Errorf("failed serving API: %w", err)
}
}

return nil
// shutdownAPI gracefully shuts down the api server
// Returns an error if an error is present in the context
// or if the server cannot be shut down
func (s *Sparrow) shutdownAPI(ctx context.Context) error {
errC := ctx.Err()
log := logger.FromContext(ctx)
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := s.server.Shutdown(shutdownCtx)
if err != nil {
log.Error("Failed to shutdown api server", "error", err)
return fmt.Errorf("failed shutting down API: %w", errors.Join(errC, err))
}
return errC
}

// okHandler returns a handler that will serve status ok
Expand Down
Loading

0 comments on commit 8a102c1

Please sign in to comment.