Skip to content

Commit

Permalink
add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Jul 25, 2019
1 parent ec825db commit 6a70ec6
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 45 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
[![GitHub Release](https://img.shields.io/github/release-pre/jjeffcaii/reactor-go.svg)](https://github.com/jjeffcaii/reactor-go/releases)

> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
<br>🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
<br>🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
<br>***[WARNING] DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
## Install
Expand Down Expand Up @@ -108,4 +108,3 @@ func Example() {
// next: #HELLO_0006
// next: #HELLO_0008
```

9 changes: 9 additions & 0 deletions flux/flux_create_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync/atomic"

rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
)

type bufferedSink struct {
Expand Down Expand Up @@ -39,11 +40,19 @@ func (p *bufferedSink) Complete() {
}

func (p *bufferedSink) Error(err error) {
if p.done {
hooks.Global().OnErrorDrop(err)
return
}
p.s.OnError(err)
p.dispose()
}

func (p *bufferedSink) Next(v interface{}) {
if p.done {
hooks.Global().OnNextDrop(v)
return
}
p.q.offer(v)
p.drain()
}
Expand Down
22 changes: 22 additions & 0 deletions flux/flux_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package flux

import (
"context"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal"
)

type fluxError struct {
e error
}

func (p fluxError) SubscribeWith(ctx context.Context, s rs.Subscriber) {
actual := internal.NewCoreSubscriber(ctx, s)
actual.OnSubscribe(internal.EmptySubscription)
actual.OnError(p.e)
}

func newFluxError(e error) fluxError {
return fluxError{e: e}
}
32 changes: 31 additions & 1 deletion flux/flux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package flux_test

import (
"context"
"errors"
"fmt"
"log"
"testing"
Expand Down Expand Up @@ -253,7 +254,6 @@ func TestEmpty(t *testing.T) {
}

func TestCreateWithRequest(t *testing.T) {

const totals = 20
f := flux.Create(func(ctx context.Context, sink flux.Sink) {
for i := 0; i < totals; i++ {
Expand Down Expand Up @@ -284,3 +284,33 @@ func TestCreateWithRequest(t *testing.T) {
f.SubscribeWith(context.Background(), sub)
assert.Equal(t, totals, int(processed), "bad processed num")
}

func TestError(t *testing.T) {
mockErr := errors.New("this is a mock error")
var sig rs.SignalType
var e1, e2 error
var requested int
flux.Error(mockErr).
DoFinally(func(s rs.SignalType) {
sig = s
}).
DoOnRequest(func(n int) {
requested = n
}).
DoOnError(func(e error) {
e1 = e
}).
Subscribe(
context.Background(),
rs.OnNext(func(v interface{}) {
assert.Fail(t, "should never run here")
}),
rs.OnError(func(e error) {
e2 = e
}),
)
assert.Equal(t, mockErr, e1, "bad doOnError")
assert.Equal(t, mockErr, e2, "bad onError")
assert.Equal(t, rs.SignalTypeError, sig, "bad signal")
assert.True(t, requested > 0, "no request")
}
4 changes: 3 additions & 1 deletion flux/misc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package flux

import "errors"
import (
"errors"
)

const (
statCancel = -1
Expand Down
18 changes: 0 additions & 18 deletions flux/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,3 @@ func TestUnicastProcessor(t *testing.T) {
}))
<-done
}

func TestDoOnSubscribe(t *testing.T) {
pc := flux.NewUnicastProcessor()
time.AfterFunc(1*time.Second, func() {
pc.Next(111)
pc.Next(222)
pc.Complete()
})
done := make(chan struct{})
pc.DoFinally(func(s rs.SignalType) {
close(done)
}).DoOnSubscribe(func(su rs.Subscription) {
log.Println("doOnSubscribe")
}).BlockLast(context.Background())

<-done

}
21 changes: 6 additions & 15 deletions flux/queue.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,23 @@
package flux

import "io"

type queue interface {
io.Closer
offer(interface{})
poll() (interface{}, bool)
size() int
}

type simpleQueue struct {
type queue struct {
c chan interface{}
}

func (q simpleQueue) size() int {
func (q queue) size() int {
return len(q.c)
}

func (q simpleQueue) Close() (err error) {
func (q queue) Close() (err error) {
close(q.c)
return
}

func (q simpleQueue) offer(v interface{}) {
func (q queue) offer(v interface{}) {
q.c <- v
}

func (q simpleQueue) poll() (v interface{}, ok bool) {
func (q queue) poll() (v interface{}, ok bool) {
select {
case v, ok = <-q.c:
return
Expand All @@ -36,7 +27,7 @@ func (q simpleQueue) poll() (v interface{}, ok bool) {
}

func newQueue(size int) queue {
return simpleQueue{
return queue{
c: make(chan interface{}, size),
}
}
5 changes: 1 addition & 4 deletions flux/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import (
var empty = just(nil)

func Error(e error) Flux {
// TODO: need implementation
return Create(func(ctx context.Context, sink Sink) {
sink.Error(e)
})
return wrap(newFluxError(e))
}

func Range(start, count int) Flux {
Expand Down
16 changes: 15 additions & 1 deletion internal/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ package internal
import (
"errors"
"fmt"

rs "github.com/jjeffcaii/reactor-go"
)

var ErrCallOnSubscribeDuplicated = errors.New("call OnSubscribe duplicated")
var (
ErrCallOnSubscribeDuplicated = errors.New("call OnSubscribe duplicated")
EmptySubscription rs.Subscription = &emptySubscription{}
)

func TryRecoverError(re interface{}) error {
if re == nil {
Expand All @@ -20,3 +25,12 @@ func TryRecoverError(re interface{}) error {
return fmt.Errorf("%s", e)
}
}

type emptySubscription struct {
}

func (emptySubscription) Request(n int) {
}

func (emptySubscription) Cancel() {
}
22 changes: 22 additions & 0 deletions mono/mono_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package mono

import (
"context"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal"
)

type monoError struct {
e error
}

func (p monoError) SubscribeWith(ctx context.Context, s rs.Subscriber) {
actual := internal.NewCoreSubscriber(ctx, s)
actual.OnSubscribe(internal.EmptySubscription)
actual.OnError(p.e)
}

func newMonoError(e error) monoError {
return monoError{e: e}
}
25 changes: 25 additions & 0 deletions mono/mono_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,28 @@ func BenchmarkCreate(b *testing.B) {
}
})
}

func TestError(t *testing.T) {
mockErr := errors.New("this is a mock error")
var sig rs.SignalType
var e1, e2 error
mono.Error(mockErr).
DoFinally(func(s rs.SignalType) {
sig = s
}).
DoOnError(func(e error) {
e1 = e
}).
Subscribe(
context.Background(),
rs.OnNext(func(v interface{}) {
assert.Fail(t, "should never run here")
}),
rs.OnError(func(e error) {
e2 = e
}),
)
assert.Equal(t, mockErr, e1, "bad doOnError")
assert.Equal(t, mockErr, e2, "bad onError")
assert.Equal(t, rs.SignalTypeError, sig, "bad signal")
}
1 change: 1 addition & 0 deletions mono/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
)


func TestProcessor(t *testing.T) {
p := mono.CreateProcessor()

Expand Down
6 changes: 5 additions & 1 deletion mono/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
var empty = wrap(newMonoJust(nil))
var errJustNilValue = errors.New("require non nil value")

func Error(e error) Mono {
return wrap(newMonoError(e))
}

func Empty() Mono {
return empty
}
Expand All @@ -29,7 +33,7 @@ func Just(v interface{}) Mono {
return wrap(newMonoJust(v))
}

func Create(gen func(context.Context, Sink)) Mono {
func Create(gen func(ctx context.Context, s Sink)) Mono {
return wrap(newMonoCreate(gen))
}

Expand Down
4 changes: 2 additions & 2 deletions reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ const (
)

type (
Predicate func(interface{}) bool
Transformer func(interface{}) interface{}
Predicate func(v interface{}) bool
Transformer func(v interface{}) interface{}

FnOnComplete = func()
FnOnNext = func(v interface{})
Expand Down

0 comments on commit 6a70ec6

Please sign in to comment.