Skip to content

Commit

Permalink
Merge pull request #39 from rsocket/bugfix/channel
Browse files Browse the repository at this point in the history
bugfix for REQUEST_CHANNEL.
  • Loading branch information
jjeffcaii authored Aug 6, 2019
2 parents 29e9b0b + 7817ff1 commit 7a98012
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 172 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ before_install:
script:
# - golint ./...
# - golangci-lint run ./...
- go test -race . -v
- go test -race -count=1 . -v
9 changes: 6 additions & 3 deletions cmd/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"fmt"
"log"
//_ "net/http/pprof"
"strconv"

"github.com/jjeffcaii/reactor-go/scheduler"
"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
"github.com/rsocket/rsocket-go/rx/mono"
//_ "net/http/pprof"
"strconv"
)

const ListenAt = "tcp://127.0.0.1:7878"
Expand All @@ -24,10 +25,12 @@ func main() {
// log.Println(http.ListenAndServe(":4444", nil))
//}()
//logger.SetLevel(logger.LevelDebug)
//go common.TraceByteBuffLeak(context.Background(), 10*time.Second)
err := rsocket.Receive().
//Fragment(65535).
//Resume().
OnStart(func() {
log.Println("server is listening:", ListenAt)
}).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket {
//log.Println("SETUP BEGIN:----------------")
//log.Println("maxLifeTime:", setup.MaxLifetime())
Expand Down
7 changes: 2 additions & 5 deletions cmd/echo/echo_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main_test
package main

import (
"bytes"
Expand All @@ -20,17 +20,14 @@ import (
"github.com/stretchr/testify/require"
)

const ListenAt = "tcp://127.0.0.1:7878"
//const ListenAt = "ws://127.0.0.1:7878/echo"

func TestClient_RequestResponse(t *testing.T) {
client, err := createClient(ListenAt)
require.NoError(t, err, "bad client")
defer func() {
_ = client.Close()
}()
wg := &sync.WaitGroup{}
n := 100 * 10000
n := 50 * 10000
wg.Add(n)
data := []byte(common.RandAlphanumeric(1024))

Expand Down
55 changes: 55 additions & 0 deletions cmd/foobar/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package main

import (
"context"
"fmt"

"github.com/rsocket/rsocket-go"
"github.com/rsocket/rsocket-go/logger"
"github.com/rsocket/rsocket-go/payload"
"github.com/rsocket/rsocket-go/rx"
"github.com/rsocket/rsocket-go/rx/flux"
)

var testData = "Hello World!"

func main() {
logger.SetLevel(logger.LevelDebug)
err := rsocket.Receive().
Fragment(128).
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket {
return rsocket.NewAbstractSocket(
rsocket.RequestChannel(func(inputs rx.Publisher) flux.Flux {
//var count int32
//countPointer := &count
receives := make(chan payload.Payload)

go func() {
var count int32
for range receives {
count++
}
fmt.Println("***** count:", count)
}()

inputs.(flux.Flux).DoFinally(func(s rx.SignalType) {
close(receives)
}).Subscribe(context.Background(), rx.OnNext(func(input payload.Payload) {
//fmt.Println("rcv from channel:", input)
receives <- input
}))

return flux.Create(func(ctx context.Context, s flux.Sink) {
for i := 0; i < 2; i++ {
s.Next(payload.NewString(testData, fmt.Sprintf("%d_from_server", i)))
}
s.Complete()
})
}),
)
}).
Transport("tcp://127.0.0.1:7878").
Serve(context.Background())
fmt.Println("SERVER STOPPED!!!!!")
panic(err)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/google/uuid v1.1.1
github.com/gorilla/websocket v1.4.0
github.com/jjeffcaii/reactor-go v0.0.14
github.com/jjeffcaii/reactor-go v0.0.16
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/urfave/cli v1.20.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/jjeffcaii/reactor-go v0.0.14 h1:bvAxdxB1LTADNy/m3giSokCFUtvOYaHmeM4jLmf/80w=
github.com/jjeffcaii/reactor-go v0.0.14/go.mod h1:yxYBt62huNjDF5+tuzzGhjHM/SCcscp6GeYPsNjU7eA=
github.com/jjeffcaii/reactor-go v0.0.16 h1:kZ7Jgur7b6C/8G5hT2wLGsDxLQo2Z2OyaggSo99qHUE=
github.com/jjeffcaii/reactor-go v0.0.16/go.mod h1:yxYBt62huNjDF5+tuzzGhjHM/SCcscp6GeYPsNjU7eA=
github.com/panjf2000/ants v1.0.0 h1:MZBsUt8W6ktQfhIswUZpw17IJlXY6ly2+U5b9jxwad4=
github.com/panjf2000/ants v1.0.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down
3 changes: 3 additions & 0 deletions internal/framing/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ type Frame interface {
io.WriterTo
// Header returns frame FrameHeader.
Header() FrameHeader
// Body returns body of frame.
Body() *common.ByteBuff
// Len returns length of frame.
Len() int
Expand All @@ -141,7 +142,9 @@ type Frame interface {
Bytes() []byte
// IsResumable returns true if frame supports resume.
IsResumable() bool
// Done marks current frame has been sent.
Done()
// DoneNotify notifies when frame done.
DoneNotify() <-chan struct{}
}

Expand Down
70 changes: 43 additions & 27 deletions internal/socket/duplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux) {
}).
DoOnRequest(func(n int) {
n32 := toU32N(n)

var newborn bool
select {
case <-rcvRequested:
Expand Down Expand Up @@ -330,7 +329,11 @@ func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux) {
// TODO: handle cancel or error
switch sig {
case rx.SignalComplete:
p.sendFrame(framing.NewFramePayload(sid, nil, nil, framing.FlagComplete))
complete := framing.NewFramePayload(sid, nil, nil, framing.FlagComplete)
p.sendFrame(complete)
<-complete.DoneNotify()
default:
panic(fmt.Errorf("unsupported sending channel signal: %s", sig))
}
}).
SubscribeOn(scheduler.Elastic()).
Expand Down Expand Up @@ -413,20 +416,20 @@ func (p *DuplexRSocket) respondRequestChannel(pl fragmentation.HeaderAndPayload)
}

sid := pl.Header().StreamID()
rcvProc := flux.CreateProcessor()

p.singleScheduler.Worker().Do(func() {
rcvProc.Next(pl)
})
receivingProcessor := flux.CreateProcessor()

rcvDone, sndDone := make(chan struct{}), make(chan struct{})
ch := make(chan struct{}, 2)

receiving := rcvProc.
receiving := receivingProcessor.
DoFinally(func(s rx.SignalType) {
close(rcvDone)
ch <- struct{}{}
<-ch
select {
case <-sndDone:
p.unregister(sid)
case _, ok := <-ch:
if ok {
close(ch)
p.unregister(sid)
}
default:
}
}).
Expand All @@ -436,8 +439,11 @@ func (p *DuplexRSocket) respondRequestChannel(pl fragmentation.HeaderAndPayload)
<-frameN.DoneNotify()
})

// TODO: if receiving == sending ???
p.singleScheduler.Worker().Do(func() {
receivingProcessor.Next(pl)
})

// TODO: if receiving == sending ???
sending, err := func() (flux flux.Flux, err error) {
defer func() {
err = tryRecover(recover())
Expand All @@ -454,32 +460,45 @@ func (p *DuplexRSocket) respondRequestChannel(pl fragmentation.HeaderAndPayload)
return nil
}

// Ensure registering message success before func end.
mustSub := make(chan struct{})

sub := rx.NewSubscriber(
rx.OnError(func(e error) {
p.writeError(sid, err)
}),
rx.OnComplete(func() {
p.sendFrame(framing.NewFramePayload(sid, nil, nil, framing.FlagComplete))
complete := framing.NewFramePayload(sid, nil, nil, framing.FlagComplete)
p.sendFrame(complete)
<-complete.DoneNotify()
}),
rx.OnSubscribe(func(s rx.Subscription) {
p.register(sid, resRC{rcv: rcvProc, snd: s})
p.register(sid, resRC{rcv: receivingProcessor, snd: s})
close(mustSub)
s.Request(initRequestN)
}),
p.toSender(sid, framing.FlagNext),
rx.OnNext(func(elem payload.Payload) {
p.sendPayload(sid, elem, framing.FlagNext)
}),
)

sending.
DoFinally(func(s rx.SignalType) {
close(sndDone)
ch <- struct{}{}
<-ch
select {
case <-rcvDone:
p.unregister(sid)
case _, ok := <-ch:
if ok {
close(ch)
p.unregister(sid)
}
default:
}
}).
SubscribeOn(scheduler.Elastic()).
SubscribeWith(context.Background(), sub)

<-mustSub
return nil
}

Expand Down Expand Up @@ -552,7 +571,9 @@ func (p *DuplexRSocket) respondRequestStream(receiving fragmentation.HeaderAndPa
}

sub := rx.NewSubscriber(
p.toSender(sid, framing.FlagNext),
rx.OnNext(func(elem payload.Payload) {
p.sendPayload(sid, elem, framing.FlagNext)
}),
rx.OnSubscribe(func(s rx.Subscription) {
p.register(sid, resRS{su: s})
s.Request(n32)
Expand Down Expand Up @@ -712,7 +733,7 @@ func (p *DuplexRSocket) onFramePayload(frame framing.Frame) error {
sid := h.StreamID()
v, ok := p.messages.Load(sid)
if !ok {
logger.Warnf("unoccupied Payload(id=%d), maybe it has been canceled", sid)
logger.Warnf("unoccupied Payload(id=%d), maybe it has been canceled(server=%T)\n", sid, p.sids)
return nil
}

Expand Down Expand Up @@ -781,12 +802,6 @@ func (p *DuplexRSocket) SetTransport(tp *transport.Transport) {
p.cond.L.Unlock()
}

func (p *DuplexRSocket) toSender(sid uint32, fg framing.FrameFlag) rx.SubscriberOption {
return rx.OnNext(func(elem payload.Payload) {
p.sendPayload(sid, elem, fg)
})
}

func (p *DuplexRSocket) sendFrame(f framing.Frame) {
defer func() {
if e := recover(); e != nil {
Expand Down Expand Up @@ -820,6 +835,7 @@ func (p *DuplexRSocket) sendPayload(
BaseFrame: framing.NewBaseFrame(h, body),
})
})
return
}

func (p *DuplexRSocket) drainWithKeepalive() (ok bool) {
Expand Down
1 change: 0 additions & 1 deletion internal/transport/connection_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func (p *tcpConn) Write(frame framing.Frame) (err error) {
}

func (p *tcpConn) Close() error {
_ = p.Flush()
return p.rawConn.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ServerTransport interface {
// Listen listens on the network address addr and handles requests on incoming connections.
// You can specify onReady handler, it'll be invoked when server begin listening.
// It always returns a non-nil error.
Listen(ctx context.Context) error
Listen(ctx context.Context, notifier chan<- struct{}) error
}

// Transport is RSocket transport which is used to carry RSocket frames.
Expand Down
3 changes: 2 additions & 1 deletion internal/transport/transport_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p *tcpServerTransport) Close() (err error) {
return
}

func (p *tcpServerTransport) Listen(ctx context.Context) (err error) {
func (p *tcpServerTransport) Listen(ctx context.Context, notifier chan<- struct{}) (err error) {
if p.tls == nil {
p.listener, err = net.Listen(p.network, p.addr)
if err != nil {
Expand All @@ -49,6 +49,7 @@ func (p *tcpServerTransport) Listen(ctx context.Context) (err error) {
return
}
}
notifier <- struct{}{}
return p.listen(ctx)
}

Expand Down
Loading

0 comments on commit 7a98012

Please sign in to comment.