Skip to content

Commit

Permalink
Only release semaphore when acquire succeeds (#128)
Browse files Browse the repository at this point in the history
Fixes GH-127
  • Loading branch information
sethvargo authored Jan 5, 2023
1 parent 4ed9440 commit 12d7c1b
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (w *Worker[T]) Do(ctx context.Context, fn WorkFunc[T]) error {
}

if err := w.sem.Acquire(ctx, 1); err != nil {
return err
return fmt.Errorf("failed to execute job: %w", err)
}

// It's possible the worker was stopped while we were waiting for the
// semaphore to acquire, but the worker is actually stopped.
if w.isStopped() {
w.sem.Release(1)
defer w.sem.Release(1)
return ErrStopped
}

Expand Down Expand Up @@ -127,8 +127,12 @@ func (w *Worker[T]) Wait(ctx context.Context) error {
return ErrStopped
}

if err := w.sem.Acquire(ctx, w.size); err != nil {
return fmt.Errorf("failed to wait for all jobs to finish: %w", err)
}
defer w.sem.Release(w.size)
return w.sem.Acquire(ctx, w.size)

return nil
}

// Done immediately stops the worker and prevents new work from being enqueued.
Expand Down

0 comments on commit 12d7c1b

Please sign in to comment.