From b11b1dc6b47dda9fd3fc4cfdc01574e146995c6b Mon Sep 17 00:00:00 2001 From: leonz789 Date: Wed, 25 Dec 2024 10:15:41 +0800 Subject: [PATCH] refactor feeder_tool, beaconchain_fetcher --- .gitignore | 2 +- cmd/feeder_tool.go | 521 ++++++++--------------------- cmd/types.go | 429 ++++++++++++++++-------- exoclient/client.go | 23 +- exoclient/grpc.go | 11 +- exoclient/query.go | 6 +- exoclient/subscribe.go | 93 ++--- exoclient/tx.go | 18 +- exoclient/types.go | 270 ++++++++++++++- fetcher/beaconchain/beaconchain.go | 116 ++----- fetcher/beaconchain/types.go | 150 +++++++-- fetcher/chainlink/chainlink.go | 12 +- fetcher/chainlink/types.go | 8 +- fetcher/fetcher.go | 471 +++++--------------------- fetcher/types/types.go | 117 +++++-- types/types.go | 47 ++- 16 files changed, 1117 insertions(+), 1177 deletions(-) diff --git a/.gitignore b/.gitignore index 83b9e82..f5aee36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ feed.sh config-*.yaml configs/ -node*.log +*.log pids build/ fetcher/beaconchain/oracle_env_beaconchain.yaml diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index 933b920..0addcc5 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -1,6 +1,7 @@ package cmd import ( + "errors" "fmt" "strings" "sync" @@ -10,21 +11,24 @@ import ( "github.com/ExocoreNetwork/price-feeder/fetcher" "github.com/ExocoreNetwork/price-feeder/types" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + "github.com/ExocoreNetwork/price-feeder/fetcher/beaconchain" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - // cmdcfg "github.com/ExocoreNetwork/exocore/cmd/config" - // sdk "github.com/cosmos/cosmos-sdk/types" ) const ( - statusOk = 0 - privFile = "priv_validator_key.json" - baseCurrency = "USDT" + statusOk = 0 + privFile = "priv_validator_key.json" + baseCurrency = "USDT" + defaultMaxRetry = 43200 + retryInterval = 2 * time.Second + + //feeder_tokenName_feederID + loggerTagPrefix = "feed_%s_%d" ) -var updateConfig sync.Mutex - -const nstToken = "nsteth" +// var updateConfig sync.Mutex // RunPriceFeeder runs price feeder to fetching price and feed to exocorechain func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) { @@ -33,7 +37,9 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo panic("logger is not initialized") } // init logger, fetchers, exocoreclient - initComponents(logger, conf, standalone) + once := new(sync.Once) + once.Do(func() { initComponents(logger, conf, standalone) }) + // initComponents(logger, conf, standalone) f, _ := fetcher.GetFetcher() // start fetching on all supported sources and tokens @@ -52,405 +58,142 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo time.Sleep(2 * time.Second) oracleP, err = ecClient.GetParams() } + ecClient.Subscribe() - // we don't handle empty tokenfeeders list - fsMap := make(map[int]*Feeder) + fsMap := NewFeederMap() + // we don't check empty tokenfeeders list + maxNonce := oracleP.MaxNonce for feederID, feeder := range oracleP.TokenFeeders { + if feederID == 0 { + continue + } tokenName := strings.ToLower(oracleP.Tokens[feeder.TokenID].Name) source := fetchertypes.Chainlink - if strings.EqualFold(tokenName, nstToken) { - source = fetchertypes.BeaconChain + if fetchertypes.IsNSTToken(tokenName) { + nstToken := fetchertypes.NSTToken(tokenName) + if source = fetchertypes.GetNSTSource(nstToken); len(source) == 0 { + panic(fmt.Sprintf("source of nst:%s is not set", tokenName)) + } } - fsMap[feederID] = NewFeeder(feeder, feederID, f, ecClient, source, tokenName, 0, feedertypes.GetLogger(fmt.Sprintf("feed_%s", tokenName))) + fsMap.Add(feeder, feederID, f, ecClient, source, tokenName, maxNonce, feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, tokenName, feederID))) } - - feeders := NewFeeders(fsMap) + feeders := fsMap.NewFeeders(logger) feeders.Start() for event := range ecClient.EventsCh() { - switch event.Type { - case exoclient.NewBlock: - case exoclient.UpdatePrice: - case exoclient.UpdateNST: + switch e := event.(type) { + case *exoclient.EventNewBlock: + if paramsUpdate := e.ParamsUpdate(); paramsUpdate { + oracleP, err = getOracleParamsWithMaxRetry(defaultMaxRetry, ecClient, logger) + if err != nil { + panic(fmt.Sprintf("Failed to get oracle params with maxRetry when params update detected, error:%v", err)) + } + feeders.UpdateOracleParams(oracleP) + // TODO: add newly added tokenfeeders if exists + } + feeders.Trigger(e.Height(), e.FeederIDs()) + case *exoclient.EventUpdatePrice: + finalPrices := make([]*finalPrice, 0, len(e.Prices())) + for _, price := range e.Prices() { + feederIDList := oracleP.GetFeederIDsByTokenID(uint64(price.TokenID())) + l := len(feederIDList) + if l == 0 { + logger.Error("Failed to get feederIDs by tokenID when try to updata local price for feeders on event_updatePrice", "tokenID", price.TokenID()) + continue + } + feederID := feederIDList[l-1] + finalPrices = append(finalPrices, &finalPrice{ + feederID: int64(feederID), + price: price.Price(), + decimal: price.Decimal(), + roundID: price.RoundID(), + }) + } + feeders.UpdatePrice(e.TxHeight(), finalPrices) + case *exoclient.EventUpdateNST: + // int conversion is safe + if updated := beaconchain.UpdateStakerValidators(int(e.StakerID()), e.ValidatorIndex(), uint64(e.Index()), e.Deposit()); !updated { + logger.Error("failed to update staker's validator list", "stakerID", e.StakerID(), "validatorIndex", e.ValidatorIndex, "deposit", e.Deposit(), "index", e.Index()) + // try to reset all validatorList + if err := ResetAllStakerValidators(ecClient, logger); err != nil { + logger.Error("failed to reset all staker's validators for native-restaking-eth") + // TODO: should we just clear all info to prevent further nst update + } + } } } - // TODO: currently the getStakerInfos will return nil error when empty response, avoid infinite loop - // initialize staker's validator list for eth-native-restaking - // nativeStakers := initializeNativeRestakingStakers(cc) - // for nativeToken, stakerInfos := range nativeStakers { - // f.ResetStakerValidatorsForAll(nativeToken, stakerInfos) - // } - - // oracleParamsFeedingTokens := make(map[string]struct{}) - // runningFeeders := make(map[int64]*feederInfo) - // // remainingFeeders used to track token feeders that have been set in oracle params but with no configuration from price-feeder for price fetching, and if the lenght of map is bigger than 0, price-feeder will try continuously to reload the configure file until those feeders are able to work - // remainningFeeders := make(map[string]*feederInfo) - // // check all live feeders and set seperate routine to udpate prices - // for feederID, feeder := range oracleP.TokenFeeders { - // if feederID == 0 { - // // feederID=0 is reserved - // continue - // } - // var tokenName string - // if strings.EqualFold(oracleP.Tokens[feeder.TokenID].Name, fetchertypes.NativeTokenETH) { - // tokenName = strings.ToLower(oracleP.Tokens[feeder.TokenID].Name) - // } else { - // tokenName = strings.ToLower(oracleP.Tokens[feeder.TokenID].Name + baseCurrency) - // } - // oracleParamsFeedingTokens[tokenName] = struct{}{} - // decimal := oracleP.Tokens[feeder.TokenID].Decimal - // fInfo := &feederInfo{ - // params: &feederParams{ - // startRoundID: feeder.StartRoundID, - // startBlock: feeder.StartBaseBlock, - // endBlock: feeder.EndBlock, - // interval: feeder.Interval, - // decimal: decimal, - // tokenIDStr: strconv.FormatInt(int64(feeder.TokenID), 10), - // feederID: int64(feederID), - // tokenName: tokenName, - // }, - // } - // remainningFeeders[tokenName] = fInfo - // // TODO: refactor - // if strings.EqualFold(tokenName, fetchertypes.NativeTokenETH) { - // // actually not usdt, we so need to do refactor for the mess - // delete(remainningFeeders, tokenName) - // trigger := make(chan eventRes, 3) - // fInfo.updateCh = trigger - // runningFeeders[int64(feederID)] = fInfo - // // start a routine to update price for this feeder - // go feedToken(fInfo, cc, f, conf) - // continue - // } - - // // check if this feeder is supported by this price-feeder - // for _, token := range conf.Tokens { - // if strings.EqualFold(token, tokenName) { - // delete(remainningFeeders, tokenName) - // trigger := make(chan eventRes, 3) - // fInfo.updateCh = trigger - // runningFeeders[int64(feederID)] = fInfo - // // start a routine to update price for this feeder - // go feedToken(fInfo, cc, f, conf) - // break - // } - // } - // } - - // newFeeder := make(chan *feederInfo) - // // check config update for remaining tokenFeeders set in oracle params - // if len(remainningFeeders) > 0 { - // // we set a background routine to update config file until we got all remainning tokens configured in oracleParams - // go reloadConfigToFetchNewTokens(remainningFeeders, newFeeder, cc, f) - // } - - // // subscribe newBlock to to trigger tx - // res, _ := exoclient.Subscriber(conf.Exocore.Ws.Addr, conf.Exocore.Ws.Endpoint) - - // for r := range res { - // event := eventRes{} - // var feederIDs []string - // if len(r.Height) > 0 { - // event.height, _ = strconv.ParseUint(r.Height, 10, 64) - // } - // if len(r.Gas) > 0 { - // event.gas, _ = strconv.ParseInt(r.Gas, 10, 64) - // } - // if r.ParamsUpdate { - // oracleP, err = exoclient.GetParams(cc) - // for err != nil { - // // retry forever until be interrupted manually - // logger.Error("Failed to get oracle params when params update detected, retrying...", "error", err) - // oracleP, err = exoclient.GetParams(cc) - // time.Sleep(2 * time.Second) - // } - // // set newly added tokenfeeders in the running queue and reload the configure for them to run porperly - // if remainningFeeders = updateCurrentFeedingTokens(oracleP, oracleParamsFeedingTokens); len(remainningFeeders) > 0 { - // // reoload config for newly added token feeders - // go reloadConfigToFetchNewTokens(remainningFeeders, newFeeder, cc, f) - // } - // } - - // if len(r.FeederIDs) > 0 { - // feederIDs = strings.Split(r.FeederIDs, "_") - // } - - // // this is an event that tells native token stakers update, we use 'ETH' temporary since we currently support eth-native-restaking only - // if len(r.NativeETH) > 0 { - // // TODO: we only support eth-native-restaking for now - // if success := f.UpdateNativeTokenValidators(fetchertypes.NativeTokenETH, r.NativeETH); !success { - // stakerInfos, err := exoclient.GetStakerInfos(cc, fetchertypes.NativeTokenETHAssetID) - // for err != nil { - // logger.Error("Failed to get stakerInfos, retrying...") - // stakerInfos, err = exoclient.GetStakerInfos(cc, fetchertypes.NativeTokenETHAssetID) - // time.Sleep(2 * time.Second) - // } - // f.ResetStakerValidatorsForAll(fetchertypes.NativeTokenETH, stakerInfos) - // } - // } - - // select { - // case feeder := <-newFeeder: - // runningFeeders[feeder.params.feederID] = feeder - // default: - // for _, fInfo := range runningFeeders { - // triggerFeeders(r, fInfo, event, oracleP, feederIDs) - // } - // } - // } } -// reloadConfigToFetchNewTokens reload config file for the remainning token feeders that are set in oracle params but not running properly for config missing -// func reloadConfigToFetchNewTokens(remainningFeeders map[string]*feederInfo, newFeeder chan *feederInfo, cc *grpc.ClientConn, f *fetcher.Fetcher) { -// logger := getLogger() -// updateConfig.Lock() -// length := len(remainningFeeders) -// for length > 0 { -// conf := types.ReloadConfig() -// for tokenRemainning, fInfo := range remainningFeeders { -// logger.Info("loading config for for token... ", "tokenName", fInfo.params.tokenName) -// for _, token := range conf.Tokens { -// if strings.EqualFold(token, tokenRemainning) { -// delete(remainningFeeders, tokenRemainning) -// length-- -// trigger := make(chan eventRes, 3) -// fInfo.updateCh = trigger -// // TODO: currently support chainlink only (index=0) -// f.AddTokenForSource(conf.Sources[0], tokenRemainning) -// // start a routine to update price for this feeder -// newFeeder <- fInfo -// go feedToken(fInfo, cc, f, conf) -// break -// } -// } -// } -// time.Sleep(10 * time.Second) -// } -// updateConfig.Unlock() -// } - -// updateCurrentFeedingTokens will update current running tokenFeeders based on the params change from upstream, and it will add the newly added tokenFeeders into the running queue and return that list which will be handled by invoker to set them properly as running tokenFeeders -// func updateCurrentFeedingTokens(oracleP oracletypes.Params, currentFeedingTokens map[string]struct{}) map[string]*feederInfo { -// remain := make(map[string]*feederInfo) -// for feederID, feeder := range oracleP.TokenFeeders { -// if feederID == 0 { -// // feederID=0 is reserved -// continue -// } -// tokenName := strings.ToLower(oracleP.Tokens[feeder.TokenID].Name + baseCurrency) -// if _, ok := currentFeedingTokens[tokenName]; ok { -// continue -// } -// decimal := oracleP.Tokens[feeder.TokenID].Decimal -// fInfo := &feederInfo{ -// params: &feederParams{ -// startRoundID: feeder.StartRoundID, -// startBlock: feeder.StartBaseBlock, -// endBlock: feeder.EndBlock, -// interval: feeder.Interval, -// decimal: decimal, -// tokenIDStr: strconv.FormatInt(int64(feeder.TokenID), 10), -// feederID: int64(feederID), -// tokenName: tokenName, -// }, -// } -// -// remain[tokenName] = fInfo -// currentFeedingTokens[tokenName] = struct{}{} -// } -// return remain -// } - -// feedToken will try to send create-price tx to update prices onto exocore chain when conditions reached including: tokenFeeder-running, inside-a-pricing-window, price-updated-since-previous-round -// -// func feedToken(fInfo *feederInfo, f *fetcher.Fetcher, conf types.Config) { -// logger := getLogger() -// pChan := make(chan *fetchertypes.PriceInfo) -// prevPrice := "" -// prevDecimal := -1 -// prevHeight := uint64(0) -// tokenID, _ := strconv.ParseUint(fInfo.params.tokenIDStr, 10, 64) -// -// startBlock := fInfo.params.startBlock -// endBlock := fInfo.params.endBlock -// interval := fInfo.params.interval -// decimal := int(fInfo.params.decimal) -// feederID := uint64(fInfo.params.feederID) -// -// if p, err := exoclient.GetLatestPrice(cc, tokenID); err == nil { -// prevPrice = p.Price -// prevDecimal = int(p.Decimal) -// } -// -// for t := range fInfo.updateCh { -// // update Params if changed, paramsUpdate will be notified to corresponding feeder, not all -// if params := t.params; params != nil { -// startBlock = params.startBlock -// endBlock = params.endBlock -// interval = params.interval -// decimal = int(params.decimal) -// } -// -// // update latest price if changed -// if len(t.price) > 0 { -// prevPrice = t.price -// prevDecimal = t.decimal -// prevHeight = t.txHeight -// // this is an tx event with height==0, so just don't submit any messages, tx will be triggered by newBlock event -// continue -// } else if t.priceUpdated && prevHeight < t.height { -// // this is a newblock event and this case is: newBlock event arrived before tx event, (interval>=2*maxNonce, so interval must > 1, so we skip one block is safe) -// // wait txEvent to update the price -// continue -// } -// // check feeder status to feed price -// logger.Info("Triggered by new Block", "block_height", t.height, "feederID", feederID, "start_base_block", fInfo.params.startBlock, "round_interval", fInfo.params.interval, "start_roundID", fInfo.params.startRoundID) -// if t.height < startBlock { -// // tx event will have zero height, just don't submit price -// continue -// } -// if endBlock > 0 && t.height >= endBlock { -// // TODO: notify corresponding token fetcher -// return -// } -// delta := (t.height - startBlock) % interval -// roundID := (t.height-startBlock)/interval + fInfo.params.startRoundID -// if delta < 3 { -// //TODO: for v1 exocored, we do no restrictions on sources, so here we hardcode source information for nativetoken and normal token -// source := conf.Sources[0] -// if strings.EqualFold(fInfo.params.tokenName, fetchertypes.NativeTokenETH) { -// logger.Info("nstETH, use beaconchain instead of chainlink as source", "block_height", t.height, "feederID", feederID, "start_roundID", fInfo.params.startRoundID) -// source = fetchertypes.BeaconChain -// } -// // TODO: use source based on oracle-params -// // f.GetLatestPriceFromSourceToken(conf.Sources[0], fInfo.params.tokenName, pChan) -// f.GetLatestPriceFromSourceToken(source, fInfo.params.tokenName, pChan) -// p := <-pChan -// if p == nil { -// continue -// } -// // TODO: this price should be compared with the current price from oracle, not from source -// if prevDecimal > -1 && prevPrice == p.Price && prevDecimal == p.Decimal { -// // if prevPrice not changed between different rounds, we don't submit any messages and the oracle module will use the price from former round to update next round. -// logger.Info("price not changed, skip submitting price", "roundID", roundID, "feederID", feederID) -// continue -// } -// if len(p.Price) == 0 { -// logger.Info("price has not been fetched yet, skip submitting price", "roundID", roundID, "feederID", feederID) -// continue -// } -// basedBlock := t.height - delta -// -// if !(fInfo.params.tokenName == fetchertypes.NativeTokenETH) { -// if p.Decimal > decimal { -// p.Price = p.Price[:len(p.Price)-int(p.Decimal-decimal)] -// p.Decimal = decimal -// } else if p.Decimal < decimal { -// p.Price = p.Price + strings.Repeat("0", decimal-p.Decimal) -// p.Decimal = decimal -// } -// } -// logger.Info("submit create-price tx", "price", p.Price, "decimal", p.Decimal, "tokeName", fInfo.params.tokenName, "block_height", t.height, "roundID", roundID, "feederID", feederID) -// res := exoclient.SendTx(cc, feederID, basedBlock, p.Price, p.RoundID, p.Decimal, int32(delta)+1, t.gas) -// txResponse := res.GetTxResponse() -// if txResponse.Code == statusOk { -// logger.Info("sendTx succeeded", "feederID", feederID) -// } else { -// logger.Error("sendTx failed", "feederID", feederID, "response_rawlog", txResponse.RawLog) -// } -// } -// } -// } -// -// // triggerFeeders will trigger tokenFeeder based on arrival events to check and send create-price tx to update prices onto exocore chain -// -// func triggerFeeders(r exoclient.ReCh, fInfo *feederInfo, event eventRes, oracleP oracletypes.Params, feederIDsPriceUpdated []string) { -// eventCpy := event -// if r.ParamsUpdate { -// // check if this tokenFeeder's params has been changed -// if update := fInfo.params.update(oracleP); update { -// paramsCopy := *fInfo.params -// eventCpy.params = ¶msCopy -// } -// } -// for _, p := range r.Price { -// parsedPrice := strings.Split(p, "_") -// if fInfo.params.tokenIDStr == parsedPrice[0] { -// if fInfo.latestPrice != strings.Join(parsedPrice[1:], "_") { -// decimal := int64(0) -// if l := len(parsedPrice); l > 4 { -// // this is possible in nst case -// eventCpy.price = strings.Join(parsedPrice[2:l-1], "_") -// decimal, _ = strconv.ParseInt(parsedPrice[l-1], 10, 32) -// } else { -// eventCpy.price = parsedPrice[2] -// decimal, _ = strconv.ParseInt(parsedPrice[3], 10, 32) -// } -// // eventCpy.price = parsedPrice[2] -// // decimal, _ := strconv.ParseInt(parsedPrice[3], 10, 32) -// eventCpy.decimal = int(decimal) -// eventCpy.txHeight, _ = strconv.ParseUint(r.TxHeight, 10, 64) -// eventCpy.roundID, _ = strconv.ParseUint(parsedPrice[1], 10, 64) -// fInfo.latestPrice = strings.Join(parsedPrice[1:], "_") -// } -// break -// } -// } -// -// for _, feederID := range feederIDsPriceUpdated { -// if feederID == strconv.FormatInt(fInfo.params.feederID, 10) { -// eventCpy.priceUpdated = true -// } -// } -// -// // notify corresponding feeder to update price -// fInfo.updateCh <- eventCpy -// } -// -// // initializeNativeRestkingStakers initialize stakers' validator list since we wrap validator set into one single staker for each native-restaking-asset -// // func initializeNativeRestakingStakers(cc *grpc.ClientConn) map[string][]*oracleTypes.StakerInfo { -// // logger := getLogger() -// // ret := make(map[string][]*oracleTypes.StakerInfo) -// // for _, v := range types.NativeRestakings { -// // stakerInfos, err := exoclient.GetStakerInfos(cc, types.AssetIDMap[v[1]]) -// // for err != nil { -// // logger.Error("Failed to get stakerInfos, retrying...", "error", err) -// // stakerInfos, err = exoclient.GetStakerInfos(cc, types.NativeTokenETHAssetID) -// // time.Sleep(2 * time.Second) -// // } -// // ret[v[1]] = stakerInfos -// // } -// // return ret -// // } -// -// // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed -func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) { - // init fetcher, start fetchers to get prices from sources - err := fetcher.Init(conf.Tokens, sourcesPath) - if err != nil { - logger.Error("failed to init fetcher", "error", err) - panic(err) +// getOracleParamsWithMaxRetry, get oracle params with max retry +// blocked +func getOracleParamsWithMaxRetry(maxRetry int, ecClient exoclient.ExoClientInf, logger feedertypes.LoggerInf) (oracleP *oracletypes.Params, err error) { + if maxRetry <= 0 { + maxRetry = defaultMaxRetry } + for i := 0; i < maxRetry; i++ { + oracleP, err = ecClient.GetParams() + if err == nil { + return + } + logger.Error("Failed to get oracle params, retrying...", "count", i, "max", maxRetry, "error", err) + time.Sleep(retryInterval) + } + return +} - // init exoclient - err = exoclient.Init(conf, mnemonic, privFile, standalone) +func ResetAllStakerValidators(ec exoclient.ExoClientInf, logger feedertypes.LoggerInf) error { + stakerInfos, err := ec.GetStakerInfos(fetchertypes.GetNSTAssetID(fetchertypes.NativeTokenETH)) if err != nil { - logger.Error("failed to init exoclient", "error", err) - panic(err) + return fmt.Errorf("failed to get stakerInfos for native-restaking-eth, error:%w", err) } + if len(stakerInfos) > 0 { + f, _ := fetcher.GetFetcher() + if err := f.InitNSTStakerValidators(stakerInfos); err != nil { + return fmt.Errorf("failed to set stakerInfs for native-restaking-eth, error:%w", err) + } + } + return nil +} - // // start fetching on all supported sources and tokens - // logger.Info("start fetching prices from all sources") - // _ = f.StartAll() +// // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed +func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) { + count := 0 + for count < defaultMaxRetry { + count++ + // init fetcher, start fetchers to get prices from sources + err := fetcher.Init(conf.Tokens, sourcesPath) + if err != nil { + logger.Error("failed to init fetcher", "error", err) + panic(err) + } - ec, _ := exoclient.GetClient() - _, err = ec.GetParams() - for err != nil { - // retry forever until be interrupted manually - logger.Info("failed to get oracle params on start, retrying...", "error", err) - time.Sleep(2 * time.Second) + // init exoclient + err = exoclient.Init(conf, mnemonic, privFile, standalone) + if err != nil { + if errors.Is(err, feedertypes.ErrInitConnectionFail) { + logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", defaultMaxRetry, "error", err) + time.Sleep(retryInterval) + continue + } + logger.Error("failed to init exoclient", "error", err) + panic(err) + } + + ec, _ := exoclient.GetClient() _, err = ec.GetParams() + for err != nil { + // retry forever until be interrupted manually + logger.Info("failed to get oracle params on start, retrying...", "error", err) + time.Sleep(2 * time.Second) + _, err = ec.GetParams() + } + + // init native stakerlist for nstETH(beaconchain) + if err := ResetAllStakerValidators(ec, logger); err != nil { + panic(fmt.Sprintf("failed in initialize nst:%v", err)) + } + + logger.Info("Initialization for price-feeder done") + break } - logger.Info("Initialization for price-feeder done") } diff --git a/cmd/types.go b/cmd/types.go index 058f24f..9031eaa 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -1,6 +1,8 @@ package cmd import ( + "errors" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" @@ -11,9 +13,10 @@ import ( type priceFetcher interface { GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) + AddTokenForSource(source, token string) bool } type priceSubmitter interface { - SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) + SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) } type signInfo struct { @@ -42,7 +45,28 @@ func (s *signInfo) revertNonce(roundID int64) { } } -type Feeder struct { +type triggerHeights struct { + commitHeight int64 + priceHeight int64 +} +type updatePrice struct { + txHeight int64 + price *fetchertypes.PriceInfo +} +type updateParamsReq struct { + params *oracletypes.Params + result chan *updateParamsRes +} +type updateParamsRes struct { +} + +type localPrice struct { + price fetchertypes.PriceInfo + height int64 +} + +// TODO: stop channel to close +type feeder struct { logger feedertypes.LoggerInf // TODO: currently only 1 source for each token, so we can just set it as a field here source string @@ -56,22 +80,51 @@ type Feeder struct { interval int64 endBlock int64 - maxNonce int32 + // maxNonce int32 - fetcher priceFetcher - submitter priceSubmitter - price fetchertypes.PriceInfo - latestSent *signInfo + fetcher priceFetcher + submitter priceSubmitter + lastPrice *localPrice + lastSent *signInfo - priceCh chan *fetchertypes.PriceInfo - heightCh chan int64 - // TODO: a sending routine - // chan struct{basedBlock, priceInfo, nonce, gas} + priceCh chan *updatePrice + heightsCh chan *triggerHeights + paramsCh chan *updateParamsReq +} + +type feederInfo struct { + source string + token string + tokenID uint64 + feederID int + // TODO: add check for rouleID, v1 can be skipped + // ruleID + startRoundID int64 + startBaseBlock int64 + interval int64 + endBlock int64 + lastPrice localPrice + lastSent signInfo +} + +func (f *feeder) Info() feederInfo { + return feederInfo{ + source: f.source, + token: f.token, + tokenID: f.tokenID, + feederID: f.feederID, + startRoundID: f.startRoundID, + startBaseBlock: f.startBaseBlock, + interval: f.interval, + endBlock: f.endBlock, + lastPrice: *f.lastPrice, + lastSent: *f.lastSent, + } } // NewFeeder new a feeder struct from oracletypes' tokenfeeder -func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *Feeder { - return &Feeder{ +func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { + return &feeder{ logger: logger, source: source, token: token, @@ -83,220 +136,316 @@ func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, interval: int64(tf.Interval), endBlock: int64(tf.EndBlock), - maxNonce: maxNonce, + // maxNonce: maxNonce, - fetcher: fetcher, - submitter: submitter, - price: fetchertypes.PriceInfo{}, - latestSent: &signInfo{}, + fetcher: fetcher, + submitter: submitter, + lastSent: &signInfo{ + maxNonce: maxNonce, + }, + lastPrice: &localPrice{}, - priceCh: make(chan *fetchertypes.PriceInfo, 1), - heightCh: make(chan int64, 1), + priceCh: make(chan *updatePrice, 1), + heightsCh: make(chan *triggerHeights, 1), + paramsCh: make(chan *updateParamsReq, 1), } } -// func (f *feeder) Start() (chan int64, chan *fetchertypes.PriceInfo) { -func (f *Feeder) Start() { - // trigger := make(chan int64, 1) - // update := make(chan *fetchertypes.PriceInfo) +func (f *feeder) start() { go func() { for { select { - case h := <-f.heightCh: - baseBlock, roundID, delta := f.calculateRound(h) + case h := <-f.heightsCh: + if h.priceHeight > f.lastPrice.height { + // the block event arrived early, wait for the price update evenst to update local price + break + } + baseBlock, roundID, delta, active := f.calculateRound(h.commitHeight) + if !active { + // f.logger.Info("didn't submit price for feeder due the the feeder expired", "height", h.commitHeight, "endBlock", f.endBlock) + break + } if delta < 3 { - if price, err := f.fetcher.getLatestPrice(f.source, f.token); err != nil { - f.logger.Error("failed to get latest price", "source", f.source, "token", f.token, "roundID", roundID, "delta", delta, "error", err) + f.logger.Info("trigger feeder", "height_commith", h.commitHeight, "height_price", h.priceHeight) + if price, err := f.fetcher.GetLatestPrice(f.source, f.token); err != nil { + f.logger.Error("failed to get latest price", "roundID", roundID, "delta", delta, "feeder", f.Info(), "error", err) + if errors.Is(err, feedertypes.ErrSrouceTokenNotConfigured) { + f.logger.Error("add token from configure of source", "token", f.token, "source", f.source) + // blocked this feeder since no available fetcher_source_price working + if added := f.fetcher.AddTokenForSource(f.source, f.token); !added { + f.logger.Error("failed to add token from configure, pleas update the config file of source", "token", f.token, "source", f.source) + } + } } else { if price.IsZero() { - f.logger.Info("got nil latest price, skip submitting price", "source", f.source, "token", f.token, "roundID", roundID, "delta", delta) + f.logger.Info("got nil latest price, skip submitting price", "roundID", roundID, "delta", delta) continue } - if price.Equal(f.price) { - f.logger.Info("didn't submit price due to price not changed", "source", f.source, "token", f.token, "roundID", roundID, "delta", delta) - f.logger.Debug("got latsetprice equal to local cache", "price", f.price) + if price.EqualPrice(f.lastPrice.price) { + f.logger.Info("didn't submit price due to price not changed", "roundID", roundID, "delta", delta, "price", price) + f.logger.Debug("got latsetprice equal to local cache", "feeder", f.Info()) + continue } - if nonce := f.latestSent.getNextNonceAndUpdate(roundID); nonce < 0 { - f.logger.Info("didn't submit due to no available nonce", "roundID", roundID, "delta", delta) + if nonce := f.lastSent.getNextNonceAndUpdate(roundID); nonce < 0 { + f.logger.Error("failed to submit due to no available nonce", "roundID", roundID, "delta", delta, "feeder", f.Info()) } else { - f.logger.Info("send tx to submit price", "feederID", f.feederID, "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) - res, err := f.submitter.SendTx(uint64(f.feederID), uint64(baseBlock), price.Price, price.RoundID, price.Decimal, nonce) + // f.logger.Info("send tx to submit price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) + res, err := f.submitter.SendTx(uint64(f.feederID), uint64(baseBlock), price, nonce) if err != nil { - f.logger.Info("failed to send tx submitting price", "feederID", f.feederID, "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "error_feeder", err) + f.lastSent.revertNonce(roundID) + f.logger.Error("failed to send tx submitting price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "feeder", f.Info(), "error_feeder", err) } if txResponse := res.GetTxResponse(); txResponse.Code == statusOk { - f.logger.Info("sent tx to submit price", "feederID", f.feederID, "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) + f.logger.Info("sent tx to submit price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) } else { - f.logger.Error("failed to send tx submitting price", "feederID", f.feederID, "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "response_rawlog", txResponse.RawLog) + f.lastSent.revertNonce(roundID) + f.logger.Error("failed to send tx submitting price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "feeder", f.Info(), "response_rawlog", txResponse.RawLog) } } } } case price := <-f.priceCh: - f.price = *price + f.lastPrice.price = *(price.price) + // update latest height that price had been updated + f.lastPrice.height = price.txHeight + case req := <-f.paramsCh: + f.updateFeederParams(req.params) + req.result <- &updateParamsRes{} } } }() - // return nil, nil +} + +// UpdateParams updates the feeder's params from oracle params, this method will block if the channel is full +// which means the update for params will must be delivered to the feeder's routine when this method is called +// blocked +func (f *feeder) updateParams(params *oracletypes.Params) chan *updateParamsRes { + // TODO update oracle parms + res := make(chan *updateParamsRes) + req := &updateParamsReq{params: params, result: res} + f.paramsCh <- req + return res } // UpdatePrice will upate local price for feeder // non-blocked -func (f *Feeder) UpdatePrice(price *fetchertypes.PriceInfo) { +func (f *feeder) updatePrice(txHeight int64, price *fetchertypes.PriceInfo) { // we dont't block this process when the channelis full, if this updating is skipped // it will be update at next time when event arrived select { - case f.priceCh <- price: + case f.priceCh <- &updatePrice{price: price, txHeight: txHeight}: default: } } -func (f *Feeder) Trigger(height int64) { + +// Trigger notify the feeder that a new block height is committed +// non-blocked +func (f *feeder) trigger(commitHeight, priceHeight int64) { // the channel got 1 buffer, so it should always been sent successfully // and if not(the channel if full), we just skip this height and don't block select { - case f.heightCh <- height: + case f.heightsCh <- &triggerHeights{commitHeight: commitHeight, priceHeight: priceHeight}: default: } } +func (f *feeder) updateFeederParams(p *oracletypes.Params) { + // TODO: update feeder's params + +} + // TODO: stop feeder routine // func (f *feeder) Stop() -func (f *Feeder) calculateRound(h int64) (baseBlock, roundID, delta int64) { +func (f *feeder) calculateRound(h int64) (baseBlock, roundID, delta int64, active bool) { + // endBlock itself is considered as active + if f.startBaseBlock > h || (f.endBlock > 0 && h > f.endBlock) { + return + } + active = true delta = (h - f.startBaseBlock) % f.interval roundID = (h-f.startBaseBlock)/f.interval + f.startRoundID baseBlock = h - delta return } -type updatePriceReq struct { - price *fetchertypes.PriceInfo - feederID int +type triggerReq struct { + height int64 + feederIDs map[int64]struct{} } -type updateNSTReq struct { - feederID int +type finalPrice struct { + feederID int64 + price string + decimal int32 + roundID string } - -type Feeders struct { - feederMap map[int]*Feeder - // TODO: feeder has sync management, so feeders could remove these channel - trigger chan int64 - updatePrice chan *updatePriceReq - updateNST chan *updateNSTReq +type updatePricesReq struct { + txHeight int64 + prices []*finalPrice } -// type updatePriceReq struct { -// price *fetchertypes.PriceInfo -// feederID int64 -// } -// -// type updateNSTReq struct { -// feederID int64 -// } +type feederMap map[int]*feeder -func NewFeeders(feederMap map[int]*Feeder) *Feeders { +// NewFeederMap new a map feeder> +func NewFeederMap() feederMap { + return make(map[int]*feeder) +} +func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { return &Feeders{ - feederMap: feederMap, + logger: logger, + feederMap: fm, // don't block on height increasing - trigger: make(chan int64, 1), - updatePrice: make(chan *updatePriceReq), - updateNST: make(chan *updateNSTReq), + trigger: make(chan *triggerReq, 1), + updatePrice: make(chan *updatePricesReq, 1), + // block on update-params delivery, no buffer + updateParams: make(chan *oracletypes.Params), } } -// TODO: remove channels +// Add adds a new feeder with feederID into the map +func (fm feederMap) Add(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) { + fm[feederID] = &feeder{ + logger: logger, + source: source, + token: token, + tokenID: tf.TokenID, + feederID: feederID, + // these conversion a safe since the block height defined in cosmossdk is int64 + startRoundID: int64(tf.StartRoundID), + startBaseBlock: int64(tf.StartBaseBlock), + interval: int64(tf.Interval), + endBlock: int64(tf.EndBlock), + fetcher: fetcher, + submitter: submitter, + lastPrice: &localPrice{}, + lastSent: &signInfo{ + maxNonce: maxNonce, + }, + + priceCh: make(chan *updatePrice, 1), + heightsCh: make(chan *triggerHeights, 1), + paramsCh: make(chan *updateParamsReq, 1), + } +} + +type Feeders struct { + logger feedertypes.LoggerInf + feederMap map[int]*feeder + // TODO: feeder has sync management, so feeders could remove these channel + trigger chan *triggerReq + updatePrice chan *updatePricesReq + updateParams chan *oracletypes.Params + // updateNST chan *updateNSTReq +} + +// Start will start to listen the trigger(newHeight) and updatePrice events +// usd channels to avoid race condition on map func (fs *Feeders) Start() { - // TODO: buffer_1 ? - // trigger := make(chan int64) - // updatePrice := make(chan *updatePriceReq) - // updateNST := make(chan *updateNSTReq) for _, f := range fs.feederMap { - f.Start() + f.start() } go func() { for { select { - case height := <-fs.trigger: + case params := <-fs.updateParams: + results := []chan *updateParamsRes{} + for _, f := range fs.feederMap { + res := f.updateParams(params) + results = append(results, res) + } + // wait for all feeders to complete updateing params + for _, res := range results { + <-res + } + // TODO: add newly added tokenfeeders if exists + + case t := <-fs.trigger: // the order does not matter for _, f := range fs.feederMap { - f.Trigger(height) + priceHeight := int64(0) + if _, ok := t.feederIDs[int64(f.feederID)]; ok { + priceHeight = t.height + } + f.trigger(t.height, priceHeight) } case req := <-fs.updatePrice: - fs.feederMap[req.feederID].UpdatePrice(req.price) - case nstInfo := <-fs.updateNST: - // TODO: update staker's validatorList - _ = nstInfo + for _, price := range req.prices { + // int conversion is safe + if feeder, ok := fs.feederMap[int(price.feederID)]; !ok { + fs.logger.Error("failed to get feeder by feederID when update price for feeders", "updatePriceReq", req) + continue + } else { + feeder.updatePrice(req.txHeight, &fetchertypes.PriceInfo{ + Price: price.price, + Decimal: price.decimal, + RoundID: price.roundID, + }) + } + } } } }() } -func (fs *Feeders) Trigger() { - +// Trigger notify all feeders that a new block height is committed +// non-blocked +func (fs *Feeders) Trigger(height int64, feederIDs map[int64]struct{}) { + select { + case fs.trigger <- &triggerReq{height: height, feederIDs: feederIDs}: + default: + } } -func (fs *Feeders) UpdatePrice() { - +// UpdatePrice will upate local price for all feeders +// non-blocked +func (fs *Feeders) UpdatePrice(txHeight int64, prices []*finalPrice) { + select { + case fs.updatePrice <- &updatePricesReq{txHeight: txHeight, prices: prices}: + default: + } } -func (fs *Feeders) UpdateNST() { - +// UpdateOracleParams updates all feeders' params from oracle params +// blocking until all feeders update their params +// when this method returned, it's guaranteed no further actions received wll be executed unless that all feeders have updated their params +func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { + fs.updateParams <- p } // Define the types for the feeder -type feederParams struct { - startRoundID uint64 - startBlock uint64 - endBlock uint64 - interval uint64 - decimal int32 - tokenIDStr string - feederID int64 - tokenName string -} - -// Define the types for the event -type eventRes struct { - height uint64 - txHeight uint64 - gas int64 - price string - decimal int - roundID uint64 - params *feederParams - priceUpdated bool - nativeToken string -} - -// Define the types for the feederInfo -type feederInfo struct { - params *feederParams - latestPrice string - updateCh chan eventRes -} - -func (f *feederParams) update(p oracletypes.Params) (updated bool) { - tokenFeeder := p.TokenFeeders[f.feederID] - if tokenFeeder.StartBaseBlock != f.startBlock { - f.startBlock = tokenFeeder.StartBaseBlock - updated = true - } - if tokenFeeder.EndBlock != f.endBlock { - f.endBlock = tokenFeeder.EndBlock - updated = true - } - if tokenFeeder.Interval != f.interval { - f.interval = tokenFeeder.Interval - updated = true - } - if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { - f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal - updated = true - } - return -} +// type feederParams struct { +// startRoundID uint64 +// startBlock uint64 +// endBlock uint64 +// interval uint64 +// decimal int32 +// tokenIDStr string +// feederID int64 +// tokenName string +// } +// +// func (f *feederParams) update(p oracletypes.Params) (updated bool) { +// tokenFeeder := p.TokenFeeders[f.feederID] +// if tokenFeeder.StartBaseBlock != f.startBlock { +// f.startBlock = tokenFeeder.StartBaseBlock +// updated = true +// } +// if tokenFeeder.EndBlock != f.endBlock { +// f.endBlock = tokenFeeder.EndBlock +// updated = true +// } +// if tokenFeeder.Interval != f.interval { +// f.interval = tokenFeeder.Interval +// updated = true +// } +// if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { +// f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal +// updated = true +// } +// return +// } func getLogger() types.LoggerInf { return types.GetLogger("") diff --git a/exoclient/client.go b/exoclient/client.go index d4df225..7063fcc 100644 --- a/exoclient/client.go +++ b/exoclient/client.go @@ -18,14 +18,12 @@ import ( "google.golang.org/grpc" ) -var _ exoClientInf = &exoClient{} +var _ ExoClientInf = &exoClient{} // exoClient implements exoClientInf interface to serve as a grpc client to interact with eoxored grpc service type exoClient struct { logger feedertypes.LoggerInf grpcConn *grpc.ClientConn - // cancel used to cancel grpc connection - cancel func() // params for sign/send transactions privKey cryptotypes.PrivKey @@ -48,7 +46,8 @@ type exoClient struct { wsLock *sync.Mutex wsActiveRoutines *int wsActive *bool - wsEventsCh chan EventRes + // wsEventsCh chan EventRes + wsEventsCh chan EventInf // client to query from exocored oracleClient oracletypes.QueryClient @@ -67,13 +66,14 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, pri wsActive: new(bool), wsLock: new(sync.Mutex), wsStop: make(chan struct{}), - wsEventsCh: make(chan EventRes), + wsEventsCh: make(chan EventInf), } var err error - ec.grpcConn, ec.cancel, err = createGrpcConn(endpoint, encCfg) + ec.logger.Info("establish grpc connection") + ec.grpcConn, err = createGrpcConn(endpoint, encCfg) if err != nil { - return nil, fmt.Errorf("failed to create new Exoclient, endpoint:%s, error:%w", err) + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create new Exoclient, endpoint:%s, error:%v", endpoint, err)) } // setup txClient @@ -91,9 +91,10 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, pri }, Proxy: http.ProxyFromEnvironment, } + ec.logger.Info("establish ws connection") ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{}) if err != nil { - return nil, fmt.Errorf("failed to create ws connection, error:%w", err) + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err)) } ec.wsClient.SetPongHandler(func(string) error { return nil @@ -109,16 +110,14 @@ func (ec *exoClient) Close() { // Close close grpc connection func (ec *exoClient) CloseGRPC() { - if ec.cancel == nil { - return - } - ec.cancel() + ec.grpcConn.Close() } func (ec *exoClient) CloseWs() { if ec.wsClient == nil { return } + ec.StopWsRoutines() ec.wsClient.Close() } diff --git a/exoclient/grpc.go b/exoclient/grpc.go index 919f7e8..4d83456 100644 --- a/exoclient/grpc.go +++ b/exoclient/grpc.go @@ -13,8 +13,8 @@ import ( ) // CreateGrpcConn creates an grpc connection to the target -func createGrpcConn(target string, encCfg params.EncodingConfig) (conn *grpc.ClientConn, cancelFunc func(), err error) { - ctx, cancel := context.WithCancel(context.Background()) +func createGrpcConn(target string, encCfg params.EncodingConfig) (conn *grpc.ClientConn, err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) grpcConn, err := grpc.DialContext( ctx, @@ -23,14 +23,15 @@ func createGrpcConn(target string, encCfg params.EncodingConfig) (conn *grpc.Cli grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(encCfg.InterfaceRegistry).GRPCCodec())), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, + Time: 150 * time.Second, Timeout: 5 * time.Second, PermitWithoutStream: true, }), + grpc.WithBlock(), ) if err != nil { - return nil, nil, fmt.Errorf("failed to create grpc connection, error:%w", err) + return nil, fmt.Errorf("failed to create grpc connection, error:%w", err) } - return grpcConn, cancel, nil + return grpcConn, nil } diff --git a/exoclient/query.go b/exoclient/query.go index b77d894..1267afe 100644 --- a/exoclient/query.go +++ b/exoclient/query.go @@ -8,12 +8,12 @@ import ( ) // GetParams queries oracle params -func (ec exoClient) GetParams() (oracleTypes.Params, error) { +func (ec exoClient) GetParams() (*oracleTypes.Params, error) { paramsRes, err := ec.oracleClient.Params(context.Background(), &oracleTypes.QueryParamsRequest{}) if err != nil { - return oracleTypes.Params{}, fmt.Errorf("failed to query oracle params from oracleClient, error:%w", err) + return &oracleTypes.Params{}, fmt.Errorf("failed to query oracle params from oracleClient, error:%w", err) } - return paramsRes.Params, nil + return ¶msRes.Params, nil } diff --git a/exoclient/subscribe.go b/exoclient/subscribe.go index 344a11b..4ac86bf 100644 --- a/exoclient/subscribe.go +++ b/exoclient/subscribe.go @@ -11,21 +11,31 @@ import ( ) type subEvent string +type eventQuery string const ( // subTypeNewBlock subType = "tm.event='NewBlock'" // subTypeTxUpdatePrice subType = "tm.event='Tx' AND create_price.price_update='success'" // subTypeTxNativeToken subType = "tm.event='Tx' AND create_price.native_token_update='update'" - subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}` - reconnectInterval = 3 - maxRetry = 600 - success = "success" + subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}` + reconnectInterval = 3 + maxRetry = 600 + success = "success" + eNewBlock eventQuery = "tm.event='NewBlock'" + eTxUpdatePrice eventQuery = "tm.event='Tx' AND create_price.price_update='success'" + eTxNativeToken eventQuery = "tm.event='Tx' AND create_price.native_token_update='update'" ) var ( - subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'")) - subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'")) - subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'")) + // subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'")) + subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock)) + // subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'")) + subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, eTxUpdatePrice)) + // subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'")) + subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, eTxNativeToken)) + // eNewBlock subEvent = "tm.event='NewBlock'" + // eTxUpdatePrice subEvent = "tm.event='Tx' AND create_price.price_update='success'" + // eTxNativeToken subEvent = "tm.event='Tx' AND create_price.native_token_update='update'" events = map[subEvent]bool{ subNewBlock: true, @@ -66,7 +76,7 @@ func (ec exoClient) Subscribe() { }() } -func (ec exoClient) EventsCh() chan EventRes { +func (ec exoClient) EventsCh() chan EventInf { return ec.wsEventsCh } @@ -76,12 +86,12 @@ func (ec exoClient) EventsCh() chan EventRes { // 3. routine: read events from ws connection func (ec exoClient) startTasks() { // ws connection stopped, reset subscriber - ec.logger.Info("establish ws connection") - if err := ec.connectWs(maxRetry); err != nil { - // continue - panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err)) - } - // ec.markWsActive() + //ec.logger.Info("establish ws connection") + // if err := ec.connectWs(maxRetry); err != nil { + // // continue + // panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err)) + // } + ec.markWsActive() ec.logger.Info("subscribe to ws publish", "events", events) if err := ec.sendAllSubscribeMsgs(maxRetry); err != nil { panic(fmt.Sprintf("failed to send subscribe messages after maxRetry:%d, error:%w", maxRetry, err)) @@ -102,7 +112,7 @@ func (ec exoClient) connectWs(maxRetry int) error { return errors.New("wsEndpoint not set in exoClient") } var err error - ec.wsClient.Close() + // ec.wsClient.Close() count := 0 for count < maxRetry { if ec.wsClient, _, err = ec.wsDialer.Dial(ec.wsEndpoint, http.Header{}); err == nil { @@ -120,7 +130,7 @@ func (ec exoClient) connectWs(maxRetry int) error { return fmt.Errorf("failed to dial ws endpoint, endpoint:%s, error:%w", ec.wsEndpoint, err) } -func (ec exoClient) closeWs() { +func (ec *exoClient) StopWsRoutines() { ec.wsLock.Lock() select { case _, ok := <-ec.wsStop: @@ -193,10 +203,12 @@ func (ec exoClient) sendAllSubscribeMsgs(maxRetry int) error { } if err := ec.wsClient.WriteMessage(websocket.TextMessage, []byte(event)); err == nil { events[event] = false + allSet = true } else { ec.logger.Error("failed to send subscribe", "message", event, "error", err) } } + time.Sleep(2 * time.Second) } if !allSet { return errors.New(fmt.Sprintf("failed to send all subscribe messages, events:%v", events)) @@ -223,7 +235,7 @@ func (ec exoClient) startPingRoutine() bool { if err := ec.wsClient.WriteMessage(websocket.PingMessage, []byte{}); err != nil { logger.Error("failed to write ping message to ws connection, close ws connection, ", "error", err) logger.Info("send signal to stop all running ws routines") - ec.closeWs() + ec.StopWsRoutines() return } case <-ec.wsStop: @@ -265,42 +277,39 @@ func (ec exoClient) startReadRoutine() bool { } logger.Info("send signal to stop all running ws routines") // send signal to stop all running ws routines - ec.closeWs() + ec.StopWsRoutines() return } var response SubscribeResult err = json.Unmarshal(data, &response) if err != nil { - ec.logger.Error("failed to pase response from publisher, skip", "error", err) + ec.logger.Error("failed to parse response from publisher, skip", "error", err) continue } - received := EventRes{} - - switch subEvent(response.Result.Query) { - case subNewBlock: - received.Height = response.Result.Data.Value.Block.Header.Height - events := response.Result.Events - if len(events.Fee) > 0 { - received.Gas = events.Fee[0] + switch eventQuery(response.Result.Query) { + case eNewBlock: + event, err := response.GetEventNewBlock() + if err != nil { + ec.logger.Error("failed to get newBlock event from event-response", "response", response, "error", err) } - if len(events.ParamsUpdate) > 0 { - received.ParamsUpdate = true + ec.wsEventsCh <- event + case eTxUpdatePrice: + event, err := response.GetEventUpdatePrice() + if err != nil { + ec.logger.Error("failed to get updatePrice event from event-response", "response", response, "error", err) + break } - // TODO: for oracle v1, this should not happen, since this event only emitted in tx, But if we add more modes to support final price generation in endblock, this would be necessaray. - if len(events.PriceUpdate) > 0 && events.PriceUpdate[0] == success { - received.FeederIDs = events.FeederIDs[0] - } - ec.wsEventsCh <- received - case subTxUpdatePrice: - // as we filtered for price_udpate=success, this means price has been updated this block - events := response.Result.Events - received.Price = events.FinalPrice - received.TxHeight = response.Result.Data.Value.TxResult.Height - ec.wsEventsCh <- received - case subTxNativeToken: + ec.wsEventsCh <- event + case eTxNativeToken: // update validator list for staker - received.NativeETH = response.Result.Events.NativeTokenChange[0] + event, err := response.GetEventUpdateNST() + if err != nil { + ec.logger.Error("failed to get nativeToken event from event-response", "response", response, "error", err) + break + } + ec.wsEventsCh <- event default: + ec.logger.Error("failed to parse unknown event type", "response-data", string(data)) } } } diff --git a/exoclient/tx.go b/exoclient/tx.go index cfd55ae..c0bc89f 100644 --- a/exoclient/tx.go +++ b/exoclient/tx.go @@ -5,7 +5,8 @@ import ( "fmt" "time" - oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" @@ -16,20 +17,21 @@ import ( ) // SendTx signs a create-price transaction and send it to exocored -func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) { +// func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) { +func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) { // build create-price message - msg := oracleTypes.NewMsgCreatePrice( + msg := oracletypes.NewMsgCreatePrice( sdk.AccAddress(ec.pubKey.Address()).String(), feederID, - []*oracleTypes.PriceSource{ + []*oracletypes.PriceSource{ { SourceID: Chainlink, - Prices: []*oracleTypes.PriceTimeDetID{ + Prices: []*oracletypes.PriceTimeDetID{ { - Price: price, - Decimal: int32(decimal), + Price: price.Price, + Decimal: price.Decimal, Timestamp: time.Now().UTC().Format(layout), - DetID: roundID, + DetID: price.RoundID, }, }, Desc: "", diff --git a/exoclient/types.go b/exoclient/types.go index e65e09d..d43790e 100644 --- a/exoclient/types.go +++ b/exoclient/types.go @@ -4,14 +4,18 @@ import ( cryptoed25519 "crypto/ed25519" "encoding/base64" "encoding/json" + "errors" "fmt" "os" "path" + "strconv" + "strings" "github.com/ExocoreNetwork/exocore/app" cmdcfg "github.com/ExocoreNetwork/exocore/cmd/config" oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" @@ -21,20 +25,171 @@ import ( "github.com/evmos/evmos/v16/encoding" ) -type exoClientInf interface { +type ExoClientInf interface { // Query - GetParams() (oracletypes.Params, error) + GetParams() (*oracletypes.Params, error) GetLatestPrice(tokenID uint64) (oracletypes.PriceTimeRound, error) GetStakerInfos(assetID string) ([]*oracleTypes.StakerInfo, error) GetStakerInfo(assetID, stakerAddr string) ([]*oracleTypes.StakerInfo, error) // Tx - SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) + SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) // Ws subscriber Subscribe() } +type EventInf interface { + Type() EventType +} + +type EventNewBlock struct { + height int64 + gas string + paramsUpdate bool + feederIDs map[int64]struct{} +} + +func (s *SubscribeResult) GetEventNewBlock() (*EventNewBlock, error) { + height, ok := s.BlockHeight() + if !ok { + return nil, errors.New("failed to get height from event_newBlock response") + } + fee, ok := s.Fee() + if !ok { + return nil, errors.New("failed to get gas from event_newBlock response") + } + feederIDs, ok := s.FeederIDs() + if !ok { + return nil, errors.New("failed to get feederIDs from event_newBlock response") + } + + return &EventNewBlock{ + height: height, + gas: fee, + paramsUpdate: s.ParamsUpdate(), + feederIDs: feederIDs, + }, nil +} +func (e *EventNewBlock) Height() int64 { + return e.height +} +func (e *EventNewBlock) Gas() string { + return e.gas +} +func (e *EventNewBlock) ParamsUpdate() bool { + return e.paramsUpdate +} +func (e *EventNewBlock) FeederIDs() map[int64]struct{} { + return e.feederIDs +} +func (e *EventNewBlock) Type() EventType { + return ENewBlock +} + +type FinalPrice struct { + tokenID int64 + roundID string + price string + decimal int32 +} + +func (f *FinalPrice) TokenID() int64 { + return f.tokenID +} +func (f *FinalPrice) RoundID() string { + return f.roundID +} +func (f *FinalPrice) Price() string { + return f.price +} +func (f *FinalPrice) Decimal() int32 { + return f.decimal +} + +type EventUpdatePrice struct { + prices []*FinalPrice + txHeight int64 +} + +func (s *SubscribeResult) GetEventUpdatePrice() (*EventUpdatePrice, error) { + prices, ok := s.FinalPrice() + if !ok { + return nil, errors.New("failed to get finalPrice from event_txUpdatePrice response") + } + txHeight, ok := s.TxHeight() + if !ok { + return nil, errors.New("failed to get txHeight from event_txUpdatePrice response") + } + return &EventUpdatePrice{ + prices: prices, + txHeight: txHeight, + }, nil +} +func (e *EventUpdatePrice) Prices() []*FinalPrice { + return e.prices +} +func (e *EventUpdatePrice) TxHeight() int64 { + return e.txHeight +} +func (e *EventUpdatePrice) Type() EventType { + return EUpdatePrice +} + +// EventUpdateNST tells the detail about the beaconchain-validator change for a staker +type EventUpdateNST struct { + deposit bool + stakerID int64 + validatorIndex string + index int64 +} + +func (s *SubscribeResult) GetEventUpdateNST() (*EventUpdateNST, error) { + nstChange, ok := s.NSTChange() + if !ok { + return nil, errors.New("failed to get NativeTokenChange from event_txUpdateNST response") + } + parsed := strings.Split(nstChange, "_") + if len(parsed) != 4 { + return nil, fmt.Errorf("failed to parse nstChange due to length is not expected 4 from event_txUPdatedNST response", "nstChange_str", nstChange) + } + deposit := parsed[0] == "deposit" + stakerID, err := strconv.ParseInt(parsed[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse stakerID in nstChange from evetn_txUpdateNST response, error:%w", err) + } + // validatorIndex, err := strconv.ParseInt(strings.TrimPrefix(parsed[2], "0x"), 16, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse beaconchain_valdiatorIndex in nstChange from event_txUpdateNST response, error:%w", err) + } + index, err := strconv.ParseInt(parsed[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse beaconchain_sync_index in nstChange from event_txUpdateNST response, error:%w", err) + } + return &EventUpdateNST{ + deposit: deposit, + stakerID: stakerID, + // validatorIndex: validatorIndex, + validatorIndex: parsed[2], + index: index, + }, nil +} +func (e *EventUpdateNST) Deposit() bool { + return e.deposit +} +func (e *EventUpdateNST) StakerID() int64 { + return e.stakerID +} +func (e *EventUpdateNST) ValidatorIndex() string { + return e.validatorIndex +} +func (e *EventUpdateNST) Index() int64 { + return e.index +} +func (e *EventUpdateNST) Type() EventType { + return EUpdateNST +} + type EventType int type EventRes struct { @@ -45,6 +200,7 @@ type EventRes struct { FeederIDs string TxHeight string NativeETH string + eventMessage interface{} Type EventType } @@ -76,6 +232,105 @@ type SubscribeResult struct { } `json:"result"` } +func (s *SubscribeResult) BlockHeight() (int64, bool) { + if h := s.Result.Data.Value.Block.Header.Height; len(h) > 0 { + height, err := strconv.ParseInt(h, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from height in SubscribeResult", "error", err, "height_str", h) + } + return height, true + } + return 0, false +} + +func (s *SubscribeResult) TxHeight() (int64, bool) { + if h := s.Result.Data.Value.TxResult.Height; len(h) > 0 { + height, err := strconv.ParseInt(h, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from txheight in SubscribeResult", "error", err, "height_str", h) + } + return height, true + } + return 0, false +} + +// FeederIDs will return (nil, true) when there's no feederIDs +func (s *SubscribeResult) FeederIDs() (feederIDs map[int64]struct{}, valid bool) { + events := s.Result.Events + if len(events.PriceUpdate) > 0 && events.PriceUpdate[0] == success { + if feederIDsStr := strings.Split(events.FeederIDs[0], "_"); len(feederIDsStr) > 0 { + feederIDs = make(map[int64]struct{}) + for _, feederIDStr := range feederIDsStr { + id, err := strconv.ParseInt(feederIDStr, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from feederIDs in subscribeResult", "feederIDs", feederIDs) + feederIDs = nil + return + } + feederIDs[id] = struct{}{} + } + valid = true + } + + } + // we don't take it as a 'false' case when there's no feederIDs + valid = true + return +} + +func (s *SubscribeResult) FinalPrice() (prices []*FinalPrice, valid bool) { + if fps := s.Result.Events.FinalPrice; len(fps) > 0 { + prices = make([]*FinalPrice, 0, len(fps)) + for _, price := range fps { + parsed := strings.Split(price, "_") + if len(parsed) != 4 { + logger.Error("failed to parse finalprice from subscribeResult", "finalPrice", price) + prices = nil + return + } + tokenID, err := strconv.ParseInt(parsed[0], 10, 64) + if err != nil { + logger.Error("failed to parse finalprice.tokenID from SubscribeResult", "parsed.tokenID", parsed[0]) + prices = nil + return + } + decimal, err := strconv.ParseInt(parsed[3], 10, 32) + if err != nil { + logger.Error("failed to parse finalprice.decimal from SubscribeResult", "parsed.decimal", parsed[3]) + prices = nil + return + } + prices = append(prices, &FinalPrice{ + tokenID: tokenID, + roundID: parsed[1], + price: parsed[2], + // conversion is safe + decimal: int32(decimal), + }) + } + valid = true + } + return +} + +func (s *SubscribeResult) NSTChange() (string, bool) { + if len(s.Result.Events.NativeTokenChange[0]) == 0 { + return "", false + } + return s.Result.Events.NativeTokenChange[0], true +} + +func (s *SubscribeResult) ParamsUpdate() bool { + return len(s.Result.Events.ParamsUpdate) > 0 +} + +func (s *SubscribeResult) Fee() (string, bool) { + if len(s.Result.Events.Fee) == 0 { + return "", false + } + return s.Result.Events.Fee[0], true +} + const ( // current version of 'Oracle' only support id=1(chainlink) as valid source Chainlink uint64 = 1 @@ -84,9 +339,9 @@ const ( ) const ( - NewBlock EventType = iota + 1 - UpdatePrice - UpdateNST + ENewBlock EventType = iota + 1 + EUpdatePrice + EUpdateNST ) var ( @@ -159,6 +414,9 @@ func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) e var err error if defaultExoClient, err = NewExoClient(logger, confExocore.Rpc, confExocore.Ws, privKey, encCfg, confExocore.ChainID); err != nil { + if errors.Is(err, feedertypes.ErrInitConnectionFail) { + return err + } return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to NewExoClient, privKey:%v, chainID:%s, error:%v", privKey, confExocore.ChainID, err)) } diff --git a/fetcher/beaconchain/beaconchain.go b/fetcher/beaconchain/beaconchain.go index ec81aac..62f4424 100644 --- a/fetcher/beaconchain/beaconchain.go +++ b/fetcher/beaconchain/beaconchain.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "math/big" "net/http" "net/url" "sort" @@ -15,7 +14,7 @@ import ( oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/ExocoreNetwork/price-feeder/fetcher/types" - "github.com/ethereum/go-ethereum/common/hexutil" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/imroc/biu" ) @@ -69,7 +68,8 @@ var ( // } // // stakerValidators = map[int]*validatorList{2: {0, validatorsTmp}} - stakerValidators = make(map[int]*validatorList) + // stakerValidators = make(map[int]*validatorList) + defaultStakerValidators = newStakerVList() // latest finalized epoch we've got balances summarized for stakers finalizedEpoch uint64 @@ -81,100 +81,28 @@ var ( slotsPerEpoch uint64 ) -func ResetStakerValidators(stakerInfos []*oracletypes.StakerInfo) { - lock.Lock() - for _, sInfo := range stakerInfos { - index := uint64(0) - if l := len(sInfo.BalanceList); l > 0 { - index = sInfo.BalanceList[l-1].Index - } - // convert bytes into number of beaconchain validator index - validators := make([]string, 0, len(sInfo.ValidatorPubkeyList)) - for _, validatorPubkey := range sInfo.ValidatorPubkeyList { - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkeyNum := new(big.Int).SetBytes(validatorPubkeyBytes).String() - validators = append(validators, validatorPubkeyNum) - } - stakerValidators[int(sInfo.StakerIndex)] = &validatorList{ - index: index, - validators: validators, - } - } - lock.Unlock() +func ResetStakerValidators(stakerInfos []*oracletypes.StakerInfo, all bool) error { + return defaultStakerValidators.reset(stakerInfos, all) } - -// UpdateStakerValidators update staker's validators for deposit/withdraw events triggered by exocore -func UpdateStakerValidators(stakerIdx int, validatorPubkey string, deposit bool, index uint64) bool { - lock.Lock() - defer lock.Unlock() - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkey = new(big.Int).SetBytes(validatorPubkeyBytes).String() - // add a new valdiator for the staker +func UpdateStakerValidators(stakerIdx int, validatorIndexHex string, index uint64, deposit bool) bool { if deposit { - if vList, ok := stakerValidators[stakerIdx]; ok { - if vList.index+1 != index { - return false - } - vList.index++ - vList.validators = append(vList.validators, validatorPubkey) - } else { - stakerValidators[stakerIdx] = &validatorList{ - index: index, - validators: []string{validatorPubkey}, - } - } - } else { - // remove the existing validatorIndex for the corresponding staker - if vList, ok := stakerValidators[stakerIdx]; ok { - if vList.index+1 != index { - return false - } - vList.index++ - for idx, v := range vList.validators { - if v == validatorPubkey { - if len(vList.validators) == 1 { - delete(stakerValidators, stakerIdx) - break - } - vList.validators = append(vList.validators[:idx], vList.validators[idx+1:]...) - break - } - } - } + return defaultStakerValidators.addVIdx(stakerIdx, validatorIndexHex, index) } - return true -} - -func ResetStakerValidatorsForAll(stakerInfos []*oracletypes.StakerInfo) { - lock.Lock() - stakerValidators = make(map[int]*validatorList) - for _, stakerInfo := range stakerInfos { - validators := make([]string, 0, len(stakerInfo.ValidatorPubkeyList)) - for _, validatorPubkey := range stakerInfo.ValidatorPubkeyList { - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkeyNum := new(big.Int).SetBytes(validatorPubkeyBytes).String() - validators = append(validators, validatorPubkeyNum) - } - - index := uint64(0) - // TODO: this may not necessary, stakerInfo should have at least one entry in balanceList - if l := len(stakerInfo.BalanceList); l > 0 { - index = stakerInfo.BalanceList[l-1].Index - } - stakerValidators[int(stakerInfo.StakerIndex)] = &validatorList{ - index: index, - validators: validators, - } - } - lock.Unlock() + return defaultStakerValidators.removeVIdx(stakerIdx, validatorIndexHex, index) } func (s *source) fetch(token string) (*types.PriceInfo, error) { // check epoch, when epoch updated, update effective-balance - if token != types.NativeTokenETH { - logger.Error("only support native-eth-restaking", "expect", types.NativeTokenETH, "got", token) - return nil, errTokenNotSupported + if types.NSTToken(token) != types.NativeTokenETH { + return nil, feedertypes.ErrTokenNotSupported.Wrap(fmt.Sprintf("only support native-eth-restaking %s", types.NativeTokenETH)) + } + + stakerValidators := defaultStakerValidators.getStakerValidators() + if len(stakerValidators) == 0 { + // return zero price when there's no stakers + return &types.PriceInfo{}, nil } + // check if finalized epoch had been updated epoch, stateRoot, err := getFinalizedEpoch() if err != nil { @@ -190,9 +118,7 @@ func (s *source) fetch(token string) (*types.PriceInfo, error) { } stakerChanges := make([][]int, 0, len(stakerValidators)) - - lock.RLock() - logger.Info("fetch efb from beaconchain", "stakerList_length", len(stakerValidators)) + s.logger.Info("fetch efb from beaconchain", "stakerList_length", len(stakerValidators)) hasEFBChanged := false for stakerIdx, vList := range stakerValidators { stakerBalance := 0 @@ -226,19 +152,17 @@ func (s *source) fetch(token string) (*types.PriceInfo, error) { delta = maxChange } stakerChanges = append(stakerChanges, []int{stakerIdx, delta}) - logger.Info("fetched efb from beaconchain", "staker_index", stakerIdx, "balance_change", delta) + s.logger.Info("fetched efb from beaconchain", "staker_index", stakerIdx, "balance_change", delta) hasEFBChanged = true } } if !hasEFBChanged && len(stakerValidators) > 0 { - logger.Info("fetch efb from beaconchain, all efbs of validators remains to 32 without any change") + s.logger.Info("fetch efb from beaconchain, all efbs of validators remains to 32 without any change") } sort.Slice(stakerChanges, func(i, j int) bool { return stakerChanges[i][0] < stakerChanges[j][0] }) - lock.RUnlock() - finalizedEpoch = epoch latestChangesBytes = convertBalanceChangeToBytes(stakerChanges) diff --git a/fetcher/beaconchain/types.go b/fetcher/beaconchain/types.go index 3d21a9c..ff54bf3 100644 --- a/fetcher/beaconchain/types.go +++ b/fetcher/beaconchain/types.go @@ -2,9 +2,9 @@ package beaconchain import ( "encoding/json" - "errors" "fmt" "io" + "math/big" "net/http" "net/url" "os" @@ -12,21 +12,15 @@ import ( "strconv" "strings" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/ExocoreNetwork/price-feeder/fetcher/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/cometbft/cometbft/libs/sync" + "github.com/ethereum/go-ethereum/common/hexutil" "gopkg.in/yaml.v2" ) -// type stakerList struct { -// StakerAddrs []string -// } -// -// type validatorList struct { -// index uint64 -// validators []string -// } - type source struct { logger feedertypes.LoggerInf *types.Source @@ -42,6 +36,116 @@ type ResultConfig struct { } `json:"data"` } +type stakerVList struct { + locker *sync.RWMutex + sValidators map[int]*validatorList +} + +func newStakerVList() *stakerVList { + return &stakerVList{ + locker: new(sync.RWMutex), + sValidators: make(map[int]*validatorList), + } +} + +func (s *stakerVList) length() int { + if s == nil { + return 0 + } + s.locker.RLock() + l := len(s.sValidators) + s.locker.RUnlock() + return l +} + +func (s *stakerVList) getStakerValidators() map[int]*validatorList { + s.locker.RLock() + defer s.locker.RUnlock() + ret := make(map[int]*validatorList) + for stakerIdx, vList := range s.sValidators { + validators := make([]string, len(vList.validators)) + copy(validators, vList.validators) + ret[stakerIdx] = &validatorList{ + index: vList.index, + validators: validators, + } + } + return ret +} + +func (s *stakerVList) addVIdx(sIdx int, vIdx string, index uint64) bool { + s.locker.Lock() + defer s.locker.Unlock() + if vList, ok := s.sValidators[sIdx]; ok { + if vList.index+1 != index { + return false + } + vList.index++ + vList.validators = append(vList.validators, vIdx) + } else { + if index != 0 { + return false + } + s.sValidators[sIdx] = &validatorList{ + index: 0, + validators: []string{vIdx}, + } + } + return true +} +func (s *stakerVList) removeVIdx(sIdx int, vIdx string, index uint64) bool { + s.locker.RLock() + defer s.locker.RUnlock() + if vList, ok := s.sValidators[sIdx]; ok { + if vList.index+1 != index { + return false + } + vList.index++ + for idx, v := range vList.validators { + if v == vIdx { + if len(vList.validators) == 1 { + delete(s.sValidators, sIdx) + return true + } + vList.validators = append(vList.validators[:idx], vList.validators[idx+1:]...) + return true + } + } + } + return false +} + +func (s *stakerVList) reset(stakerInfos []*oracletypes.StakerInfo, all bool) error { + s.locker.Lock() + if all { + s.sValidators = make(map[int]*validatorList) + } + for _, stakerInfo := range stakerInfos { + validators := make([]string, 0, len(stakerInfo.ValidatorPubkeyList)) + for _, validatorIndexHex := range stakerInfo.ValidatorPubkeyList { + validatorIdx, err := convertHexToIntStr(validatorIndexHex) + if err != nil { + logger.Error("failed to convert validatorIndex from hex string to int", "validator-index-hex", validatorIndexHex) + return fmt.Errorf(fmt.Sprintf("failed to convert validatorIndex from hex string to int, validator-index-hex:%s", validatorIndexHex)) + } + validators = append(validators, validatorIdx) + } + + index := uint64(0) + // TODO: this may not necessary, stakerInfo should have at least one entry in balanceList + if l := len(stakerInfo.BalanceList); l > 0 { + index = stakerInfo.BalanceList[l-1].Index + } + s.sValidators[int(stakerInfo.StakerIndex)] = &validatorList{ + index: index, + validators: validators, + } + + } + s.locker.Unlock() + return nil +} + const ( envConf = "oracle_env_beaconchain.yaml" urlQuerySlotsPerEpoch = "eth/v1/config/spec" @@ -50,23 +154,19 @@ const ( var ( logger feedertypes.LoggerInf - lock sync.RWMutex defaultSource *source - - // errors - errTokenNotSupported = errors.New("token not supported") ) func init() { types.SourceInitializers[types.BeaconChain] = initBeaconchain } -func initBeaconchain(cfgPath string) (types.SourceInf, error) { - // init logger, panic immediately if logger has not been set properly - if logger = feedertypes.GetLogger("fetcher_beaconchain"); logger == nil { - panic("logger is not initialized") +func initBeaconchain(cfgPath string, l feedertypes.LoggerInf) (types.SourceInf, error) { + if logger = l; logger == nil { + if logger = feedertypes.GetLogger("fetcher_beaconchain"); logger == nil { + return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized") + } } - // init from config file cfg, err := parseConfig(cfgPath) if err != nil { @@ -121,8 +221,7 @@ func initBeaconchain(cfgPath string) (types.SourceInf, error) { // initialize native-restaking stakers' beaconchain-validator list // update nst assetID to be consistent with exocored. for beaconchain it's about different lzID - types.UpdateNativeAssetID(cfg.NSTID) - // types.Fetchers[types.BeaconChain] = Fetch + types.SetNativeAssetID(fetchertypes.NativeTokenETH, cfg.NSTID) return defaultSource, nil } @@ -138,3 +237,12 @@ func parseConfig(confPath string) (config, error) { } return cfg, nil } + +func convertHexToIntStr(hexStr string) (string, error) { + vBytes, err := hexutil.Decode(hexStr) + if err != nil { + return "", err + } + return new(big.Int).SetBytes(vBytes).String(), nil + +} diff --git a/fetcher/chainlink/chainlink.go b/fetcher/chainlink/chainlink.go index 46d2bfd..d1f34ef 100644 --- a/fetcher/chainlink/chainlink.go +++ b/fetcher/chainlink/chainlink.go @@ -8,7 +8,7 @@ import ( "strings" "time" - "github.com/ExocoreNetwork/price-feeder/fetcher/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -16,7 +16,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) -func (s *source) fetch(token string) (*types.PriceInfo, error) { +func (s *source) fetch(token string) (*fetchertypes.PriceInfo, error) { chainlinkPriceFeedProxy, ok := s.chainlinkProxy.get(token) if !ok { return nil, feedertypes.ErrSrouceTokenNotConfigured.Wrap(fmt.Sprintf("failed to get chainlinkProxy for token:%s for not set", token)) @@ -32,9 +32,9 @@ func (s *source) fetch(token string) (*types.PriceInfo, error) { return nil, fmt.Errorf("failed to get decimals, error:%w", err) } - return &types.PriceInfo{ + return &fetchertypes.PriceInfo{ Price: roundData.Answer.String(), - Decimal: int(decimals), + Decimal: int32(decimals), Timestamp: time.Now().String(), RoundID: roundData.RoundId.String(), }, nil @@ -79,7 +79,9 @@ func (s *source) reload(cfgPath string, token string) error { for tName, tContract := range cfg.Tokens { tName = strings.ToLower(tName) if strings.EqualFold(tName, token) { - s.chainlinkProxy.addToken(map[string]string{tName: tContract}) + if err := s.chainlinkProxy.addToken(map[string]string{tName: tContract}); err != nil { + s.logger.Error("failed to add proxy when do reload", "source", s.GetName(), "token", tName, "error", err) + } return nil } } diff --git a/fetcher/chainlink/types.go b/fetcher/chainlink/types.go index cb23aa5..dd7b470 100644 --- a/fetcher/chainlink/types.go +++ b/fetcher/chainlink/types.go @@ -93,9 +93,11 @@ func init() { types.SourceInitializers[types.Chainlink] = initChainlink } -func initChainlink(cfgPath string) (types.SourceInf, error) { - if logger = feedertypes.GetLogger("fetcher_chainlink"); logger == nil { - return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized") +func initChainlink(cfgPath string, l feedertypes.LoggerInf) (types.SourceInf, error) { + if logger = l; logger == nil { + if logger = feedertypes.GetLogger("fetcher_chainlink"); logger == nil { + return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized") + } } cfg, err := parseConfig(cfgPath) if err != nil { diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index 46a05f9..ca29a3e 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -7,44 +7,23 @@ import ( "sync" "unicode" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + "github.com/ExocoreNetwork/price-feeder/fetcher/beaconchain" "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" ) +const ( + loggerTag = "fetcher" + loggerTagPrefix = "fetcher_%s" +) + var ( - sourcesMap sync.Map - tokensMap sync.Map - logger feedertypes.LoggerInf + logger feedertypes.LoggerInf defaultFetcher *Fetcher ) -// defaultFetcher = &Fetcher{ -// sources: sourceIDs, -// interval: defaultInterval, -// newSource: make(chan struct { -// name string -// endpoint string -// }), -// // nativeTokenValidatorsUpdate: make(chan struct { -// // tokenName string -// // info string -// // success chan bool -// // }), -// configSource: make(chan struct { -// s string -// t string -// }), -// getLatestPriceWithSourceToken: make(chan struct { -// p chan *types.PriceInfo -// s string -// t string -// }), -// newTokenForSource: make(chan struct { -// sourceName string -// tokenName string -// }), -// } type Fetcher struct { logger feedertypes.LoggerInf locker *sync.Mutex @@ -53,17 +32,8 @@ type Fetcher struct { // source->map{token->price} priceReadList map[string]map[string]*types.PriceSync addSourceToken chan *addTokenForSourceReq - // addSourceToken chan struct { - // source string - // token string - // } getLatestPrice chan *getLatestPriceReq stop chan struct{} - // getLatestPrice chan struct { - // source string - // token string - // price chan types.PriceInfo - // } } type addTokenForSourceReq struct { source string @@ -82,7 +52,7 @@ type getLatestPriceRes struct { } func newGetLatestPriceReq(source, token string) (*getLatestPriceReq, chan *getLatestPriceRes) { - res := make(chan *getLatestPriceRes) + res := make(chan *getLatestPriceRes, 1) return &getLatestPriceReq{source: source, token: token, result: res}, res } @@ -92,8 +62,9 @@ func NewFetcher(logger feedertypes.LoggerInf, sources map[string]types.SourceInf locker: new(sync.Mutex), sources: sources, priceReadList: make(map[string]map[string]*types.PriceSync), - addSourceToken: make(chan *addTokenForSourceReq), - getLatestPrice: make(chan *getLatestPriceReq), + addSourceToken: make(chan *addTokenForSourceReq, 5), + // getLatestPrice: make(chan *getLatestPriceReq), + getLatestPrice: make(chan *getLatestPriceReq, 5), stop: make(chan struct{}), } } @@ -101,7 +72,7 @@ func NewFetcher(logger feedertypes.LoggerInf, sources map[string]types.SourceInf // AddTokenForSource adds token for existing source // blocked waiting for the result to return func (f *Fetcher) AddTokenForSource(source, token string) bool { - res := make(chan bool) + res := make(chan bool, 1) f.addSourceToken <- &addTokenForSourceReq{ source: source, token: token, @@ -110,16 +81,40 @@ func (f *Fetcher) AddTokenForSource(source, token string) bool { return <-res } +// TODO:: +func (f *Fetcher) AddTokenForSourceUnBlocked(source, token string) { + res := make(chan bool) + f.addSourceToken <- &addTokenForSourceReq{ + source: source, + token: token, + result: res, + } +} + +func (f *Fetcher) InitNSTStakerValidators(stakerInfos []*oracletypes.StakerInfo) error { + f.locker.Lock() + if f.running { + f.locker.Unlock() + return errors.New("failed to init staker's validatorList for nst on a running fetcher") + } + // TODO: refator this to avoid call beaconchain directly + if err := beaconchain.ResetStakerValidators(stakerInfos, true); err != nil { + return fmt.Errorf("failed to init staker's validaorList, error:%w", err) + } + f.locker.Unlock() + return nil +} + // GetLatestPrice return the queried price for the token from specified source // blocked waiting for the result to return -func (f Fetcher) GetLatestPrice(source, token string) (types.PriceInfo, error) { +func (f *Fetcher) GetLatestPrice(source, token string) (types.PriceInfo, error) { req, res := newGetLatestPriceReq(source, token) f.getLatestPrice <- req result := <-res return result.price, result.err } -func (f Fetcher) Start() error { +func (f *Fetcher) Start() error { f.locker.Lock() if f.running { f.locker.Unlock() @@ -140,47 +135,53 @@ func (f Fetcher) Start() error { f.locker.Unlock() go func() { - select { - case req := <-f.addSourceToken: - if source, ok := f.sources[req.source]; ok { - if res := source.AddTokenAndStart(req.token); res.Error() != nil { - // TODO: clean logs - f.logger.Error("failed to AddTokenAndStart", "source", source.GetName(), "token", req.token, "error", res.Error()) - req.result <- false + for { + + select { + case req := <-f.addSourceToken: + if source, ok := f.sources[req.source]; ok { + if res := source.AddTokenAndStart(req.token); res.Error() != nil { + // TODO: clean logs + f.logger.Error("failed to AddTokenAndStart", "source", source.GetName(), "token", req.token, "error", res.Error()) + req.result <- false + } else { + f.priceReadList[req.source][req.token] = res.Price() + req.result <- true + } } else { - f.priceReadList[req.source][req.token] = res.Price() - req.result <- true - } - } else { - // we don't support adding source dynamically - f.logger.Error("failed to add token for a nonexistent soruce", "source", req.source, "token", req.token) - req.result <- false - } - case req := <-f.getLatestPrice: - if s := f.priceReadList[req.source]; s == nil { - // f.logger.Error("failed to get price from a nonexistent source", "source", req.source, "token", req.token) - req.result <- &getLatestPriceRes{ - price: types.PriceInfo{}, - err: fmt.Errorf("failed to get price of token:%s from a nonexistent source:%s", req.token, req.source), + // we don't support adding source dynamically + f.logger.Error("failed to add token for a nonexistent soruce", "source", req.source, "token", req.token) + req.result <- false } - } else if price := s[req.token]; price == nil { - f.logger.Error("failed to get price of a nonexistent token from an existing source", "source", req.source, "token", req.token) - req.result <- &getLatestPriceRes{ - price: types.PriceInfo{}, - err: fmt.Errorf("failed to get price of token %s from a nonexistent token from an existing source", req.token, "source", req.source), + case req := <-f.getLatestPrice: + if s := f.priceReadList[req.source]; s == nil { + req.result <- &getLatestPriceRes{ + price: types.PriceInfo{}, + err: fmt.Errorf("failed to get price of token:%s from a nonexistent source:%s", req.token, req.source), + } + } else if price := s[req.token]; price == nil { + req.result <- &getLatestPriceRes{ + price: types.PriceInfo{}, + err: feedertypes.ErrSrouceTokenNotConfigured.Wrap(fmt.Sprintf("failed to get price of token:%s from a nonexistent token from an existing source:%s", req.token, req.source)), + // err: fmt.Errorf("failed to get price of token:%s from a nonexistent token from an existing source:%s", req.token, req.source), + } + } else { + req.result <- &getLatestPriceRes{ + price: price.Get(), + err: nil, + } } - } else { - req.result <- &getLatestPriceRes{ - price: price.Get(), - err: nil, + case <-f.stop: + f.locker.Lock() + // close all sources + for _, source := range f.sources { + source.Stop() } + f.running = false + f.locker.Unlock() + // TODO: stop all running sources + return } - case <-f.stop: - f.locker.Lock() - f.running = false - f.locker.Unlock() - // TODO: stop all running sources - return } }() return nil @@ -196,18 +197,17 @@ func (f Fetcher) Stop() { default: close(f.stop) } - //TODO: check and make sure all sources closed - f.running = false f.locker.Unlock() } // Init initializes the fetcher with sources and tokens func Init(tokenSources []feedertypes.TokenSources, sourcesPath string) error { - if logger = feedertypes.GetLogger("fetcher"); logger == nil { + if logger = feedertypes.GetLogger(loggerTag); logger == nil { panic("logger is not initialized") } sources := make(map[string]types.SourceInf) + sourceTokens := make(map[string][]string) for _, ts := range tokenSources { sNames := strings.Split(strings.Map(func(r rune) rune { if unicode.IsSpace(r) { @@ -217,24 +217,24 @@ func Init(tokenSources []feedertypes.TokenSources, sourcesPath string) error { }, ts.Sources), ",") var err error - sourceTokens := make(map[string][]string) // add sources with names for _, sName := range sNames { source := sources[sName] // new a source if not exists if source == nil { - source, err = types.SourceInitializers[sName](sourcesPath) + l := feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, sName)) + source, err = types.SourceInitializers[sName](sourcesPath, l) if err != nil { - return fmt.Errorf("failed to init source:%s, soources_config_path:%s, error:%w", sName, sourcesPath, err) + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to init source:%s, sources_config_path:%s, error:%v", sName, sourcesPath, err)) } sources[sName] = source } sourceTokens[sName] = append(sourceTokens[sName], ts.Token) } - // setup tokens for sources - for sName, tokens := range sourceTokens { - sources[sName].InitTokens(tokens) - } + } + // setup tokens for sources + for sName, tokens := range sourceTokens { + sources[sName].InitTokens(tokens) } defaultFetcher = NewFetcher(logger, sources) @@ -247,294 +247,3 @@ func GetFetcher() (*Fetcher, bool) { } return defaultFetcher, true } - -// source is the data source to fetch token prices -// type source struct { -// lock sync.Mutex -// running *atomic.Int32 -// stopCh chan struct{} -// stopResCh chan struct{} -// // source name, should be unique -// name string -// tokens *sync.Map -// // endpoint of the source to retreive the price data; eg. https://rpc.ankr.com/eth is used for chainlink's ethereum endpoint -// // might vary for different sources -// fetch types.FType -// } - -// func NewSource() *source { -// return &source{ -// running: atomic.NewInt32(-1), -// } -// } -// -// // set source's status as working -// func (s *source) start(interval time.Duration) bool { -// s.lock.Lock() -// if s.running.Load() == -1 { -// s.stopCh = make(chan struct{}) -// s.stopResCh = make(chan struct{}) -// s.running.Inc() -// s.lock.Unlock() -// s.Fetch(interval) -// return true -// } -// return false -// } -// -// // set source 's status as not working -// func (s *source) stop() bool { -// s.lock.Lock() -// defer s.lock.Unlock() -// select { -// case _, ok := <-s.stopCh: -// if ok { -// close(s.stopCh) -// } -// return false -// default: -// close(s.stopCh) -// <-s.stopResCh -// if !s.running.CompareAndSwap(0, -1) { -// panic("running count should be zero when all token fetchers stopped") -// } -// return true -// } -// } -// -// // AddToken not concurrency safe: stop->AddToken->start(all)(/startOne need lock/select to ensure concurrent safe -// func (s *source) AddToken(name string) bool { -// priceInfo := types.NewPriceSyc() -// _, loaded := s.tokens.LoadOrStore(name, priceInfo) -// if loaded { -// return false -// } -// // fetching the new token -// if tokenAny, found := tokensMap.Load(name); found && tokenAny.(*token).active { -// s.lock.Lock() -// s.running.Inc() -// s.lock.Unlock() -// go func(tName string) { -// // TODO: set interval for different sources -// tic := time.NewTicker(defaultInterval) -// for { -// select { -// case <-tic.C: -// price, err := s.fetch(tName) -// prevPrice := priceInfo.GetInfo() -// if err == nil && (prevPrice.Price != price.Price || prevPrice.Decimal != price.Decimal) { -// priceInfo.UpdateInfo(price) -// logger.Info("update token price", "token", tName, "price", price) -// } -// case <-s.stopCh: -// if zero := s.running.Dec(); zero == 0 { -// close(s.stopResCh) -// } -// return -// } -// } -// }(name) -// } -// return true -// } - -// Fetch token price from source -// func (s *source) Fetch(interval time.Duration) { -// s.tokens.Range(func(key, value any) bool { -// tName := key.(string) -// priceInfo := value.(*types.PriceSync) -// if tokenAny, found := tokensMap.Load(tName); found && tokenAny.(*token).active { -// s.lock.Lock() -// s.running.Inc() -// s.lock.Unlock() -// go func(tName string) { -// tic := time.NewTicker(interval) -// for { -// select { -// case <-tic.C: -// price, err := s.fetch(tName) -// prevPrice := priceInfo.GetInfo() -// if err == nil && (prevPrice.Price != price.Price || prevPrice.Decimal != price.Decimal) { -// priceInfo.UpdateInfo(price) -// logger.Info("update token price", "token", tName, "price", price) -// } -// case <-s.stopCh: -// if zero := s.running.Dec(); zero == 0 { -// close(s.stopResCh) -// } -// return -// } -// } -// }(tName) -// } -// return true -// }) -// } - -// type token struct { -// name string // chain_token_address, _address is optional -// // indicates if this token is still alive for price reporting -// active bool -// // endpoint of the token; eg. 0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419 is used for chainlink to identify specific token to fetch its price -// // format might vary for different source to use; eg. for chainlink, this is used to tell the token's address on ethereum(when we use ethereume's contract) -// //endpoint string -// } - -// Fetcher serves as the unique entry point to fetch token prices as background routine continuously -// type Fetcher struct { -// running atomic.Bool -// // source ids frmo the sourceSet -// sources []string -// // set the interval of fetching price, this value is the same for all source/tokens -// interval time.Duration -// // add new source -// newSource chan struct { -// name string -// endpoint string -// } -// newTokenForSource chan struct { -// sourceName string -// tokenName string -// } -// // withdraw/deposit_stakerIdx_validatorIndex -// // nativeTokenValidatorsUpdate chan struct { -// // tokenName string -// // info string -// // success chan bool -// // } -// // config source's token -// configSource chan struct { -// s string -// t string -// } -// getLatestPriceWithSourceToken chan struct { -// p chan *types.PriceInfo -// s string -// t string -// } -// } - -// func (f *Fetcher) AddTokenForSource(sourceName string, tokenName string) { -// f.newTokenForSource <- struct { -// sourceName string -// tokenName string -// }{sourceName, tokenName} -// } - -// StartAll runs the background routine to fetch prices -// func (f *Fetcher) StartAll() context.CancelFunc { -// if !f.running.CompareAndSwap(false, true) { -// return nil -// } -// ctx, cancel := context.WithCancel(context.Background()) -// for _, sName := range f.sources { -// if sourceAny, ok := sourcesMap.Load(sName); ok { -// // start fetcheing data from 'source', setup as a background routine -// logger.Info("start fetching prices", "source", sName, "interaval", f.interval) -// sourceAny.(*source).start(f.interval) -// } -// } -// -// // monitor routine for: 1. add new sources, 2. config tokens for existing source, 3. stop all running fetchers -// go func() { -// for { -// select { -// case <-ctx.Done(): -// // close all running sources -// for _, sName := range f.sources { -// if sourceAny, ok := sourcesMap.Load(sName); ok { -// // safe the do multiple times/ on stopped sources, this process is synced(blocked until one source is stopped completely) -// sourceAny.(*source).stop() -// } -// } -// return -// -// // TODO: add a new source and start fetching its data -// case <-f.newSource: -// case t := <-f.newTokenForSource: -// for _, sName := range f.sources { -// if sName == t.sourceName { -// // TODO: it's ok to add multiple times for the same token -// tokensMap.Store(t.tokenName, &token{name: t.tokenName, active: true}) -// if s, ok := sourcesMap.Load(t.sourceName); ok { -// // add token for source, so this source is running already -// s.(*source).AddToken(t.tokenName) -// } -// break -// } -// } -// // add tokens for a existing source -// case <-f.configSource: -// // TODO: we currently don't handle the request like 'remove token', if we do that support, we should take care of the process in reading routine -// } -// } -// }() -// -// // read routine, loop to serve for price quering -// go func() { -// // read cache, in this way, we don't need to lock every time for potential conflict with tokens update in source(like add one new token), only when we fail to found corresopnding token in this readList -// // TODO: we currently don't have process for 'remove-token' from source, so the price will just not be updated, and we don't clear the priceInfo(it's no big deal since only the latest values are kept, and for reader, they will either ont quering this value any more or find out the timestamp not updated like forever) -// readList := make(map[string]map[string]*types.PriceSync) -// for ps := range f.getLatestPriceWithSourceToken { -// s := readList[ps.s] -// if s == nil { -// if _, found := sourcesMap.Load(ps.s); found { -// readList[ps.s] = make(map[string]*types.PriceSync) -// s = readList[ps.s] -// } else { -// fmt.Println("source not exists") -// ps.p <- nil -// continue -// } -// } -// -// tPrice := s[ps.t] -// if tPrice == nil { -// if sourceAny, found := sourcesMap.Load(ps.s); found { -// if p, found := sourceAny.(*source).tokens.Load(ps.t); found { -// tPrice = p.(*types.PriceSync) -// s[ps.t] = tPrice -// } else { -// if len(s) == 0 { -// fmt.Println("source has no valid token being read, remove this source for reading", ps.s, ps.t) -// delete(readList, ps.s) -// } -// continue -// } -// } else { -// fmt.Println("source not exists any more, remove this source for reading") -// delete(readList, ps.s) -// continue -// } -// } -// pRes := tPrice.GetInfo() -// ps.p <- &pRes -// } -// }() -// return cancel -// } -// -// // GetLatestPriceFromSourceToken gets the latest price of a token from a source -// func (f *Fetcher) GetLatestPriceFromSourceToken(source, token string, c chan *types.PriceInfo) { -// f.getLatestPriceWithSourceToken <- struct { -// p chan *types.PriceInfo -// s string -// t string -// }{c, source, token} -// } -// -// // UpdateNativeTokenValidators updates validator list for stakers of native-restaking-token(client-chain) -// func (f *Fetcher) UpdateNativeTokenValidators(tokenName, updateInfo string) bool { -// parsedInfo := strings.Split(updateInfo, "_") -// if len(parsedInfo) != 4 { -// return false -// } -// stakerIdx, _ := strconv.ParseInt(parsedInfo[1], 10, 64) -// validatorPubkey := parsedInfo[2] -// validatorsSize, _ := strconv.ParseUint(parsedInfo[3], 10, 64) -// return beaconchain.UpdateStakerValidators(int(stakerIdx), validatorPubkey, parsedInfo[0] == "deposit", validatorsSize) -// } -// -// func (f *Fetcher) ResetStakerValidatorsForAll(tokenName string, stakerInfos []*oracletypes.StakerInfo) { -// beaconchain.ResetStakerValidatorsForAll(stakerInfos) -// } diff --git a/fetcher/types/types.go b/fetcher/types/types.go index b4b4f28..20387f3 100644 --- a/fetcher/types/types.go +++ b/fetcher/types/types.go @@ -39,7 +39,7 @@ type SourceInf interface { // RestarAllToken() } -type SourceInitFunc func(cfgPath string) (SourceInf, error) +type SourceInitFunc func(cfgPath string, logger feedertypes.LoggerInf) (SourceInf, error) type SourceFetchFunc func(token string) (*PriceInfo, error) @@ -54,7 +54,7 @@ type NativeRestakingInfo struct { type PriceInfo struct { Price string - Decimal int + Decimal int32 Timestamp string RoundID string } @@ -89,6 +89,8 @@ type addTokenRes struct { func (p PriceInfo) IsZero() bool { return len(p.Price) == 0 } + +// Equal compare two PriceInfo ignoring the timestamp field func (p PriceInfo) Equal(price PriceInfo) bool { if p.Price == price.Price && p.Decimal == price.Decimal && @@ -97,6 +99,13 @@ func (p PriceInfo) Equal(price PriceInfo) bool { } return false } +func (p PriceInfo) EqualPrice(price PriceInfo) bool { + if p.Price == price.Price && + p.Decimal == price.Decimal { + return true + } + return false +} func NewPriceSync() *PriceSync { return &PriceSync{ @@ -112,6 +121,16 @@ func (p *PriceSync) Get() PriceInfo { return price } +func (p *PriceSync) Update(price PriceInfo) (updated bool) { + p.lock.Lock() + if !price.EqualPrice(*p.info) { + *p.info = price + updated = true + } + p.lock.Unlock() + return +} + func (p *PriceSync) Set(price PriceInfo) { p.lock.Lock() *p.info = price @@ -190,16 +209,17 @@ type Source struct { // for sources they could utilitize this function to provide that 'SourceInf' by taking care of only the 'fetch' function func NewSource(logger feedertypes.LoggerInf, name string, fetch SourceFetchFunc, cfgPath string, reload SourceReloadConfigFunc) *Source { return &Source{ - logger: logger, - cfgPath: cfgPath, - name: name, - locker: new(sync.Mutex), - stop: make(chan struct{}), - tokens: make(map[string]*tokenInfo), - activeTokenCount: new(atomic.Int32), - priceList: make(map[string]*PriceSync), - interval: defaultInterval, - addToken: make(chan *addTokenReq, defaultPendingTokensLimit), + logger: logger, + cfgPath: cfgPath, + name: name, + locker: new(sync.Mutex), + stop: make(chan struct{}), + tokens: make(map[string]*tokenInfo), + activeTokenCount: new(atomic.Int32), + priceList: make(map[string]*PriceSync), + interval: defaultInterval, + addToken: make(chan *addTokenReq, defaultPendingTokensLimit), + tokenNotConfigured: make(chan string, 1), // pendingTokensCount: new(atomic.Int32), // pendingTokensLimit: defaultPendingTokensLimit, fetch: fetch, @@ -246,12 +266,12 @@ func (s *Source) Start() map[string]*PriceSync { ret := make(map[string]*PriceSync) for tName, token := range s.tokens { ret[tName] = token.GetPriceSync() - s.logger.Info("start fetching prices", "source", s.name, "token", token) + s.logger.Info("start fetching prices", "source", s.name, "token", token, "tokenName", token.name) s.startFetchToken(token) } // main routine of source, listen to: // addToken to add a new token for the source and start fetching that token's price - // tokenNotConfitured to reload the source's config file for required token + // tokenNotConfigured to reload the source's config file for required token // stop closes the source routines and set runnign status to false go func() { for { @@ -260,7 +280,9 @@ func (s *Source) Start() map[string]*PriceSync { price := NewPriceSync() // check token existence and then add to token list & start if not exists if token, ok := s.tokens[req.tokenName]; !ok { - s.tokens[req.tokenName] = NewTokenInfo(req.tokenName, price) + token = NewTokenInfo(req.tokenName, price) + // s.tokens[req.tokenName] = NewTokenInfo(req.tokenName, price) + s.tokens[req.tokenName] = token s.logger.Info("add a new token and start fetching price", "source", s.name, "token", req.tokenName) s.startFetchToken(token) } else { @@ -302,7 +324,7 @@ func (s *Source) AddTokenAndStart(token string) *addTokenRes { err: fmt.Errorf("didn't add token due to source:%s not running", s.name), } } - // we don't block the process when then channel is not available + // we don't block the process when the channel is not available // caller should handle the returned bool value properly addReq, addResCh := newAddTokenReq(token) select { @@ -329,7 +351,6 @@ func (s *Source) Stop() { default: close(s.stop) } - s.running = false s.locker.Unlock() } @@ -360,7 +381,10 @@ func (s *Source) startFetchToken(token *tokenInfo) { } } else { // update price - token.price.Set(*price) + updated := token.price.Update(*price) + if updated { + s.logger.Info("updated price", "source", s.name, "token", token.name, "price", *price) + } } } } @@ -392,20 +416,28 @@ func (s *Source) Status() map[string]*tokenStatus { return ret } +type NSTToken string + +// type NSTID string +// +// func (n NSTID) String() string { +// return string(n) +// } + const ( defaultPendingTokensLimit = 5 defaultInterval = 30 * time.Second Chainlink = "chainlink" BeaconChain = "beaconchain" + Solana = "solana" - NativeTokenETH = "nsteth" + NativeTokenETH NSTToken = "nsteth" + NativeTokenSOL NSTToken = "nstsol" DefaultSlotsPerEpoch = uint64(32) ) var ( - // source -> fetch method - // Fetchers = make(map[string]FetchFunc) // source -> initializers of source SourceInitializers = make(map[string]SourceInitFunc) ChainToSlotsPerEpoch = map[uint64]uint64{ @@ -414,25 +446,36 @@ var ( 40217: DefaultSlotsPerEpoch, } - NativeTokenETHAssetID = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee_0x65" - // NativeRestakings = map[string][]string{ - // "eth": {BeaconChain, NativeTokenETH}, - // } - // NativeRestakings = map[string]NativeRestakingInfo{ - // "eth": NativeRestakingInfo{ - // Chain: BeaconChain, - // TokenID: NativeTokenETH, - // }, - // } - - AssetIDMap = map[string]string{ - NativeTokenETH: NativeTokenETHAssetID, + NSTTokens = map[NSTToken]struct{}{ + NativeTokenETH: struct{}{}, + NativeTokenSOL: struct{}{}, + } + NSTAssetIDMap = make(map[NSTToken]string) + NSTSourceMap = map[NSTToken]string{ + NativeTokenETH: BeaconChain, + NativeTokenSOL: Solana, } Logger feedertypes.LoggerInf ) -func UpdateNativeAssetID(nstID string) { - NativeTokenETHAssetID = nstID - AssetIDMap[NativeTokenETH] = NativeTokenETHAssetID +func SetNativeAssetID(nstToken NSTToken, assetID string) { + NSTAssetIDMap[nstToken] = assetID +} + +// GetNSTSource returns source name as string +func GetNSTSource(nstToken NSTToken) string { + return NSTSourceMap[nstToken] +} + +// GetNSTAssetID returns nst assetID as string +func GetNSTAssetID(nstToken NSTToken) string { + return NSTAssetIDMap[nstToken] +} + +func IsNSTToken(tokenName string) bool { + if _, ok := NSTTokens[NSTToken(tokenName)]; ok { + return true + } + return false } diff --git a/types/types.go b/types/types.go index 8275c03..db5932f 100644 --- a/types/types.go +++ b/types/types.go @@ -10,6 +10,21 @@ import ( "go.uber.org/zap/zapcore" ) +// TODO: define the interface of fetchertypes.PriceInfo, for fetcher,exocclient to referenced +// PriceInfoInf defines the core structure which has the value/data that is fetched from out-exocore-chain source +// and submit to exocore-chain +// type PriceInfoInf interface { +// SetPrice() +// SetDecimal() +// SetRoundID() +// SetTimeStamp() +// +// GetPrice() +// GetDecimal() +// GetRoundID() +// GetTimeStamp() +// } + type PrivValidatorKey struct { Address string `json:"address"` PrivKey struct { @@ -33,10 +48,6 @@ type Config struct { AppName string `mapstructure:"appname"` Rpc string `mapstructure:"rpc"` Ws string `mapstructure:"ws"` - // Ws struct { - // Addr string `mapstructure:"addr"` - // Endpoint string `mapstructure:"endpoint"` - // } `mapstructure:"ws"` } `mapstructure:"exocore"` } @@ -69,6 +80,7 @@ func NewLogger(level zapcore.Level) *LoggerWrapper { config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") config.Encoding = "console" config.Level = zap.NewAtomicLevelAt(level) + config.EncoderConfig.StacktraceKey = "" logger, _ := config.Build() return &LoggerWrapper{ logger.Sugar(), @@ -134,36 +146,15 @@ func (e *Err) Unwrap() error { return e.parent } -// type Config struct { -// Tokens []TokenSources `mapstructure:"tokens"` -// // Tokens []struct { -// // Token string `mapstructure:"token"` -// // Sources string `mapstructure:"sources"` -// // } `mapstructure:"tokens"` -// // Sources []string `mapstructure:"sources"` -// // Tokens []string `mapstructure:"tokens"` -// Sender struct { -// Mnemonic string `mapstructure:"mnemonic"` -// Path string `mapstructure:"path"` -// } `mapstructure:"sender"` -// Exocore struct { -// ChainID string `mapstructure:"chainid"` -// AppName string `mapstructure:"appname"` -// Rpc string `mapstructure:"rpc"` -// Ws struct { -// Addr string `mapstructure:"addr"` -// Endpoint string `mapstructure:"endpoint"` -// } `mapstructure:"ws"` -// } `mapstructure:"exocore"` -// } - var ( ConfigFile string SourcesConfigPath string v *viper.Viper - ErrInitFail = NewErr("Failed to initialization") + ErrInitFail = NewErr("failed to initialization") + ErrInitConnectionFail = NewErr("failed to establish a connection") ErrSrouceTokenNotConfigured = NewErr("token not configured") + ErrTokenNotSupported = NewErr("token not supported") ) // InitConfig will only read path cfgFile once, and for reload after InitConfig, should use ReloadConfig