Skip to content

Commit

Permalink
small refactor and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsannm committed May 25, 2024
1 parent 33112aa commit a9e59fa
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 4 deletions.
1 change: 1 addition & 0 deletions std/gateways/fastws/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (b *bundle) Start(_ context.Context, cfg kit.GatewayStartConfig) error {
opts := []gnet.Option{
gnet.WithMulticore(true),
gnet.WithReusePort(cfg.ReusePort),
gnet.WithReuseAddr(cfg.ReusePort),
}

err := gnet.Run(b.eh, b.listen, opts...)
Expand Down
4 changes: 2 additions & 2 deletions std/gateways/fastws/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (gw *gateway) OnClose(c gnet.Conn, _ error) (action gnet.Action) {
func (gw *gateway) OnTraffic(c gnet.Conn) gnet.Action {
wsc := gw.getConnWrap(c)
if wsc == nil {
gw.b.l.Debugf("did not find ws conn for connID(%d)", utils.TryCast[int64](c.Context()))
gw.b.l.Debugf("did not find ws conn for connID(%d)", utils.TryCast[uint64](c.Context()))

return gnet.Close
}
Expand All @@ -97,7 +97,7 @@ func (gw *gateway) OnTraffic(c gnet.Conn) gnet.Action {
_, err := sp.Upgrade(wsc.c)
if err != nil {
wsc.Close()
gw.b.l.Debugf("faild to upgrade websocket connID(%d): %v", utils.TryCast[int64](c.Context()), err)
gw.b.l.Debugf("faild to upgrade websocket connID(%d): %v", utils.TryCast[uint64](c.Context()), err)

return gnet.Close
}
Expand Down
76 changes: 74 additions & 2 deletions testenv/fastws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
func TestFastWS(t *testing.T) {
Convey("Kit with FastWS", t, func(c C) {
testCases := map[string]func(t *testing.T, opt fx.Option) func(c C){
"Edger Server With Huge Websocket Payload": fastwsWithHugePayload,
"Edge Server With Huge Websocket Payload": fastwsWithHugePayload,
//"Edge Server With Ping and Small Payload": fastwsWithPingAndSmallPayload,
//"Edge Server With Ping Only": fastwsWithPingOnly,
}
for title, fn := range testCases {
Convey(title,
Expand Down Expand Up @@ -89,7 +91,48 @@ func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) {

wsCtx := stub.New(
"localhost:8082",
//stub.WithLogger(&stdLogger{}),
// stub.WithLogger(common.NewStdLogger()),
).
Websocket(
stub.WithPredicateKey("cmd"),
stub.WithPingTime(time.Second),
)
c.So(wsCtx.Connect(ctx, "/"), ShouldBeNil)

for i := 0; i < 10; i++ {
req := &services.EchoRequest{Input: utils.RandomID(8192)}
res := &services.EchoResponse{}
err := wsCtx.BinaryMessage(
ctx, "echo", req, res,
func(ctx context.Context, msg kit.Message, hdr stub.Header, err error) {
c.So(err, ShouldBeNil)
c.So(msg.(*services.EchoResponse).Output, ShouldEqual, req.Input) //nolint:forcetypeassert
},
)
_ = err
//_, _ = c.Println("Error: ", err)
// c.So(err, ShouldBeNil)
time.Sleep(time.Second * 2)
}
}
}

func fastwsWithPingAndSmallPayload(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

return func(c C) {
Prepare(
t, c,
fx.Options(
opt,
),
)

time.Sleep(time.Second * 2)

wsCtx := stub.New(
"localhost:8082",
stub.WithLogger(common.NewStdLogger()),
).
Websocket(
stub.WithPredicateKey("cmd"),
Expand All @@ -112,3 +155,32 @@ func fastwsWithHugePayload(t *testing.T, opt fx.Option) func(c C) {
}
}
}

func fastwsWithPingOnly(t *testing.T, opt fx.Option) func(c C) {
ctx := context.Background()

return func(c C) {
Prepare(
t, c,
fx.Options(
opt,
),
)

time.Sleep(time.Second * 2)

wsCtx := stub.New(
"localhost:8082",
stub.WithLogger(common.NewStdLogger()),
).
Websocket(
stub.WithPredicateKey("cmd"),
stub.WithPingTime(time.Second),
)
c.So(wsCtx.Connect(ctx, "/"), ShouldBeNil)

_, _ = c.Println("waiting for 10sec ...")
time.Sleep(time.Second * 10)

}
}

0 comments on commit a9e59fa

Please sign in to comment.