diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..b55a0c3 --- /dev/null +++ b/codecov.yml @@ -0,0 +1,4 @@ +coverage: + range: 70..100 + round: down + precision: 2 diff --git a/mono/mono_timeout.go b/mono/mono_timeout.go index 54a8110..293bd99 100644 --- a/mono/mono_timeout.go +++ b/mono/mono_timeout.go @@ -2,6 +2,8 @@ package mono import ( "context" + "math" + "sync/atomic" "time" "github.com/jjeffcaii/reactor-go" @@ -13,6 +15,21 @@ type monoTimeout struct { timeout time.Duration } +func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout { + return &monoTimeout{ + source: source, + timeout: timeout, + } +} + +func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) { + m.source.SubscribeWith(ctx, &timeoutSubscriber{ + actual: s, + timeout: m.timeout, + done: make(chan struct{}), + }) +} + func (m *monoTimeout) Parent() reactor.RawPublisher { return m.source } @@ -21,33 +38,37 @@ type timeoutSubscriber struct { actual reactor.Subscriber timeout time.Duration done chan struct{} + closed int32 } func (t *timeoutSubscriber) OnComplete() { - select { - case <-t.done: - default: + if atomic.CompareAndSwapInt32(&t.closed, 0, math.MaxInt32) || atomic.CompareAndSwapInt32(&t.closed, 1, math.MaxInt32) { close(t.done) t.actual.OnComplete() } } func (t *timeoutSubscriber) OnError(err error) { - select { - case <-t.done: - hooks.Global().OnErrorDrop(err) - default: + if atomic.CompareAndSwapInt32(&t.closed, 0, -1) { close(t.done) t.actual.OnError(err) + return + } + + // item is emitted before error reach, should be processed as completed. + if atomic.CompareAndSwapInt32(&t.closed, 1, -1) { + close(t.done) + t.actual.OnComplete() } + + hooks.Global().OnErrorDrop(err) } func (t *timeoutSubscriber) OnNext(any reactor.Any) { - select { - case <-t.done: - hooks.Global().OnNextDrop(any) - default: + if atomic.CompareAndSwapInt32(&t.closed, 0, 1) { t.actual.OnNext(any) + } else { + hooks.Global().OnNextDrop(any) } } @@ -63,18 +84,3 @@ func (t *timeoutSubscriber) OnSubscribe(ctx context.Context, subscription reacto }() t.actual.OnSubscribe(ctx, subscription) } - -func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) { - m.source.SubscribeWith(ctx, &timeoutSubscriber{ - actual: s, - timeout: m.timeout, - done: make(chan struct{}), - }) -} - -func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout { - return &monoTimeout{ - source: source, - timeout: timeout, - } -}