From b0eb57a281df431af3cf4e5f9627a4298678a308 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Tue, 31 Dec 2024 13:27:45 +0800 Subject: [PATCH] feat:reconnecting ws when closed --- cmd/feeder_debug.go | 6 +++--- exoclient/client.go | 8 ------- exoclient/subscribe.go | 48 ++++++++++++++++++------------------------ 3 files changed, 24 insertions(+), 38 deletions(-) diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index dde28fd..0cfe62c 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -106,8 +106,8 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { var ( DebugRetryConfig = RetryConfig{ - MaxAttempts: 10, - Interval: 3 * time.Second, + MaxAttempts: 43200, + Interval: 2 * time.Second, } ) @@ -134,7 +134,7 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn } ec, _ := exoclient.GetClient() - _, err := getOracleParamsWithMaxRetry(1, ec, logger) + _, err := getOracleParamsWithMaxRetry(DebugRetryConfig.MaxAttempts, ec, logger) if err != nil { logger.Error("failed to get oracle params", "error", err) return diff --git a/exoclient/client.go b/exoclient/client.go index 073dfe3..a6eb56e 100644 --- a/exoclient/client.go +++ b/exoclient/client.go @@ -105,14 +105,6 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDe }, Proxy: http.ProxyFromEnvironment, } - ec.logger.Info("establish ws connection") - ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{}) - if err != nil { - return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err)) - } - ec.wsClient.SetPongHandler(func(string) error { - return nil - }) } return ec, nil } diff --git a/exoclient/subscribe.go b/exoclient/subscribe.go index 4ac86bf..e3adf87 100644 --- a/exoclient/subscribe.go +++ b/exoclient/subscribe.go @@ -14,9 +14,6 @@ type subEvent string type eventQuery string const ( - // subTypeNewBlock subType = "tm.event='NewBlock'" - // subTypeTxUpdatePrice subType = "tm.event='Tx' AND create_price.price_update='success'" - // subTypeTxNativeToken subType = "tm.event='Tx' AND create_price.native_token_update='update'" subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}` reconnectInterval = 3 maxRetry = 600 @@ -27,15 +24,9 @@ const ( ) var ( - // subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'")) - subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock)) - // subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'")) + subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock)) subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, eTxUpdatePrice)) - // subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'")) subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, eTxNativeToken)) - // eNewBlock subEvent = "tm.event='NewBlock'" - // eTxUpdatePrice subEvent = "tm.event='Tx' AND create_price.price_update='success'" - // eTxNativeToken subEvent = "tm.event='Tx' AND create_price.native_token_update='update'" events = map[subEvent]bool{ subNewBlock: true, @@ -44,7 +35,7 @@ var ( } ) -func (ec exoClient) Subscribe() { +func (ec *exoClient) Subscribe() { // set up a background routine to listen to 'stop' signal and restart all tasks // we expect this rountine as a forever-running process unless failed more than maxretry times // or failed to confirm all routines closed after timeout when reciving stop signal @@ -52,7 +43,8 @@ func (ec exoClient) Subscribe() { ec.logger.Info("start subscriber job with all tasks") ec.startTasks() defer ec.wsClient.Close() - for _ = range ec.wsStop { + for { + <-ec.wsStop ec.logger.Info("ws connection closed, mark connection as inactive and waiting for all ws routines to complete stopping") // mark ws connection as inactive to prevent further ws routine starting ec.markWsInactive() @@ -71,6 +63,7 @@ func (ec exoClient) Subscribe() { time.Sleep(1 * time.Second) } } + ec.startTasks() } }() @@ -84,13 +77,13 @@ func (ec exoClient) EventsCh() chan EventInf { // 1. routine: send ping message // 2. subscribe to events // 3. routine: read events from ws connection -func (ec exoClient) startTasks() { +func (ec *exoClient) startTasks() { // ws connection stopped, reset subscriber - //ec.logger.Info("establish ws connection") - // if err := ec.connectWs(maxRetry); err != nil { - // // continue - // panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err)) - // } + ec.logger.Info("establish ws connection") + if err := ec.connectWs(maxRetry); err != nil { + // continue + panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err)) + } ec.markWsActive() ec.logger.Info("subscribe to ws publish", "events", events) if err := ec.sendAllSubscribeMsgs(maxRetry); err != nil { @@ -107,12 +100,11 @@ func (ec exoClient) startTasks() { ec.logger.Info("setuped all subscriber tasks successfully") } -func (ec exoClient) connectWs(maxRetry int) error { - if len(ec.wsEndpoint) == 0 { - return errors.New("wsEndpoint not set in exoClient") +func (ec *exoClient) connectWs(maxRetry int) error { + if ec.wsDialer == nil { + return errors.New("wsDialer not set in exoClient") } var err error - // ec.wsClient.Close() count := 0 for count < maxRetry { if ec.wsClient, _, err = ec.wsDialer.Dial(ec.wsEndpoint, http.Header{}); err == nil { @@ -140,10 +132,11 @@ func (ec *exoClient) StopWsRoutines() { default: close(ec.wsStop) } + ec.wsLock.Unlock() } -func (ec *exoClient) increaseWsRountines() (int, bool) { +func (ec *exoClient) increaseWsRoutines() (int, bool) { // only increase active rountine count when the wsConnection is active ec.wsLock.Lock() defer ec.wsLock.Unlock() @@ -157,12 +150,13 @@ func (ec *exoClient) increaseWsRountines() (int, bool) { func (ec *exoClient) decreaseWsRountines() (int, bool) { ec.wsLock.Lock() defer ec.wsLock.Unlock() - if *ec.wsActive { + if ec.wsActiveRoutines != nil { if (*ec.wsActiveRoutines)--; *ec.wsActiveRoutines < 0 { *ec.wsActiveRoutines = 0 } return *ec.wsActiveRoutines, true } + return *ec.wsActiveRoutines, false } @@ -217,7 +211,7 @@ func (ec exoClient) sendAllSubscribeMsgs(maxRetry int) error { } func (ec exoClient) startPingRoutine() bool { - if _, ok := ec.increaseWsRountines(); !ok { + if _, ok := ec.increaseWsRoutines(); !ok { // ws connection is not active return ok } @@ -245,8 +239,8 @@ func (ec exoClient) startPingRoutine() bool { websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), ); err != nil { logger.Error("failed to write close message to publisher", "error", err) - return } + return } } }() @@ -254,7 +248,7 @@ func (ec exoClient) startPingRoutine() bool { } func (ec exoClient) startReadRoutine() bool { - if _, ok := ec.increaseWsRountines(); !ok { + if _, ok := ec.increaseWsRoutines(); !ok { // ws connection is not active return ok }