diff --git a/internal/command/helpers.go b/internal/command/helpers.go index 87dc8b92e..ee9a52c75 100644 --- a/internal/command/helpers.go +++ b/internal/command/helpers.go @@ -6,8 +6,6 @@ import ( "os" "path/filepath" - "github.com/fatih/color" - "github.com/simplesurance/baur/v2/internal/command/term" "github.com/simplesurance/baur/v2/internal/format" "github.com/simplesurance/baur/v2/internal/log" @@ -217,10 +215,13 @@ func mustWriteRow(fmt format.Formatter, row ...interface{}) { exitOnErr(err) } -var errorPrefix = color.New(color.FgRed).Sprint("ERROR:") - func exitOnErrf(err error, format string, v ...interface{}) { - exitOnErr(err, fmt.Sprintf(format, v...)) + if err == nil { + return + } + + stderr.ErrPrintf(err, format, v...) + exitFunc(1) } func exitOnErr(err error, msg ...interface{}) { @@ -228,14 +229,7 @@ func exitOnErr(err error, msg ...interface{}) { return } - if len(msg) == 0 { - stderr.Println(errorPrefix, err) - exitFunc(1) - } - - wholeMsg := fmt.Sprint(msg...) - stderr.Printf("%s %s: %s\n", errorPrefix, wholeMsg, err) - + stderr.ErrPrintln(err, msg...) exitFunc(1) } diff --git a/internal/command/run.go b/internal/command/run.go index 78e8f16a3..0448de680 100644 --- a/internal/command/run.go +++ b/internal/command/run.go @@ -2,9 +2,11 @@ package command import ( "context" + "errors" "fmt" "math" "strings" + "sync" "time" "github.com/spf13/cobra" @@ -64,6 +66,12 @@ The following Environment Variables are supported: term.Highlight("DOCKER_CERT_PATH"), term.Highlight("DOCKER_TLS_VERIFY")) +var ( + statusStrSuccess = term.GreenHighlight("successful") + statusStrSkipped = term.YellowHighlight("skipped") + statusStrFailed = term.RedHighlight("failed") +) + func init() { rootCmd.AddCommand(&newRunCmd().Command) } @@ -72,12 +80,12 @@ type runCmd struct { cobra.Command // Cmdline parameters - skipUpload bool - force bool - inputStr []string - lookupInputStr string - taskRunners uint - showOutput bool + skipUpload bool + force bool + inputStr []string + lookupInputStr string + taskRunnerGoRoutines uint + showOutput bool // other fields storage storage.Storer @@ -88,6 +96,10 @@ type runCmd struct { uploadRoutinePool *routines.Pool taskRunnerRoutinePool *routines.Pool + taskRunner *baur.TaskRunner + + skipAllScheduledTaskRunsOnce sync.Once + errorHappened bool } func newRunCmd() *runCmd { @@ -110,7 +122,7 @@ func newRunCmd() *runCmd { "include a string as input, can be specified multiple times") cmd.Flags().StringVar(&cmd.lookupInputStr, "lookup-input-str", "", "if a run can not be found, try to find a run with this value as input-string") - cmd.Flags().UintVarP(&cmd.taskRunners, "parallel-runs", "p", 1, + cmd.Flags().UintVarP(&cmd.taskRunnerGoRoutines, "parallel-runs", "p", 1, "specifies the max. number of tasks to run in parallel") cmd.Flags().BoolVarP(&cmd.showOutput, "show-task-output", "o", false, "show the output of tasks, if disabled the output is only shown "+ @@ -123,7 +135,7 @@ func newRunCmd() *runCmd { func (c *runCmd) run(cmd *cobra.Command, args []string) { var err error - if c.taskRunners == 0 { + if c.taskRunnerGoRoutines == 0 { stderr.Printf("--parallel-runs must be greater than 0\n") exitFunc(1) } @@ -137,7 +149,8 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { defer c.storage.Close() c.uploadRoutinePool = routines.NewPool(1) // run 1 upload in parallel with builds - c.taskRunnerRoutinePool = routines.NewPool(c.taskRunners) + c.taskRunnerRoutinePool = routines.NewPool(c.taskRunnerGoRoutines) + c.taskRunner = baur.NewTaskRunner() c.dockerClient, err = docker.NewClient(log.StdLogger.Debugf) exitOnErr(err) @@ -184,13 +197,24 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { ptCopy := pt c.taskRunnerRoutinePool.Queue(func() { task := ptCopy.task - runResult := c.runTask(task) + runResult, err := c.runTask(task) + if err != nil { + // error is printed in runTask() + c.skipAllScheduledTaskRuns() + return + } outputs, err := baur.OutputsFromTask(c.dockerClient, task) - exitOnErrf(err, task.ID()) + if err != nil { + stderr.ErrPrintln(err, task.ID()) + c.skipAllScheduledTaskRuns() + return + } - if !outputsExist(task, outputs) { - exitFunc(1) + if !declaredOutputsExist(task, outputs) { + // error is printed in declaredOutputsExist() + c.skipAllScheduledTaskRuns() + return } if c.skipUpload { @@ -198,14 +222,18 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { } c.uploadRoutinePool.Queue(func() { - c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult) + err := c.uploadAndRecord(ctx, ptCopy.task, ptCopy.inputs, outputs, runResult) + if err != nil { + // error is printed in uploadAndRecord() + c.skipAllScheduledTaskRuns() + } }) }) } c.taskRunnerRoutinePool.Wait() - stdout.Println("all tasks executed, waiting for uploads to finish...") + stdout.Println("task execution finished, waiting for uploads to finish...") c.uploadRoutinePool.Wait() stdout.PrintSep() stdout.Printf("finished in: %s\n", @@ -213,19 +241,49 @@ func (c *runCmd) run(cmd *cobra.Command, args []string) { time.Since(startTime), ), ) + + if c.errorHappened { + exitFunc(1) + } } -func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { - result, err := baur.NewTaskRunner().Run(task) - exitOnErrf(err, "%s", task.ID()) +func (c *runCmd) skipAllScheduledTaskRuns() { + c.skipAllScheduledTaskRunsOnce.Do(func() { + c.taskRunner.SkipRuns(true) - if result.Result.ExitCode != 0 { - statusStr := term.RedHighlight("failed") + c.errorHappened = true + + stderr.Printf("%s, %s execution of queued task runs\n", + term.RedHighlight("terminating"), + term.YellowHighlight("skipping"), + ) + }) +} + +func (c *runCmd) runTask(task *baur.Task) (*baur.RunResult, error) { + result, err := c.taskRunner.Run(task) + if err != nil { + if errors.Is(err, baur.ErrTaskRunSkipped) { + stderr.Printf("%s: execution %s\n", + term.Highlight(task), + statusStrSkipped, + ) + return nil, err + } + stderr.Printf("%s: execution %s, error: %s\n", + term.Highlight(task), + statusStrFailed, + err, + ) + return nil, err + } + + if result.Result.ExitCode != 0 { if c.showOutput { stderr.Printf("%s: execution %s (%s), command exited with code %d\n", - task, - statusStr, + term.Highlight(task), + statusStrFailed, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), @@ -233,8 +291,8 @@ func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { ) } else { stderr.Printf("%s: execution %s (%s), command exited with code %d, output:\n%s\n", - task, - statusStr, + term.Highlight(task), + statusStrFailed, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), @@ -242,19 +300,17 @@ func (c *runCmd) runTask(task *baur.Task) *baur.RunResult { result.StrOutput()) } - exitFunc(1) + return nil, fmt.Errorf("execution failed with exit code %d", result.ExitCode) } - statusStr := term.GreenHighlight("successful") - stdout.TaskPrintf(task, "execution %s (%s)\n", - statusStr, + statusStrSuccess, term.FormatDuration( result.StopTime.Sub(result.StartTime), ), ) - return result + return result, nil } type pendingTask struct { @@ -268,7 +324,7 @@ func (c *runCmd) uploadAndRecord( inputs *baur.Inputs, outputs []baur.Output, runResult *baur.RunResult, -) { +) error { var uploadResults []*baur.UploadResult for _, output := range outputs { @@ -280,7 +336,11 @@ func (c *runCmd) uploadAndRecord( }, func(o baur.Output, result *baur.UploadResult) { size, err := o.SizeBytes() - exitOnErrf(err, "%s: %s:", task.ID(), output) + if err != nil { + stderr.ErrPrintf(err, "%s: %s", task, output) + c.skipAllScheduledTaskRuns() + return + } bps := uint64(math.Round(float64(size) / result.Stop.Sub(result.Start).Seconds())) @@ -292,17 +352,33 @@ func (c *runCmd) uploadAndRecord( uploadResults = append(uploadResults, result) }, ) - - exitOnErrf(err, "%s: %s", task.ID(), output) + if err != nil { + stderr.Printf("%s: %s: upload %s, %s\n", + term.Highlight(task), + output, + statusStrFailed, + err, + ) + return err + } } id, err := baur.StoreRun(ctx, c.storage, c.vcsState, task, inputs, runResult, uploadResults) - exitOnErrf(err, "%s", task.ID()) + if err != nil { + stderr.Printf("%s: recording build result in database %s, %s\n", + term.Highlight(task), + statusStrFailed, + err, + ) + return err + } stdout.TaskPrintf(task, "run stored in database with ID %s\n", term.Highlight(id)) + + return nil } -func outputsExist(task *baur.Task, outputs []baur.Output) bool { +func declaredOutputsExist(task *baur.Task, outputs []baur.Output) bool { allExist := true if len(outputs) == 0 { @@ -312,11 +388,17 @@ func outputsExist(task *baur.Task, outputs []baur.Output) bool { for _, output := range outputs { exists, err := output.Exists() - exitOnErrf(err, "%s:", task.ID()) + if err != nil { + stderr.ErrPrintf(err, task.ID()) + return false + } if exists { size, err := output.SizeBytes() - exitOnErrf(err, "%s:", task.ID()) + if err != nil { + stderr.ErrPrintln(err, task.ID()) + return false + } stdout.TaskPrintf(task, "created %s (size: %s)\n", output, term.FormatSize(size)) diff --git a/internal/command/term/streams.go b/internal/command/term/streams.go index 5b60f7a7e..667442c81 100644 --- a/internal/command/term/streams.go +++ b/internal/command/term/streams.go @@ -5,11 +5,15 @@ import ( "io" "sync" + "github.com/fatih/color" + "github.com/simplesurance/baur/v2/pkg/baur" ) const separator = "------------------------------------------------------------------------------" +var errorPrefix = color.New(color.FgRed).Sprint("ERROR:") + // Stream is a concurrency-safe output for term.messages. type Stream struct { stream io.Writer @@ -41,6 +45,24 @@ func (s *Stream) TaskPrintf(task *baur.Task, format string, a ...interface{}) { s.Printf(prefix+format, a...) } +// ErrPrintln prints an error with an optional message. +// The method prints the error in the format: errorPrefix msg: err +func (s *Stream) ErrPrintln(err error, msg ...interface{}) { + if len(msg) == 0 { + s.Println(errorPrefix, err) + return + } + + wholeMsg := fmt.Sprint(msg...) + s.Printf("%s %s: %s\n", errorPrefix, wholeMsg, err) +} + +// ErrPrintf prints an error with an optional printf-style message. +// The method prints the error in the format: errorPrefix msg: err +func (s *Stream) ErrPrintf(err error, format string, a ...interface{}) { + s.ErrPrintln(err, fmt.Sprintf(format, a...)) +} + // PrintSep prints a separator line func (s *Stream) PrintSep() { fmt.Fprintln(s.stream, separator) diff --git a/pkg/baur/taskrunner.go b/pkg/baur/taskrunner.go index ee5860494..53c929447 100644 --- a/pkg/baur/taskrunner.go +++ b/pkg/baur/taskrunner.go @@ -1,7 +1,9 @@ package baur import ( + "errors" "fmt" + "sync/atomic" "time" "github.com/fatih/color" @@ -9,8 +11,13 @@ import ( "github.com/simplesurance/baur/v2/internal/exec" ) +// ErrTaskRunSkipped is returned when a task run was skipped instead of executed. +var ErrTaskRunSkipped = errors.New("task run skipped") + // TaskRunner executes the command of a task. -type TaskRunner struct{} +type TaskRunner struct { + skipEnabled uint32 // must be accessed via atomic operations +} func NewTaskRunner() *TaskRunner { return &TaskRunner{} @@ -28,6 +35,10 @@ type RunResult struct { func (t *TaskRunner) Run(task *Task) (*RunResult, error) { startTime := time.Now() + if t.SkipRunsIsEnabled() { + return nil, ErrTaskRunSkipped + } + // TODO: rework exec, stream the output instead of storing all in memory execResult, err := exec.Command(task.Command[0], task.Command[1:]...). Directory(task.Directory). @@ -43,3 +54,21 @@ func (t *TaskRunner) Run(task *Task) (*RunResult, error) { StopTime: time.Now(), }, nil } + +func (t *TaskRunner) setSkipRuns(val uint32) { + atomic.StoreUint32(&t.skipEnabled, val) +} + +// SkipRuns can be enabled to skip all further executions of tasks by Run(). +func (t *TaskRunner) SkipRuns(enabled bool) { + if enabled { + t.setSkipRuns(1) + } else { + t.setSkipRuns(0) + } +} + +// SkipRunsIsEnabled returns true if SkipRuns is enabled. +func (t *TaskRunner) SkipRunsIsEnabled() bool { + return atomic.LoadUint32(&t.skipEnabled) == 1 +}