diff --git a/pkg/cerrors/cerrors.go b/pkg/cerrors/cerrors.go index 12077432..333a8527 100644 --- a/pkg/cerrors/cerrors.go +++ b/pkg/cerrors/cerrors.go @@ -10,6 +10,14 @@ import ( "strings" ) +// ErrAlreadyDone indicates that action already happened +type ErrAlreadyDone struct { +} + +func (e *ErrAlreadyDone) Error() string { + return "already done" +} + // ErrTestStepsNeverReturned indicates that one or multiple TestSteps // did not complete when the test terminated or when the pipeline // received a cancellation or pause signal @@ -44,6 +52,16 @@ func (e *ErrTestStepPaniced) Error() string { return fmt.Sprintf("test step %s paniced, trace: %q", e.StepName, e.StackTrace) } +// ErrTestStepReturnedNoTarget indicates that a test step returned nil Target +type ErrTestStepReturnedNoTarget struct { + StepName string +} + +// Error returns the error string associated with the error +func (e *ErrTestStepReturnedNoTarget) Error() string { + return fmt.Sprintf("test step %s returned nil result", e.StepName) +} + // ErrTestStepReturnedDuplicateResult indicates that a test step returned result // twice for the same target. type ErrTestStepReturnedDuplicateResult struct { diff --git a/pkg/runner/base_test_suite_test.go b/pkg/runner/base_test_suite_test.go new file mode 100644 index 00000000..b0569b6f --- /dev/null +++ b/pkg/runner/base_test_suite_test.go @@ -0,0 +1,61 @@ +package runner + +import ( + "encoding/json" + + "github.com/benbjohnson/clock" + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/pluginregistry" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" + "github.com/linuxboot/contest/plugins/targetlocker/inmemory" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type BaseTestSuite struct { + suite.Suite + + pluginRegistry *pluginregistry.PluginRegistry + internalStorage *MemoryStorageEngine +} + +func (s *BaseTestSuite) SetupTest() { + storageEngine, err := NewMemoryStorageEngine() + require.NoError(s.T(), err) + s.internalStorage = storageEngine + + target.SetLocker(inmemory.New(clock.New())) + + s.pluginRegistry = pluginregistry.NewPluginRegistry(xcontext.Background()) +} + +func (s *BaseTestSuite) TearDownTest() { + target.SetLocker(nil) +} + +func (s *BaseTestSuite) RegisterStateFullStep( + runFunction func( + ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, + ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error), + validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error) error { + + return s.pluginRegistry.RegisterTestStep(stateFullStepName, func() test.TestStep { + return &stateFullStep{ + runFunction: runFunction, + validateFunction: validateFunction, + } + }, nil) +} + +func (s *BaseTestSuite) NewStep(label, name string, params test.TestStepParameters) test.TestStepBundle { + td := test.TestStepDescriptor{ + Name: name, + Label: label, + Parameters: params, + } + sb, err := s.pluginRegistry.NewTestStepBundle(ctx, td) + require.NoError(s.T(), err) + return *sb +} diff --git a/pkg/runner/job_runner_test.go b/pkg/runner/job_runner_test.go index 32cb7630..15ecbfac 100644 --- a/pkg/runner/job_runner_test.go +++ b/pkg/runner/job_runner_test.go @@ -17,12 +17,10 @@ import ( "github.com/linuxboot/contest/pkg/event" "github.com/linuxboot/contest/pkg/event/testevent" "github.com/linuxboot/contest/pkg/job" - "github.com/linuxboot/contest/pkg/pluginregistry" "github.com/linuxboot/contest/pkg/storage" "github.com/linuxboot/contest/pkg/target" "github.com/linuxboot/contest/pkg/test" "github.com/linuxboot/contest/pkg/xcontext" - "github.com/linuxboot/contest/plugins/targetlocker/inmemory" "github.com/linuxboot/contest/plugins/targetmanagers/targetlist" "github.com/linuxboot/contest/plugins/teststeps" "github.com/linuxboot/contest/plugins/teststeps/echo" @@ -84,10 +82,7 @@ func (r *collectingReporter) FinalReport(ctx xcontext.Context, parameters interf } type JobRunnerSuite struct { - suite.Suite - - pluginRegistry *pluginregistry.PluginRegistry - internalStorage *MemoryStorageEngine + BaseTestSuite } func TestTestStepSuite(t *testing.T) { @@ -95,13 +90,8 @@ func TestTestStepSuite(t *testing.T) { } func (s *JobRunnerSuite) SetupTest() { - storageEngine, err := NewMemoryStorageEngine() - require.NoError(s.T(), err) - s.internalStorage = storageEngine + s.BaseTestSuite.SetupTest() - target.SetLocker(inmemory.New(clock.New())) - - s.pluginRegistry = pluginregistry.NewPluginRegistry(xcontext.Background()) for _, e := range []struct { name string factory test.TestStepFactory @@ -113,40 +103,11 @@ func (s *JobRunnerSuite) SetupTest() { } } -func (s *JobRunnerSuite) TearDownTest() { - target.SetLocker(nil) -} - -func (s *JobRunnerSuite) registerStateFullStep( - runFunction func( - ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, - ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error), - validateFunction func(ctx xcontext.Context, params test.TestStepParameters) error) error { - - return s.pluginRegistry.RegisterTestStep(stateFullStepName, func() test.TestStep { - return &stateFullStep{ - runFunction: runFunction, - validateFunction: validateFunction, - } - }, nil) -} - -func (s *JobRunnerSuite) newStep(label, name string, params test.TestStepParameters) test.TestStepBundle { - td := test.TestStepDescriptor{ - Name: name, - Label: label, - Parameters: params, - } - sb, err := s.pluginRegistry.NewTestStepBundle(ctx, td) - require.NoError(s.T(), err) - return *sb -} - func (s *JobRunnerSuite) TestSimpleJobStartFinish() { var mu sync.Mutex var resultTargets []*target.Target - require.NoError(s.T(), s.registerStateFullStep( + require.NoError(s.T(), s.RegisterStateFullStep( func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { assert.NotNil(s.T(), target) @@ -180,7 +141,7 @@ func (s *JobRunnerSuite) TestSimpleJobStartFinish() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("test_step_label", stateFullStepName, nil), + s.NewStep("test_step_label", stateFullStepName, nil), }, }, }, @@ -209,7 +170,7 @@ func (s *JobRunnerSuite) TestJobWithTestRetry() { var resultTargets []*target.Target var callsCount int - require.NoError(s.T(), s.registerStateFullStep( + require.NoError(s.T(), s.RegisterStateFullStep( func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { return teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { assert.NotNil(s.T(), target) @@ -259,11 +220,11 @@ func (s *JobRunnerSuite) TestJobWithTestRetry() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("echo1_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("echo1_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("hello")}, }), - s.newStep("test_step_label", stateFullStepName, nil), - s.newStep("echo2_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("test_step_label", stateFullStepName, nil), + s.NewStep("echo2_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("world")}, }), }, @@ -342,7 +303,7 @@ func (s *JobRunnerSuite) TestResumeStateBadJobId() { TargetManager: targetlist.New(), }, TestStepsBundles: []test.TestStepBundle{ - s.newStep("echo1_step_label", echo.Name, map[string][]test.Param{ + s.NewStep("echo1_step_label", echo.Name, map[string][]test.Param{ "text": {*test.NewParam("hello")}, }), }, diff --git a/pkg/runner/step_runner.go b/pkg/runner/step_runner.go new file mode 100644 index 00000000..25d78830 --- /dev/null +++ b/pkg/runner/step_runner.go @@ -0,0 +1,303 @@ +package runner + +import ( + "context" + "encoding/json" + "fmt" + "runtime/debug" + "sync" + "sync/atomic" + + "github.com/linuxboot/contest/pkg/cerrors" + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" +) + +type StepRunnerEvent struct { + // Target if set represents the target for which event was generated + Target *target.Target + // Err if Target is not nil refers to this Target result otherwise is execution error + Err error +} + +type StepResult struct { + Err error + ResumeState json.RawMessage +} + +type StepRunner struct { + mu sync.Mutex + + input chan *target.Target + stopOnce sync.Once + resultsChan chan<- StepRunnerEvent + runningLoopActive bool + finishedCh chan struct{} + + resultErr error + resultResumeState json.RawMessage + notifyStoppedOnce sync.Once +} + +func (sr *StepRunner) AddTarget(ctx xcontext.Context, tgt *target.Target) error { + if tgt == nil { + return fmt.Errorf("target should not be nil") + } + + select { + case sr.input <- tgt: + case <-ctx.Until(xcontext.ErrPaused): + return xcontext.ErrPaused + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func (sr *StepRunner) Run( + ctx xcontext.Context, + bundle test.TestStepBundle, + ev testevent.Emitter, + resumeState json.RawMessage, +) (<-chan StepRunnerEvent, error) { + sr.mu.Lock() + defer sr.mu.Unlock() + + if sr.resultsChan != nil { + return nil, &cerrors.ErrAlreadyDone{} + } + + finishedCh := make(chan struct{}) + sr.finishedCh = finishedCh + resultsChan := make(chan StepRunnerEvent, 1) + sr.resultsChan = resultsChan + + var activeLoopsCount int32 = 2 + finish := func() { + if atomic.AddInt32(&activeLoopsCount, -1) != 0 { + return + } + + sr.mu.Lock() + close(sr.finishedCh) + sr.finishedCh = nil + sr.mu.Unlock() + + // if an error occurred we already sent notification + sr.notifyStopped(nil) + close(sr.resultsChan) + ctx.Debugf("StepRunner finished") + } + + stepIn := sr.input + stepOut := make(chan test.TestStepResult) + go func() { + defer finish() + sr.runningLoop(ctx, stepIn, stepOut, bundle, ev, resumeState) + ctx.Debugf("Running loop finished") + }() + + go func() { + defer finish() + sr.readingLoop(ctx, stepOut, bundle.TestStepLabel) + ctx.Debugf("Reading loop finished") + }() + + return resultsChan, nil +} + +func (sr *StepRunner) Started() bool { + sr.mu.Lock() + defer sr.mu.Unlock() + + return sr.resultsChan != nil +} + +func (sr *StepRunner) Running() bool { + sr.mu.Lock() + defer sr.mu.Unlock() + + return sr.resultsChan != nil && sr.finishedCh != nil +} + +// WaitResults returns TestStep.Run() output +// It returns an error if and only if waiting was terminated by input ctx argument and returns ctx.Err() +func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, err error) { + sr.mu.Lock() + resultErr := sr.resultErr + resultResumeState := sr.resultResumeState + finishedCh := sr.finishedCh + sr.mu.Unlock() + + // StepRunner either finished with error or behaved incorrectly + // it makes no sense to wait while it finishes, return what we have + if resultErr != nil { + return StepResult{ + Err: resultErr, + ResumeState: resultResumeState, + }, nil + } + + if finishedCh != nil { + select { + case <-ctx.Done(): + return StepResult{}, ctx.Err() + case <-finishedCh: + } + } + + sr.mu.Lock() + defer sr.mu.Unlock() + return StepResult{ + Err: resultErr, + ResumeState: resultResumeState, + }, nil +} + +// Stop triggers TestStep to stop running by closing input channel +func (sr *StepRunner) Stop() { + sr.stopOnce.Do(func() { + close(sr.input) + }) +} + +func (sr *StepRunner) readingLoop( + ctx xcontext.Context, + stepOut chan test.TestStepResult, + testStepLabel string, +) { + reportedTargets := make(map[string]struct{}) + for { + select { + case res, ok := <-stepOut: + if !ok { + ctx.Debugf("Output channel closed") + + sr.mu.Lock() + if sr.runningLoopActive { + // This means that plugin closed its channels before leaving. + sr.setErrLocked(ctx, &cerrors.ErrTestStepClosedChannels{StepName: testStepLabel}) + } + sr.mu.Unlock() + return + } + + if res.Target == nil { + sr.setErr(ctx, &cerrors.ErrTestStepReturnedNoTarget{StepName: testStepLabel}) + return + } + ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID) + + _, found := reportedTargets[res.Target.ID] + reportedTargets[res.Target.ID] = struct{}{} + + if found { + sr.setErr(ctx, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID}) + return + } + + select { + case sr.resultsChan <- StepRunnerEvent{Target: res.Target, Err: res.Err}: + case <-ctx.Done(): + ctx.Debugf( + "reading loop detected context canceled, target '%s' with result: '%v' was not reported", + res.Target.ID, + res.Err, + ) + } + case <-ctx.Done(): + ctx.Debugf("reading loop detected context canceled") + return + } + } +} + +func (sr *StepRunner) runningLoop( + ctx xcontext.Context, + stepIn <-chan *target.Target, + stepOut chan test.TestStepResult, + bundle test.TestStepBundle, + ev testevent.Emitter, + resumeState json.RawMessage, +) { + defer func() { + sr.mu.Lock() + sr.runningLoopActive = false + sr.mu.Unlock() + + if recoverOccurred := safeCloseOutCh(stepOut); recoverOccurred { + sr.setErr(ctx, &cerrors.ErrTestStepClosedChannels{StepName: bundle.TestStepLabel}) + } + ctx.Debugf("output channel closed") + }() + + sr.mu.Lock() + sr.runningLoopActive = true + sr.mu.Unlock() + + resultResumeState, err := func() (json.RawMessage, error) { + defer func() { + if r := recover(); r != nil { + sr.setErr(ctx, &cerrors.ErrTestStepPaniced{ + StepName: bundle.TestStepLabel, + StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), + }) + } + }() + + inChannels := test.TestStepChannels{In: stepIn, Out: stepOut} + return bundle.TestStep.Run(ctx, inChannels, bundle.Parameters, ev, resumeState) + }() + ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState)) + + sr.mu.Lock() + sr.setErrLocked(ctx, err) + sr.resultResumeState = resultResumeState + sr.mu.Unlock() +} + +// setErr sets step runner error unless already set. +func (sr *StepRunner) setErr(ctx xcontext.Context, err error) { + sr.mu.Lock() + defer sr.mu.Unlock() + sr.setErrLocked(ctx, err) +} + +func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) { + if err == nil || sr.resultErr != nil { + return + } + ctx.Errorf("err: %v", err) + sr.resultErr = err + + // notifyStopped is a blocking operation: should release the lock + sr.mu.Unlock() + sr.notifyStopped(err) + sr.mu.Lock() +} + +func (sr *StepRunner) notifyStopped(err error) { + sr.notifyStoppedOnce.Do(func() { + sr.resultsChan <- StepRunnerEvent{Err: err} + }) +} + +func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) { + recoverOccurred = false + defer func() { + if r := recover(); r != nil { + recoverOccurred = true + } + }() + close(ch) + return +} + +// NewStepRunner creates a new StepRunner object +func NewStepRunner() *StepRunner { + return &StepRunner{ + input: make(chan *target.Target), + } +} diff --git a/pkg/runner/step_runner_test.go b/pkg/runner/step_runner_test.go new file mode 100644 index 00000000..70a5e46b --- /dev/null +++ b/pkg/runner/step_runner_test.go @@ -0,0 +1,100 @@ +package runner + +import ( + "encoding/json" + "fmt" + "sync" + "testing" + + "github.com/linuxboot/contest/pkg/event/testevent" + "github.com/linuxboot/contest/pkg/target" + "github.com/linuxboot/contest/pkg/test" + "github.com/linuxboot/contest/pkg/xcontext" + "github.com/linuxboot/contest/plugins/teststeps" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +func TestStepRunnerSuite(t *testing.T) { + suite.Run(t, new(StepRunnerSuite)) +} + +type StepRunnerSuite struct { + BaseTestSuite +} + +func (s *StepRunnerSuite) TestRunningStep() { + targetsReaction := map[string]error{ + "TSucc": nil, + "TFail": fmt.Errorf("oops"), + } + + var mu sync.Mutex + var obtainedTargets []target.Target + var obtainedResumeState json.RawMessage + + err := s.RegisterStateFullStep( + func(ctx xcontext.Context, ch test.TestStepChannels, params test.TestStepParameters, ev testevent.Emitter, resumeState json.RawMessage) (json.RawMessage, error) { + obtainedResumeState = resumeState + _, err := teststeps.ForEachTarget(stateFullStepName, ctx, ch, func(ctx xcontext.Context, target *target.Target) error { + require.NotNil(s.T(), target) + + mu.Lock() + defer mu.Unlock() + obtainedTargets = append(obtainedTargets, *target) + return targetsReaction[target.ID] + }) + if err != nil { + return nil, err + } + return json.RawMessage("{\"output\": true}"), nil + }, + nil, + ) + require.NoError(s.T(), err) + + stepRunner := NewStepRunner() + require.NotNil(s.T(), stepRunner) + + emitterFactory := NewTestStepEventsEmitterFactory(s.internalStorage.StorageEngineVault, 1, 1, testName, 0) + emitter := emitterFactory.New("test_step_label") + + inputResumeState := json.RawMessage("{\"some_input\": 42}") + resultChan, err := stepRunner.Run(ctx, s.NewStep("test_step_label", stateFullStepName, nil), emitter, inputResumeState) + require.NoError(s.T(), err) + require.NotNil(s.T(), resultChan) + + require.NoError(s.T(), stepRunner.AddTarget(ctx, tgt("TSucc"))) + ev, ok := <-resultChan + require.True(s.T(), ok) + require.Equal(s.T(), tgt("TSucc"), ev.Target) + require.NoError(s.T(), ev.Err) + + require.NoError(s.T(), stepRunner.AddTarget(ctx, tgt("TFail"))) + ev, ok = <-resultChan + require.True(s.T(), ok) + require.Equal(s.T(), tgt("TFail"), ev.Target) + require.Error(s.T(), ev.Err) + + stepRunner.Stop() + + ev, ok = <-resultChan + require.True(s.T(), ok) + require.Nil(s.T(), ev.Target) + require.NoError(s.T(), ev.Err) + + ev, ok = <-resultChan + require.False(s.T(), ok) + + closedCtx, cancel := xcontext.WithCancel(ctx) + cancel() + + // if step runner has results, it should return them even if input context is closed + res, err := stepRunner.WaitResults(closedCtx) + require.NoError(s.T(), err) + + require.Equal(s.T(), json.RawMessage("{\"output\": true}"), res.ResumeState) + require.NoError(s.T(), res.Err) + + require.Equal(s.T(), inputResumeState, obtainedResumeState) +} diff --git a/pkg/runner/test_runner.go b/pkg/runner/test_runner.go index 943d5aa1..20668205 100644 --- a/pkg/runner/test_runner.go +++ b/pkg/runner/test_runner.go @@ -6,9 +6,10 @@ package runner import ( + "context" "encoding/json" "fmt" - "runtime/debug" + "strconv" "sync" "time" @@ -47,8 +48,8 @@ type TestRunner struct { // One mutex to rule them all, used to serialize access to all the state above. // Could probably be split into several if necessary. - mu sync.Mutex - cond *sync.Cond // Used to notify the monitor about changes + mu sync.Mutex + monitorCond *sync.Cond // Used to notify the monitor about changes } // stepState contains state associated with one state of the pipeline: @@ -59,18 +60,12 @@ type stepState struct { stepIndex int // Index of this step in the pipeline. sb test.TestStepBundle // The test bundle. - // Channels used to communicate with the plugin. - inCh chan *target.Target - outCh chan test.TestStepResult - ev testevent.Emitter + ev testevent.Emitter + stepRunner *StepRunner + readingLoopRunning bool - tgtDone map[*target.Target]bool // Targets for which results have been received. - - resumeState json.RawMessage // Resume state passed to and returned by the Run method. - stepStarted bool // testStep.Run() has been invoked - stepRunning bool // testStep.Run() is currently running. - readerRunning bool // Result reader is running. - runErr error // Runner error, returned from Run() or an error condition detected by the reader. + resumeState json.RawMessage // Resume state passed to and returned by the Run method. + runErr error // Runner error, returned from Run() or an error condition detected by the reader. } // targetStepPhase denotes progression of a target through a step @@ -145,22 +140,24 @@ func (tr *TestRunner) Run( // Set up the pipeline for i, sb := range t.TestStepsBundles { stepCtx, stepCancel := xcontext.WithCancel(stepsCtx) + stepCtx = stepCtx.WithField("step_index", strconv.Itoa(i)) + stepCtx = stepCtx.WithField("step_label", sb.TestStepLabel) + var srs json.RawMessage if i < len(rs.StepResumeState) && string(rs.StepResumeState[i]) != "null" { srs = rs.StepResumeState[i] } - tr.steps = append(tr.steps, &stepState{ + ss := &stepState{ ctx: stepCtx, cancel: stepCancel, stepIndex: i, sb: sb, - inCh: make(chan *target.Target), - outCh: make(chan test.TestStepResult), ev: emitterFactory.New(sb.TestStepLabel), - tgtDone: make(map[*target.Target]bool), + stepRunner: NewStepRunner(), resumeState: srs, - }) + } // Step handlers will be started from target handlers as targets reach them. + tr.steps = append(tr.steps, ss) } // Set up the targets @@ -246,7 +243,8 @@ func (tr *TestRunner) Run( ctx.Debugf("- %d in flight, ok to resume? %t", numInFlightTargets, resumeOk) ctx.Debugf("step states:") for i, ss := range tr.steps { - ctx.Debugf(" %d %s %t %t %v %s", i, ss, ss.stepRunning, ss.readerRunning, ss.runErr, ss.resumeState) + ctx.Debugf(" %d %s %t %t %t %v %s", + i, ss, ss.stepRunner.Started(), ss.stepRunner.Running(), ss.readingLoopRunning, ss.runErr, ss.resumeState) } // Is there a useful error to report? @@ -290,101 +288,75 @@ func (tr *TestRunner) Run( func (tr *TestRunner) waitStepRunners(ctx xcontext.Context) error { ctx.Debugf("waiting for step runners to finish") - swch := make(chan struct{}) - go func() { - tr.mu.Lock() - defer tr.mu.Unlock() - for { - ok := true - for _, ss := range tr.steps { - // numRunning == 1 is also acceptable: we allow the Run() goroutine - // to continue in case of error, if the result processor decided - // to abandon its runner, there's nothing we can do. - switch { - case !ss.stepRunning && !ss.readerRunning: - // Done - case ss.stepRunning && ss.readerRunning: - // Still active - ok = false - case !ss.stepRunning && ss.readerRunning: - // Transient state, let it finish - ok = false - case ss.stepRunning && !ss.readerRunning: - // This is possible if plugin got stuck and result processor gave up on it. - // If so, it should have left an error. - if ss.runErr == nil { - ctx.Errorf("%s: result processor left runner with no error", ss) - // There's nothing we can do at this point, fall through. - } + + shutdownCtx, cancel := context.WithTimeout(ctx, tr.shutdownTimeout) + defer cancel() + + var stepsNeverReturned []string + for _, ss := range tr.steps { + if !ss.stepRunner.Started() { + continue + } + result, err := ss.stepRunner.WaitResults(shutdownCtx) + if err != nil { + result.Err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} + stepsNeverReturned = append(stepsNeverReturned, ss.sb.TestStepLabel) + // Cancel this step's context, this will help release the reader. + ss.cancel() + } else { + // reading loop can still be running, wait for it to finish + tr.mu.Lock() + for { + if !ss.readingLoopRunning { + break } + tr.monitorCond.Wait() } - if ok { - close(swch) - return - } - tr.cond.Wait() + tr.mu.Unlock() } - }() - var err error - select { - case <-swch: - ctx.Debugf("step runners finished") tr.mu.Lock() - defer tr.mu.Unlock() - err = tr.checkStepRunnersLocked() - case <-time.After(tr.shutdownTimeout): - ctx.Errorf("step runners failed to shut down correctly") - tr.mu.Lock() - defer tr.mu.Unlock() - // If there is a step with an error set, use that. - err = tr.checkStepRunnersLocked() - // If there isn't, enumerate ones that were still running at the time. - nrerr := &cerrors.ErrTestStepsNeverReturned{} - if err == nil { - err = nrerr - } - for _, ss := range tr.steps { - if ss.stepRunning { - ss.setErrLocked(&cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}}) - nrerr.StepNames = append(nrerr.StepNames, ss.sb.TestStepLabel) - // Cancel this step's context, this will help release the reader. - ss.cancel() - } - } + ss.resumeState = result.ResumeState + ss.setErrLocked(result.Err) + tr.mu.Unlock() } - // Emit step error events. + for _, ss := range tr.steps { - if ss.runErr != nil && ss.runErr != xcontext.ErrPaused && ss.runErr != xcontext.ErrCanceled { - if err := ss.emitEvent(ctx, EventTestError, nil, ss.runErr.Error()); err != nil { + tr.mu.Lock() + stepErr := ss.runErr + tr.mu.Unlock() + + if stepErr != nil && stepErr != xcontext.ErrPaused && stepErr != xcontext.ErrCanceled { + if err := ss.emitEvent(ctx, EventTestError, nil, stepErr.Error()); err != nil { ctx.Errorf("failed to emit event: %s", err) } } } - return err + + resultErr := tr.checkStepRunnersLocked() + if len(stepsNeverReturned) > 0 && resultErr == nil { + resultErr = &cerrors.ErrTestStepsNeverReturned{StepNames: stepsNeverReturned} + } + return resultErr } func (tr *TestRunner) injectTarget(ctx xcontext.Context, tgs *targetState, ss *stepState) error { - var err error ctx.Debugf("%s: injecting into %s", tgs, ss) - select { - case ss.inCh <- tgs.tgt: - // Injected successfully. - err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil) + + tgt := tgs.tgt + err := ss.stepRunner.AddTarget(ctx, tgt) + if err == nil { + if err = ss.emitEvent(ctx, target.EventTargetIn, tgs.tgt, nil); err != nil { + err = fmt.Errorf("failed to report target injection: %w", err) + } + tr.mu.Lock() - defer tr.mu.Unlock() // By the time we get here the target could have been processed and result posted already, hence the check. if tgs.CurPhase == targetStepPhaseBegin { tgs.CurPhase = targetStepPhaseRun } - if err != nil { - err = fmt.Errorf("failed to report target injection: %w", err) - } - case <-ctx.Until(xcontext.ErrPaused): - err = xcontext.ErrPaused - case <-ctx.Done(): - err = xcontext.ErrCanceled + tr.mu.Unlock() } - tr.cond.Signal() + tr.monitorCond.Signal() return err } @@ -399,13 +371,7 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, return xcontext.ErrPaused } - select { - case res, ok := <-resCh: - if !ok { - // Channel is closed when job is paused to make sure all results are processed. - ctx.Debugf("%s: result channel closed", tgs) - return xcontext.ErrPaused - } + processTargetResult := func(res error) error { ctx.Debugf("%s: result recd for %s", tgs, ss) var err error if res == nil { @@ -422,14 +388,31 @@ func (tr *TestRunner) awaitTargetResult(ctx xcontext.Context, tgs *targetState, } tgs.CurPhase = targetStepPhaseEnd tr.mu.Unlock() - tr.cond.Signal() + tr.monitorCond.Signal() return err + } + + select { + case res, ok := <-resCh: + if !ok { + // Channel is closed when job is paused to make sure all results are processed. + ctx.Debugf("%s: result channel closed", tgs) + return xcontext.ErrPaused + } + return processTargetResult(res) // Check for cancellation. // Notably we are not checking for the pause condition here: // when paused, we want to let all the injected targets to finish // and collect all the results they produce. If that doesn't happen, // step runner will close resCh on its way out and unblock us. case <-ctx.Done(): + // we might have a race-condition here when both events happen + // in this case we should prioritise result processing + select { + case res := <-resCh: + return processTargetResult(res) + default: + } tr.mu.Lock() ctx.Debugf("%s: canceled 2", tgs) tr.mu.Unlock() @@ -479,10 +462,9 @@ loop: } tr.mu.Unlock() // Make sure we have a step runner active. If not, start one. - tr.runStepIfNeeded(ss) + err := tr.runStepIfNeeded(ss) // Inject the target. - var err error - if inject { + if err == nil && inject { err = tr.injectTarget(ctx, tgs, ss) } // Await result. It will be communicated to us by the step runner @@ -492,12 +474,12 @@ loop: } tr.mu.Lock() if err != nil { - ss.ctx.Errorf("%s", err) + ctx.Errorf("%s", err) switch err { case xcontext.ErrPaused: - ss.ctx.Debugf("%s: paused 1", tgs) + ctx.Debugf("%s: paused 1", tgs) case xcontext.ErrCanceled: - ss.ctx.Debugf("%s: canceled 1", tgs) + ctx.Debugf("%s: canceled 1", tgs) default: ss.setErrLocked(err) } @@ -518,28 +500,64 @@ loop: tr.mu.Lock() ctx.Debugf("%s: target handler finished", tgs) tgs.handlerRunning = false - tr.cond.Signal() + tr.monitorCond.Signal() tr.mu.Unlock() } // runStepIfNeeded starts the step runner goroutine if not already running. -func (tr *TestRunner) runStepIfNeeded(ss *stepState) { +func (tr *TestRunner) runStepIfNeeded(ss *stepState) error { tr.mu.Lock() defer tr.mu.Unlock() - if ss.stepStarted { - return + + if ss.stepRunner.Started() { + return nil } - ss.stepStarted = true - ss.stepRunning = true - ss.readerRunning = true - go tr.stepRunner(ss) - go tr.stepReader(ss) -} -func (ss *stepState) setErr(mu sync.Locker, err error) { - mu.Lock() - defer mu.Unlock() - ss.setErrLocked(err) + resumeState := ss.resumeState + ss.resumeState = nil + + resultCh, err := ss.stepRunner.Run(ss.ctx, ss.sb, ss.ev, resumeState) + if err != nil { + return fmt.Errorf("failed to stert a step runner for '%s': %v", ss.sb.TestStepLabel, err) + } + + ss.readingLoopRunning = true + go func() { + defer func() { + tr.mu.Lock() + defer tr.mu.Unlock() + + ss.readingLoopRunning = false + tr.monitorCond.Signal() + }() + for { + select { + case stepResult, ok := <-resultCh: + if !ok { + ss.ctx.Debugf("step runner results channel was closed") + return + } + if stepResult.Target == nil { + ss.ctx.Errorf("step runner results an err: %v", err) + tr.mu.Lock() + ss.setErrLocked(stepResult.Err) + tr.mu.Unlock() + } else { + if err := tr.reportTargetResult(ss.ctx, ss, stepResult.Target, stepResult.Err); err != nil { + ss.ctx.Errorf("Reporting target result failed: %v", err) + tr.mu.Lock() + ss.setErrLocked(err) + tr.mu.Unlock() + } + } + tr.monitorCond.Signal() + case <-ss.ctx.Done(): + ss.ctx.Debugf("step context was cancelled") + return + } + } + }() + return nil } // setErrLocked sets step runner error unless already set. @@ -547,7 +565,7 @@ func (ss *stepState) setErrLocked(err error) { if err == nil || ss.runErr != nil { return } - ss.ctx.Errorf("err: %v", err) + ss.ctx.Errorf("setErrLocked: %v", err) ss.runErr = err } @@ -570,42 +588,8 @@ func (ss *stepState) emitEvent(ctx xcontext.Context, name event.Name, tgt *targe return ss.ev.Emit(ctx, errEv) } -// stepRunner runs a test pipeline's step (the Run() method). -func (tr *TestRunner) stepRunner(ss *stepState) { - ss.ctx.Debugf("%s: step runner active", ss) - defer func() { - if r := recover(); r != nil { - tr.mu.Lock() - ss.stepRunning = false - ss.setErrLocked(&cerrors.ErrTestStepPaniced{ - StepName: ss.sb.TestStepLabel, - StackTrace: fmt.Sprintf("%s / %s", r, debug.Stack()), - }) - tr.mu.Unlock() - tr.safeCloseOutCh(ss) - } - }() - tr.mu.Lock() - var runErr error - resumeState := ss.resumeState - ss.resumeState = nil - tr.mu.Unlock() - chans := test.TestStepChannels{In: ss.inCh, Out: ss.outCh} - resumeState, runErr = ss.sb.TestStep.Run(ss.ctx, chans, ss.sb.Parameters, ss.ev, resumeState) - ss.ctx.Debugf("%s: step runner finished %v, rs %s", ss, runErr, string(resumeState)) - tr.mu.Lock() - ss.stepRunning = false - ss.setErrLocked(runErr) - if runErr == xcontext.ErrPaused { - ss.resumeState = resumeState - } - tr.mu.Unlock() - // Signal to the result processor that no more will be coming. - tr.safeCloseOutCh(ss) -} - // reportTargetResult reports result of executing a step to the appropriate target handler. -func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res error) error { +func (tr *TestRunner) reportTargetResult(ctx xcontext.Context, ss *stepState, tgt *target.Target, res error) error { resCh, err := func() (chan error, error) { tr.mu.Lock() defer tr.mu.Unlock() @@ -616,13 +600,6 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res Target: tgt.ID, } } - if ss.tgtDone[tgt] { - return nil, &cerrors.ErrTestStepReturnedDuplicateResult{ - StepName: ss.sb.TestStepLabel, - Target: tgt.ID, - } - } - ss.tgtDone[tgt] = true // Begin is also allowed here because it may happen that we get a result before target handler updates phase. if tgs.CurStep != ss.stepIndex || (tgs.CurPhase != targetStepPhaseBegin && tgs.CurPhase != targetStepPhaseRun) { @@ -634,7 +611,7 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res if tgs.resCh == nil { // If canceled or paused, target handler may have left early. We don't care though. select { - case <-ss.ctx.Done(): + case <-ctx.Done(): return nil, xcontext.ErrCanceled default: // This should not happen, must be an internal error. @@ -642,12 +619,13 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res } } tgs.CurPhase = targetStepPhaseResultPending - ss.ctx.Debugf("%s: result for %s: %v", ss, tgs, res) + ctx.Debugf("%s: result for %s: %v", ss, tgs, res) return tgs.resCh, nil }() if err != nil { return err } + select { case resCh <- res: break @@ -657,56 +635,6 @@ func (tr *TestRunner) reportTargetResult(ss *stepState, tgt *target.Target, res return nil } -func (tr *TestRunner) safeCloseOutCh(ss *stepState) { - defer func() { - if r := recover(); r != nil { - ss.setErr(&tr.mu, &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel}) - } - }() - close(ss.outCh) -} - -// stepReader receives results from the step's output channel and forwards them to the appropriate target handlers. -func (tr *TestRunner) stepReader(ss *stepState) { - ss.ctx.Debugf("%s: step reader active", ss) - var err error - cancelCh := ss.ctx.Done() - var shutdownTimeoutCh <-chan time.Time -loop: - for { - select { - case res, ok := <-ss.outCh: - if !ok { - ss.ctx.Debugf("%s: out chan closed", ss) - tr.mu.Lock() - if ss.stepRunning { - // This means that plugin closed its channels before leaving. - err = &cerrors.ErrTestStepClosedChannels{StepName: ss.sb.TestStepLabel} - } - tr.mu.Unlock() - break loop - } - if err = tr.reportTargetResult(ss, res.Target, res.Err); err != nil { - break loop - } - case <-cancelCh: - ss.ctx.Debugf("%s: canceled 3, draining", ss) - // Allow some time to drain - cancelCh = nil - shutdownTimeoutCh = time.After(tr.shutdownTimeout) - case <-shutdownTimeoutCh: - err = &cerrors.ErrTestStepsNeverReturned{StepNames: []string{ss.sb.TestStepLabel}} - break loop - } - } - tr.mu.Lock() - defer tr.mu.Unlock() - ss.setErrLocked(err) - ss.readerRunning = false - ss.ctx.Debugf("%s: step reader finished, %t %t %v", ss, ss.stepRunning, ss.readerRunning, ss.runErr) - tr.cond.Signal() -} - // checkStepRunnersLocked checks if any step runner has encountered an error. func (tr *TestRunner) checkStepRunnersLocked() error { for i, ss := range tr.steps { @@ -767,12 +695,12 @@ stepLoop: } if !ok { // Wait for notification: as progress is being made, we get notified. - tr.cond.Wait() + tr.monitorCond.Wait() continue } // All targets ok, close the step's input channel. ctx.Debugf("monitor pass %d: %s: no more targets, closing input channel", pass, ss) - close(ss.inCh) + ss.stepRunner.Stop() step++ } // Wait for all the targets to finish. @@ -793,7 +721,7 @@ tgtLoop: // It's been paused, this is fine. continue } - if ss.stepStarted && !ss.readerRunning { + if ss.stepRunner.Started() && !ss.readingLoopRunning { // Target has been injected but step runner has exited without a valid reason, this target has been lost. runErr = &cerrors.ErrTestStepLostTargets{ StepName: ss.sb.TestStepLabel, @@ -810,7 +738,7 @@ tgtLoop: break } // Wait for notification: as progress is being made, we get notified. - tr.cond.Wait() + tr.monitorCond.Wait() } ctx.Debugf("monitor: finished, %v", runErr) return runErr @@ -818,9 +746,9 @@ tgtLoop: func NewTestRunnerWithTimeouts(shutdownTimeout time.Duration) *TestRunner { tr := &TestRunner{ - shutdownTimeout: shutdownTimeout, + shutdownTimeout: shutdownTimeout, } - tr.cond = sync.NewCond(&tr.mu) + tr.monitorCond = sync.NewCond(&tr.mu) return tr } diff --git a/pkg/runner/test_runner_test.go b/pkg/runner/test_runner_test.go index b27186f6..8f1962cc 100644 --- a/pkg/runner/test_runner_test.go +++ b/pkg/runner/test_runner_test.go @@ -73,7 +73,7 @@ type runRes struct { } type MemoryStorageEngine struct { - Storage storage.ResettableStorage + Storage storage.ResettableStorage StorageEngineVault *storage.SimpleEngineVault } @@ -89,7 +89,7 @@ func NewMemoryStorageEngine() (*MemoryStorageEngine, error) { } return &MemoryStorageEngine{ - Storage: ms, + Storage: ms, StorageEngineVault: storageEngineVault, }, nil } @@ -206,7 +206,7 @@ func (s *TestRunnerSuite) Test1Step1Success() { require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepFinishedEvent]} -`, s.internalStorage.GetStepEvents(testName,"")) +`, s.internalStorage.GetStepEvents(testName, "")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -218,7 +218,7 @@ func (s *TestRunnerSuite) Test1Step1Success() { // Simple case: one target, one step that blocks for a bit, success. // We block for longer than the shutdown timeout of the test runner. func (s *TestRunnerSuite) Test1StepLongerThanShutdown1Success() { - tr := NewTestRunnerWithTimeouts(100*time.Millisecond) + tr := NewTestRunnerWithTimeouts(100 * time.Millisecond) _, targetsResults, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second, []*target.Target{tgt("T1")}, []test.TestStepBundle{ @@ -239,7 +239,7 @@ func (s *TestRunnerSuite) Test1StepLongerThanShutdown1Success() { {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestFinishedEvent]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetOut]} -`, s.internalStorage.GetTargetEvents(testName,"T1")) +`, s.internalStorage.GetTargetEvents(testName, "T1")) } // Simple case: one target, one step, failure. @@ -258,7 +258,7 @@ func (s *TestRunnerSuite) Test1Step1Fail() { require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 1][(*Target)(nil) TestStepFinishedEvent]} -`, s.internalStorage.GetStepEvents(testName,"Step 1")) +`, s.internalStorage.GetStepEvents(testName, "Step 1")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -316,7 +316,7 @@ func (s *TestRunnerSuite) Test3StepsNotReachedStepNotRun() { {[1 1 SimpleTest 0 Step 2][(*Target)(nil) TestStepRunningEvent]} {[1 1 SimpleTest 0 Step 2][(*Target)(nil) TestStepFinishedEvent]} `, s.internalStorage.GetStepEvents(testName, "Step 2")) - require.Equal(s.T(), "\n\n", s.internalStorage.GetStepEvents(testName,"Step 3")) + require.Equal(s.T(), "\n\n", s.internalStorage.GetStepEvents(testName, "Step 3")) require.Equal(s.T(), ` {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TargetIn]} {[1 1 SimpleTest 0 Step 1][Target{ID: "T1"} TestStartedEvent]} @@ -338,7 +338,7 @@ func (s *TestRunnerSuite) Test3StepsNotReachedStepNotRun() { // A misbehaving step that fails to shut down properly after processing targets // and does not return. func (s *TestRunnerSuite) TestNoReturnStepWithCorrectTargetForwarding() { - tr := NewTestRunnerWithTimeouts(200*time.Millisecond) + tr := NewTestRunnerWithTimeouts(200 * time.Millisecond) ctx, cancel := xcontext.WithCancel(ctx) defer cancel() _, _, err := s.runWithTimeout(ctx, tr, nil, 1, 2*time.Second, @@ -363,7 +363,7 @@ func (s *TestRunnerSuite) TestStepPanics() { ) require.Error(s.T(), err) require.IsType(s.T(), &cerrors.ErrTestStepPaniced{}, err) - require.Equal(s.T(), "\n\n", s.internalStorage.GetTargetEvents(testName,"T1")) + require.Equal(s.T(), "\n\n", s.internalStorage.GetTargetEvents(testName, "T1")) require.Contains(s.T(), s.internalStorage.GetStepEvents(testName, "Step 1"), "step Step 1 paniced") } diff --git a/tests/integ/plugins/testrunner_test.go b/tests/integ/plugins/testrunner_test.go index df3ddfb1..dcf1d7bc 100644 --- a/tests/integ/plugins/testrunner_test.go +++ b/tests/integ/plugins/testrunner_test.go @@ -46,7 +46,7 @@ var ( var ( pluginRegistry *pluginregistry.PluginRegistry targets []*target.Target - successTimeout = 5 * time.Second + successTimeout = 20 * time.Second ) var testSteps = map[string]test.TestStepFactory{