diff --git a/flux/flux_create_sink.go b/flux/flux_create_sink.go index b00d5c9..fd14f6f 100644 --- a/flux/flux_create_sink.go +++ b/flux/flux_create_sink.go @@ -2,6 +2,7 @@ package flux import ( "context" + "sync" "sync/atomic" rs "github.com/jjeffcaii/reactor-go" @@ -14,17 +15,25 @@ type bufferedSink struct { n int32 draining int32 done bool + cond *sync.Cond } func (p *bufferedSink) Request(n int) { atomic.AddInt32(&(p.n), int32(n)) + p.drain() } func (p *bufferedSink) Cancel() { + // TODO: support cancel p.dispose() } func (p *bufferedSink) Complete() { + p.cond.L.Lock() + for atomic.LoadInt32(&(p.draining)) == 1 || p.q.size() > 0 { + p.cond.Wait() + } + p.cond.L.Unlock() p.s.OnComplete() p.dispose() } @@ -43,7 +52,12 @@ func (p *bufferedSink) drain() { if !atomic.CompareAndSwapInt32(&(p.draining), 0, 1) { return } - defer atomic.StoreInt32(&(p.draining), 0) + defer func() { + p.cond.L.Lock() + atomic.StoreInt32(&(p.draining), 0) + p.cond.Broadcast() + p.cond.L.Unlock() + }() for atomic.AddInt32(&(p.n), -1) > -1 { if p.done { return @@ -66,7 +80,8 @@ func (p *bufferedSink) dispose() { func newBufferedSink(s rs.Subscriber, cap int) *bufferedSink { return &bufferedSink{ - s: s, - q: newQueue(cap), + s: s, + q: newQueue(cap), + cond: sync.NewCond(&sync.Mutex{}), } } diff --git a/flux/flux_test.go b/flux/flux_test.go index 55c209d..259c458 100644 --- a/flux/flux_test.go +++ b/flux/flux_test.go @@ -251,3 +251,36 @@ func TestEmpty(t *testing.T) { }), ) } + +func TestCreateWithRequest(t *testing.T) { + + const totals = 20 + f := flux.Create(func(ctx context.Context, sink flux.Sink) { + for i := 0; i < totals; i++ { + sink.Next(i) + } + sink.Complete() + }) + + var processed int32 + + su := make(chan rs.Subscription, 1) + + sub := rs.NewSubscriber(rs.OnNext(func(v interface{}) { + log.Println("next:", v) + processed++ + }), rs.OnSubscribe(func(s rs.Subscription) { + su <- s + s.Request(1) + }), rs.OnComplete(func() { + log.Println("complete") + })) + + time.AfterFunc(100*time.Millisecond, func() { + s := <-su + s.Request(totals - 1) + }) + + f.SubscribeWith(context.Background(), sub) + assert.Equal(t, totals, int(processed), "bad processed num") +} diff --git a/flux/processor_test.go b/flux/processor_test.go index 8914d10..37378cb 100644 --- a/flux/processor_test.go +++ b/flux/processor_test.go @@ -1,4 +1,4 @@ -package flux +package flux_test import ( "context" @@ -7,10 +7,11 @@ import ( "time" rs "github.com/jjeffcaii/reactor-go" + "github.com/jjeffcaii/reactor-go/flux" ) func TestUnicastProcessor(t *testing.T) { - p := NewUnicastProcessor() + p := flux.NewUnicastProcessor() time.AfterFunc(100*time.Millisecond, func() { p.Next(1) @@ -40,3 +41,21 @@ func TestUnicastProcessor(t *testing.T) { })) <-done } + +func TestDoOnSubscribe(t *testing.T) { + pc := flux.NewUnicastProcessor() + time.AfterFunc(1*time.Second, func() { + pc.Next(111) + pc.Next(222) + pc.Complete() + }) + done := make(chan struct{}) + pc.DoFinally(func(s rs.SignalType) { + close(done) + }).DoOnSubscribe(func(su rs.Subscription) { + log.Println("doOnSubscribe") + }).BlockLast(context.Background()) + + <-done + +} diff --git a/flux/queue.go b/flux/queue.go index 7aabd8d..97a7901 100644 --- a/flux/queue.go +++ b/flux/queue.go @@ -6,12 +6,17 @@ type queue interface { io.Closer offer(interface{}) poll() (interface{}, bool) + size() int } type simpleQueue struct { c chan interface{} } +func (q simpleQueue) size() int { + return len(q.c) +} + func (q simpleQueue) Close() (err error) { close(q.c) return