diff --git a/README.md b/README.md
index e0a01b4..0898c39 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@
> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
-
***[WARNING] DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
+
⚠️⚠️⚠️ ***DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
## Install
diff --git a/flux/flux.go b/flux/flux.go
index 4974c61..7e95764 100644
--- a/flux/flux.go
+++ b/flux/flux.go
@@ -1,48 +1,50 @@
package flux
import (
- "context"
+ "context"
- "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/scheduler"
+ "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/scheduler"
)
type OverflowStrategy int8
const (
- OverflowBuffer OverflowStrategy = iota
- OverflowIgnore
- OverflowError
- OverflowDrop
- OverflowLatest
+ OverflowBuffer OverflowStrategy = iota
+ OverflowIgnore
+ OverflowError
+ OverflowDrop
+ OverflowLatest
)
type FnSwitchOnFirst = func(s Signal, f Flux) Flux
type Flux interface {
- rs.Publisher
- Filter(rs.Predicate) Flux
- Map(rs.Transformer) Flux
- DoOnDiscard(rs.FnOnDiscard) Flux
- DoOnNext(rs.FnOnNext) Flux
- DoOnComplete(rs.FnOnComplete) Flux
- DoOnError(rs.FnOnError) Flux
- DoOnCancel(rs.FnOnCancel) Flux
- DoOnRequest(rs.FnOnRequest) Flux
- DoOnSubscribe(rs.FnOnSubscribe) Flux
- DoFinally(rs.FnOnFinally) Flux
- SwitchOnFirst(FnSwitchOnFirst) Flux
- SubscribeOn(scheduler.Scheduler) Flux
- BlockLast(context.Context) (interface{}, error)
+ rs.Publisher
+ Filter(rs.Predicate) Flux
+ Map(rs.Transformer) Flux
+ Take(n int) Flux
+ DoOnDiscard(rs.FnOnDiscard) Flux
+ DoOnNext(rs.FnOnNext) Flux
+ DoOnComplete(rs.FnOnComplete) Flux
+ DoOnError(rs.FnOnError) Flux
+ DoOnCancel(rs.FnOnCancel) Flux
+ DoOnRequest(rs.FnOnRequest) Flux
+ DoOnSubscribe(rs.FnOnSubscribe) Flux
+ DoFinally(rs.FnOnFinally) Flux
+ SwitchOnFirst(FnSwitchOnFirst) Flux
+ SubscribeOn(scheduler.Scheduler) Flux
+ BlockLast(context.Context) (interface{}, error)
+ ToChan(ctx context.Context, cap int) (c <-chan interface{}, e <-chan error)
}
type Sink interface {
- Complete()
- Error(error)
- Next(interface{})
+ Complete()
+ Error(error)
+ Next(interface{})
}
type Processor interface {
- Flux
- Sink
+ Flux
+ Sink
}
diff --git a/flux/flux_create_sink.go b/flux/flux_create_sink.go
index 9534f3c..b89abc5 100644
--- a/flux/flux_create_sink.go
+++ b/flux/flux_create_sink.go
@@ -1,16 +1,14 @@
package flux
import (
- "context"
- "sync"
- "sync/atomic"
+ "sync"
+ "sync/atomic"
- rs "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/hooks"
+ rs "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/hooks"
)
type bufferedSink struct {
- ctx context.Context
s rs.Subscriber
q queue
n int32
@@ -84,7 +82,6 @@ func (p *bufferedSink) drain() {
func (p *bufferedSink) dispose() {
p.done = true
_ = p.q.Close()
-
}
func newBufferedSink(s rs.Subscriber, cap int) *bufferedSink {
diff --git a/flux/flux_test.go b/flux/flux_test.go
index d33864c..d0fd328 100644
--- a/flux/flux_test.go
+++ b/flux/flux_test.go
@@ -1,316 +1,332 @@
package flux_test
import (
- "context"
- "errors"
- "fmt"
- "log"
- "testing"
- "time"
+ "context"
+ "errors"
+ "fmt"
+ "log"
+ "testing"
+ "time"
- "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/flux"
- "github.com/jjeffcaii/reactor-go/scheduler"
- "github.com/stretchr/testify/assert"
+ "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/flux"
+ "github.com/jjeffcaii/reactor-go/scheduler"
+ "github.com/stretchr/testify/assert"
)
func Example() {
- gen := func(ctx context.Context, sink flux.Sink) {
- for i := 0; i < 10; i++ {
- v := i
- sink.Next(v)
- }
- sink.Complete()
- }
- done := make(chan struct{})
+ gen := func(ctx context.Context, sink flux.Sink) {
+ for i := 0; i < 10; i++ {
+ v := i
+ sink.Next(v)
+ }
+ sink.Complete()
+ }
+ done := make(chan struct{})
- var su rs.Subscription
- flux.Create(gen).
- Filter(func(i interface{}) bool {
- return i.(int)%2 == 0
- }).
- Map(func(i interface{}) interface{} {
- return fmt.Sprintf("#HELLO_%04d", i.(int))
- }).
- SubscribeOn(scheduler.Elastic()).
- Subscribe(context.Background(),
- rs.OnSubscribe(func(s rs.Subscription) {
- su = s
- s.Request(1)
- }),
- rs.OnNext(func(v interface{}) {
- fmt.Println("next:", v)
- su.Request(1)
- }),
- rs.OnComplete(func() {
- close(done)
- }),
- )
- <-done
+ var su rs.Subscription
+ flux.Create(gen).
+ Filter(func(i interface{}) bool {
+ return i.(int)%2 == 0
+ }).
+ Map(func(i interface{}) interface{} {
+ return fmt.Sprintf("#HELLO_%04d", i.(int))
+ }).
+ SubscribeOn(scheduler.Elastic()).
+ Subscribe(context.Background(),
+ rs.OnSubscribe(func(s rs.Subscription) {
+ su = s
+ s.Request(1)
+ }),
+ rs.OnNext(func(v interface{}) {
+ fmt.Println("next:", v)
+ su.Request(1)
+ }),
+ rs.OnComplete(func() {
+ close(done)
+ }),
+ )
+ <-done
}
var testData = []int{1, 2, 3, 4, 5}
func TestSuite(t *testing.T) {
- var inputs []interface{}
- for _, value := range testData {
- inputs = append(inputs, value)
- }
- all := make(map[string]flux.Flux)
- all["just"] = flux.Just(inputs...)
- all["create"] = flux.Create(func(ctx context.Context, sink flux.Sink) {
- for _, it := range testData {
- sink.Next(it)
- }
- sink.Complete()
- })
- //all["unicast"] = nil
- all["range"] = flux.Range(1, 5)
- for k, v := range all {
- gen := func() flux.Flux {
- if k == "unicast" {
- vv := flux.NewUnicastProcessor()
- go func() {
- for _, it := range testData {
- vv.Next(it)
- }
- vv.Complete()
- }()
- time.Sleep(100 * time.Millisecond)
- return vv
- }
- return v
- }
- t.Run(fmt.Sprintf("%s_Request", k), func(t *testing.T) {
- testRequest(gen(), t)
- })
- t.Run(fmt.Sprintf("%s_Peek", k), func(t *testing.T) {
- testPeek(gen(), t)
- })
- t.Run(fmt.Sprintf("%s_Discard", k), func(t *testing.T) {
- testDiscard(gen(), t)
- })
- t.Run(fmt.Sprintf("%s_FilterRequest", k), func(t *testing.T) {
- testFilterRequest(gen(), t)
- })
- t.Run(fmt.Sprintf("%s_BlockLast", k), func(t *testing.T) {
- testBlockLast(gen(), t)
- })
- //t.Run(fmt.Sprintf("%s_DoSubscribeOn", k), func(t *testing.T) {
- // testDoOnSubscribe(gen(), t)
- //})
- }
+ var inputs []interface{}
+ for _, value := range testData {
+ inputs = append(inputs, value)
+ }
+ all := make(map[string]flux.Flux)
+ all["just"] = flux.Just(inputs...)
+ all["create"] = flux.Create(func(ctx context.Context, sink flux.Sink) {
+ for _, it := range testData {
+ sink.Next(it)
+ }
+ sink.Complete()
+ })
+ //all["unicast"] = nil
+ all["range"] = flux.Range(1, 5)
+ for k, v := range all {
+ gen := func() flux.Flux {
+ if k == "unicast" {
+ vv := flux.NewUnicastProcessor()
+ go func() {
+ for _, it := range testData {
+ vv.Next(it)
+ }
+ vv.Complete()
+ }()
+ time.Sleep(100 * time.Millisecond)
+ return vv
+ }
+ return v
+ }
+ t.Run(fmt.Sprintf("%s_Request", k), func(t *testing.T) {
+ testRequest(gen(), t)
+ })
+ t.Run(fmt.Sprintf("%s_Peek", k), func(t *testing.T) {
+ testPeek(gen(), t)
+ })
+ t.Run(fmt.Sprintf("%s_Discard", k), func(t *testing.T) {
+ testDiscard(gen(), t)
+ })
+ t.Run(fmt.Sprintf("%s_FilterRequest", k), func(t *testing.T) {
+ testFilterRequest(gen(), t)
+ })
+ t.Run(fmt.Sprintf("%s_BlockLast", k), func(t *testing.T) {
+ testBlockLast(gen(), t)
+ })
+ //t.Run(fmt.Sprintf("%s_DoSubscribeOn", k), func(t *testing.T) {
+ // testDoOnSubscribe(gen(), t)
+ //})
+ }
}
func testDoOnSubscribe(f flux.Flux, t *testing.T) {
- var su rs.Subscription
- var got int
- f.
- DoOnSubscribe(func(s rs.Subscription) {
- su = s
- su.Request(1)
- }).
- DoOnNext(func(v interface{}) {
- log.Println("next:", v)
- got++
- su.Request(1)
- }).
- Subscribe(context.Background())
- assert.Equal(t, len(testData), got, "bad amount")
+ var su rs.Subscription
+ var got int
+ f.
+ DoOnSubscribe(func(s rs.Subscription) {
+ su = s
+ su.Request(1)
+ }).
+ DoOnNext(func(v interface{}) {
+ log.Println("next:", v)
+ got++
+ su.Request(1)
+ }).
+ Subscribe(context.Background())
+ assert.Equal(t, len(testData), got, "bad amount")
}
func testBlockLast(f flux.Flux, t *testing.T) {
- last, err := f.BlockLast(context.Background())
- assert.NoError(t, err, "block last failed")
- assert.Equal(t, testData[len(testData)-1], last, "value doesn't match")
+ last, err := f.BlockLast(context.Background())
+ assert.NoError(t, err, "block last failed")
+ assert.Equal(t, testData[len(testData)-1], last, "value doesn't match")
}
func testFilterRequest(f flux.Flux, t *testing.T) {
- var s rs.Subscription
- var totals, discards, nexts, requests, filter int
- done := make(chan struct{})
- f.
- DoFinally(func(s rs.SignalType) {
- assert.Equal(t, rs.SignalTypeComplete, s, "bad signal")
- close(done)
- }).
- Filter(func(i interface{}) (ok bool) {
- totals++
- ok = i.(int)&1 == 0
- if ok {
- filter++
- }
- return
- }).
- DoOnDiscard(func(v interface{}) {
- discards++
- }).
- DoOnNext(func(v interface{}) {
- nexts++
- s.Request(1)
- }).
- DoOnRequest(func(n int) {
- requests++
- }).
- Subscribe(context.Background(), rs.OnSubscribe(func(su rs.Subscription) {
- s = su
- s.Request(1)
- }))
- <-done
- assert.Equal(t, totals, discards+nexts, "bad discards+nexts")
- assert.Equal(t, filter, nexts, "bad nexts")
- assert.Equal(t, nexts+1, requests, "bad requests")
+ var s rs.Subscription
+ var totals, discards, nexts, requests, filter int
+ done := make(chan struct{})
+ f.
+ DoFinally(func(s rs.SignalType) {
+ assert.Equal(t, rs.SignalTypeComplete, s, "bad signal")
+ close(done)
+ }).
+ Filter(func(i interface{}) (ok bool) {
+ totals++
+ ok = i.(int)&1 == 0
+ if ok {
+ filter++
+ }
+ return
+ }).
+ DoOnDiscard(func(v interface{}) {
+ discards++
+ }).
+ DoOnNext(func(v interface{}) {
+ nexts++
+ s.Request(1)
+ }).
+ DoOnRequest(func(n int) {
+ requests++
+ }).
+ Subscribe(context.Background(), rs.OnSubscribe(func(su rs.Subscription) {
+ s = su
+ s.Request(1)
+ }))
+ <-done
+ assert.Equal(t, totals, discards+nexts, "bad discards+nexts")
+ assert.Equal(t, filter, nexts, "bad nexts")
+ assert.Equal(t, nexts+1, requests, "bad requests")
}
func testDiscard(f flux.Flux, t *testing.T) {
- var next, next2, discard []int
- done := make(chan struct{})
- f.
- DoFinally(func(s rs.SignalType) {
- close(done)
- }).
- DoOnNext(func(v interface{}) {
- next = append(next, v.(int))
- }).
- Filter(func(i interface{}) bool {
- return i.(int) > 3
- }).
- DoOnNext(func(v interface{}) {
- next2 = append(next2, v.(int))
- }).
- DoOnDiscard(func(i interface{}) {
- discard = append(discard, i.(int))
- }).
- Subscribe(context.Background())
- <-done
- assert.Equal(t, testData, next, "bad next")
- assert.Equal(t, len(next), len(next2)+len(discard), "bad amount")
- assert.Equal(t, []int{4, 5}, next2, "bad next2")
- assert.Equal(t, []int{1, 2, 3}, discard, "bad discard")
+ var next, next2, discard []int
+ done := make(chan struct{})
+ f.
+ DoFinally(func(s rs.SignalType) {
+ close(done)
+ }).
+ DoOnNext(func(v interface{}) {
+ next = append(next, v.(int))
+ }).
+ Filter(func(i interface{}) bool {
+ return i.(int) > 3
+ }).
+ DoOnNext(func(v interface{}) {
+ next2 = append(next2, v.(int))
+ }).
+ DoOnDiscard(func(i interface{}) {
+ discard = append(discard, i.(int))
+ }).
+ Subscribe(context.Background())
+ <-done
+ assert.Equal(t, testData, next, "bad next")
+ assert.Equal(t, len(next), len(next2)+len(discard), "bad amount")
+ assert.Equal(t, []int{4, 5}, next2, "bad next2")
+ assert.Equal(t, []int{1, 2, 3}, discard, "bad discard")
}
func testPeek(f flux.Flux, t *testing.T) {
- var complete int
- var a, b []int
- var requests int
- var ss rs.Subscription
- done := make(chan struct{})
- f.
- DoOnNext(func(v interface{}) {
- a = append(a, v.(int))
- }).
- DoOnRequest(func(n int) {
- requests++
- }).
- DoOnComplete(func() {
- complete++
- }).
- DoFinally(func(s rs.SignalType) {
- close(done)
- }).
- Subscribe(context.Background(), rs.OnSubscribe(func(su rs.Subscription) {
- ss = su
- ss.Request(1)
- }), rs.OnNext(func(v interface{}) {
- b = append(b, v.(int))
- ss.Request(1)
- }))
- <-done
- assert.Equal(t, b, a, "values doesn't match")
- assert.Equal(t, len(a)+1, requests, "bad requests")
- assert.Equal(t, 1, complete, "bad complete")
+ var complete int
+ var a, b []int
+ var requests int
+ var ss rs.Subscription
+ done := make(chan struct{})
+ f.
+ DoOnNext(func(v interface{}) {
+ a = append(a, v.(int))
+ }).
+ DoOnRequest(func(n int) {
+ requests++
+ }).
+ DoOnComplete(func() {
+ complete++
+ }).
+ DoFinally(func(s rs.SignalType) {
+ close(done)
+ }).
+ Subscribe(context.Background(), rs.OnSubscribe(func(su rs.Subscription) {
+ ss = su
+ ss.Request(1)
+ }), rs.OnNext(func(v interface{}) {
+ b = append(b, v.(int))
+ ss.Request(1)
+ }))
+ <-done
+ assert.Equal(t, b, a, "values doesn't match")
+ assert.Equal(t, len(a)+1, requests, "bad requests")
+ assert.Equal(t, 1, complete, "bad complete")
}
func testRequest(f flux.Flux, t *testing.T) {
- var nexts []int
- var su rs.Subscription
- done := make(chan struct{})
- f.
- DoFinally(func(s rs.SignalType) {
- close(done)
- }).
- SubscribeOn(scheduler.Elastic()).
- Subscribe(context.Background(), rs.OnSubscribe(func(s rs.Subscription) {
- su = s
- su.Request(1)
- }), rs.OnNext(func(v interface{}) {
- nexts = append(nexts, v.(int))
- su.Request(1)
- }))
- <-done
- assert.Equal(t, testData, nexts, "bad results")
+ var nexts []int
+ var su rs.Subscription
+ done := make(chan struct{})
+ f.
+ DoFinally(func(s rs.SignalType) {
+ close(done)
+ }).
+ SubscribeOn(scheduler.Elastic()).
+ Subscribe(context.Background(), rs.OnSubscribe(func(s rs.Subscription) {
+ su = s
+ su.Request(1)
+ }), rs.OnNext(func(v interface{}) {
+ nexts = append(nexts, v.(int))
+ su.Request(1)
+ }))
+ <-done
+ assert.Equal(t, testData, nexts, "bad results")
}
func TestEmpty(t *testing.T) {
- flux.Just().Subscribe(
- context.Background(),
- rs.OnNext(func(v interface{}) {
- log.Println("next:", v)
- }),
- rs.OnComplete(func() {
- log.Println("complete")
- }),
- )
+ flux.Just().Subscribe(
+ context.Background(),
+ rs.OnNext(func(v interface{}) {
+ log.Println("next:", v)
+ }),
+ rs.OnComplete(func() {
+ log.Println("complete")
+ }),
+ )
}
func TestCreateWithRequest(t *testing.T) {
- const totals = 20
- f := flux.Create(func(ctx context.Context, sink flux.Sink) {
- for i := 0; i < totals; i++ {
- sink.Next(i)
- }
- sink.Complete()
- })
+ const totals = 20
+ f := flux.Create(func(ctx context.Context, sink flux.Sink) {
+ for i := 0; i < totals; i++ {
+ sink.Next(i)
+ }
+ sink.Complete()
+ })
- var processed int32
+ var processed int32
- su := make(chan rs.Subscription, 1)
+ su := make(chan rs.Subscription, 1)
- sub := rs.NewSubscriber(rs.OnNext(func(v interface{}) {
- log.Println("next:", v)
- processed++
- }), rs.OnSubscribe(func(s rs.Subscription) {
- su <- s
- s.Request(1)
- }), rs.OnComplete(func() {
- log.Println("complete")
- }))
+ sub := rs.NewSubscriber(rs.OnNext(func(v interface{}) {
+ log.Println("next:", v)
+ processed++
+ }), rs.OnSubscribe(func(s rs.Subscription) {
+ su <- s
+ s.Request(1)
+ }), rs.OnComplete(func() {
+ log.Println("complete")
+ }))
- time.AfterFunc(100*time.Millisecond, func() {
- s := <-su
- s.Request(totals - 1)
- })
+ time.AfterFunc(100*time.Millisecond, func() {
+ s := <-su
+ s.Request(totals - 1)
+ })
- f.SubscribeWith(context.Background(), sub)
- assert.Equal(t, totals, int(processed), "bad processed num")
+ f.SubscribeWith(context.Background(), sub)
+ assert.Equal(t, totals, int(processed), "bad processed num")
+}
+
+func TestToChan(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+ defer cancel()
+ ch, err := flux.Interval(1*time.Second).ToChan(ctx, 1)
+L:
+ for {
+ select {
+ case v := <-ch:
+ fmt.Println("v:", v)
+ case e := <-err:
+ fmt.Println("err:", e)
+ break L
+ }
+ }
}
func TestError(t *testing.T) {
- mockErr := errors.New("this is a mock error")
- var sig rs.SignalType
- var e1, e2 error
- var requested int
- flux.Error(mockErr).
- DoFinally(func(s rs.SignalType) {
- sig = s
- }).
- DoOnRequest(func(n int) {
- requested = n
- }).
- DoOnError(func(e error) {
- e1 = e
- }).
- Subscribe(
- context.Background(),
- rs.OnNext(func(v interface{}) {
- assert.Fail(t, "should never run here")
- }),
- rs.OnError(func(e error) {
- e2 = e
- }),
- )
- assert.Equal(t, mockErr, e1, "bad doOnError")
- assert.Equal(t, mockErr, e2, "bad onError")
- assert.Equal(t, rs.SignalTypeError, sig, "bad signal")
- assert.True(t, requested > 0, "no request")
+ mockErr := errors.New("this is a mock error")
+ var sig rs.SignalType
+ var e1, e2 error
+ var requested int
+ flux.Error(mockErr).
+ DoFinally(func(s rs.SignalType) {
+ sig = s
+ }).
+ DoOnRequest(func(n int) {
+ requested = n
+ }).
+ DoOnError(func(e error) {
+ e1 = e
+ }).
+ Subscribe(
+ context.Background(),
+ rs.OnNext(func(v interface{}) {
+ assert.Fail(t, "should never run here")
+ }),
+ rs.OnError(func(e error) {
+ e2 = e
+ }),
+ )
+ assert.Equal(t, mockErr, e1, "bad doOnError")
+ assert.Equal(t, mockErr, e2, "bad onError")
+ assert.Equal(t, rs.SignalTypeError, sig, "bad signal")
+ assert.True(t, requested > 0, "no request")
}
diff --git a/flux/op_take.go b/flux/op_take.go
new file mode 100644
index 0000000..2885f14
--- /dev/null
+++ b/flux/op_take.go
@@ -0,0 +1,80 @@
+package flux
+
+import (
+ "context"
+ "sync/atomic"
+
+ "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/hooks"
+ "github.com/jjeffcaii/reactor-go/internal"
+)
+
+type fluxTake struct {
+ source rs.RawPublisher
+ n int
+}
+
+func (p *fluxTake) SubscribeWith(ctx context.Context, s rs.Subscriber) {
+ actual := internal.ExtractRawSubscriber(s)
+ take := newTakeSubscriber(actual, int64(p.n))
+ p.source.SubscribeWith(ctx, internal.NewCoreSubscriber(ctx, take))
+}
+
+type takeSubscriber struct {
+ actual rs.Subscriber
+ remaining int64
+ stat int32
+ su rs.Subscription
+}
+
+func (t *takeSubscriber) OnError(e error) {
+ if !atomic.CompareAndSwapInt32(&(t.stat), 0, statError) {
+ hooks.Global().OnErrorDrop(e)
+ return
+ }
+ t.actual.OnError(e)
+}
+
+func (t *takeSubscriber) OnNext(v interface{}) {
+ remaining := atomic.AddInt64(&(t.remaining), -1)
+ // if no remaining or stat is not default value.
+ if remaining < 0 || atomic.LoadInt32(&(t.stat)) != 0 {
+ hooks.Global().OnNextDrop(v)
+ return
+ }
+ t.actual.OnNext(v)
+ if remaining > 0 {
+ return
+ }
+ t.su.Cancel()
+ t.OnComplete()
+}
+
+func (t *takeSubscriber) OnSubscribe(su rs.Subscription) {
+ if atomic.LoadInt64(&(t.remaining)) < 1 {
+ su.Cancel()
+ return
+ }
+ t.su = su
+ t.actual.OnSubscribe(su)
+}
+
+func (t *takeSubscriber) OnComplete() {
+ if atomic.CompareAndSwapInt32(&(t.stat), 0, statComplete) {
+ t.actual.OnComplete()
+ }
+}
+
+func newTakeSubscriber(actual rs.Subscriber, n int64) *takeSubscriber {
+ return &takeSubscriber{
+ actual: actual,
+ remaining: n,
+ }
+}
+
+func newFluxTake(source rs.RawPublisher, n int) *fluxTake {
+ return &fluxTake{
+ source: source,
+ n: n,
+ }
+}
diff --git a/flux/op_take_test.go b/flux/op_take_test.go
new file mode 100644
index 0000000..f3aff85
--- /dev/null
+++ b/flux/op_take_test.go
@@ -0,0 +1,32 @@
+package flux_test
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/jjeffcaii/reactor-go/flux"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestTake(t *testing.T) {
+ var amount int
+ var completed bool
+ var cancelled bool
+ _, err := flux.Interval(10 * time.Millisecond).
+ Take(3).
+ DoOnCancel(func() {
+ cancelled = true
+ }).
+ DoOnComplete(func() {
+ completed = true
+ }).
+ DoOnNext(func(v interface{}) {
+ amount++
+ }).
+ BlockLast(context.Background())
+ assert.NoError(t, err, "block last failed")
+ assert.Equal(t, 3, amount, "bad amount ")
+ assert.True(t, completed, "bad completed")
+ assert.False(t, cancelled, "bad cancelled")
+}
diff --git a/flux/wrapper.go b/flux/wrapper.go
index 6d079f1..750e411 100644
--- a/flux/wrapper.go
+++ b/flux/wrapper.go
@@ -1,113 +1,143 @@
package flux
import (
- "context"
- "errors"
+ "context"
+ "errors"
- rs "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/hooks"
- "github.com/jjeffcaii/reactor-go/scheduler"
+ rs "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/hooks"
+ "github.com/jjeffcaii/reactor-go/scheduler"
)
type wrapper struct {
- rs.RawPublisher
+ rs.RawPublisher
}
func (p wrapper) Subscribe(ctx context.Context, options ...rs.SubscriberOption) {
- p.SubscribeWith(ctx, rs.NewSubscriber(options...))
+ p.SubscribeWith(ctx, rs.NewSubscriber(options...))
+}
+
+func (p wrapper) Take(n int) Flux {
+ return wrap(newFluxTake(p.RawPublisher, n))
}
func (p wrapper) Filter(f rs.Predicate) Flux {
- return wrap(newFluxFilter(p.RawPublisher, f))
+ return wrap(newFluxFilter(p.RawPublisher, f))
}
func (p wrapper) Map(t rs.Transformer) Flux {
- return wrap(newFluxMap(p.RawPublisher, t))
+ return wrap(newFluxMap(p.RawPublisher, t))
}
func (p wrapper) SubscribeOn(sc scheduler.Scheduler) Flux {
- return wrap(newFluxSubscribeOn(p.RawPublisher, sc))
+ return wrap(newFluxSubscribeOn(p.RawPublisher, sc))
}
func (p wrapper) DoOnNext(fn rs.FnOnNext) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekNext(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekNext(fn)))
}
func (p wrapper) DoOnComplete(fn rs.FnOnComplete) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekComplete(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekComplete(fn)))
}
func (p wrapper) DoOnRequest(fn rs.FnOnRequest) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekRequest(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekRequest(fn)))
}
func (p wrapper) DoOnDiscard(fn rs.FnOnDiscard) Flux {
- return wrap(newFluxContext(p.RawPublisher, withContextDiscard(fn)))
+ return wrap(newFluxContext(p.RawPublisher, withContextDiscard(fn)))
}
func (p wrapper) DoOnCancel(fn rs.FnOnCancel) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekCancel(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekCancel(fn)))
}
func (p wrapper) DoOnError(fn rs.FnOnError) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekError(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekError(fn)))
}
func (p wrapper) DoFinally(fn rs.FnOnFinally) Flux {
- return wrap(newFluxFinally(p.RawPublisher, fn))
+ return wrap(newFluxFinally(p.RawPublisher, fn))
}
func (p wrapper) DoOnSubscribe(fn rs.FnOnSubscribe) Flux {
- return wrap(newFluxPeek(p.RawPublisher, peekSubscribe(fn)))
+ return wrap(newFluxPeek(p.RawPublisher, peekSubscribe(fn)))
}
func (p wrapper) SwitchOnFirst(fn FnSwitchOnFirst) Flux {
- return wrap(newFluxSwitchOnFirst(p.RawPublisher, fn))
+ return wrap(newFluxSwitchOnFirst(p.RawPublisher, fn))
+}
+
+func (p wrapper) ToChan(ctx context.Context, cap int) (<-chan interface{}, <-chan error) {
+ if cap < 1 {
+ cap = 1
+ }
+ ch := make(chan interface{}, cap)
+ err := make(chan error, 1)
+ p.
+ DoFinally(func(s rs.SignalType) {
+ if s == rs.SignalTypeCancel {
+ err <- rs.ErrSubscribeCancelled
+ }
+ close(ch)
+ close(err)
+ }).
+ SubscribeOn(scheduler.Elastic()).
+ Subscribe(ctx,
+ rs.OnNext(func(v interface{}) {
+ ch <- v
+ }),
+ rs.OnError(func(e error) {
+ err <- e
+ }),
+ )
+ return ch, err
}
func (p wrapper) BlockLast(ctx context.Context) (last interface{}, err error) {
- done := make(chan struct{})
- p.
- DoFinally(func(s rs.SignalType) {
- close(done)
- }).
- DoOnCancel(func() {
- err = rs.ErrSubscribeCancelled
- }).
- Subscribe(ctx,
- rs.OnNext(func(v interface{}) {
- if old := last; old != nil {
- hooks.Global().OnNextDrop(old)
- }
- last = v
- }),
- rs.OnError(func(e error) {
- err = e
- }),
- )
- <-done
- return
+ done := make(chan struct{})
+ p.
+ DoFinally(func(s rs.SignalType) {
+ close(done)
+ }).
+ DoOnCancel(func() {
+ err = rs.ErrSubscribeCancelled
+ }).
+ Subscribe(ctx,
+ rs.OnNext(func(v interface{}) {
+ if old := last; old != nil {
+ hooks.Global().OnNextDrop(old)
+ }
+ last = v
+ }),
+ rs.OnError(func(e error) {
+ err = e
+ }),
+ )
+ <-done
+ return
}
func (p wrapper) Complete() {
- p.mustProcessor().Complete()
+ p.mustProcessor().Complete()
}
func (p wrapper) Error(e error) {
- p.mustProcessor().Error(e)
+ p.mustProcessor().Error(e)
}
func (p wrapper) Next(v interface{}) {
- p.mustProcessor().Next(v)
+ p.mustProcessor().Next(v)
}
func (p wrapper) mustProcessor() rawProcessor {
- v, ok := p.RawPublisher.(rawProcessor)
- if !ok {
- panic(errors.New("require a processor"))
- }
- return v
+ v, ok := p.RawPublisher.(rawProcessor)
+ if !ok {
+ panic(errors.New("require a processor"))
+ }
+ return v
}
func wrap(r rs.RawPublisher) wrapper {
- return wrapper{r}
+ return wrapper{r}
}
diff --git a/mono/op_flatmap.go b/mono/op_flatmap.go
index 12d83ed..d96193e 100644
--- a/mono/op_flatmap.go
+++ b/mono/op_flatmap.go
@@ -1,115 +1,111 @@
package mono
import (
- "context"
- "sync/atomic"
+ "context"
+ "sync/atomic"
- "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/internal"
+ "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/internal"
)
const (
- statCancel = -1
- statError = -2
- statComplete = 2
+ statCancel = -1
+ statError = -2
+ statComplete = 2
)
type innerFlatMapSubscriber struct {
- parent *flatMapSubscriber
+ parent *flatMapSubscriber
}
func (p *innerFlatMapSubscriber) OnError(err error) {
- if atomic.CompareAndSwapInt32(&(p.parent.stat), 0, statError) {
- p.parent.actual.OnError(err)
- }
+ if atomic.CompareAndSwapInt32(&(p.parent.stat), 0, statError) {
+ p.parent.actual.OnError(err)
+ }
}
func (p *innerFlatMapSubscriber) OnNext(v interface{}) {
- if atomic.LoadInt32(&(p.parent.stat)) != 0 {
- return
- }
- p.parent.actual.OnNext(v)
- p.OnComplete()
+ if atomic.LoadInt32(&(p.parent.stat)) != 0 {
+ return
+ }
+ p.parent.actual.OnNext(v)
+ p.OnComplete()
}
func (p *innerFlatMapSubscriber) OnSubscribe(s rs.Subscription) {
- s.Request(rs.RequestInfinite)
+ s.Request(rs.RequestInfinite)
}
func (p *innerFlatMapSubscriber) OnComplete() {
- if atomic.CompareAndSwapInt32(&(p.parent.stat), 0, statComplete) {
- p.parent.actual.OnComplete()
- }
+ if atomic.CompareAndSwapInt32(&(p.parent.stat), 0, statComplete) {
+ p.parent.actual.OnComplete()
+ }
}
type flatMapSubscriber struct {
- actual rs.Subscriber
- mapper flatMapper
- stat int32
- ctx context.Context
+ actual rs.Subscriber
+ mapper flatMapper
+ stat int32
+ ctx context.Context
}
func (p *flatMapSubscriber) Request(n int) {
}
func (p *flatMapSubscriber) Cancel() {
- atomic.CompareAndSwapInt32(&(p.stat), 0, statCancel)
+ atomic.CompareAndSwapInt32(&(p.stat), 0, statCancel)
}
func (p *flatMapSubscriber) OnComplete() {
- if atomic.LoadInt32(&(p.stat)) == statComplete {
- p.actual.OnComplete()
- }
+ if atomic.LoadInt32(&(p.stat)) == statComplete {
+ p.actual.OnComplete()
+ }
}
func (p *flatMapSubscriber) OnError(err error) {
- if atomic.CompareAndSwapInt32(&(p.stat), 0, statError) {
- p.actual.OnError(err)
- }
+ if atomic.CompareAndSwapInt32(&(p.stat), 0, statError) {
+ p.actual.OnError(err)
+ }
}
func (p *flatMapSubscriber) OnNext(v interface{}) {
- if atomic.LoadInt32(&(p.stat)) != 0 {
- return
- }
- m := p.mapper(v)
- inner := &innerFlatMapSubscriber{
- parent: p,
- }
- m.SubscribeWith(p.ctx, inner)
+ if atomic.LoadInt32(&(p.stat)) != 0 {
+ return
+ }
+ m := p.mapper(v)
+ inner := &innerFlatMapSubscriber{
+ parent: p,
+ }
+ m.SubscribeWith(p.ctx, inner)
}
func (p *flatMapSubscriber) OnSubscribe(s rs.Subscription) {
- s.Request(rs.RequestInfinite)
-}
-
-func (p *flatMapSubscriber) isDone() bool {
- return atomic.LoadInt32(&(p.stat)) != 0
+ s.Request(rs.RequestInfinite)
}
func newFlatMapSubscriber(ctx context.Context, actual rs.Subscriber, mapper flatMapper) *flatMapSubscriber {
- return &flatMapSubscriber{
- ctx: ctx,
- actual: actual,
- mapper: mapper,
- }
+ return &flatMapSubscriber{
+ ctx: ctx,
+ actual: actual,
+ mapper: mapper,
+ }
}
type monoFlatMap struct {
- source rs.RawPublisher
- mapper flatMapper
+ source rs.RawPublisher
+ mapper flatMapper
}
func (m *monoFlatMap) SubscribeWith(ctx context.Context, actual rs.Subscriber) {
- actual = internal.ExtractRawSubscriber(actual)
- s := newFlatMapSubscriber(ctx, actual, m.mapper)
- actual.OnSubscribe(s)
- m.source.SubscribeWith(ctx, s)
+ actual = internal.ExtractRawSubscriber(actual)
+ s := newFlatMapSubscriber(ctx, actual, m.mapper)
+ actual.OnSubscribe(s)
+ m.source.SubscribeWith(ctx, s)
}
func newMonoFlatMap(source rs.RawPublisher, mapper flatMapper) *monoFlatMap {
- return &monoFlatMap{
- source: source,
- mapper: mapper,
- }
+ return &monoFlatMap{
+ source: source,
+ mapper: mapper,
+ }
}
diff --git a/mono/op_peek.go b/mono/op_peek.go
index 4419ca9..b9314ab 100644
--- a/mono/op_peek.go
+++ b/mono/op_peek.go
@@ -128,6 +128,7 @@ func peekCancel(fn rs.FnOnCancel) monoPeekOption {
peek.onCancelCall = fn
}
}
+
func peekRequest(fn rs.FnOnRequest) monoPeekOption {
return func(peek *monoPeek) {
peek.onRequestCall = fn
diff --git a/mono/processor_test.go b/mono/processor_test.go
index 5392ea9..a88d811 100644
--- a/mono/processor_test.go
+++ b/mono/processor_test.go
@@ -1,55 +1,55 @@
package mono_test
import (
- "context"
- "testing"
- "time"
+ "context"
+ "testing"
+ "time"
- "github.com/jjeffcaii/reactor-go"
- "github.com/jjeffcaii/reactor-go/mono"
- "github.com/stretchr/testify/assert"
+ "github.com/jjeffcaii/reactor-go"
+ "github.com/jjeffcaii/reactor-go/mono"
+ "github.com/stretchr/testify/assert"
)
-
func TestProcessor(t *testing.T) {
- p := mono.CreateProcessor()
-
- time.AfterFunc(100*time.Millisecond, func() {
- p.Success(333)
- })
-
- v, err := p.
- Map(func(i interface{}) interface{} {
- return i.(int) * 2
- }).
- Block(context.Background())
- assert.NoError(t, err, "block failed")
- assert.Equal(t, 666, v.(int), "bad result")
-
- var actual int
- p.
- DoOnNext(func(v interface{}) {
- actual = v.(int)
- }).
- Subscribe(context.Background())
- assert.Equal(t, 333, actual, "bad result")
+ p := mono.CreateProcessor()
+
+ time.AfterFunc(100*time.Millisecond, func() {
+ p.Success(333)
+ })
+
+ v, err := p.
+ Map(func(i interface{}) interface{} {
+ return i.(int) * 2
+ }).
+ Block(context.Background())
+ assert.NoError(t, err, "block failed")
+ assert.Equal(t, 666, v.(int), "bad result")
+
+ var actual int
+ p.
+ DoOnNext(func(v interface{}) {
+ actual = v.(int)
+ }).
+ Subscribe(context.Background())
+ assert.Equal(t, 333, actual, "bad result")
}
func TestProcessor_Context(t *testing.T) {
- p := mono.CreateProcessor()
- ctx, _ := context.WithTimeout(context.Background(), 100*time.Millisecond)
- time.AfterFunc(300*time.Millisecond, func() {
- p.Success(77778888)
- })
- done := make(chan struct{})
- p.
- DoOnError(func(e error) {
- assert.Equal(t, rs.ErrSubscribeCancelled, e, "bad error")
- }).
- DoFinally(func(signal rs.SignalType) {
- close(done)
- assert.Equal(t, rs.SignalTypeError, signal)
- }).
- Subscribe(ctx)
- <-done
+ p := mono.CreateProcessor()
+ ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
+ defer cancel()
+ time.AfterFunc(300*time.Millisecond, func() {
+ p.Success(77778888)
+ })
+ done := make(chan struct{})
+ p.
+ DoOnError(func(e error) {
+ assert.Equal(t, rs.ErrSubscribeCancelled, e, "bad error")
+ }).
+ DoFinally(func(signal rs.SignalType) {
+ close(done)
+ assert.Equal(t, rs.SignalTypeError, signal)
+ }).
+ Subscribe(ctx)
+ <-done
}
diff --git a/scheduler/parallel.go b/scheduler/parallel.go
index 31d710c..3548a54 100644
--- a/scheduler/parallel.go
+++ b/scheduler/parallel.go
@@ -1,62 +1,55 @@
package scheduler
import (
- "runtime"
- "sync"
+ "runtime"
+ "sync"
)
var parallel Scheduler
func init() {
- parallel = NewParallel(runtime.NumCPU() * 2)
+ parallel = NewParallel(runtime.NumCPU() * 2)
}
type parallelScheduler struct {
- jobs chan Job
- n int
- once sync.Once
+ jobs chan Job
+ n int
+ once sync.Once
}
func (p *parallelScheduler) Close() error {
- close(p.jobs)
- return nil
+ close(p.jobs)
+ return nil
}
func (p *parallelScheduler) Do(j Job) {
- p.jobs <- j
+ p.jobs <- j
}
func (p *parallelScheduler) start() {
- for i := 0; i < p.n; i++ {
- go func() {
- L:
- for {
- select {
- case j, ok := <-p.jobs:
- if !ok {
- break L
- }
- j()
- }
- }
- }()
- }
+ for i := 0; i < p.n; i++ {
+ go func() {
+ for j := range p.jobs {
+ j()
+ }
+ }()
+ }
}
func (p *parallelScheduler) Worker() Worker {
- p.once.Do(func() {
- p.start()
- })
- return p
+ p.once.Do(func() {
+ p.start()
+ })
+ return p
}
func NewParallel(n int) Scheduler {
- return ¶llelScheduler{
- jobs: make(chan Job),
- n: n,
- }
+ return ¶llelScheduler{
+ jobs: make(chan Job),
+ n: n,
+ }
}
func Parallel() Scheduler {
- return parallel
+ return parallel
}
diff --git a/scheduler/single.go b/scheduler/single.go
index 2acf507..ee551da 100644
--- a/scheduler/single.go
+++ b/scheduler/single.go
@@ -3,54 +3,47 @@ package scheduler
import "sync/atomic"
var (
- single Scheduler
- defaultSingleJobs = 1000
+ single Scheduler
+ defaultSingleJobs = 1000
)
func init() {
- single = NewSingle(defaultSingleJobs)
+ single = NewSingle(defaultSingleJobs)
}
func NewSingle(cap int) Scheduler {
- return &singleScheduler{
- jobs: make(chan Job, cap),
- }
+ return &singleScheduler{
+ jobs: make(chan Job, cap),
+ }
}
type singleScheduler struct {
- jobs chan Job
- started int64
+ jobs chan Job
+ started int64
}
func (p *singleScheduler) Close() (err error) {
- close(p.jobs)
- return
+ close(p.jobs)
+ return
}
func (p *singleScheduler) start() {
-L:
- for {
- select {
- case j, ok := <-p.jobs:
- if !ok {
- break L
- }
- j()
- }
- }
+ for j := range p.jobs {
+ j()
+ }
}
func (p *singleScheduler) Do(j Job) {
- p.jobs <- j
+ p.jobs <- j
}
func (p *singleScheduler) Worker() Worker {
- if atomic.AddInt64(&(p.started), 1) == 1 {
- go p.start()
- }
- return p
+ if atomic.AddInt64(&(p.started), 1) == 1 {
+ go p.start()
+ }
+ return p
}
func Single() Scheduler {
- return single
+ return single
}