Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:reconnecting ws when closed #14

Merged
merged 1 commit into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {

var (
DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
MaxAttempts: 43200,
Interval: 2 * time.Second,
}
)

Expand All @@ -134,7 +134,7 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
}
ec, _ := exoclient.GetClient()

_, err := getOracleParamsWithMaxRetry(1, ec, logger)
_, err := getOracleParamsWithMaxRetry(DebugRetryConfig.MaxAttempts, ec, logger)
if err != nil {
logger.Error("failed to get oracle params", "error", err)
return
Expand Down
8 changes: 0 additions & 8 deletions exoclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDe
},
Proxy: http.ProxyFromEnvironment,
}
ec.logger.Info("establish ws connection")
ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{})
if err != nil {
return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err))
}
ec.wsClient.SetPongHandler(func(string) error {
return nil
})
}
return ec, nil
}
Expand Down
48 changes: 21 additions & 27 deletions exoclient/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ type subEvent string
type eventQuery string

const (
// subTypeNewBlock subType = "tm.event='NewBlock'"
// subTypeTxUpdatePrice subType = "tm.event='Tx' AND create_price.price_update='success'"
// subTypeTxNativeToken subType = "tm.event='Tx' AND create_price.native_token_update='update'"
subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}`
reconnectInterval = 3
maxRetry = 600
Expand All @@ -27,15 +24,9 @@ const (
)

var (
// subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'"))
subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock))
// subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'"))
subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock))
subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, eTxUpdatePrice))
// subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'"))
subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, eTxNativeToken))
// eNewBlock subEvent = "tm.event='NewBlock'"
// eTxUpdatePrice subEvent = "tm.event='Tx' AND create_price.price_update='success'"
// eTxNativeToken subEvent = "tm.event='Tx' AND create_price.native_token_update='update'"

events = map[subEvent]bool{
subNewBlock: true,
Expand All @@ -44,15 +35,16 @@ var (
}
)

func (ec exoClient) Subscribe() {
func (ec *exoClient) Subscribe() {
// set up a background routine to listen to 'stop' signal and restart all tasks
// we expect this rountine as a forever-running process unless failed more than maxretry times
// or failed to confirm all routines closed after timeout when reciving stop signal
go func() {
ec.logger.Info("start subscriber job with all tasks")
ec.startTasks()
defer ec.wsClient.Close()
for _ = range ec.wsStop {
for {
<-ec.wsStop
ec.logger.Info("ws connection closed, mark connection as inactive and waiting for all ws routines to complete stopping")
// mark ws connection as inactive to prevent further ws routine starting
ec.markWsInactive()
Expand All @@ -71,6 +63,7 @@ func (ec exoClient) Subscribe() {
time.Sleep(1 * time.Second)
}
}

ec.startTasks()
}
}()
Expand All @@ -84,13 +77,13 @@ func (ec exoClient) EventsCh() chan EventInf {
// 1. routine: send ping message
// 2. subscribe to events
// 3. routine: read events from ws connection
func (ec exoClient) startTasks() {
func (ec *exoClient) startTasks() {
// ws connection stopped, reset subscriber
//ec.logger.Info("establish ws connection")
// if err := ec.connectWs(maxRetry); err != nil {
// // continue
// panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err))
// }
ec.logger.Info("establish ws connection")
if err := ec.connectWs(maxRetry); err != nil {
// continue
panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err))
}
ec.markWsActive()
ec.logger.Info("subscribe to ws publish", "events", events)
if err := ec.sendAllSubscribeMsgs(maxRetry); err != nil {
Expand All @@ -107,12 +100,11 @@ func (ec exoClient) startTasks() {
ec.logger.Info("setuped all subscriber tasks successfully")
}

func (ec exoClient) connectWs(maxRetry int) error {
if len(ec.wsEndpoint) == 0 {
return errors.New("wsEndpoint not set in exoClient")
func (ec *exoClient) connectWs(maxRetry int) error {
if ec.wsDialer == nil {
return errors.New("wsDialer not set in exoClient")
}
var err error
// ec.wsClient.Close()
count := 0
for count < maxRetry {
if ec.wsClient, _, err = ec.wsDialer.Dial(ec.wsEndpoint, http.Header{}); err == nil {
Expand Down Expand Up @@ -140,10 +132,11 @@ func (ec *exoClient) StopWsRoutines() {
default:
close(ec.wsStop)
}

ec.wsLock.Unlock()
}

func (ec *exoClient) increaseWsRountines() (int, bool) {
func (ec *exoClient) increaseWsRoutines() (int, bool) {
// only increase active rountine count when the wsConnection is active
ec.wsLock.Lock()
defer ec.wsLock.Unlock()
Expand All @@ -157,12 +150,13 @@ func (ec *exoClient) increaseWsRountines() (int, bool) {
func (ec *exoClient) decreaseWsRountines() (int, bool) {
ec.wsLock.Lock()
defer ec.wsLock.Unlock()
if *ec.wsActive {
if ec.wsActiveRoutines != nil {
if (*ec.wsActiveRoutines)--; *ec.wsActiveRoutines < 0 {
*ec.wsActiveRoutines = 0
}
return *ec.wsActiveRoutines, true
}

return *ec.wsActiveRoutines, false
}

Expand Down Expand Up @@ -217,7 +211,7 @@ func (ec exoClient) sendAllSubscribeMsgs(maxRetry int) error {
}

func (ec exoClient) startPingRoutine() bool {
if _, ok := ec.increaseWsRountines(); !ok {
if _, ok := ec.increaseWsRoutines(); !ok {
// ws connection is not active
return ok
}
Expand Down Expand Up @@ -245,16 +239,16 @@ func (ec exoClient) startPingRoutine() bool {
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
); err != nil {
logger.Error("failed to write close message to publisher", "error", err)
return
}
return
}
}
}()
return true
}

func (ec exoClient) startReadRoutine() bool {
if _, ok := ec.increaseWsRountines(); !ok {
if _, ok := ec.increaseWsRoutines(); !ok {
// ws connection is not active
return ok
}
Expand Down
Loading