diff --git a/cmd/foobar/main.go b/cmd/foobar/main.go deleted file mode 100644 index a022aca..0000000 --- a/cmd/foobar/main.go +++ /dev/null @@ -1,55 +0,0 @@ -package main - -import ( - "context" - "fmt" - - "github.com/rsocket/rsocket-go" - "github.com/rsocket/rsocket-go/logger" - "github.com/rsocket/rsocket-go/payload" - "github.com/rsocket/rsocket-go/rx" - "github.com/rsocket/rsocket-go/rx/flux" -) - -var testData = "Hello World!" - -func main() { - logger.SetLevel(logger.LevelDebug) - err := rsocket.Receive(). - Fragment(128). - Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) rsocket.RSocket { - return rsocket.NewAbstractSocket( - rsocket.RequestChannel(func(inputs rx.Publisher) flux.Flux { - //var count int32 - //countPointer := &count - receives := make(chan payload.Payload) - - go func() { - var count int32 - for range receives { - count++ - } - fmt.Println("***** count:", count) - }() - - inputs.(flux.Flux).DoFinally(func(s rx.SignalType) { - close(receives) - }).Subscribe(context.Background(), rx.OnNext(func(input payload.Payload) { - //fmt.Println("rcv from channel:", input) - receives <- input - })) - - return flux.Create(func(ctx context.Context, s flux.Sink) { - for i := 0; i < 2; i++ { - s.Next(payload.NewString(testData, fmt.Sprintf("%d_from_server", i))) - } - s.Complete() - }) - }), - ) - }). - Transport("tcp://127.0.0.1:7878"). - Serve(context.Background()) - fmt.Println("SERVER STOPPED!!!!!") - panic(err) -} diff --git a/internal/fragmentation/joiner_test.go b/internal/fragmentation/joiner_test.go index 8cd8624..ffb121f 100644 --- a/internal/fragmentation/joiner_test.go +++ b/internal/fragmentation/joiner_test.go @@ -12,7 +12,6 @@ func TestFragmentPayload(t *testing.T) { const totals = 10 const sid = uint32(1) fr := NewJoiner(framing.NewFramePayload(sid, []byte("(ROOT)"), []byte("(ROOT)"), framing.FlagFollow, framing.FlagMetadata)) - defer fr.Release() for i := 0; i < totals; i++ { data := fmt.Sprintf("(data%04d)", i) var frame *framing.FramePayload diff --git a/internal/fragmentation/splitter_test.go b/internal/fragmentation/splitter_test.go index 2149a61..e45725b 100644 --- a/internal/fragmentation/splitter_test.go +++ b/internal/fragmentation/splitter_test.go @@ -15,7 +15,6 @@ func TestSplitter_Split(t *testing.T) { joiner, err := split2joiner(mtu, data, metadata) assert.NoError(t, err, "split failed") - defer joiner.Release() m, ok := joiner.Metadata() assert.True(t, ok, "bad metadata") diff --git a/internal/framing/frame_test.go b/internal/framing/frame_test.go index 7e3fbd8..612ac6a 100644 --- a/internal/framing/frame_test.go +++ b/internal/framing/frame_test.go @@ -13,8 +13,6 @@ func TestDecode_Payload(t *testing.T) { //s := "000000012940000005776f726c6468656c6c6f" // go s := "00000001296000000966726f6d5f6a617661706f6e67" //java - - bs, err := hex.DecodeString(s) assert.NoError(t, err, "bad bytes") h := ParseFrameHeader(bs[:HeaderLen]) @@ -24,15 +22,5 @@ func TestDecode_Payload(t *testing.T) { pl := &FramePayload{ BaseFrame: NewBaseFrame(h, bf), } - defer pl.Release() log.Println(pl) - - - - - - - - - } diff --git a/internal/socket/duplex.go b/internal/socket/duplex.go index 71f600f..abc3c7a 100644 --- a/internal/socket/duplex.go +++ b/internal/socket/duplex.go @@ -835,7 +835,6 @@ func (p *DuplexRSocket) sendPayload( BaseFrame: framing.NewBaseFrame(h, body), }) }) - return } func (p *DuplexRSocket) drainWithKeepalive() (ok bool) { diff --git a/payload/payload.go b/payload/payload.go index 3aebaaa..53812c9 100644 --- a/payload/payload.go +++ b/payload/payload.go @@ -1,20 +1,12 @@ package payload import ( - "errors" - "fmt" "io/ioutil" - "os" "time" "github.com/rsocket/rsocket-go/internal/common" ) -var ( - errNotFile = errors.New("target is not file") - errTooLargePooledPayload = fmt.Sprintf("too large pooled payload: maximum size is %d", common.MaxUint24-3) -) - type ( // Payload is a stream message (upstream or downstream). // It contains data associated with a stream created by a previous request. @@ -82,17 +74,6 @@ func NewString(data, metadata string) Payload { // NewFile create a new payload from file. func NewFile(filename string, metadata []byte) (Payload, error) { - fi, err := os.Stat(filename) - if err != nil { - return nil, err - } - if fi.IsDir() { - return nil, errNotFile - } - // Check file size - //if fi.Size() > common.MaxUint24 { - // return nil, errTooLargeFile - //} bs, err := ioutil.ReadFile(filename) if err != nil { return nil, err