diff --git a/internal/worker/worker.go b/internal/worker/worker.go index d7d6f2b..db8e4e7 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -90,13 +90,13 @@ func (w *Worker[T]) Do(ctx context.Context, fn WorkFunc[T]) error { } if err := w.sem.Acquire(ctx, 1); err != nil { - return err + return fmt.Errorf("failed to execute job: %w", err) } // It's possible the worker was stopped while we were waiting for the // semaphore to acquire, but the worker is actually stopped. if w.isStopped() { - w.sem.Release(1) + defer w.sem.Release(1) return ErrStopped } @@ -127,8 +127,12 @@ func (w *Worker[T]) Wait(ctx context.Context) error { return ErrStopped } + if err := w.sem.Acquire(ctx, w.size); err != nil { + return fmt.Errorf("failed to wait for all jobs to finish: %w", err) + } defer w.sem.Release(w.size) - return w.sem.Acquire(ctx, w.size) + + return nil } // Done immediately stops the worker and prevents new work from being enqueued.