Skip to content

Commit

Permalink
Merge pull request #15 from ExocoreNetwork/develop
Browse files Browse the repository at this point in the history
Reconnection ws connection when it is closed
  • Loading branch information
leonz789 authored Dec 31, 2024
2 parents 7516de7 + 5afb53c commit 0c42dc8
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 38 deletions.
6 changes: 3 additions & 3 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {

var (
DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
MaxAttempts: 43200,
Interval: 2 * time.Second,
}
)

Expand All @@ -134,7 +134,7 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
}
ec, _ := exoclient.GetClient()

_, err := getOracleParamsWithMaxRetry(1, ec, logger)
_, err := getOracleParamsWithMaxRetry(DebugRetryConfig.MaxAttempts, ec, logger)
if err != nil {
logger.Error("failed to get oracle params", "error", err)
return
Expand Down
8 changes: 0 additions & 8 deletions exoclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDe
},
Proxy: http.ProxyFromEnvironment,
}
ec.logger.Info("establish ws connection")
ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{})
if err != nil {
return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err))
}
ec.wsClient.SetPongHandler(func(string) error {
return nil
})
}
return ec, nil
}
Expand Down
48 changes: 21 additions & 27 deletions exoclient/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ type subEvent string
type eventQuery string

const (
// subTypeNewBlock subType = "tm.event='NewBlock'"
// subTypeTxUpdatePrice subType = "tm.event='Tx' AND create_price.price_update='success'"
// subTypeTxNativeToken subType = "tm.event='Tx' AND create_price.native_token_update='update'"
subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}`
reconnectInterval = 3
maxRetry = 600
Expand All @@ -27,15 +24,9 @@ const (
)

var (
// subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'"))
subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock))
// subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'"))
subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock))
subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, eTxUpdatePrice))
// subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'"))
subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, eTxNativeToken))
// eNewBlock subEvent = "tm.event='NewBlock'"
// eTxUpdatePrice subEvent = "tm.event='Tx' AND create_price.price_update='success'"
// eTxNativeToken subEvent = "tm.event='Tx' AND create_price.native_token_update='update'"

events = map[subEvent]bool{
subNewBlock: true,
Expand All @@ -44,15 +35,16 @@ var (
}
)

func (ec exoClient) Subscribe() {
func (ec *exoClient) Subscribe() {
// set up a background routine to listen to 'stop' signal and restart all tasks
// we expect this rountine as a forever-running process unless failed more than maxretry times
// or failed to confirm all routines closed after timeout when reciving stop signal
go func() {
ec.logger.Info("start subscriber job with all tasks")
ec.startTasks()
defer ec.wsClient.Close()
for _ = range ec.wsStop {
for {
<-ec.wsStop
ec.logger.Info("ws connection closed, mark connection as inactive and waiting for all ws routines to complete stopping")
// mark ws connection as inactive to prevent further ws routine starting
ec.markWsInactive()
Expand All @@ -71,6 +63,7 @@ func (ec exoClient) Subscribe() {
time.Sleep(1 * time.Second)
}
}

ec.startTasks()
}
}()
Expand All @@ -84,13 +77,13 @@ func (ec exoClient) EventsCh() chan EventInf {
// 1. routine: send ping message
// 2. subscribe to events
// 3. routine: read events from ws connection
func (ec exoClient) startTasks() {
func (ec *exoClient) startTasks() {
// ws connection stopped, reset subscriber
//ec.logger.Info("establish ws connection")
// if err := ec.connectWs(maxRetry); err != nil {
// // continue
// panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err))
// }
ec.logger.Info("establish ws connection")
if err := ec.connectWs(maxRetry); err != nil {
// continue
panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err))
}
ec.markWsActive()
ec.logger.Info("subscribe to ws publish", "events", events)
if err := ec.sendAllSubscribeMsgs(maxRetry); err != nil {
Expand All @@ -107,12 +100,11 @@ func (ec exoClient) startTasks() {
ec.logger.Info("setuped all subscriber tasks successfully")
}

func (ec exoClient) connectWs(maxRetry int) error {
if len(ec.wsEndpoint) == 0 {
return errors.New("wsEndpoint not set in exoClient")
func (ec *exoClient) connectWs(maxRetry int) error {
if ec.wsDialer == nil {
return errors.New("wsDialer not set in exoClient")
}
var err error
// ec.wsClient.Close()
count := 0
for count < maxRetry {
if ec.wsClient, _, err = ec.wsDialer.Dial(ec.wsEndpoint, http.Header{}); err == nil {
Expand Down Expand Up @@ -140,10 +132,11 @@ func (ec *exoClient) StopWsRoutines() {
default:
close(ec.wsStop)
}

ec.wsLock.Unlock()
}

func (ec *exoClient) increaseWsRountines() (int, bool) {
func (ec *exoClient) increaseWsRoutines() (int, bool) {
// only increase active rountine count when the wsConnection is active
ec.wsLock.Lock()
defer ec.wsLock.Unlock()
Expand All @@ -157,12 +150,13 @@ func (ec *exoClient) increaseWsRountines() (int, bool) {
func (ec *exoClient) decreaseWsRountines() (int, bool) {
ec.wsLock.Lock()
defer ec.wsLock.Unlock()
if *ec.wsActive {
if ec.wsActiveRoutines != nil {
if (*ec.wsActiveRoutines)--; *ec.wsActiveRoutines < 0 {
*ec.wsActiveRoutines = 0
}
return *ec.wsActiveRoutines, true
}

return *ec.wsActiveRoutines, false
}

Expand Down Expand Up @@ -217,7 +211,7 @@ func (ec exoClient) sendAllSubscribeMsgs(maxRetry int) error {
}

func (ec exoClient) startPingRoutine() bool {
if _, ok := ec.increaseWsRountines(); !ok {
if _, ok := ec.increaseWsRoutines(); !ok {
// ws connection is not active
return ok
}
Expand Down Expand Up @@ -245,16 +239,16 @@ func (ec exoClient) startPingRoutine() bool {
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
); err != nil {
logger.Error("failed to write close message to publisher", "error", err)
return
}
return
}
}
}()
return true
}

func (ec exoClient) startReadRoutine() bool {
if _, ok := ec.increaseWsRountines(); !ok {
if _, ok := ec.increaseWsRoutines(); !ok {
// ws connection is not active
return ok
}
Expand Down

0 comments on commit 0c42dc8

Please sign in to comment.