Skip to content

Commit

Permalink
Code style improvements of StepRunner
Browse files Browse the repository at this point in the history
Signed-off-by: Ilya <[email protected]>
  • Loading branch information
rihter007 committed Dec 14, 2021
1 parent 5efe6d8 commit 62d049c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
63 changes: 32 additions & 31 deletions pkg/runner/step_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,24 @@ type StepResult struct {
type StepRunner struct {
mu sync.Mutex

stepIn chan *target.Target
reportedTargets map[string]struct{}

started bool
runningLoopActive bool
input chan *target.Target
stopOnce sync.Once
resultsChan chan<- StepRunnerEvent
runningLoopActive bool
finishedCh chan struct{}

resultErr error
resultResumeState json.RawMessage
notifyStoppedOnce sync.Once
resultsChan chan<- StepRunnerEvent
}

func (sr *StepRunner) AddTarget(ctx xcontext.Context, tgt *target.Target) error {
if tgt == nil {
return fmt.Errorf("target should not be nil")
}

select {
case sr.stepIn <- tgt:
case sr.input <- tgt:
case <-ctx.Until(xcontext.ErrPaused):
return xcontext.ErrPaused
case <-ctx.Done():
Expand All @@ -61,20 +62,15 @@ func (sr *StepRunner) Run(
ev testevent.Emitter,
resumeState json.RawMessage,
) (<-chan StepRunnerEvent, error) {
err := func() error {
sr.mu.Lock()
defer sr.mu.Unlock()
if sr.started {
return &cerrors.ErrAlreadyDone{}
}
sr.started = true
return nil
}()
if err != nil {
return nil, err
sr.mu.Lock()
defer sr.mu.Unlock()

if sr.resultsChan != nil {
return nil, &cerrors.ErrAlreadyDone{}
}

sr.finishedCh = make(chan struct{})
finishedCh := make(chan struct{})
sr.finishedCh = finishedCh
resultsChan := make(chan StepRunnerEvent, 1)
sr.resultsChan = resultsChan

Expand All @@ -95,10 +91,11 @@ func (sr *StepRunner) Run(
ctx.Debugf("StepRunner finished")
}

stepIn := sr.input
stepOut := make(chan test.TestStepResult)
go func() {
defer finish()
sr.runningLoop(ctx, stepOut, bundle, ev, resumeState)
sr.runningLoop(ctx, stepIn, stepOut, bundle, ev, resumeState)
ctx.Debugf("Running loop finished")
}()

Expand All @@ -107,21 +104,22 @@ func (sr *StepRunner) Run(
sr.readingLoop(ctx, stepOut, bundle.TestStepLabel)
ctx.Debugf("Reading loop finished")
}()

return resultsChan, nil
}

func (sr *StepRunner) Started() bool {
sr.mu.Lock()
defer sr.mu.Unlock()

return sr.started
return sr.resultsChan != nil
}

func (sr *StepRunner) Running() bool {
sr.mu.Lock()
defer sr.mu.Unlock()

return sr.started && sr.finishedCh != nil
return sr.resultsChan != nil && sr.finishedCh != nil
}

// WaitResults returns TestStep.Run() output
Expand Down Expand Up @@ -161,7 +159,7 @@ func (sr *StepRunner) WaitResults(ctx context.Context) (stepResult StepResult, e
// Stop triggers TestStep to stop running by closing input channel
func (sr *StepRunner) Stop() {
sr.stopOnce.Do(func() {
close(sr.stepIn)
close(sr.input)
})
}

Expand All @@ -170,6 +168,7 @@ func (sr *StepRunner) readingLoop(
stepOut chan test.TestStepResult,
testStepLabel string,
) {
reportedTargets := make(map[string]struct{})
for {
select {
case res, ok := <-stepOut:
Expand All @@ -191,10 +190,8 @@ func (sr *StepRunner) readingLoop(
}
ctx.Infof("Obtained '%v' for target '%s'", res, res.Target.ID)

sr.mu.Lock()
_, found := sr.reportedTargets[res.Target.ID]
sr.reportedTargets[res.Target.ID] = struct{}{}
sr.mu.Unlock()
_, found := reportedTargets[res.Target.ID]
reportedTargets[res.Target.ID] = struct{}{}

if found {
sr.setErr(ctx, &cerrors.ErrTestStepReturnedDuplicateResult{StepName: testStepLabel, Target: res.Target.ID})
Expand All @@ -219,6 +216,7 @@ func (sr *StepRunner) readingLoop(

func (sr *StepRunner) runningLoop(
ctx xcontext.Context,
stepIn <-chan *target.Target,
stepOut chan test.TestStepResult,
bundle test.TestStepBundle,
ev testevent.Emitter,
Expand Down Expand Up @@ -249,7 +247,7 @@ func (sr *StepRunner) runningLoop(
}
}()

inChannels := test.TestStepChannels{In: sr.stepIn, Out: stepOut}
inChannels := test.TestStepChannels{In: stepIn, Out: stepOut}
return bundle.TestStep.Run(ctx, inChannels, bundle.Parameters, ev, resumeState)
}()
ctx.Debugf("TestStep finished '%v', rs %s", err, string(resultResumeState))
Expand All @@ -273,7 +271,11 @@ func (sr *StepRunner) setErrLocked(ctx xcontext.Context, err error) {
}
ctx.Errorf("err: %v", err)
sr.resultErr = err
sr.notifyStopped(sr.resultErr)

// notifyStopped is a blocking operation: should release the lock
sr.mu.Unlock()
sr.notifyStopped(err)
sr.mu.Lock()
}

func (sr *StepRunner) notifyStopped(err error) {
Expand All @@ -296,7 +298,6 @@ func safeCloseOutCh(ch chan test.TestStepResult) (recoverOccurred bool) {
// NewStepRunner creates a new StepRunner object
func NewStepRunner() *StepRunner {
return &StepRunner{
stepIn: make(chan *target.Target),
reportedTargets: make(map[string]struct{}),
input: make(chan *target.Target),
}
}
3 changes: 2 additions & 1 deletion pkg/runner/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func (tr *TestRunner) runStepIfNeeded(ss *stepState) error {
return
}
if stepResult.Target == nil {
ss.ctx.Errorf("step runner results an err: %v", err)
tr.mu.Lock()
ss.setErrLocked(stepResult.Err)
tr.mu.Unlock()
Expand All @@ -564,7 +565,7 @@ func (ss *stepState) setErrLocked(err error) {
if err == nil || ss.runErr != nil {
return
}
ss.ctx.Errorf("err: %v", err)
ss.ctx.Errorf("setErrLocked: %v", err)
ss.runErr = err
}

Expand Down

0 comments on commit 62d049c

Please sign in to comment.