Skip to content

Commit

Permalink
fix #43
Browse files Browse the repository at this point in the history
  • Loading branch information
jjeffcaii committed Nov 19, 2019
1 parent bd4541b commit 090c4e6
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 20 deletions.
10 changes: 8 additions & 2 deletions internal/framing/frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ type Frame interface {
// IsResumable returns true if frame supports resume.
IsResumable() bool
// Done marks current frame has been sent.
Done()
Done() (closed bool)
// DoneNotify notifies when frame done.
DoneNotify() <-chan struct{}
}
Expand All @@ -156,8 +156,14 @@ type BaseFrame struct {
}

// Done can be invoked when a frame has been been processed.
func (p *BaseFrame) Done() {
func (p *BaseFrame) Done() (closed bool) {
defer func() {
if e := recover(); e != nil {
closed = true
}
}()
close(p.done)
return
}

// DoneNotify notify when frame has been done.
Expand Down
7 changes: 3 additions & 4 deletions internal/socket/duplex.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,10 +984,9 @@ func (p *DuplexRSocket) drainOutBack() {
var out framing.Frame
for i := range p.outsPriority {
out = p.outsPriority[i]
if p.tp != nil {
if err := p.tp.Send(out, false); err != nil {
logger.Errorf("send frame failed: %v\n", err)
}
if err := p.tp.Send(out, false); err != nil {
out.Done()
logger.Errorf("send frame failed: %v\n", err)
}
}
if err := p.tp.Flush(); err != nil {
Expand Down
38 changes: 25 additions & 13 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type (
ServerTransportAcceptor = func(ctx context.Context, tp *Transport)
)

var errTransportClosed = errors.New("transport closed")

// ServerTransport is server-side RSocket transport.
type ServerTransport interface {
io.Closer
Expand Down Expand Up @@ -76,26 +78,36 @@ func (p *Transport) SetLifetime(lifetime time.Duration) {
}

// Send send a frame.
func (p *Transport) Send(frame framing.Frame, flush bool) error {
defer frame.Done()
if err := p.conn.Write(frame); err != nil {
return errors.Wrap(err, "send failed")
func (p *Transport) Send(frame framing.Frame, flush bool) (err error) {
defer func() {
// ensure frame done when send success.
if err == nil {
frame.Done()
}
}()
if p == nil || p.conn == nil {
err = errTransportClosed
return
}
if !flush {
return nil
err = p.conn.Write(frame)
if err != nil {
return
}
if err := p.conn.Flush(); err != nil {
return errors.Wrap(err, "flush failed")
if !flush {
return
}
return nil
err = p.conn.Flush()
return
}

// Flush flush all bytes in current connection.
func (p *Transport) Flush() error {
if err := p.conn.Flush(); err != nil {
return errors.Wrap(err, "flush failed")
func (p *Transport) Flush() (err error) {
if p == nil || p.conn == nil {
err = errTransportClosed
return
}
return nil
err = p.conn.Flush()
return
}

// Close close current transport.
Expand Down
2 changes: 1 addition & 1 deletion rx/rx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type (
FnOnNext = func(input payload.Payload)
// FnOnSubscribe is alias of function for signal when subscribe begin.
FnOnSubscribe = func(s Subscription)
// FnOnError is alias of function for signal when an error occured.
// FnOnError is alias of function for signal when an error occurred.
FnOnError = func(e error)
// FnOnCancel is alias of function for signal when subscription canceled.
FnOnCancel = func()
Expand Down

0 comments on commit 090c4e6

Please sign in to comment.