Skip to content

Commit

Permalink
add Take and ToChan for flux.
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Aug 2, 2019
1 parent 6a70ec6 commit 6b654f7
Show file tree
Hide file tree
Showing 12 changed files with 655 additions and 515 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

> A golang implementation for [reactive-streams](https://www.reactive-streams.org/).
<br>🚧🚧🚧 ***IT IS UNDER ACTIVE DEVELOPMENT!!!***
<br>***[WARNING] DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
<br>⚠️⚠️⚠️ ***DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!***
## Install

Expand Down
56 changes: 29 additions & 27 deletions flux/flux.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,50 @@
package flux

import (
"context"
"context"

"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/scheduler"
)

type OverflowStrategy int8

const (
OverflowBuffer OverflowStrategy = iota
OverflowIgnore
OverflowError
OverflowDrop
OverflowLatest
OverflowBuffer OverflowStrategy = iota
OverflowIgnore
OverflowError
OverflowDrop
OverflowLatest
)

type FnSwitchOnFirst = func(s Signal, f Flux) Flux

type Flux interface {
rs.Publisher
Filter(rs.Predicate) Flux
Map(rs.Transformer) Flux
DoOnDiscard(rs.FnOnDiscard) Flux
DoOnNext(rs.FnOnNext) Flux
DoOnComplete(rs.FnOnComplete) Flux
DoOnError(rs.FnOnError) Flux
DoOnCancel(rs.FnOnCancel) Flux
DoOnRequest(rs.FnOnRequest) Flux
DoOnSubscribe(rs.FnOnSubscribe) Flux
DoFinally(rs.FnOnFinally) Flux
SwitchOnFirst(FnSwitchOnFirst) Flux
SubscribeOn(scheduler.Scheduler) Flux
BlockLast(context.Context) (interface{}, error)
rs.Publisher
Filter(rs.Predicate) Flux
Map(rs.Transformer) Flux
Take(n int) Flux
DoOnDiscard(rs.FnOnDiscard) Flux
DoOnNext(rs.FnOnNext) Flux
DoOnComplete(rs.FnOnComplete) Flux
DoOnError(rs.FnOnError) Flux
DoOnCancel(rs.FnOnCancel) Flux
DoOnRequest(rs.FnOnRequest) Flux
DoOnSubscribe(rs.FnOnSubscribe) Flux
DoFinally(rs.FnOnFinally) Flux
SwitchOnFirst(FnSwitchOnFirst) Flux
SubscribeOn(scheduler.Scheduler) Flux
BlockLast(context.Context) (interface{}, error)
ToChan(ctx context.Context, cap int) (c <-chan interface{}, e <-chan error)
}

type Sink interface {
Complete()
Error(error)
Next(interface{})
Complete()
Error(error)
Next(interface{})
}

type Processor interface {
Flux
Sink
Flux
Sink
}
11 changes: 4 additions & 7 deletions flux/flux_create_sink.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package flux

import (
"context"
"sync"
"sync/atomic"
"sync"
"sync/atomic"

rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
rs "github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
)

type bufferedSink struct {
ctx context.Context
s rs.Subscriber
q queue
n int32
Expand Down Expand Up @@ -84,7 +82,6 @@ func (p *bufferedSink) drain() {
func (p *bufferedSink) dispose() {
p.done = true
_ = p.q.Close()

}

func newBufferedSink(s rs.Subscriber, cap int) *bufferedSink {
Expand Down
Loading

0 comments on commit 6b654f7

Please sign in to comment.