diff --git a/.gitignore b/.gitignore index 83b9e82..f5aee36 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ feed.sh config-*.yaml configs/ -node*.log +*.log pids build/ fetcher/beaconchain/oracle_env_beaconchain.yaml diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index 006bf8c..512fdaa 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -1,402 +1,204 @@ package cmd import ( - "strconv" + "errors" + "fmt" "strings" - "sync" "time" - oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/ExocoreNetwork/price-feeder/exoclient" "github.com/ExocoreNetwork/price-feeder/fetcher" - "github.com/ExocoreNetwork/price-feeder/fetcher/types" + "github.com/ExocoreNetwork/price-feeder/types" + + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + "github.com/ExocoreNetwork/price-feeder/fetcher/beaconchain" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - "google.golang.org/grpc" ) +type RetryConfig struct { + MaxAttempts int + Interval time.Duration +} + +// DefaultRetryConfig provides default retry settings +var DefaultRetryConfig = RetryConfig{ + MaxAttempts: 43200, // defaultMaxRetry + Interval: 2 * time.Second, +} + const ( statusOk = 0 privFile = "priv_validator_key.json" baseCurrency = "USDT" + + //feeder_tokenName_feederID + loggerTagPrefix = "feed_%s_%d" ) -var updateConfig sync.Mutex +// var updateConfig sync.Mutex // RunPriceFeeder runs price feeder to fetching price and feed to exocorechain func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) { + // init logger if logger = feedertypes.SetLogger(logger); logger == nil { panic("logger is not initialized") } - - // init fetcher, start fetchers to get prices from sources - f, err := fetcher.Init(conf.Sources, conf.Tokens, sourcesPath) - if err != nil { - logger.Error("failed to init fetcher", "error", err) + // init logger, fetchers, exocoreclient + if err := initComponents(logger, conf, standalone); err != nil { + logger.Error("failed to initialize components") panic(err) } - // start fetching on all supported sources and tokens - _ = f.StartAll() + // initComponents(logger, conf, standalone) - // init exoclient - cc, err := exoclient.Init(conf, mnemonic, privFile, standalone) - if err != nil { - logger.Error("failed to init exoclient", "error", err) - panic(err) + f, _ := fetcher.GetFetcher() + // start fetching on all supported sources and tokens + logger.Info("start fetching prices from all sources") + if err := f.Start(); err != nil { + panic(fmt.Sprintf("failed to start Fetcher, error:%v", err)) } - defer cc.Close() + + ecClient, _ := exoclient.GetClient() + defer ecClient.Close() // initialize oracle params by querying from exocore - oracleP, err := exoclient.GetParams(cc) - for err != nil { - // retry forever until be interrupted manually - logger.Error("Failed to get oracle params on start, retrying...", err) - time.Sleep(2 * time.Second) - oracleP, err = exoclient.GetParams(cc) + oracleP, err := getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger) + if err != nil { + panic(fmt.Sprintf("failed to get initial oracle params: %v", err)) } - // TODO: currently the getStakerInfos will return nil error when empty response, avoid infinite loop - // initialize staker's validator list for eth-native-restaking - nativeStakers := initializeNativeRestakingStakers(cc) - for nativeToken, stakerInfos := range nativeStakers { - f.ResetStakerValidatorsForAll(nativeToken, stakerInfos) - } + ecClient.Subscribe() - oracleParamsFeedingTokens := make(map[string]struct{}) - runningFeeders := make(map[int64]*feederInfo) - // remainingFeeders used to track token feeders that have been set in oracle params but with no configuration from price-feeder for price fetching, and if the lenght of map is bigger than 0, price-feeder will try continuously to reload the configure file until those feeders are able to work - remainningFeeders := make(map[string]*feederInfo) - // check all live feeders and set seperate routine to udpate prices + fsMap := NewFeederMap() + // we don't check empty tokenfeeders list + maxNonce := oracleP.MaxNonce for feederID, feeder := range oracleP.TokenFeeders { if feederID == 0 { - // feederID=0 is reserved - continue - } - var tokenName string - if strings.EqualFold(oracleP.Tokens[feeder.TokenID].Name, types.NativeTokenETH) { - tokenName = strings.ToLower(oracleP.Tokens[feeder.TokenID].Name) - } else { - tokenName = strings.ToLower(oracleP.Tokens[feeder.TokenID].Name + baseCurrency) - } - oracleParamsFeedingTokens[tokenName] = struct{}{} - decimal := oracleP.Tokens[feeder.TokenID].Decimal - fInfo := &feederInfo{ - params: &feederParams{ - startRoundID: feeder.StartRoundID, - startBlock: feeder.StartBaseBlock, - endBlock: feeder.EndBlock, - interval: feeder.Interval, - decimal: decimal, - tokenIDStr: strconv.FormatInt(int64(feeder.TokenID), 10), - feederID: int64(feederID), - tokenName: tokenName, - }, - } - remainningFeeders[tokenName] = fInfo - // TODO: refactor - if strings.EqualFold(tokenName, types.NativeTokenETH) { - // actually not usdt, we so need to do refactor for the mess - delete(remainningFeeders, tokenName) - trigger := make(chan eventRes, 3) - fInfo.updateCh = trigger - runningFeeders[int64(feederID)] = fInfo - // start a routine to update price for this feeder - go feedToken(fInfo, cc, f, conf) continue } - - // check if this feeder is supported by this price-feeder - for _, token := range conf.Tokens { - if strings.EqualFold(token, tokenName) { - delete(remainningFeeders, tokenName) - trigger := make(chan eventRes, 3) - fInfo.updateCh = trigger - runningFeeders[int64(feederID)] = fInfo - // start a routine to update price for this feeder - go feedToken(fInfo, cc, f, conf) - break + tokenName := strings.ToLower(oracleP.Tokens[feeder.TokenID].Name) + source := fetchertypes.Chainlink + if fetchertypes.IsNSTToken(tokenName) { + nstToken := fetchertypes.NSTToken(tokenName) + if source = fetchertypes.GetNSTSource(nstToken); len(source) == 0 { + panic(fmt.Sprintf("source of nst:%s is not set", tokenName)) } } + fsMap.Add(feeder, feederID, f, ecClient, source, tokenName, maxNonce, feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, tokenName, feederID))) } + feeders := fsMap.NewFeeders(logger) + feeders.Start() - newFeeder := make(chan *feederInfo) - // check config update for remaining tokenFeeders set in oracle params - if len(remainningFeeders) > 0 { - // we set a background routine to update config file until we got all remainning tokens configured in oracleParams - go reloadConfigToFetchNewTokens(remainningFeeders, newFeeder, cc, f) - } - - // subscribe newBlock to to trigger tx - res, _ := exoclient.Subscriber(conf.Exocore.Ws.Addr, conf.Exocore.Ws.Endpoint) - - for r := range res { - event := eventRes{} - var feederIDs []string - if len(r.Height) > 0 { - event.height, _ = strconv.ParseUint(r.Height, 10, 64) - } - if len(r.Gas) > 0 { - event.gas, _ = strconv.ParseInt(r.Gas, 10, 64) - } - if r.ParamsUpdate { - oracleP, err = exoclient.GetParams(cc) - for err != nil { - // retry forever until be interrupted manually - logger.Error("Failed to get oracle params when params update detected, retrying...", "error", err) - oracleP, err = exoclient.GetParams(cc) - time.Sleep(2 * time.Second) - } - // set newly added tokenfeeders in the running queue and reload the configure for them to run porperly - if remainningFeeders = updateCurrentFeedingTokens(oracleP, oracleParamsFeedingTokens); len(remainningFeeders) > 0 { - // reoload config for newly added token feeders - go reloadConfigToFetchNewTokens(remainningFeeders, newFeeder, cc, f) - } - } - - if len(r.FeederIDs) > 0 { - feederIDs = strings.Split(r.FeederIDs, "_") - } - - // this is an event that tells native token stakers update, we use 'ETH' temporary since we currently support eth-native-restaking only - if len(r.NativeETH) > 0 { - // TODO: we only support eth-native-restaking for now - if success := f.UpdateNativeTokenValidators(types.NativeTokenETH, r.NativeETH); !success { - stakerInfos, err := exoclient.GetStakerInfos(cc, types.NativeTokenETHAssetID) - for err != nil { - logger.Error("Failed to get stakerInfos, retrying...") - stakerInfos, err = exoclient.GetStakerInfos(cc, types.NativeTokenETHAssetID) - time.Sleep(2 * time.Second) + for event := range ecClient.EventsCh() { + switch e := event.(type) { + case *exoclient.EventNewBlock: + if paramsUpdate := e.ParamsUpdate(); paramsUpdate { + oracleP, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger) + if err != nil { + fmt.Printf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err) + return } - f.ResetStakerValidatorsForAll(types.NativeTokenETH, stakerInfos) + feeders.UpdateOracleParams(oracleP) + // TODO: add newly added tokenfeeders if exists } - } - - select { - case feeder := <-newFeeder: - runningFeeders[feeder.params.feederID] = feeder - default: - for _, fInfo := range runningFeeders { - triggerFeeders(r, fInfo, event, oracleP, feederIDs) + feeders.Trigger(e.Height(), e.FeederIDs()) + case *exoclient.EventUpdatePrice: + finalPrices := make([]*finalPrice, 0, len(e.Prices())) + for _, price := range e.Prices() { + feederIDList := oracleP.GetFeederIDsByTokenID(uint64(price.TokenID())) + l := len(feederIDList) + if l == 0 { + logger.Error("Failed to get feederIDs by tokenID when try to updata local price for feeders on event_updatePrice", "tokenID", price.TokenID()) + continue + } + feederID := feederIDList[l-1] + finalPrices = append(finalPrices, &finalPrice{ + feederID: int64(feederID), + price: price.Price(), + decimal: price.Decimal(), + roundID: price.RoundID(), + }) } - } - } -} - -// reloadConfigToFetchNewTokens reload config file for the remainning token feeders that are set in oracle params but not running properly for config missing -func reloadConfigToFetchNewTokens(remainningFeeders map[string]*feederInfo, newFeeder chan *feederInfo, cc *grpc.ClientConn, f *fetcher.Fetcher) { - logger := getLogger() - updateConfig.Lock() - length := len(remainningFeeders) - for length > 0 { - conf := feedertypes.ReloadConfig() - for tokenRemainning, fInfo := range remainningFeeders { - logger.Info("loading config for for token... ", "tokenName", fInfo.params.tokenName) - for _, token := range conf.Tokens { - if strings.EqualFold(token, tokenRemainning) { - delete(remainningFeeders, tokenRemainning) - length-- - trigger := make(chan eventRes, 3) - fInfo.updateCh = trigger - // TODO: currently support chainlink only (index=0) - f.AddTokenForSource(conf.Sources[0], tokenRemainning) - // start a routine to update price for this feeder - newFeeder <- fInfo - go feedToken(fInfo, cc, f, conf) - break + feeders.UpdatePrice(e.TxHeight(), finalPrices) + case *exoclient.EventUpdateNST: + // int conversion is safe + if updated := beaconchain.UpdateStakerValidators(int(e.StakerID()), e.ValidatorIndex(), uint64(e.Index()), e.Deposit()); !updated { + logger.Error("failed to update staker's validator list", "stakerID", e.StakerID(), "validatorIndex", e.ValidatorIndex, "deposit", e.Deposit(), "index", e.Index()) + // try to reset all validatorList + if err := ResetAllStakerValidators(ecClient, logger); err != nil { + logger.Error("failed to reset all staker's validators for native-restaking-eth") + // TODO: should we just clear all info to prevent further nst update } } } - time.Sleep(10 * time.Second) } - updateConfig.Unlock() } -// updateCurrentFeedingTokens will update current running tokenFeeders based on the params change from upstream, and it will add the newly added tokenFeeders into the running queue and return that list which will be handled by invoker to set them properly as running tokenFeeders -func updateCurrentFeedingTokens(oracleP oracleTypes.Params, currentFeedingTokens map[string]struct{}) map[string]*feederInfo { - remain := make(map[string]*feederInfo) - for feederID, feeder := range oracleP.TokenFeeders { - if feederID == 0 { - // feederID=0 is reserved - continue - } - tokenName := strings.ToLower(oracleP.Tokens[feeder.TokenID].Name + baseCurrency) - if _, ok := currentFeedingTokens[tokenName]; ok { - continue - } - decimal := oracleP.Tokens[feeder.TokenID].Decimal - fInfo := &feederInfo{ - params: &feederParams{ - startRoundID: feeder.StartRoundID, - startBlock: feeder.StartBaseBlock, - endBlock: feeder.EndBlock, - interval: feeder.Interval, - decimal: decimal, - tokenIDStr: strconv.FormatInt(int64(feeder.TokenID), 10), - feederID: int64(feederID), - tokenName: tokenName, - }, +// getOracleParamsWithMaxRetry, get oracle params with max retry +// blocked +func getOracleParamsWithMaxRetry(maxRetry int, ecClient exoclient.ExoClientInf, logger feedertypes.LoggerInf) (oracleP *oracletypes.Params, err error) { + if maxRetry <= 0 { + maxRetry = DefaultRetryConfig.MaxAttempts + } + for i := 0; i < maxRetry; i++ { + oracleP, err = ecClient.GetParams() + if err == nil { + return } - - remain[tokenName] = fInfo - currentFeedingTokens[tokenName] = struct{}{} + logger.Error("Failed to get oracle params, retrying...", "count", i, "max", maxRetry, "error", err) + time.Sleep(DefaultRetryConfig.Interval) } - return remain + return } -// feedToken will try to send create-price tx to update prices onto exocore chain when conditions reached including: tokenFeeder-running, inside-a-pricing-window, price-updated-since-previous-round -func feedToken(fInfo *feederInfo, cc *grpc.ClientConn, f *fetcher.Fetcher, conf feedertypes.Config) { - logger := getLogger() - pChan := make(chan *types.PriceInfo) - prevPrice := "" - prevDecimal := -1 - prevHeight := uint64(0) - tokenID, _ := strconv.ParseUint(fInfo.params.tokenIDStr, 10, 64) - - startBlock := fInfo.params.startBlock - endBlock := fInfo.params.endBlock - interval := fInfo.params.interval - decimal := int(fInfo.params.decimal) - feederID := uint64(fInfo.params.feederID) - - if p, err := exoclient.GetLatestPrice(cc, tokenID); err == nil { - prevPrice = p.Price - prevDecimal = int(p.Decimal) +func ResetAllStakerValidators(ec exoclient.ExoClientInf, logger feedertypes.LoggerInf) error { + stakerInfos, err := ec.GetStakerInfos(fetchertypes.GetNSTAssetID(fetchertypes.NativeTokenETH)) + if err != nil { + return fmt.Errorf("failed to get stakerInfos for native-restaking-eth, error:%w", err) } - - for t := range fInfo.updateCh { - // update Params if changed, paramsUpdate will be notified to corresponding feeder, not all - if params := t.params; params != nil { - startBlock = params.startBlock - endBlock = params.endBlock - interval = params.interval - decimal = int(params.decimal) - } - - // update latest price if changed - if len(t.price) > 0 { - prevPrice = t.price - prevDecimal = t.decimal - prevHeight = t.txHeight - // this is an tx event with height==0, so just don't submit any messages, tx will be triggered by newBlock event - continue - } else if t.priceUpdated && prevHeight < t.height { - // this is a newblock event and this case is: newBlock event arrived before tx event, (interval>=2*maxNonce, so interval must > 1, so we skip one block is safe) - // wait txEvent to update the price - continue - } - // check feeder status to feed price - logger.Info("Triggered by new Block", "block_height", t.height, "feederID", feederID, "start_base_block", fInfo.params.startBlock, "round_interval", fInfo.params.interval, "start_roundID", fInfo.params.startRoundID) - if t.height < startBlock { - // tx event will have zero height, just don't submit price - continue - } - if endBlock > 0 && t.height >= endBlock { - // TODO: notify corresponding token fetcher - return - } - delta := (t.height - startBlock) % interval - roundID := (t.height-startBlock)/interval + fInfo.params.startRoundID - if delta < 3 { - //TODO: for v1 exocored, we do no restrictions on sources, so here we hardcode source information for nativetoken and normal token - source := conf.Sources[0] - if strings.EqualFold(fInfo.params.tokenName, types.NativeTokenETH) { - logger.Info("nstETH, use beaconchain instead of chainlink as source", "block_height", t.height, "feederID", feederID, "start_roundID", fInfo.params.startRoundID) - source = types.BeaconChain - } - // TODO: use source based on oracle-params - // f.GetLatestPriceFromSourceToken(conf.Sources[0], fInfo.params.tokenName, pChan) - f.GetLatestPriceFromSourceToken(source, fInfo.params.tokenName, pChan) - p := <-pChan - if p == nil { - continue - } - // TODO: this price should be compared with the current price from oracle, not from source - if prevDecimal > -1 && prevPrice == p.Price && prevDecimal == p.Decimal { - // if prevPrice not changed between different rounds, we don't submit any messages and the oracle module will use the price from former round to update next round. - logger.Info("price not changed, skip submitting price", "roundID", roundID, "feederID", feederID) - continue - } - if len(p.Price) == 0 { - logger.Info("price has not been fetched yet, skip submitting price", "roundID", roundID, "feederID", feederID) - continue - } - basedBlock := t.height - delta - - if !(fInfo.params.tokenName == types.NativeTokenETH) { - if p.Decimal > decimal { - p.Price = p.Price[:len(p.Price)-int(p.Decimal-decimal)] - p.Decimal = decimal - } else if p.Decimal < decimal { - p.Price = p.Price + strings.Repeat("0", decimal-p.Decimal) - p.Decimal = decimal - } - } - logger.Info("submit create-price tx", "price", p.Price, "decimal", p.Decimal, "tokeName", fInfo.params.tokenName, "block_height", t.height, "roundID", roundID, "feederID", feederID) - res := exoclient.SendTx(cc, feederID, basedBlock, p.Price, p.RoundID, p.Decimal, int32(delta)+1, t.gas) - txResponse := res.GetTxResponse() - if txResponse.Code == statusOk { - logger.Info("sendTx succeeded", "feederID", feederID) - } else { - logger.Error("sendTx failed", "feederID", feederID, "response_rawlog", txResponse.RawLog) - } + if len(stakerInfos) > 0 { + if err := beaconchain.ResetStakerValidators(stakerInfos, true); err != nil { + return fmt.Errorf("failed to set stakerInfs for native-restaking-eth, error:%w", err) } } + return nil } -// triggerFeeders will trigger tokenFeeder based on arrival events to check and send create-price tx to update prices onto exocore chain -func triggerFeeders(r exoclient.ReCh, fInfo *feederInfo, event eventRes, oracleP oracleTypes.Params, feederIDsPriceUpdated []string) { - eventCpy := event - if r.ParamsUpdate { - // check if this tokenFeeder's params has been changed - if update := fInfo.params.update(oracleP); update { - paramsCopy := *fInfo.params - eventCpy.params = ¶msCopy - } - } - for _, p := range r.Price { - parsedPrice := strings.Split(p, "_") - if fInfo.params.tokenIDStr == parsedPrice[0] { - if fInfo.latestPrice != strings.Join(parsedPrice[1:], "_") { - decimal := int64(0) - if l := len(parsedPrice); l > 4 { - // this is possible in nst case - eventCpy.price = strings.Join(parsedPrice[2:l-1], "_") - decimal, _ = strconv.ParseInt(parsedPrice[l-1], 10, 32) - } else { - eventCpy.price = parsedPrice[2] - decimal, _ = strconv.ParseInt(parsedPrice[3], 10, 32) - } - // eventCpy.price = parsedPrice[2] - // decimal, _ := strconv.ParseInt(parsedPrice[3], 10, 32) - eventCpy.decimal = int(decimal) - eventCpy.txHeight, _ = strconv.ParseUint(r.TxHeight, 10, 64) - eventCpy.roundID, _ = strconv.ParseUint(parsedPrice[1], 10, 64) - fInfo.latestPrice = strings.Join(parsedPrice[1:], "_") +// // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed +func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) error { + count := 0 + for count < DefaultRetryConfig.MaxAttempts { + count++ + // init fetcher, start fetchers to get prices from sources + err := fetcher.Init(conf.Tokens, sourcesPath) + if err != nil { + return fmt.Errorf("failed to init fetcher, error:%w", err) + } + + // init exoclient + err = exoclient.Init(conf, mnemonic, privFile, standalone) + if err != nil { + if errors.Is(err, feedertypes.ErrInitConnectionFail) { + logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err) + time.Sleep(DefaultRetryConfig.Interval) + continue } - break + return fmt.Errorf("failed to init exoclient, error;%w", err) } - } - for _, feederID := range feederIDsPriceUpdated { - if feederID == strconv.FormatInt(fInfo.params.feederID, 10) { - eventCpy.priceUpdated = true - } - } + ec, _ := exoclient.GetClient() - // notify corresponding feeder to update price - fInfo.updateCh <- eventCpy -} + _, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ec, logger) + if err != nil { + return fmt.Errorf("failed to get oracle params on start, error:%w", err) + } -// initializeNativeRestkingStakers initialize stakers' validator list since we wrap validator set into one single staker for each native-restaking-asset -func initializeNativeRestakingStakers(cc *grpc.ClientConn) map[string][]*oracleTypes.StakerInfo { - logger := getLogger() - ret := make(map[string][]*oracleTypes.StakerInfo) - for _, v := range types.NativeRestakings { - stakerInfos, err := exoclient.GetStakerInfos(cc, types.AssetIDMap[v[1]]) - for err != nil { - logger.Error("Failed to get stakerInfos, retrying...", "error", err) - stakerInfos, err = exoclient.GetStakerInfos(cc, types.NativeTokenETHAssetID) - time.Sleep(2 * time.Second) + // init native stakerlist for nstETH(beaconchain) + if err := ResetAllStakerValidators(ec, logger); err != nil { + return fmt.Errorf("failed in initialize nst:%w", err) } - ret[v[1]] = stakerInfos + + logger.Info("Initialization for price-feeder done") + break } - return ret + return nil } diff --git a/cmd/start.go b/cmd/start.go index e66bfcc..41cdf6d 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -24,6 +24,9 @@ and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications. This application is a tool to generate the needed files to quickly create a Cobra application.`, + // PreRun: func(cmd *cobra.Command, args []string) { + // + // }, Run: func(cmd *cobra.Command, args []string) { logger := feedertypes.NewLogger(zapcore.InfoLevel) // start fetcher to get prices from chainlink diff --git a/cmd/types.go b/cmd/types.go index aded1e0..494af3b 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -1,63 +1,455 @@ package cmd import ( + "errors" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" + types "github.com/ExocoreNetwork/price-feeder/types" + + sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) -// Define the types for the feeder -type feederParams struct { - startRoundID uint64 - startBlock uint64 - endBlock uint64 - interval uint64 - decimal int32 - tokenIDStr string - feederID int64 - tokenName string -} - -// Define the types for the event -type eventRes struct { - height uint64 - txHeight uint64 - gas int64 - price string - decimal int - roundID uint64 - params *feederParams - priceUpdated bool - nativeToken string -} - -// Define the types for the feederInfo +type priceFetcher interface { + GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) + AddTokenForSource(source, token string) bool +} +type priceSubmitter interface { + SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) +} + +type signInfo struct { + maxNonce int32 + roundID int64 + nonce int32 +} + +func (s *signInfo) getNextNonceAndUpdate(roundID int64) int32 { + if roundID < s.roundID { + return -1 + } else if roundID > s.roundID { + s.roundID = roundID + s.nonce = 1 + return 1 + } + if s.nonce = s.nonce + 1; s.nonce > s.maxNonce { + s.nonce = s.maxNonce + return -1 + } + return s.nonce +} +func (s *signInfo) revertNonce(roundID int64) { + if s.roundID == roundID && s.nonce > 0 { + s.nonce-- + } +} + +type triggerHeights struct { + commitHeight int64 + priceHeight int64 +} +type updatePrice struct { + txHeight int64 + price *fetchertypes.PriceInfo +} +type updateParamsReq struct { + params *oracletypes.Params + result chan *updateParamsRes +} +type updateParamsRes struct { +} + +type localPrice struct { + price fetchertypes.PriceInfo + height int64 +} + +// TODO: stop channel to close +type feeder struct { + logger feedertypes.LoggerInf + // TODO: currently only 1 source for each token, so we can just set it as a field here + source string + token string + tokenID uint64 + feederID int + // TODO: add check for rouleID, v1 can be skipped + // ruleID + startRoundID int64 + startBaseBlock int64 + interval int64 + endBlock int64 + + // maxNonce int32 + + fetcher priceFetcher + submitter priceSubmitter + lastPrice *localPrice + lastSent *signInfo + + priceCh chan *updatePrice + heightsCh chan *triggerHeights + paramsCh chan *updateParamsReq +} + type feederInfo struct { - params *feederParams - latestPrice string - updateCh chan eventRes + source string + token string + tokenID uint64 + feederID int + // TODO: add check for rouleID, v1 can be skipped + // ruleID + startRoundID int64 + startBaseBlock int64 + interval int64 + endBlock int64 + lastPrice localPrice + lastSent signInfo } -func (f *feederParams) update(p oracletypes.Params) (updated bool) { - tokenFeeder := p.TokenFeeders[f.feederID] - if tokenFeeder.StartBaseBlock != f.startBlock { - f.startBlock = tokenFeeder.StartBaseBlock - updated = true +func (f *feeder) Info() feederInfo { + return feederInfo{ + source: f.source, + token: f.token, + tokenID: f.tokenID, + feederID: f.feederID, + startRoundID: f.startRoundID, + startBaseBlock: f.startBaseBlock, + interval: f.interval, + endBlock: f.endBlock, + lastPrice: *f.lastPrice, + lastSent: *f.lastSent, } - if tokenFeeder.EndBlock != f.endBlock { - f.endBlock = tokenFeeder.EndBlock - updated = true +} + +// NewFeeder new a feeder struct from oracletypes' tokenfeeder +func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { + return &feeder{ + logger: logger, + source: source, + token: token, + tokenID: tf.TokenID, + feederID: feederID, + // these conversion a safe since the block height defined in cosmossdk is int64 + startRoundID: int64(tf.StartRoundID), + startBaseBlock: int64(tf.StartBaseBlock), + interval: int64(tf.Interval), + endBlock: int64(tf.EndBlock), + + // maxNonce: maxNonce, + + fetcher: fetcher, + submitter: submitter, + lastSent: &signInfo{ + maxNonce: maxNonce, + }, + lastPrice: &localPrice{}, + + priceCh: make(chan *updatePrice, 1), + heightsCh: make(chan *triggerHeights, 1), + paramsCh: make(chan *updateParamsReq, 1), } - if tokenFeeder.Interval != f.interval { - f.interval = tokenFeeder.Interval - updated = true +} + +func (f *feeder) start() { + go func() { + for { + select { + case h := <-f.heightsCh: + if h.priceHeight > f.lastPrice.height { + // the block event arrived early, wait for the price update evenst to update local price + break + } + baseBlock, roundID, delta, active := f.calculateRound(h.commitHeight) + if !active { + break + } + if delta < 3 { + f.logger.Info("trigger feeder", "height_commith", h.commitHeight, "height_price", h.priceHeight) + if price, err := f.fetcher.GetLatestPrice(f.source, f.token); err != nil { + f.logger.Error("failed to get latest price", "roundID", roundID, "delta", delta, "feeder", f.Info(), "error", err) + if errors.Is(err, feedertypes.ErrSourceTokenNotConfigured) { + f.logger.Error("add token from configure of source", "token", f.token, "source", f.source) + // blocked this feeder since no available fetcher_source_price working + if added := f.fetcher.AddTokenForSource(f.source, f.token); !added { + f.logger.Error("failed to complete adding token from configure, pleas check and update the config file of source if necessary", "token", f.token, "source", f.source) + } + } + } else { + if price.IsZero() { + f.logger.Info("got nil latest price, skip submitting price", "roundID", roundID, "delta", delta) + continue + } + if len(price.Price) >= 32 && price.EqualToBase64Price(f.lastPrice.price) { + f.logger.Info("didn't submit price due to price not changed", "roundID", roundID, "delta", delta, "price", price) + f.logger.Debug("got latsetprice equal to local cache", "feeder", f.Info()) + continue + } else if price.EqualPrice(f.lastPrice.price) { + f.logger.Info("didn't submit price due to price not changed", "roundID", roundID, "delta", delta, "price", price) + f.logger.Debug("got latsetprice equal to local cache", "feeder", f.Info()) + continue + } + if nonce := f.lastSent.getNextNonceAndUpdate(roundID); nonce < 0 { + f.logger.Error("failed to submit due to no available nonce", "roundID", roundID, "delta", delta, "feeder", f.Info()) + } else { + // f.logger.Info("send tx to submit price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) + res, err := f.submitter.SendTx(uint64(f.feederID), uint64(baseBlock), price, nonce) + if err != nil { + f.lastSent.revertNonce(roundID) + f.logger.Error("failed to send tx submitting price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "feeder", f.Info(), "error_feeder", err) + } + if txResponse := res.GetTxResponse(); txResponse.Code == statusOk { + f.logger.Info("sent tx to submit price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta) + } else { + f.lastSent.revertNonce(roundID) + f.logger.Error("failed to send tx submitting price", "price", price, "nonce", nonce, "baseBlock", baseBlock, "delta", delta, "feeder", f.Info(), "response_rawlog", txResponse.RawLog) + } + + } + } + } + case price := <-f.priceCh: + f.lastPrice.price = *(price.price) + // update latest height that price had been updated + f.lastPrice.height = price.txHeight + case req := <-f.paramsCh: + f.updateFeederParams(req.params) + req.result <- &updateParamsRes{} + } + } + }() +} + +// UpdateParams updates the feeder's params from oracle params, this method will block if the channel is full +// which means the update for params will must be delivered to the feeder's routine when this method is called +// blocked +func (f *feeder) updateParams(params *oracletypes.Params) chan *updateParamsRes { + // TODO update oracle parms + res := make(chan *updateParamsRes) + req := &updateParamsReq{params: params, result: res} + f.paramsCh <- req + return res +} + +// UpdatePrice will upate local price for feeder +// non-blocked +func (f *feeder) updatePrice(txHeight int64, price *fetchertypes.PriceInfo) { + // we dont't block this process when the channelis full, if this updating is skipped + // it will be update at next time when event arrived + select { + case f.priceCh <- &updatePrice{price: price, txHeight: txHeight}: + default: } - if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { - f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal - updated = true +} + +// Trigger notify the feeder that a new block height is committed +// non-blocked +func (f *feeder) trigger(commitHeight, priceHeight int64) { + // the channel got 1 buffer, so it should always been sent successfully + // and if not(the channel if full), we just skip this height and don't block + select { + case f.heightsCh <- &triggerHeights{commitHeight: commitHeight, priceHeight: priceHeight}: + default: + } +} + +func (f *feeder) updateFeederParams(p *oracletypes.Params) { + // TODO: update feeder's params + +} + +// TODO: stop feeder routine +// func (f *feeder) Stop() + +func (f *feeder) calculateRound(h int64) (baseBlock, roundID, delta int64, active bool) { + // endBlock itself is considered as active + if f.startBaseBlock > h || (f.endBlock > 0 && h > f.endBlock) { + return } + active = true + delta = (h - f.startBaseBlock) % f.interval + roundID = (h-f.startBaseBlock)/f.interval + f.startRoundID + baseBlock = h - delta return } -func getLogger() feedertypes.LoggerInf { - return feedertypes.GetLogger("") +type triggerReq struct { + height int64 + feederIDs map[int64]struct{} +} + +type finalPrice struct { + feederID int64 + price string + decimal int32 + roundID string +} +type updatePricesReq struct { + txHeight int64 + prices []*finalPrice +} + +type feederMap map[int]*feeder + +// NewFeederMap new a map feeder> +func NewFeederMap() feederMap { + return make(map[int]*feeder) +} +func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { + return &Feeders{ + logger: logger, + feederMap: fm, + // don't block on height increasing + trigger: make(chan *triggerReq, 1), + updatePrice: make(chan *updatePricesReq, 1), + // block on update-params delivery, no buffer + updateParams: make(chan *oracletypes.Params), + } +} + +// Add adds a new feeder with feederID into the map +func (fm feederMap) Add(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) { + fm[feederID] = &feeder{ + logger: logger, + source: source, + token: token, + tokenID: tf.TokenID, + feederID: feederID, + // these conversion a safe since the block height defined in cosmossdk is int64 + startRoundID: int64(tf.StartRoundID), + startBaseBlock: int64(tf.StartBaseBlock), + interval: int64(tf.Interval), + endBlock: int64(tf.EndBlock), + fetcher: fetcher, + submitter: submitter, + lastPrice: &localPrice{}, + lastSent: &signInfo{ + maxNonce: maxNonce, + }, + + priceCh: make(chan *updatePrice, 1), + heightsCh: make(chan *triggerHeights, 1), + paramsCh: make(chan *updateParamsReq, 1), + } +} + +type Feeders struct { + logger feedertypes.LoggerInf + feederMap map[int]*feeder + // TODO: feeder has sync management, so feeders could remove these channel + trigger chan *triggerReq + updatePrice chan *updatePricesReq + updateParams chan *oracletypes.Params + // updateNST chan *updateNSTReq +} + +// Start will start to listen the trigger(newHeight) and updatePrice events +// usd channels to avoid race condition on map +func (fs *Feeders) Start() { + for _, f := range fs.feederMap { + f.start() + } + go func() { + for { + select { + case params := <-fs.updateParams: + results := []chan *updateParamsRes{} + for _, f := range fs.feederMap { + res := f.updateParams(params) + results = append(results, res) + } + // wait for all feeders to complete updateing params + for _, res := range results { + <-res + } + // TODO: add newly added tokenfeeders if exists + + case t := <-fs.trigger: + // the order does not matter + for _, f := range fs.feederMap { + priceHeight := int64(0) + if _, ok := t.feederIDs[int64(f.feederID)]; ok { + priceHeight = t.height + } + f.trigger(t.height, priceHeight) + } + case req := <-fs.updatePrice: + for _, price := range req.prices { + // int conversion is safe + if feeder, ok := fs.feederMap[int(price.feederID)]; !ok { + fs.logger.Error("failed to get feeder by feederID when update price for feeders", "updatePriceReq", req) + continue + } else { + feeder.updatePrice(req.txHeight, &fetchertypes.PriceInfo{ + Price: price.price, + Decimal: price.decimal, + RoundID: price.roundID, + }) + } + } + } + } + }() +} + +// Trigger notify all feeders that a new block height is committed +// non-blocked +func (fs *Feeders) Trigger(height int64, feederIDs map[int64]struct{}) { + select { + case fs.trigger <- &triggerReq{height: height, feederIDs: feederIDs}: + default: + } +} + +// UpdatePrice will upate local price for all feeders +// non-blocked +func (fs *Feeders) UpdatePrice(txHeight int64, prices []*finalPrice) { + select { + case fs.updatePrice <- &updatePricesReq{txHeight: txHeight, prices: prices}: + default: + } +} + +// UpdateOracleParams updates all feeders' params from oracle params +// blocking until all feeders update their params +// when this method returned, it's guaranteed no further actions received wll be executed unless that all feeders have updated their params +func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { + fs.updateParams <- p +} + +// Define the types for the feeder +// type feederParams struct { +// startRoundID uint64 +// startBlock uint64 +// endBlock uint64 +// interval uint64 +// decimal int32 +// tokenIDStr string +// feederID int64 +// tokenName string +// } +// +// func (f *feederParams) update(p oracletypes.Params) (updated bool) { +// tokenFeeder := p.TokenFeeders[f.feederID] +// if tokenFeeder.StartBaseBlock != f.startBlock { +// f.startBlock = tokenFeeder.StartBaseBlock +// updated = true +// } +// if tokenFeeder.EndBlock != f.endBlock { +// f.endBlock = tokenFeeder.EndBlock +// updated = true +// } +// if tokenFeeder.Interval != f.interval { +// f.interval = tokenFeeder.Interval +// updated = true +// } +// if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { +// f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal +// updated = true +// } +// return +// } + +func getLogger() types.LoggerInf { + return types.GetLogger("") } diff --git a/exoclient/client.go b/exoclient/client.go new file mode 100644 index 0000000..7063fcc --- /dev/null +++ b/exoclient/client.go @@ -0,0 +1,130 @@ +package exoclient + +import ( + "fmt" + "net" + "net/http" + "net/url" + "sync" + + "cosmossdk.io/simapp/params" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" + "github.com/cosmos/cosmos-sdk/client" + cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" + "github.com/cosmos/cosmos-sdk/types/tx" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" + "github.com/gorilla/websocket" + "google.golang.org/grpc" +) + +var _ ExoClientInf = &exoClient{} + +// exoClient implements exoClientInf interface to serve as a grpc client to interact with eoxored grpc service +type exoClient struct { + logger feedertypes.LoggerInf + grpcConn *grpc.ClientConn + + // params for sign/send transactions + privKey cryptotypes.PrivKey + pubKey cryptotypes.PubKey + encCfg params.EncodingConfig + txCfg client.TxConfig + chainID string + + // client to broadcast transactions to eoxocred + txClient tx.ServiceClient + + // wsclient interact with exocored + wsClient *websocket.Conn + wsEndpoint string + wsDialer *websocket.Dialer + // wsStop channel used to signal ws close + wsStop chan struct{} + // wsStopRet chan struct{} + // wsActiveRoutines *atomic.Int32 + wsLock *sync.Mutex + wsActiveRoutines *int + wsActive *bool + // wsEventsCh chan EventRes + wsEventsCh chan EventInf + + // client to query from exocored + oracleClient oracletypes.QueryClient +} + +// NewExoClient creates a exocore-client used to do queries and send transactions to exocored +func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, privKey cryptotypes.PrivKey, encCfg params.EncodingConfig, chainID string) (*exoClient, error) { + ec := &exoClient{ + logger: logger, + privKey: privKey, + pubKey: privKey.PubKey(), + encCfg: encCfg, + txCfg: encCfg.TxConfig, + wsEndpoint: wsEndpoint, + wsActiveRoutines: new(int), + wsActive: new(bool), + wsLock: new(sync.Mutex), + wsStop: make(chan struct{}), + wsEventsCh: make(chan EventInf), + } + + var err error + ec.logger.Info("establish grpc connection") + ec.grpcConn, err = createGrpcConn(endpoint, encCfg) + if err != nil { + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create new Exoclient, endpoint:%s, error:%v", endpoint, err)) + } + + // setup txClient + ec.txClient = sdktx.NewServiceClient(ec.grpcConn) + // setup queryClient + ec.oracleClient = oracletypes.NewQueryClient(ec.grpcConn) + // setup wsClient + u, err := url.Parse(wsEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse wsEndpoint, wsEndpoint:%s, error:%w", wsEndpoint, err) + } + ec.wsDialer = &websocket.Dialer{ + NetDial: func(_, _ string) (net.Conn, error) { + return net.Dial("tcp", u.Host) + }, + Proxy: http.ProxyFromEnvironment, + } + ec.logger.Info("establish ws connection") + ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{}) + if err != nil { + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err)) + } + ec.wsClient.SetPongHandler(func(string) error { + return nil + }) + + return ec, nil +} + +func (ec *exoClient) Close() { + ec.CloseWs() + ec.CloseGRPC() +} + +// Close close grpc connection +func (ec *exoClient) CloseGRPC() { + ec.grpcConn.Close() +} + +func (ec *exoClient) CloseWs() { + if ec.wsClient == nil { + return + } + ec.StopWsRoutines() + ec.wsClient.Close() +} + +// GetClient returns defaultExoClient and a bool value to tell if that defaultExoClient has been initialized +func GetClient() (*exoClient, bool) { + if defaultExoClient == nil { + return nil, false + } + return defaultExoClient, true +} diff --git a/exoclient/grpc.go b/exoclient/grpc.go index 2634fd2..4d83456 100644 --- a/exoclient/grpc.go +++ b/exoclient/grpc.go @@ -1,22 +1,36 @@ package exoclient import ( + "context" + "fmt" + "time" + + "cosmossdk.io/simapp/params" "github.com/cosmos/cosmos-sdk/codec" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) // CreateGrpcConn creates an grpc connection to the target -func CreateGrpcConn(target string) (*grpc.ClientConn, error) { - grpcConn, err := grpc.Dial( +func createGrpcConn(target string, encCfg params.EncodingConfig) (conn *grpc.ClientConn, err error) { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + + grpcConn, err := grpc.DialContext( + ctx, target, // for internal usage, no need to set TSL grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.ForceCodec(codec.NewProtoCodec(encCfg.InterfaceRegistry).GRPCCodec())), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 150 * time.Second, + Timeout: 5 * time.Second, + PermitWithoutStream: true, + }), + grpc.WithBlock(), ) if err != nil { - logger.Error("failed to create grpc connect", "error", err) - return nil, err + return nil, fmt.Errorf("failed to create grpc connection, error:%w", err) } return grpcConn, nil diff --git a/exoclient/query.go b/exoclient/query.go index d312ab8..1267afe 100644 --- a/exoclient/query.go +++ b/exoclient/query.go @@ -2,28 +2,26 @@ package exoclient import ( "context" + "fmt" oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" - "google.golang.org/grpc" ) // GetParams queries oracle params -func GetParams(grpcConn *grpc.ClientConn) (oracleTypes.Params, error) { - oracleClient := oracleTypes.NewQueryClient(grpcConn) - paramsRes, err := oracleClient.Params(context.Background(), &oracleTypes.QueryParamsRequest{}) +func (ec exoClient) GetParams() (*oracleTypes.Params, error) { + paramsRes, err := ec.oracleClient.Params(context.Background(), &oracleTypes.QueryParamsRequest{}) if err != nil { - return oracleTypes.Params{}, err + return &oracleTypes.Params{}, fmt.Errorf("failed to query oracle params from oracleClient, error:%w", err) } + return ¶msRes.Params, nil - return paramsRes.Params, nil } // GetLatestPrice returns latest price of specific token -func GetLatestPrice(grpcConn *grpc.ClientConn, tokenID uint64) (oracleTypes.PriceTimeRound, error) { - oracleClient := oracleTypes.NewQueryClient(grpcConn) - priceRes, err := oracleClient.LatestPrice(context.Background(), &oracleTypes.QueryGetLatestPriceRequest{TokenId: tokenID}) +func (ec exoClient) GetLatestPrice(tokenID uint64) (oracleTypes.PriceTimeRound, error) { + priceRes, err := ec.oracleClient.LatestPrice(context.Background(), &oracleTypes.QueryGetLatestPriceRequest{TokenId: tokenID}) if err != nil { - return oracleTypes.PriceTimeRound{}, err + return oracleTypes.PriceTimeRound{}, fmt.Errorf("failed to get latest price from oracleClient, error:%w", err) } return priceRes.Price, nil @@ -31,21 +29,19 @@ func GetLatestPrice(grpcConn *grpc.ClientConn, tokenID uint64) (oracleTypes.Pric // TODO: pagination // GetStakerInfos get all stakerInfos for the assetID -func GetStakerInfos(grpcConn *grpc.ClientConn, assetID string) ([]*oracleTypes.StakerInfo, error) { - oracleClient := oracleTypes.NewQueryClient(grpcConn) - stakerInfoRes, err := oracleClient.StakerInfos(context.Background(), &oracleTypes.QueryStakerInfosRequest{AssetId: assetID}) +func (ec exoClient) GetStakerInfos(assetID string) ([]*oracleTypes.StakerInfo, error) { + stakerInfoRes, err := ec.oracleClient.StakerInfos(context.Background(), &oracleTypes.QueryStakerInfosRequest{AssetId: assetID}) if err != nil { - return []*oracleTypes.StakerInfo{}, err + return []*oracleTypes.StakerInfo{}, fmt.Errorf("failed to get stakerInfos from oracleClient, error:%w", err) } return stakerInfoRes.StakerInfos, nil } // GetStakerInfos get the stakerInfos corresponding to stakerAddr for the assetID -func GetStakerInfo(grpcConn *grpc.ClientConn, assetID, stakerAddr string) ([]*oracleTypes.StakerInfo, error) { - oracleClient := oracleTypes.NewQueryClient(grpcConn) - stakerInfoRes, err := oracleClient.StakerInfos(context.Background(), &oracleTypes.QueryStakerInfosRequest{AssetId: assetID}) +func (ec exoClient) GetStakerInfo(assetID, stakerAddr string) ([]*oracleTypes.StakerInfo, error) { + stakerInfoRes, err := ec.oracleClient.StakerInfos(context.Background(), &oracleTypes.QueryStakerInfosRequest{AssetId: assetID}) if err != nil { - return []*oracleTypes.StakerInfo{}, err + return []*oracleTypes.StakerInfo{}, fmt.Errorf("failed to get stakerInfo from oracleClient, error:%w", err) } return stakerInfoRes.StakerInfos, nil } diff --git a/exoclient/query_test.go b/exoclient/query_test.go index 10849e5..48824d3 100644 --- a/exoclient/query_test.go +++ b/exoclient/query_test.go @@ -7,7 +7,7 @@ import ( ) func TestQuery(t *testing.T) { - cc := CreateGrpcConn("127.0.0.1:9090") + cc, _, _ := CreateGrpcConn("127.0.0.1:9090") defer cc.Close() p, err := GetParams(cc) assert.NoError(t, err) diff --git a/exoclient/subscribe.go b/exoclient/subscribe.go index 81a440d..4ac86bf 100644 --- a/exoclient/subscribe.go +++ b/exoclient/subscribe.go @@ -2,265 +2,323 @@ package exoclient import ( "encoding/json" + "errors" "fmt" - "net" "net/http" - "net/url" "time" "github.com/gorilla/websocket" ) +type subEvent string +type eventQuery string + const ( - subTypeNewBlock = "tm.event='NewBlock'" - subTypeTxUpdatePrice = "tm.event='Tx' AND create_price.price_update='success'" - subTypeTxNativeToken = "tm.event='Tx' AND create_price.native_token_update='update'" - sub = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}` - reconnectInterval = 3 - maxRetry = 600 - success = "success" + // subTypeNewBlock subType = "tm.event='NewBlock'" + // subTypeTxUpdatePrice subType = "tm.event='Tx' AND create_price.price_update='success'" + // subTypeTxNativeToken subType = "tm.event='Tx' AND create_price.native_token_update='update'" + subStr = `{"jsonrpc":"2.0","method":"subscribe","id":0,"params":{"query":"%s"}}` + reconnectInterval = 3 + maxRetry = 600 + success = "success" + eNewBlock eventQuery = "tm.event='NewBlock'" + eTxUpdatePrice eventQuery = "tm.event='Tx' AND create_price.price_update='success'" + eTxNativeToken eventQuery = "tm.event='Tx' AND create_price.native_token_update='update'" ) var ( - conn *websocket.Conn - rHeader http.Header - host string - eventNewBlock = fmt.Sprintf(sub, subTypeNewBlock) - eventTxPrice = fmt.Sprintf(sub, subTypeTxUpdatePrice) - eventTxNativeToken = fmt.Sprintf(sub, subTypeTxNativeToken) + // subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='NewBlock'")) + subNewBlock subEvent = subEvent(fmt.Sprintf(subStr, eNewBlock)) + // subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.price_update='success'")) + subTxUpdatePrice subEvent = subEvent(fmt.Sprintf(subStr, eTxUpdatePrice)) + // subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, "tm.event='Tx' AND create_price.native_token_update='update'")) + subTxNativeToken subEvent = subEvent(fmt.Sprintf(subStr, eTxNativeToken)) + // eNewBlock subEvent = "tm.event='NewBlock'" + // eTxUpdatePrice subEvent = "tm.event='Tx' AND create_price.price_update='success'" + // eTxNativeToken subEvent = "tm.event='Tx' AND create_price.native_token_update='update'" - events = map[string]bool{ - eventNewBlock: true, - eventTxPrice: true, - eventTxNativeToken: true, + events = map[subEvent]bool{ + subNewBlock: true, + subTxUpdatePrice: true, + subTxNativeToken: true, } ) -type result struct { - Result struct { - Query string `json:"query"` - Data struct { - Value struct { - TxResult struct { - Height string `json:"height"` - } `json:"TxResult"` - Block struct { - Header struct { - Height string `json:"height"` - } `json:"header"` - } `json:"block"` - } `json:"value"` - } `json:"data"` - Events struct { - Fee []string `json:"fee_market.base_fee"` - ParamsUpdate []string `json:"create_price.params_update"` - FinalPrice []string `json:"create_price.final_price"` - PriceUpdate []string `json:"create_price.price_update"` - FeederID []string `json:"create_price.feeder_id"` - FeederIDs []string `json:"create_price.feeder_ids"` - NativeTokenUpdate []string `json:"create_price.native_token_update"` - NativeTokenChange []string `json:"create_price.native_token_change"` - } `json:"events"` - } `json:"result"` +func (ec exoClient) Subscribe() { + // set up a background routine to listen to 'stop' signal and restart all tasks + // we expect this rountine as a forever-running process unless failed more than maxretry times + // or failed to confirm all routines closed after timeout when reciving stop signal + go func() { + ec.logger.Info("start subscriber job with all tasks") + ec.startTasks() + defer ec.wsClient.Close() + for _ = range ec.wsStop { + ec.logger.Info("ws connection closed, mark connection as inactive and waiting for all ws routines to complete stopping") + // mark ws connection as inactive to prevent further ws routine starting + ec.markWsInactive() + timeout := time.NewTicker(60 * time.Second) + loop: + for { + select { + case <-timeout.C: + // this should not happen + panic("failed to complete closing all ws routines, timeout") + default: + if ec.isZeroWsRoutines() { + logger.Info("all running ws routnines stopped") + break loop + } + time.Sleep(1 * time.Second) + } + } + ec.startTasks() + } + }() } -type ReCh struct { - Height string - Gas string - ParamsUpdate bool - Price []string - FeederIDs string - TxHeight string - NativeETH string +func (ec exoClient) EventsCh() chan EventInf { + return ec.wsEventsCh } -// setup ws connection, and subscribe newblock events -func Subscriber(remoteAddr string, endpoint string) (ret chan ReCh, stop chan struct{}) { - // logger := getLogger() - u, err := url.Parse(remoteAddr) - if err != nil { - panic(err) - } - - dialer := &websocket.Dialer{ - NetDial: func(_, _ string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - Proxy: http.ProxyFromEnvironment, +// startTasks establishes the ws connection and +// 1. routine: send ping message +// 2. subscribe to events +// 3. routine: read events from ws connection +func (ec exoClient) startTasks() { + // ws connection stopped, reset subscriber + //ec.logger.Info("establish ws connection") + // if err := ec.connectWs(maxRetry); err != nil { + // // continue + // panic(fmt.Sprintf("failed to create ws connection after maxRetry:%d, error:%w", maxRetry, err)) + // } + ec.markWsActive() + ec.logger.Info("subscribe to ws publish", "events", events) + if err := ec.sendAllSubscribeMsgs(maxRetry); err != nil { + panic(fmt.Sprintf("failed to send subscribe messages after maxRetry:%d, error:%w", maxRetry, err)) } - rHeader = http.Header{} - host = u.Host - conn, _, err = dialer.Dial("ws://"+host+endpoint, rHeader) - if err != nil { - panic(fmt.Sprintf("dail ws failed, error:%s", err)) - } + // start routine to send ping messages + ec.logger.Info("start ws ping routine") + ec.startPingRoutine() - stop = make(chan struct{}) - stopInternal := make(chan struct{}) - ret = make(chan ReCh) + // start routine to read events response + ec.logger.Info("start ws read routine") + ec.startReadRoutine() + ec.logger.Info("setuped all subscriber tasks successfully") +} - // read routine reads events(newBlock) from websocket - go func() { - defer func() { - conn.Close() - }() - conn.SetPongHandler(func(string) error { +func (ec exoClient) connectWs(maxRetry int) error { + if len(ec.wsEndpoint) == 0 { + return errors.New("wsEndpoint not set in exoClient") + } + var err error + // ec.wsClient.Close() + count := 0 + for count < maxRetry { + if ec.wsClient, _, err = ec.wsDialer.Dial(ec.wsEndpoint, http.Header{}); err == nil { + ec.wsClient.SetPongHandler(func(string) error { + return nil + }) + ec.wsStop = make(chan struct{}) + ec.markWsActive() return nil - }) - for { - _, data, err := conn.ReadMessage() - if err != nil { - logger.Error("read from publisher failed", "error", err) - if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { - return - } - // close write routine - close(stopInternal) - // reconnect ws - attempt := 0 - for ; err != nil; conn, _, err = dialer.Dial("ws://"+host+endpoint, rHeader) { - logger.Error("failed to reconnect to publisher, retrying...", "error", err, "attempt_count", attempt) - time.Sleep(reconnectInterval * time.Second) - attempt++ - if attempt > maxRetry { - logger.Error("failed to reconnect to publisher after max retry") - return - } - } - logger.Info("reconnected to publisher successfully.") - conn.SetPongHandler(func(string) error { - return nil - }) - // rest stopInternal - stopInternal = make(chan struct{}) - // setup write routine to set ping messages - go writeRoutine(conn, stopInternal) - // resubscribe event - attempt = 0 - for attempt < maxRetry { - if err = conn.WriteMessage(websocket.TextMessage, []byte(eventNewBlock)); err == nil { - break - } - logger.Error("failed to subscribe to new_block event, retrying", "error", err, "attempt_cout", attempt) - time.Sleep(1 * time.Second) - attempt++ - } - if attempt == maxRetry { - logger.Error("fail to subscribe new_block event after max retry") - return - } - continue - } - var response result - err = json.Unmarshal(data, &response) - if err != nil { - logger.Error("failed to pase response from publisher, skip", "error", err) - continue - } - rec := ReCh{} - - switch response.Result.Query { - case subTypeNewBlock: - rec.Height = response.Result.Data.Value.Block.Header.Height - events := response.Result.Events - if len(events.Fee) > 0 { - rec.Gas = events.Fee[0] - } - if len(events.ParamsUpdate) > 0 { - rec.ParamsUpdate = true - } - // TODO: for oracle v1, this should not happen, since this event only emitted in tx, But if we add more modes to support final price generation in endblock, this would be necessaray. - if len(events.PriceUpdate) > 0 && events.PriceUpdate[0] == success { - rec.FeederIDs = events.FeederIDs[0] - } - ret <- rec - case subTypeTxUpdatePrice: - // as we filtered for price_udpate=success, this means price has been updated this block - events := response.Result.Events - rec.Price = events.FinalPrice - rec.TxHeight = response.Result.Data.Value.TxResult.Height - ret <- rec - case subTypeTxNativeToken: - // update validator list for staker - rec.NativeETH = response.Result.Events.NativeTokenChange[0] - default: - } - - select { - case <-stop: - return - default: - } } - }() + count++ + ec.logger.Info("connecting to ws endpoint", "endpoint", ec.wsEndpoint, "attempt_count", count, "error", err) + time.Sleep(reconnectInterval * time.Second) + } + return fmt.Errorf("failed to dial ws endpoint, endpoint:%s, error:%w", ec.wsEndpoint, err) +} - // write message to subscribe tx event for price update - if err = conn.WriteMessage(websocket.TextMessage, []byte(eventTxPrice)); err != nil { - panic("fail to subscribe tx event") +func (ec *exoClient) StopWsRoutines() { + ec.wsLock.Lock() + select { + case _, ok := <-ec.wsStop: + if ok { + close(ec.wsStop) + } + default: + close(ec.wsStop) } + ec.wsLock.Unlock() +} - // write message to subscribe tx event for native token validator list change - if err = conn.WriteMessage(websocket.TextMessage, []byte(eventTxPrice)); err != nil { - panic("fail to subscribe tx event") +func (ec *exoClient) increaseWsRountines() (int, bool) { + // only increase active rountine count when the wsConnection is active + ec.wsLock.Lock() + defer ec.wsLock.Unlock() + if *ec.wsActive { + (*ec.wsActiveRoutines)++ + return *ec.wsActiveRoutines, true } + return *ec.wsActiveRoutines, false +} - // write message to subscribe newBlock event - if err = conn.WriteMessage(websocket.TextMessage, []byte(eventNewBlock)); err != nil { - panic("fail to subscribe new_block event") +func (ec *exoClient) decreaseWsRountines() (int, bool) { + ec.wsLock.Lock() + defer ec.wsLock.Unlock() + if *ec.wsActive { + if (*ec.wsActiveRoutines)--; *ec.wsActiveRoutines < 0 { + *ec.wsActiveRoutines = 0 + } + return *ec.wsActiveRoutines, true } + return *ec.wsActiveRoutines, false +} - // write routine sends ping messages every 10 seconds - go writeRoutine(conn, stopInternal) - return +func (ec *exoClient) isZeroWsRoutines() bool { + ec.wsLock.Lock() + isZero := *ec.wsActiveRoutines == 0 + ec.wsLock.Unlock() + return isZero } -// writeRoutine sends ping messages every 10 second -func writeRoutine(conn *websocket.Conn, stop chan struct{}) { - // logger := getLogger() - ticker := time.NewTicker(10 * time.Second) - defer func() { - ticker.Stop() - }() +func (ec *exoClient) markWsActive() { + ec.wsLock.Lock() + *ec.wsActive = true + ec.wsLock.Unlock() - for { - select { - case <-ticker.C: - if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - panic(err) +} + +func (ec *exoClient) markWsInactive() { + ec.wsLock.Lock() + *ec.wsActive = false + ec.wsLock.Unlock() +} + +func (ec exoClient) sendAllSubscribeMsgs(maxRetry int) error { + // at least try for one time + if maxRetry < 1 { + maxRetry = 1 + } + // reset events for re-subscribing + resetEvents() + allSet := false + for maxRetry > 0 && !allSet { + maxRetry-- + allSet = true + for event, ok := range events { + if ok { + allSet = false } - case <-stop: - if err := conn.WriteMessage( - websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - ); err != nil { - logger.Error("failed to write close message to publisher", "error", err) - return + if err := ec.wsClient.WriteMessage(websocket.TextMessage, []byte(event)); err == nil { + events[event] = false + allSet = true + } else { + ec.logger.Error("failed to send subscribe", "message", event, "error", err) } } + time.Sleep(2 * time.Second) } + if !allSet { + return errors.New(fmt.Sprintf("failed to send all subscribe messages, events:%v", events)) + } + return nil } -func ResetEvents() { - for event, _ := range events { - events[event] = true +func (ec exoClient) startPingRoutine() bool { + if _, ok := ec.increaseWsRountines(); !ok { + // ws connection is not active + return ok } + go func() { + defer func() { + ec.decreaseWsRountines() + }() + ticker := time.NewTicker(10 * time.Second) + defer func() { + ticker.Stop() + }() + for { + select { + case <-ticker.C: + if err := ec.wsClient.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + logger.Error("failed to write ping message to ws connection, close ws connection, ", "error", err) + logger.Info("send signal to stop all running ws routines") + ec.StopWsRoutines() + return + } + case <-ec.wsStop: + logger.Info("close ws ping routine due to receiving close signal") + if err := ec.wsClient.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + ); err != nil { + logger.Error("failed to write close message to publisher", "error", err) + return + } + } + } + }() + return true } -func SubEvents(conn *websocket.Conn, retryCount int, retryInterval time.Duration) { - // logger := getLogger() - for event, ok := range events { - retryCount-- - if ok { - if err := conn.WriteMessage(websocket.TextMessage, []byte(event)); err == nil { - logger.Info("subscribes event succussfully", "event", event) - events[event] = false +func (ec exoClient) startReadRoutine() bool { + if _, ok := ec.increaseWsRountines(); !ok { + // ws connection is not active + return ok + } + go func() { + defer func() { + ec.decreaseWsRountines() + }() + for { + select { + case <-ec.wsStop: + ec.logger.Info("close ws read routine due to receive close signal") + return + default: + _, data, err := ec.wsClient.ReadMessage() + if err != nil { + ec.logger.Error("failed to read from ws publisher", "error", err) + if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + // TODO: retry ? + panic(fmt.Sprintf("Got unexpectedCloseError from ws connection, error:%v", err)) + } + logger.Info("send signal to stop all running ws routines") + // send signal to stop all running ws routines + ec.StopWsRoutines() + return + } + var response SubscribeResult + err = json.Unmarshal(data, &response) + if err != nil { + ec.logger.Error("failed to parse response from publisher, skip", "error", err) + continue + } + switch eventQuery(response.Result.Query) { + case eNewBlock: + event, err := response.GetEventNewBlock() + if err != nil { + ec.logger.Error("failed to get newBlock event from event-response", "response", response, "error", err) + } + ec.wsEventsCh <- event + case eTxUpdatePrice: + event, err := response.GetEventUpdatePrice() + if err != nil { + ec.logger.Error("failed to get updatePrice event from event-response", "response", response, "error", err) + break + } + ec.wsEventsCh <- event + case eTxNativeToken: + // update validator list for staker + event, err := response.GetEventUpdateNST() + if err != nil { + ec.logger.Error("failed to get nativeToken event from event-response", "response", response, "error", err) + break + } + ec.wsEventsCh <- event + default: + ec.logger.Error("failed to parse unknown event type", "response-data", string(data)) + } } } - if retryCount == 0 { - logger.Error("failed to subscribe to all events", "events", events) - panic("fail to subscribe events") - } - time.Sleep(retryInterval * time.Second) + }() + return true +} + +func resetEvents() { + for event, _ := range events { + events[event] = true } } diff --git a/exoclient/tx.go b/exoclient/tx.go index c95f020..305485f 100644 --- a/exoclient/tx.go +++ b/exoclient/tx.go @@ -5,89 +5,33 @@ import ( "fmt" "time" - oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" - cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - - "github.com/cosmos/cosmos-sdk/client" authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" - "github.com/cosmos/cosmos-sdk/types" sdk "github.com/cosmos/cosmos-sdk/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/types/tx/signing" - - "google.golang.org/grpc" ) -// signMsg signs the message with consensusskey -func signMsg(cc *grpc.ClientConn, gasPrice int64, msgs ...sdk.Msg) authsigning.Tx { - txBuilder := txCfg.NewTxBuilder() - _ = txBuilder.SetMsgs(msgs...) - txBuilder.SetGasLimit(blockMaxGas) - txBuilder.SetFeeAmount(sdk.Coins{types.NewInt64Coin(denom, 0)}) - - signMode := txCfg.SignModeHandler().DefaultMode() - - _ = txBuilder.SetSignatures(getSignature(nil, pubKey, signMode)) - - bytesToSign := getSignBytes(txCfg, txBuilder.GetTx(), chainID) - sigBytes, err := privKey.Sign(bytesToSign) - if err != nil { - panic(fmt.Sprintf("priv_sign fail %s", err.Error())) - } - _ = txBuilder.SetSignatures(getSignature(sigBytes, pubKey, signMode)) - return txBuilder.GetTx() -} - -// getSignBytes reteive the bytes from tx for signing -func getSignBytes(txCfg client.TxConfig, tx authsigning.Tx, cID string) []byte { - b, err := txCfg.SignModeHandler().GetSignBytes( - txCfg.SignModeHandler().DefaultMode(), - authsigning.SignerData{ - ChainID: cID, - }, - tx, - ) - if err != nil { - panic(fmt.Sprintf("Get bytesToSign fail, %s", err.Error())) - } - - return b -} - -// getSignature assembles a siging.SignatureV2 structure -func getSignature(s []byte, pub cryptotypes.PubKey, signMode signing.SignMode) signing.SignatureV2 { - sig := signing.SignatureV2{ - PubKey: pub, - Data: &signing.SingleSignatureData{ - SignMode: signMode, - Signature: s, - }, - } - - return sig -} - -// SendTx build a create-price message and broadcast to the exocoreChain through grpc connection -func SendTx(cc *grpc.ClientConn, feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32, gasPrice int64) *sdktx.BroadcastTxResponse { - if gasPrice == 0 { - gasPrice = defaultGasPrice - } - +// SendTx signs a create-price transaction and send it to exocored +// func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) { +func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) { // build create-price message - msg := oracleTypes.NewMsgCreatePrice( - sdk.AccAddress(pubKey.Address()).String(), + msg := oracletypes.NewMsgCreatePrice( + sdk.AccAddress(ec.pubKey.Address()).String(), feederID, - []*oracleTypes.PriceSource{ + []*oracletypes.PriceSource{ { SourceID: Chainlink, - Prices: []*oracleTypes.PriceTimeDetID{ + Prices: []*oracletypes.PriceTimeDetID{ { - Price: price, - Decimal: int32(decimal), - Timestamp: time.Now().UTC().Format(layout), - DetID: roundID, + Price: price.Price, + Decimal: price.Decimal, + Timestamp: time.Now().UTC().Format(feedertypes.TimeLayout), + DetID: price.RoundID, }, }, Desc: "", @@ -98,21 +42,20 @@ func SendTx(cc *grpc.ClientConn, feederID uint64, baseBlock uint64, price, round ) // sign the message with validator consensus-key configured - signedTx := signMsg(cc, gasPrice, msg) + signedTx, err := ec.signMsg(msg) + if err != nil { + return nil, fmt.Errorf("failed to sign message, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } // encode transaction to broadcast - txBytes, err := txCfg.TxEncoder()(signedTx) + txBytes, err := ec.txCfg.TxEncoder()(signedTx) if err != nil { - panic(err) + // this should not happen + return nil, fmt.Errorf("failed to encode singedTx, txBytes:%b, msg:%v, valConsAddr:%s, error:%w", txBytes, msg, sdk.ConsAddress(ec.pubKey.Address()), err) } - return broadcastTxBytes(cc, txBytes) -} - -// boradcastTxByBytes broadcasts the signed transaction -func broadcastTxBytes(cc *grpc.ClientConn, txBytes []byte) *sdktx.BroadcastTxResponse { - txClient := sdktx.NewServiceClient(cc) - ccRes, err := txClient.BroadcastTx( + // broadcast txBytes + res, err := ec.txClient.BroadcastTx( context.Background(), &sdktx.BroadcastTxRequest{ Mode: sdktx.BroadcastMode_BROADCAST_MODE_SYNC, @@ -120,7 +63,61 @@ func broadcastTxBytes(cc *grpc.ClientConn, txBytes []byte) *sdktx.BroadcastTxRes }, ) if err != nil { - panic(err) + return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } + return res, nil +} + +// signMsg signs the message with consensusskey +func (ec exoClient) signMsg(msgs ...sdk.Msg) (authsigning.Tx, error) { + txBuilder := ec.txCfg.NewTxBuilder() + _ = txBuilder.SetMsgs(msgs...) + txBuilder.SetGasLimit(blockMaxGas) + txBuilder.SetFeeAmount(sdk.Coins{sdk.NewInt64Coin(denom, 0)}) + + if err := txBuilder.SetSignatures(ec.getSignature(nil)); err != nil { + ec.logger.Error("failed to SetSignatures", "errro", err) + return nil, err + } + + bytesToSign, err := ec.getSignBytes(txBuilder.GetTx()) + if err != nil { + return nil, fmt.Errorf("failed to getSignBytes, error:%w", err) + } + sigBytes, err := ec.privKey.Sign(bytesToSign) + if err != nil { + ec.logger.Error("failed to sign txBytes", "error", err) + return nil, err + } + // _ = txBuilder.SetSignatures(getSignature(sigBytes, ec.pubKey, signMode)) + _ = txBuilder.SetSignatures(ec.getSignature(sigBytes)) + return txBuilder.GetTx(), nil +} + +// getSignBytes reteive the bytes from tx for signing +func (ec exoClient) getSignBytes(tx authsigning.Tx) ([]byte, error) { + b, err := ec.txCfg.SignModeHandler().GetSignBytes( + ec.txCfg.SignModeHandler().DefaultMode(), + authsigning.SignerData{ + ChainID: ec.chainID, + }, + tx, + ) + if err != nil { + return nil, fmt.Errorf("Get bytesToSign fail, %w", err) + } + + return b, nil +} + +// getSignature assembles a siging.SignatureV2 structure +func (ec exoClient) getSignature(sigBytes []byte) signing.SignatureV2 { + signature := signing.SignatureV2{ + PubKey: ec.pubKey, + Data: &signing.SingleSignatureData{ + SignMode: ec.txCfg.SignModeHandler().DefaultMode(), + Signature: sigBytes, + }, } - return ccRes + return signature } diff --git a/exoclient/types.go b/exoclient/types.go index 9d37608..11e0651 100644 --- a/exoclient/types.go +++ b/exoclient/types.go @@ -4,45 +4,359 @@ import ( cryptoed25519 "crypto/ed25519" "encoding/base64" "encoding/json" + "errors" "fmt" "os" "path" + "strconv" + "strings" - "cosmossdk.io/simapp/params" "github.com/ExocoreNetwork/exocore/app" cmdcfg "github.com/ExocoreNetwork/exocore/cmd/config" + oracleTypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keys/ed25519" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" sdk "github.com/cosmos/cosmos-sdk/types" + sdktx "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/go-bip39" "github.com/evmos/evmos/v16/encoding" - "google.golang.org/grpc" ) +type ExoClientInf interface { + // Query + GetParams() (*oracletypes.Params, error) + GetLatestPrice(tokenID uint64) (oracletypes.PriceTimeRound, error) + GetStakerInfos(assetID string) ([]*oracleTypes.StakerInfo, error) + GetStakerInfo(assetID, stakerAddr string) ([]*oracleTypes.StakerInfo, error) + + // Tx + SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) + + // Ws subscriber + Subscribe() +} + +type EventInf interface { + Type() EventType +} + +type EventNewBlock struct { + height int64 + gas string + paramsUpdate bool + feederIDs map[int64]struct{} +} + +func (s *SubscribeResult) GetEventNewBlock() (*EventNewBlock, error) { + height, ok := s.BlockHeight() + if !ok { + return nil, errors.New("failed to get height from event_newBlock response") + } + fee, ok := s.Fee() + if !ok { + return nil, errors.New("failed to get gas from event_newBlock response") + } + feederIDs, ok := s.FeederIDs() + if !ok { + return nil, errors.New("failed to get feederIDs from event_newBlock response") + } + + return &EventNewBlock{ + height: height, + gas: fee, + paramsUpdate: s.ParamsUpdate(), + feederIDs: feederIDs, + }, nil +} +func (e *EventNewBlock) Height() int64 { + return e.height +} +func (e *EventNewBlock) Gas() string { + return e.gas +} +func (e *EventNewBlock) ParamsUpdate() bool { + return e.paramsUpdate +} +func (e *EventNewBlock) FeederIDs() map[int64]struct{} { + return e.feederIDs +} +func (e *EventNewBlock) Type() EventType { + return ENewBlock +} + +type FinalPrice struct { + tokenID int64 + roundID string + price string + decimal int32 +} + +func (f *FinalPrice) TokenID() int64 { + return f.tokenID +} +func (f *FinalPrice) RoundID() string { + return f.roundID +} +func (f *FinalPrice) Price() string { + return f.price +} +func (f *FinalPrice) Decimal() int32 { + return f.decimal +} + +type EventUpdatePrice struct { + prices []*FinalPrice + txHeight int64 +} + +func (s *SubscribeResult) GetEventUpdatePrice() (*EventUpdatePrice, error) { + prices, ok := s.FinalPrice() + if !ok { + return nil, errors.New("failed to get finalPrice from event_txUpdatePrice response") + } + txHeight, ok := s.TxHeight() + if !ok { + return nil, errors.New("failed to get txHeight from event_txUpdatePrice response") + } + return &EventUpdatePrice{ + prices: prices, + txHeight: txHeight, + }, nil +} +func (e *EventUpdatePrice) Prices() []*FinalPrice { + return e.prices +} +func (e *EventUpdatePrice) TxHeight() int64 { + return e.txHeight +} +func (e *EventUpdatePrice) Type() EventType { + return EUpdatePrice +} + +// EventUpdateNST tells the detail about the beaconchain-validator change for a staker +type EventUpdateNST struct { + deposit bool + stakerID int64 + validatorIndex string + index int64 +} + +func (s *SubscribeResult) GetEventUpdateNST() (*EventUpdateNST, error) { + nstChange, ok := s.NSTChange() + if !ok { + return nil, errors.New("failed to get NativeTokenChange from event_txUpdateNST response") + } + parsed := strings.Split(nstChange, "_") + if len(parsed) != 4 { + return nil, fmt.Errorf("failed to parse nstChange: expected 4 parts but got %d, nstChange: %s", len(parsed), nstChange) + } + deposit := parsed[0] == "deposit" + stakerID, err := strconv.ParseInt(parsed[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse stakerID in nstChange from evetn_txUpdateNST response, error:%w", err) + } + index, err := strconv.ParseInt(parsed[3], 10, 64) + if err != nil { + return nil, fmt.Errorf("failed to parse beaconchain_sync_index in nstChange from event_txUpdateNST response, error:%w", err) + } + return &EventUpdateNST{ + deposit: deposit, + stakerID: stakerID, + validatorIndex: parsed[2], + index: index, + }, nil +} +func (e *EventUpdateNST) Deposit() bool { + return e.deposit +} +func (e *EventUpdateNST) StakerID() int64 { + return e.stakerID +} +func (e *EventUpdateNST) ValidatorIndex() string { + return e.validatorIndex +} +func (e *EventUpdateNST) Index() int64 { + return e.index +} +func (e *EventUpdateNST) Type() EventType { + return EUpdateNST +} + +type EventType int + +type EventRes struct { + Height string + Gas string + ParamsUpdate bool + Price []string + FeederIDs string + TxHeight string + NativeETH string + eventMessage interface{} + Type EventType +} + +type SubscribeResult struct { + Result struct { + Query string `json:"query"` + Data struct { + Value struct { + TxResult struct { + Height string `json:"height"` + } `json:"TxResult"` + Block struct { + Header struct { + Height string `json:"height"` + } `json:"header"` + } `json:"block"` + } `json:"value"` + } `json:"data"` + Events struct { + Fee []string `json:"fee_market.base_fee"` + ParamsUpdate []string `json:"create_price.params_update"` + FinalPrice []string `json:"create_price.final_price"` + PriceUpdate []string `json:"create_price.price_update"` + FeederID []string `json:"create_price.feeder_id"` + FeederIDs []string `json:"create_price.feeder_ids"` + NativeTokenUpdate []string `json:"create_price.native_token_update"` + NativeTokenChange []string `json:"create_price.native_token_change"` + } `json:"events"` + } `json:"result"` +} + +func (s *SubscribeResult) BlockHeight() (int64, bool) { + if h := s.Result.Data.Value.Block.Header.Height; len(h) > 0 { + height, err := strconv.ParseInt(h, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from height in SubscribeResult", "error", err, "height_str", h) + } + return height, true + } + return 0, false +} + +func (s *SubscribeResult) TxHeight() (int64, bool) { + if h := s.Result.Data.Value.TxResult.Height; len(h) > 0 { + height, err := strconv.ParseInt(h, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from txheight in SubscribeResult", "error", err, "height_str", h) + } + return height, true + } + return 0, false +} + +// FeederIDs will return (nil, true) when there's no feederIDs +func (s *SubscribeResult) FeederIDs() (feederIDs map[int64]struct{}, valid bool) { + events := s.Result.Events + if len(events.PriceUpdate) > 0 && events.PriceUpdate[0] == success { + if feederIDsStr := strings.Split(events.FeederIDs[0], "_"); len(feederIDsStr) > 0 { + feederIDs = make(map[int64]struct{}) + for _, feederIDStr := range feederIDsStr { + id, err := strconv.ParseInt(feederIDStr, 10, 64) + if err != nil { + logger.Error("failed to parse int64 from feederIDs in subscribeResult", "feederIDs", feederIDs) + feederIDs = nil + return + } + feederIDs[id] = struct{}{} + } + valid = true + } + + } + // we don't take it as a 'false' case when there's no feederIDs + valid = true + return +} + +func (s *SubscribeResult) FinalPrice() (prices []*FinalPrice, valid bool) { + if fps := s.Result.Events.FinalPrice; len(fps) > 0 { + prices = make([]*FinalPrice, 0, len(fps)) + for _, price := range fps { + parsed := strings.Split(price, "_") + if l := len(parsed); l > 4 { + // nsteth + parsed[2] = strings.Join(parsed[2:l-1], "_") + parsed[3] = parsed[l-1] + parsed = parsed[:4] + } + if len(parsed[2]) == 32 { + // make sure this base64 string is valid + if _, err := base64.StdEncoding.DecodeString(parsed[2]); err != nil { + logger.Error("failed to parse base64 encoded string when parse finalprice.price from SbuscribeResult", "parsed.price", parsed[2]) + return + } + } + tokenID, err := strconv.ParseInt(parsed[0], 10, 64) + if err != nil { + logger.Error("failed to parse finalprice.tokenID from SubscribeResult", "parsed.tokenID", parsed[0]) + prices = nil + return + } + decimal, err := strconv.ParseInt(parsed[3], 10, 32) + if err != nil { + logger.Error("failed to parse finalprice.decimal from SubscribeResult", "parsed.decimal", parsed[3]) + prices = nil + return + } + prices = append(prices, &FinalPrice{ + tokenID: tokenID, + roundID: parsed[1], + price: parsed[2], + // conversion is safe + decimal: int32(decimal), + }) + } + valid = true + } + return +} + +func (s *SubscribeResult) NSTChange() (string, bool) { + if len(s.Result.Events.NativeTokenChange[0]) == 0 { + return "", false + } + return s.Result.Events.NativeTokenChange[0], true +} + +func (s *SubscribeResult) ParamsUpdate() bool { + return len(s.Result.Events.ParamsUpdate) > 0 +} + +func (s *SubscribeResult) Fee() (string, bool) { + if len(s.Result.Events.Fee) == 0 { + return "", false + } + return s.Result.Events.Fee[0], true +} + const ( + // current version of 'Oracle' only support id=1(chainlink) as valid source Chainlink uint64 = 1 denom = "hua" - layout = "2006-01-02 15:04:05" ) -var ( - defaultGasPrice = int64(7) +const ( + ENewBlock EventType = iota + 1 + EUpdatePrice + EUpdateNST +) +var ( logger feedertypes.LoggerInf blockMaxGas uint64 - chainID string - encCfg params.EncodingConfig - txCfg client.TxConfig - privKey cryptotypes.PrivKey - pubKey cryptotypes.PubKey + defaultExoClient *exoClient ) // Init intialize the exoclient with configuration including consensuskey info, chainID -func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) (*grpc.ClientConn, error) { +// func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) (*grpc.ClientConn, func(), error) { +func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) error { if logger = feedertypes.GetLogger("exoclient"); logger == nil { panic("logger is not initialized") } @@ -59,6 +373,7 @@ func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) ( // if mnemonic is not set from flag, then check config file to find if there is mnemonic configured if len(mnemonic) == 0 && len(confSender.Mnemonic) > 0 { + logger.Info("set mnemonic from config", "mnemonic", confSender.Mnemonic) mnemonic = confSender.Mnemonic } @@ -66,49 +381,46 @@ func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) ( // load privatekey from local path file, err := os.Open(path.Join(confSender.Path, privFile)) if err != nil { - logger.Error("failed to load privatekey from local path", "path", privFile, "error", err) - return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to open consensuskey file, %v", err)) + // return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to open consensuskey file, %v", err)) + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to open consensuskey file, path:%s, error:%v", privFile, err)) } defer file.Close() var privKey feedertypes.PrivValidatorKey if err := json.NewDecoder(file).Decode(&privKey); err != nil { - logger.Error("failed to parse consensuskey from json file", "error", err) - return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse consensuskey from json file %v", err)) + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse consensuskey from json file, file path:%s, error:%v", privFile, err)) } + logger.Info("load privatekey from local file", "path", privFile) privBase64 = privKey.PrivKey.Value } else if !bip39.IsMnemonicValid(mnemonic) { - logger.Error("invalid mnemonic from config") - return nil, feedertypes.ErrInitFail.Wrap("invalid mnemonic from config") + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("invalid mnemonic:%s", mnemonic)) } - + var privKey cryptotypes.PrivKey if len(mnemonic) > 0 { privKey = ed25519.GenPrivKeyFromSecret([]byte(mnemonic)) } else { privBytes, err := base64.StdEncoding.DecodeString(privBase64) if err != nil { - logger.Error("failed to parse privateKey form base64", "base64_string", privBase64) - return nil, feedertypes.ErrInitFail.Wrap(err.Error()) + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse privatekey from base64_string:%s, error:%v", privBase64, err)) } //nolint:all privKey = &ed25519.PrivKey{ Key: cryptoed25519.PrivateKey(privBytes), } } - pubKey = privKey.PubKey() - encCfg = encoding.MakeConfig(app.ModuleBasics) - txCfg = encCfg.TxConfig + encCfg := encoding.MakeConfig(app.ModuleBasics) if len(confExocore.ChainID) == 0 { - logger.Error("chainID must be sepecified in config") - return nil, feedertypes.ErrInitFail.Wrap("ChainID must be specified in config") + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("ChainID must be specified in config")) } - chainID = confExocore.ChainID - conn, err := CreateGrpcConn(confExocore.Rpc) - if err != nil { - logger.Error("failed to crete grpc connect when initialize exoclient") - return conn, feedertypes.ErrInitFail.Wrap(err.Error()) + var err error + if defaultExoClient, err = NewExoClient(logger, confExocore.Rpc, confExocore.Ws, privKey, encCfg, confExocore.ChainID); err != nil { + if errors.Is(err, feedertypes.ErrInitConnectionFail) { + return err + } + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to NewExoClient, privKey:%v, chainID:%s, error:%v", privKey, confExocore.ChainID, err)) } - return conn, nil + + return nil } diff --git a/fetcher/beaconchain/beaconchain.go b/fetcher/beaconchain/beaconchain.go index ec9a48e..9f3240d 100644 --- a/fetcher/beaconchain/beaconchain.go +++ b/fetcher/beaconchain/beaconchain.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "math/big" "net/http" "net/url" "sort" @@ -15,7 +14,7 @@ import ( oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/ExocoreNetwork/price-feeder/fetcher/types" - "github.com/ethereum/go-ethereum/common/hexutil" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/imroc/biu" ) @@ -69,117 +68,46 @@ var ( // } // // stakerValidators = map[int]*validatorList{2: {0, validatorsTmp}} - stakerValidators = make(map[int]*validatorList) + // stakerValidators = make(map[int]*validatorList) + defaultStakerValidators = newStakerVList() // latest finalized epoch we've got balances summarized for stakers finalizedEpoch uint64 // latest stakerBalanceChanges, initialized as 0 change (256-0 of 1st parts means that all stakers have 32 efb) - latestChangesBytes = make([]byte, 32) + // latestChangesBytes = make([]byte, 32) + latestChangesBytes = types.NSTETHZeroChanges urlEndpoint *url.URL slotsPerEpoch uint64 ) -func ResetStakerValidators(stakerInfos []*oracletypes.StakerInfo) { - lock.Lock() - for _, sInfo := range stakerInfos { - index := uint64(0) - if l := len(sInfo.BalanceList); l > 0 { - index = sInfo.BalanceList[l-1].Index - } - // convert bytes into number of beaconchain validator index - validators := make([]string, 0, len(sInfo.ValidatorPubkeyList)) - for _, validatorPubkey := range sInfo.ValidatorPubkeyList { - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkeyNum := new(big.Int).SetBytes(validatorPubkeyBytes).String() - validators = append(validators, validatorPubkeyNum) - } - stakerValidators[int(sInfo.StakerIndex)] = &validatorList{ - index: index, - validators: validators, - } - } - lock.Unlock() +func ResetStakerValidators(stakerInfos []*oracletypes.StakerInfo, all bool) error { + return defaultStakerValidators.reset(stakerInfos, all) } - -// UpdateStakerValidators update staker's validators for deposit/withdraw events triggered by exocore -func UpdateStakerValidators(stakerIdx int, validatorPubkey string, deposit bool, index uint64) bool { - lock.Lock() - defer lock.Unlock() - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkey = new(big.Int).SetBytes(validatorPubkeyBytes).String() - // add a new valdiator for the staker +func UpdateStakerValidators(stakerIdx int, validatorIndexHex string, index uint64, deposit bool) bool { if deposit { - if vList, ok := stakerValidators[stakerIdx]; ok { - if vList.index+1 != index { - return false - } - vList.index++ - vList.validators = append(vList.validators, validatorPubkey) - } else { - stakerValidators[stakerIdx] = &validatorList{ - index: index, - validators: []string{validatorPubkey}, - } - } - } else { - // remove the existing validatorIndex for the corresponding staker - if vList, ok := stakerValidators[stakerIdx]; ok { - if vList.index+1 != index { - return false - } - vList.index++ - for idx, v := range vList.validators { - if v == validatorPubkey { - if len(vList.validators) == 1 { - delete(stakerValidators, stakerIdx) - break - } - vList.validators = append(vList.validators[:idx], vList.validators[idx+1:]...) - break - } - } - } + return defaultStakerValidators.addVIdx(stakerIdx, validatorIndexHex, index) } - return true + return defaultStakerValidators.removeVIdx(stakerIdx, validatorIndexHex, index) } -func ResetStakerValidatorsForAll(stakerInfos []*oracletypes.StakerInfo) { - lock.Lock() - stakerValidators = make(map[int]*validatorList) - for _, stakerInfo := range stakerInfos { - validators := make([]string, 0, len(stakerInfo.ValidatorPubkeyList)) - for _, validatorPubkey := range stakerInfo.ValidatorPubkeyList { - validatorPubkeyBytes, _ := hexutil.Decode(validatorPubkey) - validatorPubkeyNum := new(big.Int).SetBytes(validatorPubkeyBytes).String() - validators = append(validators, validatorPubkeyNum) - } - - index := uint64(0) - // TODO: this may not necessary, stakerInfo should have at least one entry in balanceList - if l := len(stakerInfo.BalanceList); l > 0 { - index = stakerInfo.BalanceList[l-1].Index - } - stakerValidators[int(stakerInfo.StakerIndex)] = &validatorList{ - index: index, - validators: validators, - } +func (s *source) fetch(token string) (*types.PriceInfo, error) { + // check epoch, when epoch updated, update effective-balance + if types.NSTToken(token) != types.NativeTokenETH { + return nil, feedertypes.ErrTokenNotSupported.Wrap(fmt.Sprintf("only support native-eth-restaking %s", types.NativeTokenETH)) } - lock.Unlock() -} -func Fetch(token string) (*types.PriceInfo, error) { - // check epoch, when epoch updated, update effective-balance - if token != types.NativeTokenETH { - logger.Error("only support native-eth-restaking", "expect", types.NativeTokenETH, "got", token) - return nil, errTokenNotSupported + stakerValidators := defaultStakerValidators.getStakerValidators() + if len(stakerValidators) == 0 { + // return zero price when there's no stakers + return &types.PriceInfo{}, nil } + // check if finalized epoch had been updated epoch, stateRoot, err := getFinalizedEpoch() if err != nil { - logger.Error("fail to get finalized epoch from beaconchain", "error", err) - return nil, err + return nil, fmt.Errorf("fail to get finalized epoch from beaconchain, error:%w", err) } // epoch not updated, just return without fetching since effective-balance has not changed @@ -191,9 +119,7 @@ func Fetch(token string) (*types.PriceInfo, error) { } stakerChanges := make([][]int, 0, len(stakerValidators)) - - lock.RLock() - logger.Info("fetch efb from beaconchain", "stakerList_length", len(stakerValidators)) + s.logger.Info("fetch efb from beaconchain", "stakerList_length", len(stakerValidators)) hasEFBChanged := false for stakerIdx, vList := range stakerValidators { stakerBalance := 0 @@ -206,8 +132,7 @@ func Fetch(token string) (*types.PriceInfo, error) { l -= 100 validatorBalances, err := getValidators(tmpValidatorPubkeys, stateRoot) if err != nil { - logger.Error("failed to get validators from beaconchain", "error", err) - return nil, err + return nil, fmt.Errorf("failed to get validators from beaconchain, error:%w", err) } for _, validatorBalance := range validatorBalances { stakerBalance += int(validatorBalance[1]) @@ -217,8 +142,7 @@ func Fetch(token string) (*types.PriceInfo, error) { // validatorBalances, err := GetValidators(validatorIdxs, epoch) validatorBalances, err := getValidators(vList.validators[i:], stateRoot) if err != nil { - logger.Error("failed to get validators from beaconchain", "error", err) - return nil, err + return nil, fmt.Errorf("failed to get validators from beaconchain, error:%w", err) } for _, validatorBalance := range validatorBalances { // this should be initialized from exocored @@ -229,19 +153,17 @@ func Fetch(token string) (*types.PriceInfo, error) { delta = maxChange } stakerChanges = append(stakerChanges, []int{stakerIdx, delta}) - logger.Info("fetched efb from beaconchain", "staker_index", stakerIdx, "balance_change", delta) + s.logger.Info("fetched efb from beaconchain", "staker_index", stakerIdx, "balance_change", delta) hasEFBChanged = true } } if !hasEFBChanged && len(stakerValidators) > 0 { - logger.Info("fetch efb from beaconchain, all efbs of validators remains to 32 without any change") + s.logger.Info("fetch efb from beaconchain, all efbs of validators remains to 32 without any change") } sort.Slice(stakerChanges, func(i, j int) bool { return stakerChanges[i][0] < stakerChanges[j][0] }) - lock.RUnlock() - finalizedEpoch = epoch latestChangesBytes = convertBalanceChangeToBytes(stakerChanges) @@ -252,6 +174,11 @@ func Fetch(token string) (*types.PriceInfo, error) { }, nil } +// reload does nothing since beaconchain source only used to update the balance change for nsteth +func (s *source) reload(token, cfgPath string) error { + return nil +} + func convertBalanceChangeToBytes(stakerChanges [][]int) []byte { if len(stakerChanges) == 0 { // lenght equals to 0 means that alls takers have efb of 32 with 0 changes diff --git a/fetcher/beaconchain/types.go b/fetcher/beaconchain/types.go index 9304957..80b3b3f 100644 --- a/fetcher/beaconchain/types.go +++ b/fetcher/beaconchain/types.go @@ -2,8 +2,9 @@ package beaconchain import ( "encoding/json" - "errors" + "fmt" "io" + "math/big" "net/http" "net/url" "os" @@ -11,21 +12,19 @@ import ( "strconv" "strings" + oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/ExocoreNetwork/price-feeder/fetcher/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/cometbft/cometbft/libs/sync" + "github.com/ethereum/go-ethereum/common/hexutil" "gopkg.in/yaml.v2" ) -// type stakerList struct { -// StakerAddrs []string -// } -// -// type validatorList struct { -// index uint64 -// validators []string -// } - +type source struct { + logger feedertypes.LoggerInf + *types.Source +} type config struct { URL string `yaml:"url"` NSTID string `yaml:"nstid"` @@ -37,49 +36,158 @@ type ResultConfig struct { } `json:"data"` } +type stakerVList struct { + locker *sync.RWMutex + sValidators map[int]*validatorList +} + +func newStakerVList() *stakerVList { + return &stakerVList{ + locker: new(sync.RWMutex), + sValidators: make(map[int]*validatorList), + } +} + +func (s *stakerVList) length() int { + if s == nil { + return 0 + } + s.locker.RLock() + l := len(s.sValidators) + s.locker.RUnlock() + return l +} + +func (s *stakerVList) getStakerValidators() map[int]*validatorList { + s.locker.RLock() + defer s.locker.RUnlock() + ret := make(map[int]*validatorList) + for stakerIdx, vList := range s.sValidators { + validators := make([]string, len(vList.validators)) + copy(validators, vList.validators) + ret[stakerIdx] = &validatorList{ + index: vList.index, + validators: validators, + } + } + return ret +} + +func (s *stakerVList) addVIdx(sIdx int, vIdx string, index uint64) bool { + s.locker.Lock() + defer s.locker.Unlock() + if vList, ok := s.sValidators[sIdx]; ok { + if vList.index+1 != index { + return false + } + vList.index++ + vList.validators = append(vList.validators, vIdx) + } else { + if index != 0 { + return false + } + s.sValidators[sIdx] = &validatorList{ + index: 0, + validators: []string{vIdx}, + } + } + return true +} +func (s *stakerVList) removeVIdx(sIdx int, vIdx string, index uint64) bool { + s.locker.RLock() + defer s.locker.RUnlock() + if vList, ok := s.sValidators[sIdx]; ok { + if vList.index+1 != index { + return false + } + vList.index++ + for idx, v := range vList.validators { + if v == vIdx { + if len(vList.validators) == 1 { + delete(s.sValidators, sIdx) + return true + } + vList.validators = append(vList.validators[:idx], vList.validators[idx+1:]...) + return true + } + } + } + return false +} + +func (s *stakerVList) reset(stakerInfos []*oracletypes.StakerInfo, all bool) error { + s.locker.Lock() + if all { + s.sValidators = make(map[int]*validatorList) + } + for _, stakerInfo := range stakerInfos { + validators := make([]string, 0, len(stakerInfo.ValidatorPubkeyList)) + for _, validatorIndexHex := range stakerInfo.ValidatorPubkeyList { + validatorIdx, err := convertHexToIntStr(validatorIndexHex) + if err != nil { + logger.Error("failed to convert validatorIndex from hex string to int", "validator-index-hex", validatorIndexHex) + return fmt.Errorf(fmt.Sprintf("failed to convert validatorIndex from hex string to int, validator-index-hex:%s", validatorIndexHex)) + } + validators = append(validators, validatorIdx) + } + + index := uint64(0) + // TODO: this may not necessary, stakerInfo should have at least one entry in balanceList + if l := len(stakerInfo.BalanceList); l > 0 { + index = stakerInfo.BalanceList[l-1].Index + } + s.sValidators[int(stakerInfo.StakerIndex)] = &validatorList{ + index: index, + validators: validators, + } + + } + s.locker.Unlock() + return nil +} + const ( envConf = "oracle_env_beaconchain.yaml" urlQuerySlotsPerEpoch = "eth/v1/config/spec" + hexPrefix = "0x" ) var ( - logger feedertypes.LoggerInf - lock sync.RWMutex - - // errors - errTokenNotSupported = errors.New("token not supported") + logger feedertypes.LoggerInf + defaultSource *source ) func init() { - types.InitFetchers[types.BeaconChain] = initBeaconchain + types.SourceInitializers[types.BeaconChain] = initBeaconchain } -func initBeaconchain(confPath string) error { - if logger = feedertypes.GetLogger("fetcher_beaconchain"); logger == nil { - panic("logger is not initialized") +func initBeaconchain(cfgPath string, l feedertypes.LoggerInf) (types.SourceInf, error) { + if logger = l; logger == nil { + if logger = feedertypes.GetLogger("fetcher_beaconchain"); logger == nil { + return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized") + } } - cfg, err := parseConfig(confPath) + // init from config file + cfg, err := parseConfig(cfgPath) if err != nil { - logger.Error("fail to parse config", "error", err, "path", confPath) - return feedertypes.ErrInitFail.Wrap(err.Error()) + // logger.Error("fail to parse config", "error", err, "path", cfgPath) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse config, error:%v", err)) } + // beaconchain endpoint url urlEndpoint, err = url.Parse(cfg.URL) if err != nil { - logger.Error("failed to parse beaconchain URL", "url", cfg.URL) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse url:%s, error:%v", cfg.URL, err)) } - // parse nstID by splitting it + // parse nstID by splitting it with "_'" nstID := strings.Split(cfg.NSTID, "_") if len(nstID) != 2 { - logger.Error("invalid nstID format, should be: x_y", "nstID", nstID) - return feedertypes.ErrInitFail.Wrap("invalid nstID format") + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("invalid nstID format, nstID:%s", nstID)) } - // the second element is the lzID of the chain - lzID, err := strconv.ParseUint(strings.TrimPrefix(nstID[1], "0x"), 16, 64) + // the second element is the lzID of the chain, trim possible prefix_0x + lzID, err := strconv.ParseUint(strings.TrimPrefix(nstID[1], hexPrefix), 16, 64) if err != nil { - logger.Error("failed to pase lzID from nstID", "got_nstID", nstID[1], "error", err) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse lzID:%s from nstID, error:%v", nstID[1], err)) } // set slotsPerEpoch @@ -90,29 +198,34 @@ func initBeaconchain(confPath string) error { u := urlEndpoint.JoinPath(urlQuerySlotsPerEpoch) res, err := http.Get(u.String()) if err != nil { - logger.Error("failed to get slotsPerEpoch from endpoint", "error", err, "url", u.String()) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to get slotsPerEpoch from endpoint:%s, error:%v", u.String(), err)) } result, err := io.ReadAll(res.Body) if err != nil { - logger.Error("failed to read response from slotsPerEpoch", "error", err) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to get slotsPerEpoch from endpoint:%s, error:%v", u.String(), err)) } var re ResultConfig if err = json.Unmarshal(result, &re); err != nil { - logger.Error("failed to parse response from slotsPerEpoch", "erro", err) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse response from slotsPerEpoch, error:%v", err)) } if slotsPerEpoch, err = strconv.ParseUint(re.Data.SlotsPerEpoch, 10, 64); err != nil { - logger.Error("failed to parse response_slotsPerEpoch", "got_res.data.slotsPerEpoch", re.Data.SlotsPerEpoch) - return feedertypes.ErrInitFail.Wrap(err.Error()) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse response_slotsPerEoch, got:%s, rror:%v", re.Data.SlotsPerEpoch, err)) } } - types.UpdateNativeAssetID(cfg.NSTID) - types.Fetchers[types.BeaconChain] = Fetch + // init first to get a fixed point for 'fetch' to refer to + defaultSource = &source{} + *defaultSource = source{ + logger: logger, + Source: types.NewSource(logger, types.BeaconChain, defaultSource.fetch, cfgPath, defaultSource.reload), + } - return nil + // initialize native-restaking stakers' beaconchain-validator list + + // update nst assetID to be consistent with exocored. for beaconchain it's about different lzID + types.SetNativeAssetID(fetchertypes.NativeTokenETH, cfg.NSTID) + + return defaultSource, nil } func parseConfig(confPath string) (config, error) { @@ -126,3 +239,12 @@ func parseConfig(confPath string) (config, error) { } return cfg, nil } + +func convertHexToIntStr(hexStr string) (string, error) { + vBytes, err := hexutil.Decode(hexStr) + if err != nil { + return "", err + } + return new(big.Int).SetBytes(vBytes).String(), nil + +} diff --git a/fetcher/chainlink/chainlink.go b/fetcher/chainlink/chainlink.go index 43a90e0..fccbbcf 100644 --- a/fetcher/chainlink/chainlink.go +++ b/fetcher/chainlink/chainlink.go @@ -8,72 +8,34 @@ import ( "strings" "time" - "github.com/ExocoreNetwork/price-feeder/fetcher/types" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) -var ( - clients = make(map[string]*ethclient.Client) - // token-> proxycontract - chainlinkProxy = newProxy() -) - -func Fetch(token string) (*types.PriceInfo, error) { - chainlinkPriceFeedProxy, ok := chainlinkProxy.get(token) +func (s *source) fetch(token string) (*fetchertypes.PriceInfo, error) { + chainlinkPriceFeedProxy, ok := s.chainlinkProxy.get(token) if !ok { - // reload config to add new token - // TODO: this is not concurrent safe, if multiple tokens are fetching conconrrently, the access to chainlinkProxy sould be synced - logger.Error("chainlinkPriceFeedProxy not found, try to reload form chainlink config file and skip this round of fetching price", "token", token) - go func() { - // TODO: limit maximum reloading simultaneously - err := errors.New("start reload") - success := false - var cfg Config - for err != nil { - if cfg, err = parseConfig(configPath); err != nil { - logger.Error("failed to parse config file of source chalink", "error", err, "config_path", configPath) - time.Sleep(10 * time.Second) - continue - } - for tName, address := range cfg.Tokens { - if token == strings.ToLower(tName) { - if err = chainlinkProxy.add(map[string]string{token: address}); err != nil { - logger.Error("failed to add chainlinkPriceFeedProxy, wait for 10 seconds and try again to reload chalink config file", "token", token, "error", err, "config_path", configPath) - time.Sleep(10 * time.Second) - } else { - success = true - logger.Info("scuccessed to add new chainlinkPriceFeedProxy, it will be active for next round fetching price", "token", token) - } - break - } - } - } - if !success { - logger.Error("failed to find info for chainlinkPriceFeedProxy in chainlink config file, please update that file and it will be reloaded on next round", "token", token, "config_path", configPath) - } - }() - return nil, fmt.Errorf("there's no active chainlinkPriceFeedProxy for token:%s", token) + return nil, feedertypes.ErrSourceTokenNotConfigured.Wrap(fmt.Sprintf("chainlinkProxy not configured for token: %s", token)) } roundData, err := chainlinkPriceFeedProxy.LatestRoundData(&bind.CallOpts{}) if err != nil { - logger.Error("failed to get LatestRoundData from chainlink", "token", token, "error", err) - return nil, err + return nil, fmt.Errorf("failed to get LatestRoundData of token:%s from chainlink, error:%w", token, err) } decimals, err := chainlinkPriceFeedProxy.Decimals(&bind.CallOpts{}) if err != nil { - logger.Error("failed to get decimal from chainlinkPriceFeedProxy", "token", token, "error", err) - return nil, err + return nil, fmt.Errorf("failed to get decimals, error:%w", err) } - return &types.PriceInfo{ + return &fetchertypes.PriceInfo{ Price: roundData.Answer.String(), - Decimal: int(decimals), - Timestamp: time.Now().String(), + Decimal: int32(decimals), + Timestamp: time.Now().UTC().Format(feedertypes.TimeLayout), RoundID: roundData.RoundId.String(), }, nil } @@ -101,20 +63,27 @@ func isContractAddress(addr string, client *ethclient.Client) bool { return len(bytecode) > 0 } -// func formatTime(timestamp *big.Int) time.Time { -// timestampInt64 := timestamp.Int64() -// if timestampInt64 == 0 { -// log.Fatalf("timestamp %v cannot be represented as int64", timestamp) -// } -// return time.Unix(timestampInt64, 0) -// } -// -// func divideBigInt(num1 *big.Int, num2 *big.Int) *big.Float { -// if num2.BitLen() == 0 { -// log.Fatal("cannot divide by zero.") -// } -// num1BigFloat := new(big.Float).SetInt(num1) -// num2BigFloat := new(big.Float).SetInt(num2) -// result := new(big.Float).Quo(num1BigFloat, num2BigFloat) -// return result -// } +func (s *source) reload(cfgPath string, token string) error { + cfg, err := parseConfig(cfgPath) + if err != nil { + return fmt.Errorf("failed to parse config file, error:%w", err) + } + // add new network from config file + for network, url := range cfg.URLs { + network = strings.ToLower(network) + if err := s.chainlinkProxy.addClient(network, url); err != nil { + return fmt.Errorf("failed to add ethClient for network:%s with url:%s, error:%w", network, url, err) + } + } + // add proxy for new token matches the required token if found + for tName, tContract := range cfg.Tokens { + tName = strings.ToLower(tName) + if strings.EqualFold(tName, token) { + if err := s.chainlinkProxy.addToken(map[string]string{tName: tContract}); err != nil { + s.logger.Error("failed to add proxy when do reload", "source", s.GetName(), "token", tName, "error", err) + } + return nil + } + } + return errors.New("token not found in reloaded config file") +} diff --git a/fetcher/chainlink/types.go b/fetcher/chainlink/types.go index 80bc174..f7c38cc 100644 --- a/fetcher/chainlink/types.go +++ b/fetcher/chainlink/types.go @@ -1,6 +1,7 @@ package chainlink import ( + "errors" "fmt" "os" "path" @@ -15,6 +16,13 @@ import ( "gopkg.in/yaml.v2" ) +type source struct { + logger feedertypes.LoggerInf + *types.Source + chainlinkProxy *proxy + clients map[string]*ethclient.Client +} + type Config struct { URLs map[string]string `yaml:"urls"` Tokens map[string]string `yaml:"tokens"` @@ -22,85 +30,108 @@ type Config struct { } type proxy struct { - lock sync.RWMutex + locker *sync.RWMutex + clients map[string]*ethclient.Client aggregators map[string]*aggregatorv3.AggregatorV3Interface } -const envConf = "oracle_env_chainlink.yaml" - func newProxy() *proxy { return &proxy{ + locker: new(sync.RWMutex), aggregators: make(map[string]*aggregatorv3.AggregatorV3Interface), + clients: make(map[string]*ethclient.Client), } } -func (p *proxy) add(tokens map[string]string) error { - p.lock.Lock() - defer p.lock.Unlock() +func (p *proxy) addToken(tokens map[string]string) error { + p.locker.Lock() + defer p.locker.Unlock() for token, address := range tokens { addrParsed := strings.Split(strings.TrimSpace(address), "_") - if ok := isContractAddress(addrParsed[0], clients[addrParsed[1]]); !ok { - logger.Error("address tried to be added as chainlink proxy is not a contract address", "address", addrParsed[0], "chain", addrParsed[1]) - return fmt.Errorf("address %s is not a contract address on chain:%s\n", addrParsed[0], addrParsed[1]) + if ok := isContractAddress(addrParsed[0], p.clients[addrParsed[1]]); !ok { + return fmt.Errorf("invalid contract: address=%s chain=%s", addrParsed[0], addrParsed[1]) } var err error - if p.aggregators[strings.ToLower(token)], err = aggregatorv3.NewAggregatorV3Interface(common.HexToAddress(addrParsed[0]), clients[addrParsed[1]]); err != nil { - logger.Error("failed to newAggregator of chainlink", "address", common.HexToAddress(addrParsed[0]), "chain", addrParsed[1], "error", err) - return err + if p.aggregators[strings.ToLower(token)], err = aggregatorv3.NewAggregatorV3Interface(common.HexToAddress(addrParsed[0]), p.clients[addrParsed[1]]); err != nil { + return fmt.Errorf("failed to create aggregator: address=%s chain=%s error=%w", + common.HexToAddress(addrParsed[0]), addrParsed[1], err) } } return nil } +// addClient adds an ethClient, it will skip if the network exists in current clients +// does not need to be guard by lock +func (p *proxy) addClient(network, url string) error { + // p.locker.Lock() + // defer p.locker.Unlock() + var err error + if _, ok := p.clients[network]; !ok { + if len(url) == 0 { + return errors.New("url is empty") + } + p.clients[network], err = ethclient.Dial(url) + } + return err +} + func (p *proxy) get(token string) (*aggregatorv3.AggregatorV3Interface, bool) { - p.lock.RLock() + p.locker.RLock() aggregator, ok := p.aggregators[token] - p.lock.RUnlock() + p.locker.RUnlock() return aggregator, ok } +const envConf = "oracle_env_chainlink.yaml" + var ( - logger feedertypes.LoggerInf - configPath string + defaultSource *source + logger feedertypes.LoggerInf ) func init() { - types.InitFetchers[types.Chainlink] = initChainlink + types.SourceInitializers[types.Chainlink] = initChainlink } -func initChainlink(confPath string) error { - if logger = feedertypes.GetLogger("fetcher_chainlink"); logger == nil { - return feedertypes.ErrInitFail.Wrap("logger is not initialized") +func initChainlink(cfgPath string, l feedertypes.LoggerInf) (types.SourceInf, error) { + if logger = l; logger == nil { + if logger = feedertypes.GetLogger("fetcher_chainlink"); logger == nil { + return nil, feedertypes.ErrInitFail.Wrap("logger is not initialized") + } } - configPath = confPath - cfg, err := parseConfig(configPath) + cfg, err := parseConfig(cfgPath) if err != nil { - logger.Error("failed to parse config file", "path", configPath, "error", err) - return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse config file, path;%s, error:%s", configPath, err)) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to parse config file, path;%s, error:%s", cfgPath, err)) + } + // init defaultSource + defaultSource = &source{ + logger: logger, + clients: make(map[string]*ethclient.Client), + chainlinkProxy: newProxy(), } + // add ethClients for network, url := range cfg.URLs { if len(url) == 0 { - logger.Error("rpcURL is empty. check the oracle_env_chainlink.yaml") - return feedertypes.ErrInitFail.Wrap("rpcURL from config is empty") + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("rpcURL from config is empty, config_file:%s", envConf)) } - clients[network], err = ethclient.Dial(url) + network = strings.ToLower(network) + err = defaultSource.chainlinkProxy.addClient(network, url) if err != nil { - logger.Error("failed to initialize ethClient", "url", url, "error", err) - return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("fail to initialize ethClient, url:%s, erro:%s", url, err)) + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("fail to initialize ethClient, url:%s, error:%s", url, err)) } } - if err = chainlinkProxy.add(cfg.Tokens); err != nil { - logger.Error("failed to add chainlinkPriceFeedProxy", "error", err, "tokens", cfg.Tokens) - return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to add chainlinkPriceFeedProxy, error:%v", err)) + // add proxy for tokens + if err = defaultSource.chainlinkProxy.addToken(cfg.Tokens); err != nil { + return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to add chainlinkPriceFeedProxy for token:%s, error:%v", cfg.Tokens, err)) } - types.Fetchers[types.Chainlink] = Fetch - return nil + defaultSource.Source = types.NewSource(logger, types.Chainlink, defaultSource.fetch, cfgPath, defaultSource.reload) + return defaultSource, nil } -func parseConfig(confPath string) (Config, error) { - yamlFile, err := os.Open(path.Join(confPath, envConf)) +func parseConfig(cfgPath string) (Config, error) { + yamlFile, err := os.Open(path.Join(cfgPath, envConf)) if err != nil { return Config{}, err } diff --git a/fetcher/fetcher.go b/fetcher/fetcher.go index 48076bf..ac5c57a 100644 --- a/fetcher/fetcher.go +++ b/fetcher/fetcher.go @@ -1,391 +1,248 @@ package fetcher import ( - "context" + "errors" "fmt" - "reflect" - "strconv" "strings" "sync" "time" + "unicode" - oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" - "github.com/ExocoreNetwork/price-feeder/fetcher/beaconchain" "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - "go.uber.org/atomic" ) -const defaultInterval = 30 * time.Second - -var ( - sourcesMap sync.Map - tokensMap sync.Map - logger feedertypes.LoggerInf +const ( + loggerTag = "fetcher" + loggerTagPrefix = "fetcher_%s" ) -// Init initializes the fetcher with sources and tokens -func Init(sourcesIn, tokensIn []string, sourcesPath string) (*Fetcher, error) { - if logger = feedertypes.GetLogger("fetcher"); logger == nil { - panic("logger is not initialized") - } - sourceIDs := make([]string, 0) - for _, tName := range tokensIn { - tName = strings.ToLower(tName) - tokensMap.Store(tName, &token{name: tName, active: true}) - } - - for _, sName := range sourcesIn { - s := &source{name: sName, tokens: &sync.Map{}, running: atomic.NewInt32(-1), stopCh: make(chan struct{}), stopResCh: make(chan struct{})} - - // init source's fetcher - reflect.ValueOf(types.InitFetchers[sName]).Call([]reflect.Value{reflect.ValueOf(sourcesPath)}) - s.fetch = reflect.ValueOf(types.Fetchers[sName]).Interface().(types.FType) - for _, tName := range tokensIn { - s.tokens.Store(strings.ToLower(tName), types.NewPriceSyc()) - } - sourcesMap.Store(sName, s) - sourceIDs = append(sourceIDs, sName) - } - - // set up for nativerestaking source - // ethereum-beaconchain-validator. source:beaconchain - for _, sourceAndToken := range types.NativeRestakings { - sName := sourceAndToken[0] - tName := sourceAndToken[1] - - tName = strings.ToLower(tName) - tokensMap.Store(tName, &token{name: tName, active: true}) - - s := &source{name: sName, tokens: &sync.Map{}, running: atomic.NewInt32(-1), stopCh: make(chan struct{}), stopResCh: make(chan struct{})} - - // init source's fetcher - if err := types.InitFetchers[sName](sourcesPath); err != nil { - logger.Error("failed to init fetcher", "soure_name", sName, "error", err) - return nil, feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to init fetcher: %s", sName)) - } - - s.fetch = types.Fetchers[sName] - - s.tokens.Store(tName, types.NewPriceSyc()) - - sourcesMap.Store(sName, s) - sourceIDs = append(sourceIDs, sName) +var ( + logger feedertypes.LoggerInf - } + defaultFetcher *Fetcher +) - return &Fetcher{ - sources: sourceIDs, - interval: defaultInterval, - newSource: make(chan struct { - name string - endpoint string - }), - // nativeTokenValidatorsUpdate: make(chan struct { - // tokenName string - // info string - // success chan bool - // }), - configSource: make(chan struct { - s string - t string - }), - getLatestPriceWithSourceToken: make(chan struct { - p chan *types.PriceInfo - s string - t string - }), - newTokenForSource: make(chan struct { - sourceName string - tokenName string - }), - }, nil +type Fetcher struct { + logger feedertypes.LoggerInf + locker *sync.Mutex + running bool + sources map[string]types.SourceInf + // source->map{token->price} + priceReadList map[string]map[string]*types.PriceSync + addSourceToken chan *addTokenForSourceReq + getLatestPrice chan *getLatestPriceReq + stop chan struct{} } - -// source is the data source to fetch token prices -type source struct { - lock sync.Mutex - running *atomic.Int32 - stopCh chan struct{} - stopResCh chan struct{} - // source name, should be unique - name string - tokens *sync.Map - // endpoint of the source to retreive the price data; eg. https://rpc.ankr.com/eth is used for chainlink's ethereum endpoint - // might vary for different sources - fetch types.FType +type addTokenForSourceReq struct { + source string + token string + result chan bool } -func NewSource() *source { - return &source{ - running: atomic.NewInt32(-1), - } +type getLatestPriceReq struct { + source string + token string + result chan *getLatestPriceRes } - -// set source's status as working -func (s *source) start(interval time.Duration) bool { - s.lock.Lock() - if s.running.Load() == -1 { - s.stopCh = make(chan struct{}) - s.stopResCh = make(chan struct{}) - s.running.Inc() - s.lock.Unlock() - s.Fetch(interval) - return true - } - return false +type getLatestPriceRes struct { + price types.PriceInfo + err error } -// set source 's status as not working -func (s *source) stop() bool { - s.lock.Lock() - defer s.lock.Unlock() - select { - case <-s.stopCh: - fmt.Println("closed already") - return false - default: - close(s.stopCh) - <-s.stopResCh - if !s.running.CompareAndSwap(0, -1) { - panic("running count should be zero when all token fetchers stopped") - } - return true - } +func newGetLatestPriceReq(source, token string) (*getLatestPriceReq, chan *getLatestPriceRes) { + res := make(chan *getLatestPriceRes, 1) + return &getLatestPriceReq{source: source, token: token, result: res}, res } -// AddToken not concurrency safe: stop->AddToken->start(all)(/startOne need lock/select to ensure concurrent safe -func (s *source) AddToken(name string) bool { - priceInfo := types.NewPriceSyc() - _, loaded := s.tokens.LoadOrStore(name, priceInfo) - if loaded { - return false - } - // fetching the new token - if tokenAny, found := tokensMap.Load(name); found && tokenAny.(*token).active { - s.lock.Lock() - s.running.Inc() - s.lock.Unlock() - go func(tName string) { - // TODO: set interval for different sources - tic := time.NewTicker(defaultInterval) - for { - select { - case <-tic.C: - price, err := s.fetch(tName) - prevPrice := priceInfo.GetInfo() - if err == nil && (prevPrice.Price != price.Price || prevPrice.Decimal != price.Decimal) { - priceInfo.UpdateInfo(price) - logger.Info("update token price", "token", tName, "price", price) - } - case <-s.stopCh: - if zero := s.running.Dec(); zero == 0 { - close(s.stopResCh) - } - return - } - } - }(name) +func NewFetcher(logger feedertypes.LoggerInf, sources map[string]types.SourceInf) *Fetcher { + return &Fetcher{ + logger: logger, + locker: new(sync.Mutex), + sources: sources, + priceReadList: make(map[string]map[string]*types.PriceSync), + addSourceToken: make(chan *addTokenForSourceReq, 5), + // getLatestPrice: make(chan *getLatestPriceReq), + getLatestPrice: make(chan *getLatestPriceReq, 5), + stop: make(chan struct{}), } - return true } -// Fetch token price from source -func (s *source) Fetch(interval time.Duration) { - s.tokens.Range(func(key, value any) bool { - tName := key.(string) - priceInfo := value.(*types.PriceSync) - if tokenAny, found := tokensMap.Load(tName); found && tokenAny.(*token).active { - s.lock.Lock() - s.running.Inc() - s.lock.Unlock() - go func(tName string) { - tic := time.NewTicker(interval) - for { - select { - case <-tic.C: - price, err := s.fetch(tName) - prevPrice := priceInfo.GetInfo() - if err == nil && (prevPrice.Price != price.Price || prevPrice.Decimal != price.Decimal) { - priceInfo.UpdateInfo(price) - logger.Info("update token price", "token", tName, "price", price) - } - case <-s.stopCh: - if zero := s.running.Dec(); zero == 0 { - close(s.stopResCh) - } - return - } - } - }(tName) - } - return true - }) -} - -type token struct { - name string // chain_token_address, _address is optional - // indicates if this token is still alive for price reporting - active bool - // endpoint of the token; eg. 0x5f4eC3Df9cbd43714FE2740f5E3616155c5b8419 is used for chainlink to identify specific token to fetch its price - // format might vary for different source to use; eg. for chainlink, this is used to tell the token's address on ethereum(when we use ethereume's contract) - //endpoint string +// AddTokenForSource adds token for existing source +// blocked waiting for the result to return +func (f *Fetcher) AddTokenForSource(source, token string) bool { + res := make(chan bool, 1) + f.addSourceToken <- &addTokenForSourceReq{ + source: source, + token: token, + result: res, + } + return <-res } -// Fetcher serves as the unique entry point to fetch token prices as background routine continuously -type Fetcher struct { - running atomic.Bool - // source ids frmo the sourceSet - sources []string - // set the interval of fetching price, this value is the same for all source/tokens - interval time.Duration - // add new source - newSource chan struct { - name string - endpoint string - } - newTokenForSource chan struct { - sourceName string - tokenName string - } - // withdraw/deposit_stakerIdx_validatorIndex - // nativeTokenValidatorsUpdate chan struct { - // tokenName string - // info string - // success chan bool - // } - // config source's token - configSource chan struct { - s string - t string - } - getLatestPriceWithSourceToken chan struct { - p chan *types.PriceInfo - s string - t string +// TODO:: +func (f *Fetcher) AddTokenForSourceUnBlocked(source, token string) { + res := make(chan bool) + f.addSourceToken <- &addTokenForSourceReq{ + source: source, + token: token, + result: res, } } -func (f *Fetcher) AddTokenForSource(sourceName string, tokenName string) { - f.newTokenForSource <- struct { - sourceName string - tokenName string - }{sourceName, tokenName} +// GetLatestPrice return the queried price for the token from specified source +// blocked waiting for the result to return +func (f *Fetcher) GetLatestPrice(source, token string) (types.PriceInfo, error) { + req, res := newGetLatestPriceReq(source, token) + f.getLatestPrice <- req + result := <-res + return result.price, result.err } -// StartAll runs the background routine to fetch prices -func (f *Fetcher) StartAll() context.CancelFunc { - if !f.running.CompareAndSwap(false, true) { - return nil +func (f *Fetcher) Start() error { + f.locker.Lock() + if f.running { + f.locker.Unlock() + return errors.New("failed to start fetcher which is already running") } - ctx, cancel := context.WithCancel(context.Background()) - for _, sName := range f.sources { - if sourceAny, ok := sourcesMap.Load(sName); ok { - // start fetcheing data from 'source', setup as a background routine - sourceAny.(*source).start(f.interval) - } + if len(f.sources) == 0 { + f.locker.Unlock() + return errors.New("failed to start fetcher with no sources set") + } + priceList := make(map[string]map[string]*types.PriceSync) + for sName, source := range f.sources { + f.logger.Info("start source", "source", sName) + prices := source.Start() + priceList[sName] = prices } + f.priceReadList = priceList + f.running = true + f.locker.Unlock() - // monitor routine for: 1. add new sources, 2. config tokens for existing source, 3. stop all running fetchers go func() { + const timeout = 5 * time.Second for { select { - case <-ctx.Done(): - // close all running sources - for _, sName := range f.sources { - if sourceAny, ok := sourcesMap.Load(sName); ok { - // safe the do multiple times/ on stopped sources, this process is synced(blocked until one source is stopped completely) - sourceAny.(*source).stop() - } - } - return - - // TODO: add a new source and start fetching its data - case <-f.newSource: - case t := <-f.newTokenForSource: - for _, sName := range f.sources { - if sName == t.sourceName { - // TODO: it's ok to add multiple times for the same token - tokensMap.Store(t.tokenName, &token{name: t.tokenName, active: true}) - if s, ok := sourcesMap.Load(t.sourceName); ok { - // add token for source, so this source is running already - s.(*source).AddToken(t.tokenName) + case req := <-f.addSourceToken: + timer := time.NewTimer(timeout) + select { + case <-timer.C: + req.result <- false + f.logger.Error("timeout while adding token", "source", req.source, "token", req.token) + default: + // it's safe to add one token multiple times + if source, ok := f.sources[req.source]; ok { + if res := source.AddTokenAndStart(req.token); res.Error() != nil { + // TODO: clean logs + f.logger.Error("failed to AddTokenAndStart", "source", source.GetName(), "token", req.token, "error", res.Error()) + req.result <- false + } else { + f.priceReadList[req.source][req.token] = res.Price() + req.result <- true } - break + } else { + // we don't support adding source dynamically + f.logger.Error("failed to add token for a nonexistent soruce", "source", req.source, "token", req.token) + req.result <- false } } - // add tokens for a existing source - case <-f.configSource: - // TODO: we currently don't handle the request like 'remove token', if we do that support, we should take care of the process in reading routine - } - } - }() - - // read routine, loop to serve for price quering - go func() { - // read cache, in this way, we don't need to lock every time for potential conflict with tokens update in source(like add one new token), only when we fail to found corresopnding token in this readList - // TODO: we currently don't have process for 'remove-token' from source, so the price will just not be updated, and we don't clear the priceInfo(it's no big deal since only the latest values are kept, and for reader, they will either ont quering this value any more or find out the timestamp not updated like forever) - readList := make(map[string]map[string]*types.PriceSync) - for ps := range f.getLatestPriceWithSourceToken { - s := readList[ps.s] - if s == nil { - if _, found := sourcesMap.Load(ps.s); found { - readList[ps.s] = make(map[string]*types.PriceSync) - s = readList[ps.s] - } else { - fmt.Println("source not exists") - ps.p <- nil - continue - } - } - - tPrice := s[ps.t] - if tPrice == nil { - if sourceAny, found := sourcesMap.Load(ps.s); found { - if p, found := sourceAny.(*source).tokens.Load(ps.t); found { - tPrice = p.(*types.PriceSync) - s[ps.t] = tPrice + case req := <-f.getLatestPrice: + timer := time.NewTimer(timeout) + select { + case <-timer.C: + req.result <- &getLatestPriceRes{ + price: types.PriceInfo{}, + err: fmt.Errorf("timeout while getting price for token %s from source %s", req.token, req.source), + } + default: + if s := f.priceReadList[req.source]; s == nil { + req.result <- &getLatestPriceRes{ + price: types.PriceInfo{}, + err: fmt.Errorf("failed to get price of token:%s from a nonexistent source:%s", req.token, req.source), + } + } else if price := s[req.token]; price == nil { + req.result <- &getLatestPriceRes{ + price: types.PriceInfo{}, + err: feedertypes.ErrSourceTokenNotConfigured.Wrap(fmt.Sprintf("failed to get price of token:%s from a nonexistent token from an existing source:%s", req.token, req.source)), + } } else { - if len(s) == 0 { - fmt.Println("source has no valid token being read, remove this source for reading", ps.s, ps.t) - delete(readList, ps.s) + req.result <- &getLatestPriceRes{ + price: price.Get(), + err: nil, } - continue } - } else { - fmt.Println("source not exists any more, remove this source for reading") - delete(readList, ps.s) - continue } + case <-f.stop: + f.locker.Lock() + for _, source := range f.sources { + source.Stop() + } + f.running = false + f.locker.Unlock() + return } - pRes := tPrice.GetInfo() - ps.p <- &pRes } }() - return cancel + return nil } -// GetLatestPriceFromSourceToken gets the latest price of a token from a source -func (f *Fetcher) GetLatestPriceFromSourceToken(source, token string, c chan *types.PriceInfo) { - f.getLatestPriceWithSourceToken <- struct { - p chan *types.PriceInfo - s string - t string - }{c, source, token} +func (f Fetcher) Stop() { + f.locker.Lock() + select { + case _, ok := <-f.stop: + if ok { + close(f.stop) + } + default: + close(f.stop) + } + f.locker.Unlock() } -// UpdateNativeTokenValidators updates validator list for stakers of native-restaking-token(client-chain) -func (f *Fetcher) UpdateNativeTokenValidators(tokenName, updateInfo string) bool { - parsedInfo := strings.Split(updateInfo, "_") - if len(parsedInfo) != 4 { - return false +// Init initializes the fetcher with sources and tokens +func Init(tokenSources []feedertypes.TokenSources, sourcesPath string) error { + if logger = feedertypes.GetLogger(loggerTag); logger == nil { + panic("logger is not initialized") + } + + sources := make(map[string]types.SourceInf) + sourceTokens := make(map[string][]string) + for _, ts := range tokenSources { + sNames := strings.Split(strings.Map(func(r rune) rune { + if unicode.IsSpace(r) { + return -1 + } + return r + }, ts.Sources), ",") + + var err error + // add sources with names + for _, sName := range sNames { + source := sources[sName] + // new a source if not exists + if source == nil { + l := feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, sName)) + source, err = types.SourceInitializers[sName](sourcesPath, l) + if err != nil { + return feedertypes.ErrInitFail.Wrap(fmt.Sprintf("failed to init source:%s, sources_config_path:%s, error:%v", sName, sourcesPath, err)) + } + sources[sName] = source + } + sourceTokens[sName] = append(sourceTokens[sName], ts.Token) + } + } + // setup tokens for sources + for sName, tokens := range sourceTokens { + sources[sName].InitTokens(tokens) } - stakerIdx, _ := strconv.ParseInt(parsedInfo[1], 10, 64) - validatorPubkey := parsedInfo[2] - validatorsSize, _ := strconv.ParseUint(parsedInfo[3], 10, 64) - return beaconchain.UpdateStakerValidators(int(stakerIdx), validatorPubkey, parsedInfo[0] == "deposit", validatorsSize) + + defaultFetcher = NewFetcher(logger, sources) + return nil } -func (f *Fetcher) ResetStakerValidatorsForAll(tokenName string, stakerInfos []*oracletypes.StakerInfo) { - beaconchain.ResetStakerValidatorsForAll(stakerInfos) +func GetFetcher() (*Fetcher, bool) { + if defaultFetcher == nil { + return nil, false + } + return defaultFetcher, true } diff --git a/fetcher/interface.go b/fetcher/interface.go deleted file mode 100644 index 38ff10f..0000000 --- a/fetcher/interface.go +++ /dev/null @@ -1,38 +0,0 @@ -package fetcher - -type F interface { - // start fetching all tokens' prices - StartAll() - // stop fetching all tokens' prices - StopAll() - // start fetching a specific token's price - Start() - // stop fetching a specific token's price - Stop() - // add a token into token set for price fetching - AddToken() - // remove a token from the token set - RemoveToken() - // TODO: add a specific source, could be user defined as a rpc server, this fetcher worked as a client to request price from that server // force add will replace existed living source if any - // RegisterSource() - // TODO: remove a specific source - // RemoveSource() - // config fetching interval(currently support the same interval for all tokens) - SetInterval() - // config source with supported tokens - ConfigSource(source string, tokens ...string) - - // GetAllprices retreive all alive token's prices from fetcher - GetAllLatestPrices() - // - GetLatestPricesAllSources(token string) - // - GetLatestPrice(srouce, token string) - // TODO: support history price ? not neccessary -} - -/* -this feeder tool only serves as offical usage -AddToken is called when params udpated some tokens -AddSource is a cmd command -*/ diff --git a/fetcher/types/types.go b/fetcher/types/types.go index 872973f..54944d2 100644 --- a/fetcher/types/types.go +++ b/fetcher/types/types.go @@ -1,77 +1,492 @@ package types import ( + "crypto/sha256" + "encoding/base64" + "errors" + "fmt" + "strings" "sync" + "sync/atomic" + "time" feedertypes "github.com/ExocoreNetwork/price-feeder/types" ) -const ( - Chainlink = "chainlink" - BeaconChain = "beaconchain" +type SourceInf interface { + // InitTokens used for initialization, it should only be called before 'Start' + // when the source is not 'running', this method vill overwrite the source's 'tokens' list + InitTokens(tokens []string) bool - NativeTokenETH = "nsteth" + // Start starts fetching prices of all tokens configured in the source. token->price + Start() map[string]*PriceSync - DefaultSlotsPerEpoch = uint64(32) -) + // AddTokenAndStart adds a new token to fetch price for a running source + AddTokenAndStart(token string) *addTokenRes -var ( - ChainToSlotsPerEpoch = map[uint64]uint64{ - 101: DefaultSlotsPerEpoch, - 40161: DefaultSlotsPerEpoch, - 40217: DefaultSlotsPerEpoch, - } - NativeTokenETHAssetID = "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee_0x65" - NativeRestakings = map[string][]string{ - "eth": {BeaconChain, NativeTokenETH}, - } + // GetName returns name of the source + GetName() string + // Status returns the status of all tokens configured in the source: running or not + Status() map[string]*tokenStatus - AssetIDMap = map[string]string{ - NativeTokenETH: NativeTokenETHAssetID, - } + // ReloadConfig reload the config file for the source + // ReloadConfigForToken(string) error - Logger feedertypes.LoggerInf -) + // Stop closes all running routine generated from the source + Stop() + + // TODO: add some interfaces to achieve more fine-grained management + // StopToken(token string) + // ReloadConfigAll() + // RestarAllToken() +} -type FType func(string) (*PriceInfo, error) +type SourceInitFunc func(cfgPath string, logger feedertypes.LoggerInf) (SourceInf, error) -// var Fetchers = make(map[string]func(string) (*PriceInfo, error)) -var Fetchers = make(map[string]FType) +type SourceFetchFunc func(token string) (*PriceInfo, error) -// TODO Init fetchers -var InitFetchers = make(map[string]func(string) error) +// SourceReloadConfigFunc reload source config file +// reload meant to be called during source running, the concurrency should be handled well +type SourceReloadConfigFunc func(config, token string) error + +type NativeRestakingInfo struct { + Chain string + TokenID string +} type PriceInfo struct { Price string - Decimal int + Decimal int32 Timestamp string RoundID string } type PriceSync struct { - Lock sync.RWMutex - Info *PriceInfo + lock *sync.RWMutex + info *PriceInfo +} + +type tokenInfo struct { + name string + price *PriceSync + active *atomic.Bool +} +type tokenStatus struct { + name string + price PriceInfo + active bool +} + +type addTokenReq struct { + tokenName string + result chan *addTokenRes } -func (ps *PriceSync) UpdateInfo(info *PriceInfo) { - ps.Lock.Lock() - *ps.Info = *info - ps.Lock.Unlock() +type addTokenRes struct { + price *PriceSync + err error } -func (ps *PriceSync) GetInfo() PriceInfo { - ps.Lock.RLock() - defer ps.Lock.RUnlock() - return *ps.Info +// IsZero is used to check if a PriceInfo has not been assigned, similar to a nil value in a pointer variable +func (p PriceInfo) IsZero() bool { + return len(p.Price) == 0 +} + +// Equal compare two PriceInfo ignoring the timestamp, roundID fields +func (p PriceInfo) EqualPrice(price PriceInfo) bool { + if p.Price == price.Price && + p.Decimal == price.Decimal { + return true + } + return false +} +func (p PriceInfo) EqualToBase64Price(price PriceInfo) bool { + if len(p.Price) < 32 { + return false + } + h := sha256.New() + h.Write([]byte(p.Price)) + p.Price = base64.StdEncoding.EncodeToString(h.Sum(nil)) + + if p.Price == price.Price && + p.Decimal == price.Decimal { + return true + } + return false } -func NewPriceSyc() *PriceSync { +func NewPriceSync() *PriceSync { return &PriceSync{ - Info: &PriceInfo{}, + lock: new(sync.RWMutex), + info: &PriceInfo{}, + } +} + +func (p *PriceSync) Get() PriceInfo { + p.lock.RLock() + price := *p.info + p.lock.RUnlock() + return price +} + +func (p *PriceSync) Update(price PriceInfo) (updated bool) { + p.lock.Lock() + if !price.EqualPrice(*p.info) { + *p.info = price + updated = true + } + p.lock.Unlock() + return +} + +func (p *PriceSync) Set(price PriceInfo) { + p.lock.Lock() + *p.info = price + p.lock.Unlock() +} + +func NewTokenInfo(name string, price *PriceSync) *tokenInfo { + return &tokenInfo{ + name: name, + price: price, + active: new(atomic.Bool), + } +} + +// GetPriceSync returns the price structure which has lock to make sure concurrency safe +func (t *tokenInfo) GetPriceSync() *PriceSync { + return t.price +} + +// GetPrice returns the price info +func (t *tokenInfo) GetPrice() PriceInfo { + return t.price.Get() +} + +// GetActive returns the active status +func (t *tokenInfo) GetActive() bool { + return t.active.Load() +} + +// SetActive set the active status +func (t *tokenInfo) SetActive(v bool) { + t.active.Store(v) +} + +func newAddTokenReq(tokenName string) (*addTokenReq, chan *addTokenRes) { + resCh := make(chan *addTokenRes, 1) + req := &addTokenReq{ + tokenName: tokenName, + result: resCh, + } + return req, resCh +} + +func (r *addTokenRes) Error() error { + return r.err +} +func (r *addTokenRes) Price() *PriceSync { + return r.price +} + +var _ SourceInf = &Source{} + +// Source is a common implementation of SourceInf +type Source struct { + logger feedertypes.LoggerInf + cfgPath string + running bool + priceList map[string]*PriceSync + name string + locker *sync.Mutex + stop chan struct{} + // 'fetch' interacts directly with data source + fetch SourceFetchFunc + reload SourceReloadConfigFunc + tokens map[string]*tokenInfo + activeTokenCount *atomic.Int32 + interval time.Duration + addToken chan *addTokenReq + // pendingTokensCount *atomic.Int32 + // pendingTokensLimit int32 + // used to trigger reloading source config + tokenNotConfigured chan string +} + +// NewSource returns a implementaion of sourceInf +// for sources they could utilitize this function to provide that 'SourceInf' by taking care of only the 'fetch' function +func NewSource(logger feedertypes.LoggerInf, name string, fetch SourceFetchFunc, cfgPath string, reload SourceReloadConfigFunc) *Source { + return &Source{ + logger: logger, + cfgPath: cfgPath, + name: name, + locker: new(sync.Mutex), + stop: make(chan struct{}), + tokens: make(map[string]*tokenInfo), + activeTokenCount: new(atomic.Int32), + priceList: make(map[string]*PriceSync), + interval: defaultInterval, + addToken: make(chan *addTokenReq, defaultPendingTokensLimit), + tokenNotConfigured: make(chan string, 1), + // pendingTokensCount: new(atomic.Int32), + // pendingTokensLimit: defaultPendingTokensLimit, + fetch: fetch, + reload: reload, + } +} + +// InitTokenNames adds the token names in the source's token list +// NOTE: call before start +func (s *Source) InitTokens(tokens []string) bool { + s.locker.Lock() + defer s.locker.Unlock() + if s.running { + s.logger.Info("failed to add a token to the running source") + return false + } + // reset all tokens + s.tokens = make(map[string]*tokenInfo) + for _, token := range tokens { + // we standardize all token names to lowercase + token = strings.ToLower(token) + s.tokens[token] = NewTokenInfo(token, NewPriceSync()) + } + return true +} + +// Start starts background routines to fetch all registered token for the source frequently +// and watch for 1. add token, 2.stop events +// TODO(leon): return error and existing map when running already +func (s *Source) Start() map[string]*PriceSync { + s.locker.Lock() + if s.running { + s.logger.Error("failed to start the source which is already running", "source", s.name) + s.locker.Unlock() + return nil + } + if len(s.tokens) == 0 { + s.logger.Error("failed to start the source which has no tokens set", "source", s.name) + s.locker.Unlock() + return nil + } + s.running = true + s.locker.Unlock() + ret := make(map[string]*PriceSync) + for tName, token := range s.tokens { + ret[tName] = token.GetPriceSync() + s.logger.Info("start fetching prices", "source", s.name, "token", token, "tokenName", token.name) + s.startFetchToken(token) + } + // main routine of source, listen to: + // addToken to add a new token for the source and start fetching that token's price + // tokenNotConfigured to reload the source's config file for required token + // stop closes the source routines and set runnign status to false + go func() { + for { + select { + case req := <-s.addToken: + price := NewPriceSync() + // check token existence and then add to token list & start if not exists + if token, ok := s.tokens[req.tokenName]; !ok { + token = NewTokenInfo(req.tokenName, price) + // s.tokens[req.tokenName] = NewTokenInfo(req.tokenName, price) + s.tokens[req.tokenName] = token + s.logger.Info("add a new token and start fetching price", "source", s.name, "token", req.tokenName) + s.startFetchToken(token) + } else { + s.logger.Info("didn't add duplicated token, return existing priceSync", "source", s.name, "token", req.tokenName) + price = token.GetPriceSync() + } + req.result <- &addTokenRes{ + price: price, + err: nil, + } + case tName := <-s.tokenNotConfigured: + if err := s.reloadConfigForToken(tName); err != nil { + s.logger.Error("failed to reload config for adding token", "source", s.name, "token", tName) + } + case <-s.stop: + s.logger.Info("exit listening rountine for addToken", "source", s.name) + // waiting for all token routines to exist + for s.activeTokenCount.Load() > 0 { + time.Sleep(1 * time.Second) + } + s.locker.Lock() + s.running = false + s.locker.Unlock() + return + } + } + }() + return ret +} + +// AddTokenAndStart adds token into a running source and start fetching that token +// return (nil, false) and skip adding this token when previously adding request is not handled +// if the token is already exist, it will that correspondin *priceSync +func (s *Source) AddTokenAndStart(token string) *addTokenRes { + s.locker.Lock() + defer s.locker.Unlock() + if !s.running { + return &addTokenRes{ + price: nil, + err: fmt.Errorf("didn't add token due to source:%s not running", s.name), + } + } + // we don't block the process when the channel is not available + // caller should handle the returned bool value properly + addReq, addResCh := newAddTokenReq(token) + select { + case s.addToken <- addReq: + return <-addResCh + default: + } + // TODO(leon): define an res-skipErr variable + return &addTokenRes{ + price: nil, + err: fmt.Errorf("didn't add token, too many pendings, limit:%d", defaultPendingTokensLimit), + } +} + +func (s *Source) Stop() { + s.logger.Info("stop source and close all running routines", "source", s.name) + s.locker.Lock() + // make it safe when closed more than one time + select { + case _, ok := <-s.stop: + if ok { + close(s.stop) + } + default: + close(s.stop) + } + s.locker.Unlock() +} + +func (s *Source) startFetchToken(token *tokenInfo) { + s.activeTokenCount.Add(1) + token.SetActive(true) + go func() { + defer func() { + token.SetActive(false) + s.activeTokenCount.Add(-1) + }() + tic := time.NewTicker(s.interval) + for { + select { + case <-s.stop: + s.logger.Info("exist fetching routine", "source", s.name, "token", token) + return + case <-tic.C: + if price, err := s.fetch(token.name); err != nil { + if errors.Is(err, feedertypes.ErrSourceTokenNotConfigured) { + s.logger.Info("token not config for source", "token", token.name) + s.tokenNotConfigured <- token.name + } else { + s.logger.Error("failed to fetch price", "source", s.name, "token", token.name, "error", err) + // TODO(leon): exist this routine after maximum fails ? + // s.tokens[token.name].active = false + // return + } + } else { + // update price + updated := token.price.Update(*price) + if updated { + s.logger.Info("updated price", "source", s.name, "token", token.name, "price", *price) + } + } + } + } + }() +} + +func (s *Source) reloadConfigForToken(token string) error { + if err := s.reload(s.cfgPath, token); err != nil { + return fmt.Errorf("failed to reload config file to from path:%s when adding token", s.cfgPath) + } + return nil +} + +func (s *Source) GetName() string { + return s.name +} + +func (s *Source) Status() map[string]*tokenStatus { + s.locker.Lock() + ret := make(map[string]*tokenStatus) + for tName, token := range s.tokens { + ret[tName] = &tokenStatus{ + name: tName, + price: token.price.Get(), + active: token.GetActive(), + } } + s.locker.Unlock() + return ret } -func UpdateNativeAssetID(nstID string) { - NativeTokenETHAssetID = nstID - AssetIDMap[NativeTokenETH] = NativeTokenETHAssetID +type NSTToken string + +// type NSTID string +// +// func (n NSTID) String() string { +// return string(n) +// } + +const ( + defaultPendingTokensLimit = 5 + // defaultInterval = 30 * time.Second + defaultInterval = 3 * time.Second + Chainlink = "chainlink" + BeaconChain = "beaconchain" + Solana = "solana" + + NativeTokenETH NSTToken = "nsteth" + NativeTokenSOL NSTToken = "nstsol" + + DefaultSlotsPerEpoch = uint64(32) +) + +var ( + NSTETHZeroChanges = make([]byte, 32) + // source -> initializers of source + SourceInitializers = make(map[string]SourceInitFunc) + ChainToSlotsPerEpoch = map[uint64]uint64{ + 101: DefaultSlotsPerEpoch, + 40161: DefaultSlotsPerEpoch, + 40217: DefaultSlotsPerEpoch, + } + + NSTTokens = map[NSTToken]struct{}{ + NativeTokenETH: struct{}{}, + NativeTokenSOL: struct{}{}, + } + NSTAssetIDMap = make(map[NSTToken]string) + NSTSourceMap = map[NSTToken]string{ + NativeTokenETH: BeaconChain, + NativeTokenSOL: Solana, + } + + Logger feedertypes.LoggerInf +) + +func SetNativeAssetID(nstToken NSTToken, assetID string) { + NSTAssetIDMap[nstToken] = assetID +} + +// GetNSTSource returns source name as string +func GetNSTSource(nstToken NSTToken) string { + return NSTSourceMap[nstToken] +} + +// GetNSTAssetID returns nst assetID as string +func GetNSTAssetID(nstToken NSTToken) string { + return NSTAssetIDMap[nstToken] +} + +func IsNSTToken(tokenName string) bool { + if _, ok := NSTTokens[NSTToken(tokenName)]; ok { + return true + } + return false } diff --git a/types/config_test.go b/types/config_test.go new file mode 100644 index 0000000..63093a1 --- /dev/null +++ b/types/config_test.go @@ -0,0 +1,19 @@ +package types + +import ( + "fmt" + "strings" + "testing" + "unicode" +) + +func TestConfig(t *testing.T) { + conf := InitConfig("./config-bak.yaml") + tmp := strings.Map(func(r rune) rune { + if unicode.IsSpace(r) { + return -1 + } + return r + }, conf.Tokens[2].Sources) + fmt.Println(strings.Split(tmp, ",")) +} diff --git a/types/types.go b/types/types.go index 41ee3c5..5b8548c 100644 --- a/types/types.go +++ b/types/types.go @@ -10,6 +10,21 @@ import ( "go.uber.org/zap/zapcore" ) +// TODO: define the interface of fetchertypes.PriceInfo, for fetcher,exocclient to referenced +// PriceInfoInf defines the core structure which has the value/data that is fetched from out-exocore-chain source +// and submit to exocore-chain +// type PriceInfoInf interface { +// SetPrice() +// SetDecimal() +// SetRoundID() +// SetTimeStamp() +// +// GetPrice() +// GetDecimal() +// GetRoundID() +// GetTimeStamp() +// } + type PrivValidatorKey struct { Address string `json:"address"` PrivKey struct { @@ -17,8 +32,29 @@ type PrivValidatorKey struct { } `json:"priv_key"` } +type TokenSources struct { + Token string `mapstructure:"token"` + Sources string `mapstructure:"sources"` +} + +type Config struct { + Tokens []TokenSources `mapstructure:"tokens"` + Sender struct { + Mnemonic string `mapstructure:"mnemonic"` + Path string `mapstructure:"path"` + } `mapstructure:"sender"` + Exocore struct { + ChainID string `mapstructure:"chainid"` + AppName string `mapstructure:"appname"` + Rpc string `mapstructure:"rpc"` + Ws string `mapstructure:"ws"` + } `mapstructure:"exocore"` +} + type LoggerInf log.Logger +const TimeLayout = "2006-01-02 15:04:05" + var logger log.Logger = NewLogger(zapcore.InfoLevel) type LoggerWrapper struct { @@ -46,6 +82,7 @@ func NewLogger(level zapcore.Level) *LoggerWrapper { config.EncoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05") config.Encoding = "console" config.Level = zap.NewAtomicLevelAt(level) + config.EncoderConfig.StacktraceKey = "" logger, _ := config.Build() return &LoggerWrapper{ logger.Sugar(), @@ -111,30 +148,15 @@ func (e *Err) Unwrap() error { return e.parent } -type Config struct { - Sources []string `mapstructure:"sources"` - Tokens []string `mapstructure:"tokens"` - Sender struct { - Mnemonic string `mapstructure:"mnemonic"` - Path string `mapstructure:"path"` - } `mapstructure:"sender"` - Exocore struct { - ChainID string `mapstructure:"chainid"` - AppName string `mapstructure:"appname"` - Rpc string `mapstructure:"rpc"` - Ws struct { - Addr string `mapstructure:"addr"` - Endpoint string `mapstructure:"endpoint"` - } `mapstructure:"ws"` - } `mapstructure:"exocore"` -} - var ( ConfigFile string SourcesConfigPath string v *viper.Viper - ErrInitFail = NewErr("Failed to initialization") + ErrInitFail = NewErr("failed to initialization") + ErrInitConnectionFail = NewErr("failed to establish a connection") + ErrSourceTokenNotConfigured = NewErr("token not configured") + ErrTokenNotSupported = NewErr("token not supported") ) // InitConfig will only read path cfgFile once, and for reload after InitConfig, should use ReloadConfig