From 12d7c1ba0fcada25a0d3467fdd2003335f12d6b4 Mon Sep 17 00:00:00 2001 From: Seth Vargo Date: Thu, 5 Jan 2023 10:07:16 -0500 Subject: [PATCH] Only release semaphore when acquire succeeds (#128) Fixes GH-127 --- internal/worker/worker.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/internal/worker/worker.go b/internal/worker/worker.go index d7d6f2b..db8e4e7 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -90,13 +90,13 @@ func (w *Worker[T]) Do(ctx context.Context, fn WorkFunc[T]) error { } if err := w.sem.Acquire(ctx, 1); err != nil { - return err + return fmt.Errorf("failed to execute job: %w", err) } // It's possible the worker was stopped while we were waiting for the // semaphore to acquire, but the worker is actually stopped. if w.isStopped() { - w.sem.Release(1) + defer w.sem.Release(1) return ErrStopped } @@ -127,8 +127,12 @@ func (w *Worker[T]) Wait(ctx context.Context) error { return ErrStopped } + if err := w.sem.Acquire(ctx, w.size); err != nil { + return fmt.Errorf("failed to wait for all jobs to finish: %w", err) + } defer w.sem.Release(w.size) - return w.sem.Acquire(ctx, w.size) + + return nil } // Done immediately stops the worker and prevents new work from being enqueued.