diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index 512fdaa..8e06aa7 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -28,12 +28,11 @@ var DefaultRetryConfig = RetryConfig{ } const ( - statusOk = 0 - privFile = "priv_validator_key.json" - baseCurrency = "USDT" + statusOk = 0 + privFile = "priv_validator_key.json" //feeder_tokenName_feederID - loggerTagPrefix = "feed_%s_%d" + // loggerTagPrefix = "feed_%s_%d" ) // var updateConfig sync.Mutex @@ -68,7 +67,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo ecClient.Subscribe() - fsMap := NewFeederMap() + feeders := NewFeeders(feedertypes.GetLogger("feeders"), f, ecClient) // we don't check empty tokenfeeders list maxNonce := oracleP.MaxNonce for feederID, feeder := range oracleP.TokenFeeders { @@ -83,9 +82,8 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo panic(fmt.Sprintf("source of nst:%s is not set", tokenName)) } } - fsMap.Add(feeder, feederID, f, ecClient, source, tokenName, maxNonce, feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, tokenName, feederID))) + feeders.SetupFeeder(feeder, feederID, source, tokenName, maxNonce) } - feeders := fsMap.NewFeeders(logger) feeders.Start() for event := range ecClient.EventsCh() { @@ -94,7 +92,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo if paramsUpdate := e.ParamsUpdate(); paramsUpdate { oracleP, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger) if err != nil { - fmt.Printf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err) + logger.Error(fmt.Sprintf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err)) return } feeders.UpdateOracleParams(oracleP) diff --git a/cmd/types.go b/cmd/types.go index 494af3b..dc1b998 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -2,15 +2,19 @@ package cmd import ( "errors" + "fmt" + "strings" + "sync" oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - types "github.com/ExocoreNetwork/price-feeder/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) +const loggerTagPrefix = "feed_%s_%d" + type priceFetcher interface { GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) AddTokenForSource(source, token string) bool @@ -92,38 +96,45 @@ type feeder struct { paramsCh chan *updateParamsReq } -type feederInfo struct { - source string - token string - tokenID uint64 - feederID int +type FeederInfo struct { + Source string + Token string + TokenID uint64 + FeederID int // TODO: add check for rouleID, v1 can be skipped // ruleID - startRoundID int64 - startBaseBlock int64 - interval int64 - endBlock int64 - lastPrice localPrice - lastSent signInfo -} - -func (f *feeder) Info() feederInfo { - return feederInfo{ - source: f.source, - token: f.token, - tokenID: f.tokenID, - feederID: f.feederID, - startRoundID: f.startRoundID, - startBaseBlock: f.startBaseBlock, - interval: f.interval, - endBlock: f.endBlock, - lastPrice: *f.lastPrice, - lastSent: *f.lastSent, + StartRoundID int64 + StartBaseBlock int64 + Interval int64 + EndBlock int64 + LastPrice localPrice + LastSent signInfo +} + +func (f *feeder) Info() FeederInfo { + var lastPrice localPrice + var lastSent signInfo + if f.lastPrice != nil { + lastPrice = *f.lastPrice + } + if f.lastSent != nil { + lastSent = *f.lastSent + } + return FeederInfo{ + Source: f.source, + Token: f.token, + TokenID: f.tokenID, + FeederID: f.feederID, + StartRoundID: f.startRoundID, + StartBaseBlock: f.startBaseBlock, + Interval: f.interval, + EndBlock: f.endBlock, + LastPrice: lastPrice, + LastSent: lastSent, } } -// NewFeeder new a feeder struct from oracletypes' tokenfeeder -func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { +func newFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { return &feeder{ logger: logger, source: source, @@ -135,15 +146,12 @@ func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, startBaseBlock: int64(tf.StartBaseBlock), interval: int64(tf.Interval), endBlock: int64(tf.EndBlock), - - // maxNonce: maxNonce, - - fetcher: fetcher, - submitter: submitter, + fetcher: fetcher, + submitter: submitter, + lastPrice: &localPrice{}, lastSent: &signInfo{ maxNonce: maxNonce, }, - lastPrice: &localPrice{}, priceCh: make(chan *updatePrice, 1), heightsCh: make(chan *triggerHeights, 1), @@ -213,7 +221,10 @@ func (f *feeder) start() { // update latest height that price had been updated f.lastPrice.height = price.txHeight case req := <-f.paramsCh: - f.updateFeederParams(req.params) + if err := f.updateFeederParams(req.params); err != nil { + // This should not happen under this case. + f.logger.Error("failed to update params", "new params", req.params) + } req.result <- &updateParamsRes{} } } @@ -253,9 +264,25 @@ func (f *feeder) trigger(commitHeight, priceHeight int64) { } } -func (f *feeder) updateFeederParams(p *oracletypes.Params) { +func (f *feeder) updateFeederParams(p *oracletypes.Params) error { + if p == nil || len(p.TokenFeeders) < f.feederID+1 { + return errors.New("invalid oracle parmas") + } // TODO: update feeder's params - + tokenFeeder := p.TokenFeeders[f.feederID] + if f.endBlock != int64(tokenFeeder.EndBlock) { + f.endBlock = int64(tokenFeeder.EndBlock) + } + if f.startBaseBlock != int64(tokenFeeder.StartBaseBlock) { + f.startBaseBlock = int64(tokenFeeder.StartBaseBlock) + } + if f.interval != int64(tokenFeeder.Interval) { + f.interval = int64(tokenFeeder.Interval) + } + if p.MaxNonce > 0 { + f.lastSent.maxNonce = p.MaxNonce + } + return nil } // TODO: stop feeder routine @@ -289,63 +316,59 @@ type updatePricesReq struct { prices []*finalPrice } -type feederMap map[int]*feeder - -// NewFeederMap new a map feeder> -func NewFeederMap() feederMap { - return make(map[int]*feeder) +type Feeders struct { + locker *sync.Mutex + running bool + fetcher priceFetcher + submitter priceSubmitter + logger feedertypes.LoggerInf + feederMap map[int]*feeder + // TODO: feeder has sync management, so feeders could remove these channel + trigger chan *triggerReq + updatePrice chan *updatePricesReq + updateParams chan *oracletypes.Params + // updateNST chan *updateNSTReq } -func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { + +func NewFeeders(logger feedertypes.LoggerInf, fetcher priceFetcher, submitter priceSubmitter) *Feeders { return &Feeders{ + locker: new(sync.Mutex), logger: logger, - feederMap: fm, + fetcher: fetcher, + submitter: submitter, + feederMap: make(map[int]*feeder), + // feederMap: fm, // don't block on height increasing trigger: make(chan *triggerReq, 1), updatePrice: make(chan *updatePricesReq, 1), - // block on update-params delivery, no buffer - updateParams: make(chan *oracletypes.Params), + // it's safe to have a buffer to not block running feeders, + // since for running feeders, only endBlock is possible to be modified + updateParams: make(chan *oracletypes.Params, 1), } -} - -// Add adds a new feeder with feederID into the map -func (fm feederMap) Add(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) { - fm[feederID] = &feeder{ - logger: logger, - source: source, - token: token, - tokenID: tf.TokenID, - feederID: feederID, - // these conversion a safe since the block height defined in cosmossdk is int64 - startRoundID: int64(tf.StartRoundID), - startBaseBlock: int64(tf.StartBaseBlock), - interval: int64(tf.Interval), - endBlock: int64(tf.EndBlock), - fetcher: fetcher, - submitter: submitter, - lastPrice: &localPrice{}, - lastSent: &signInfo{ - maxNonce: maxNonce, - }, - priceCh: make(chan *updatePrice, 1), - heightsCh: make(chan *triggerHeights, 1), - paramsCh: make(chan *updateParamsReq, 1), - } } -type Feeders struct { - logger feedertypes.LoggerInf - feederMap map[int]*feeder - // TODO: feeder has sync management, so feeders could remove these channel - trigger chan *triggerReq - updatePrice chan *updatePricesReq - updateParams chan *oracletypes.Params - // updateNST chan *updateNSTReq +func (fs *Feeders) SetupFeeder(tf *oracletypes.TokenFeeder, feederID int, source string, token string, maxNonce int32) { + fs.locker.Lock() + defer fs.locker.Unlock() + if fs.running { + fs.logger.Error("failed to setup feeder for a running feeders, this should be called before feeders is started") + return + } + fs.feederMap[feederID] = newFeeder(tf, feederID, fs.fetcher, fs.submitter, source, token, maxNonce, fs.logger.With("feeder", fmt.Sprintf(loggerTagPrefix, token, feederID))) } // Start will start to listen the trigger(newHeight) and updatePrice events // usd channels to avoid race condition on map func (fs *Feeders) Start() { + fs.locker.Lock() + if fs.running { + fs.logger.Error("failed to start feeders since it's already running") + fs.locker.Unlock() + return + } + fs.running = true + fs.locker.Unlock() for _, f := range fs.feederMap { f.start() } @@ -354,16 +377,32 @@ func (fs *Feeders) Start() { select { case params := <-fs.updateParams: results := []chan *updateParamsRes{} + existingFeederIDs := make(map[int64]struct{}) for _, f := range fs.feederMap { res := f.updateParams(params) results = append(results, res) + existingFeederIDs[int64(f.feederID)] = struct{}{} } // wait for all feeders to complete updateing params for _, res := range results { <-res } - // TODO: add newly added tokenfeeders if exists - + for tfID, tf := range params.TokenFeeders { + if _, ok := existingFeederIDs[int64(tfID)]; !ok { + // create and start a new feeder + tokenName := strings.ToLower(params.Tokens[tf.TokenID].Name) + source := fetchertypes.Chainlink + if fetchertypes.IsNSTToken(tokenName) { + nstToken := fetchertypes.NSTToken(tokenName) + if source = fetchertypes.GetNSTSource(nstToken); len(source) == 0 { + fs.logger.Error("failed to add new feeder, source of nst token is not set", "token", tokenName) + } + } + feeder := newFeeder(tf, tfID, fs.fetcher, fs.submitter, source, tokenName, params.MaxNonce, fs.logger) + fs.feederMap[tfID] = feeder + feeder.start() + } + } case t := <-fs.trigger: // the order does not matter for _, f := range fs.feederMap { @@ -411,45 +450,15 @@ func (fs *Feeders) UpdatePrice(txHeight int64, prices []*finalPrice) { } // UpdateOracleParams updates all feeders' params from oracle params -// blocking until all feeders update their params -// when this method returned, it's guaranteed no further actions received wll be executed unless that all feeders have updated their params +// if the receiving channel is full, blocking until all updateParams are received by the channel func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { + if p == nil { + fs.logger.Error("received nil oracle params") + return + } + if len(p.TokenFeeders) == 0 { + fs.logger.Error("received empty token feeders") + return + } fs.updateParams <- p } - -// Define the types for the feeder -// type feederParams struct { -// startRoundID uint64 -// startBlock uint64 -// endBlock uint64 -// interval uint64 -// decimal int32 -// tokenIDStr string -// feederID int64 -// tokenName string -// } -// -// func (f *feederParams) update(p oracletypes.Params) (updated bool) { -// tokenFeeder := p.TokenFeeders[f.feederID] -// if tokenFeeder.StartBaseBlock != f.startBlock { -// f.startBlock = tokenFeeder.StartBaseBlock -// updated = true -// } -// if tokenFeeder.EndBlock != f.endBlock { -// f.endBlock = tokenFeeder.EndBlock -// updated = true -// } -// if tokenFeeder.Interval != f.interval { -// f.interval = tokenFeeder.Interval -// updated = true -// } -// if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { -// f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal -// updated = true -// } -// return -// } - -func getLogger() types.LoggerInf { - return types.GetLogger("") -} diff --git a/fetcher/chainlink/chainlink.go b/fetcher/chainlink/chainlink.go index fccbbcf..903affb 100644 --- a/fetcher/chainlink/chainlink.go +++ b/fetcher/chainlink/chainlink.go @@ -16,7 +16,12 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) +const baseCurrency = "usdt" + func (s *source) fetch(token string) (*fetchertypes.PriceInfo, error) { + if !strings.HasSuffix(token, baseCurrency) { + token += baseCurrency + } chainlinkPriceFeedProxy, ok := s.chainlinkProxy.get(token) if !ok { return nil, feedertypes.ErrSourceTokenNotConfigured.Wrap(fmt.Sprintf("chainlinkProxy not configured for token: %s", token))