Skip to content

Commit

Permalink
bugfix: requestN for flux create.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Jul 23, 2019
1 parent 57c07f0 commit f497379
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 5 deletions.
21 changes: 18 additions & 3 deletions flux/flux_create_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flux

import (
"context"
"sync"
"sync/atomic"

rs "github.com/jjeffcaii/reactor-go"
Expand All @@ -14,17 +15,25 @@ type bufferedSink struct {
n int32
draining int32
done bool
cond *sync.Cond
}

func (p *bufferedSink) Request(n int) {
atomic.AddInt32(&(p.n), int32(n))
p.drain()
}

func (p *bufferedSink) Cancel() {
// TODO: support cancel
p.dispose()
}

func (p *bufferedSink) Complete() {
p.cond.L.Lock()
for atomic.LoadInt32(&(p.draining)) == 1 || p.q.size() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
p.s.OnComplete()
p.dispose()
}
Expand All @@ -43,7 +52,12 @@ func (p *bufferedSink) drain() {
if !atomic.CompareAndSwapInt32(&(p.draining), 0, 1) {
return
}
defer atomic.StoreInt32(&(p.draining), 0)
defer func() {
p.cond.L.Lock()
atomic.StoreInt32(&(p.draining), 0)
p.cond.Broadcast()
p.cond.L.Unlock()
}()
for atomic.AddInt32(&(p.n), -1) > -1 {
if p.done {
return
Expand All @@ -66,7 +80,8 @@ func (p *bufferedSink) dispose() {

func newBufferedSink(s rs.Subscriber, cap int) *bufferedSink {
return &bufferedSink{
s: s,
q: newQueue(cap),
s: s,
q: newQueue(cap),
cond: sync.NewCond(&sync.Mutex{}),
}
}
33 changes: 33 additions & 0 deletions flux/flux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,36 @@ func TestEmpty(t *testing.T) {
}),
)
}

func TestCreateWithRequest(t *testing.T) {

const totals = 20
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
for i := 0; i < totals; i++ {
sink.Next(i)
}
sink.Complete()
})

var processed int32

su := make(chan rs.Subscription, 1)

sub := rs.NewSubscriber(rs.OnNext(func(v interface{}) {
log.Println("next:", v)
processed++
}), rs.OnSubscribe(func(s rs.Subscription) {
su <- s
s.Request(1)
}), rs.OnComplete(func() {
log.Println("complete")
}))

time.AfterFunc(100*time.Millisecond, func() {
s := <-su
s.Request(totals - 1)
})

f.SubscribeWith(context.Background(), sub)
assert.Equal(t, totals, int(processed), "bad processed num")
}
23 changes: 21 additions & 2 deletions flux/processor_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package flux
package flux_test

import (
"context"
Expand All @@ -7,10 +7,11 @@ import (
"time"

rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/flux"
)

func TestUnicastProcessor(t *testing.T) {
p := NewUnicastProcessor()
p := flux.NewUnicastProcessor()

time.AfterFunc(100*time.Millisecond, func() {
p.Next(1)
Expand Down Expand Up @@ -40,3 +41,21 @@ func TestUnicastProcessor(t *testing.T) {
}))
<-done
}

func TestDoOnSubscribe(t *testing.T) {
pc := flux.NewUnicastProcessor()
time.AfterFunc(1*time.Second, func() {
pc.Next(111)
pc.Next(222)
pc.Complete()
})
done := make(chan struct{})
pc.DoFinally(func(s rs.SignalType) {
close(done)
}).DoOnSubscribe(func(su rs.Subscription) {
log.Println("doOnSubscribe")
}).BlockLast(context.Background())

<-done

}
5 changes: 5 additions & 0 deletions flux/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ type queue interface {
io.Closer
offer(interface{})
poll() (interface{}, bool)
size() int
}

type simpleQueue struct {
c chan interface{}
}

func (q simpleQueue) size() int {
return len(q.c)
}

func (q simpleQueue) Close() (err error) {
close(q.c)
return
Expand Down

0 comments on commit f497379

Please sign in to comment.