From 5aac5f8bd0b3594f290acc6bf38b5f9c3c3bd9fa Mon Sep 17 00:00:00 2001 From: leonz789 Date: Fri, 27 Dec 2024 14:09:28 +0800 Subject: [PATCH 01/10] set one buffer for updateparams channel --- cmd/types.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/types.go b/cmd/types.go index 494af3b..ea09abd 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -254,6 +254,9 @@ func (f *feeder) trigger(commitHeight, priceHeight int64) { } func (f *feeder) updateFeederParams(p *oracletypes.Params) { + if p == nil || len(p.TokenFeeders) == 0 { + return + } // TODO: update feeder's params } @@ -302,8 +305,9 @@ func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { // don't block on height increasing trigger: make(chan *triggerReq, 1), updatePrice: make(chan *updatePricesReq, 1), - // block on update-params delivery, no buffer - updateParams: make(chan *oracletypes.Params), + // it's safe to have a buffer to not block running feeders, + // since for running feeders, only endBlock is possible to be modified + updateParams: make(chan *oracletypes.Params, 1), } } @@ -411,8 +415,7 @@ func (fs *Feeders) UpdatePrice(txHeight int64, prices []*finalPrice) { } // UpdateOracleParams updates all feeders' params from oracle params -// blocking until all feeders update their params -// when this method returned, it's guaranteed no further actions received wll be executed unless that all feeders have updated their params +// if the receiving channel is full, blocking until all updateParams are received by the channel func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { fs.updateParams <- p } From 051df11ac38d59e45518d69a270bc8478152d9f1 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Fri, 27 Dec 2024 15:44:00 +0800 Subject: [PATCH 02/10] udpate feeder and setup new feeder when updateparams event received, refactor feeders methods --- cmd/feeder_tool.go | 14 ++- cmd/types.go | 220 ++++++++++++++++++++++----------------------- 2 files changed, 113 insertions(+), 121 deletions(-) diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index 512fdaa..8e06aa7 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -28,12 +28,11 @@ var DefaultRetryConfig = RetryConfig{ } const ( - statusOk = 0 - privFile = "priv_validator_key.json" - baseCurrency = "USDT" + statusOk = 0 + privFile = "priv_validator_key.json" //feeder_tokenName_feederID - loggerTagPrefix = "feed_%s_%d" + // loggerTagPrefix = "feed_%s_%d" ) // var updateConfig sync.Mutex @@ -68,7 +67,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo ecClient.Subscribe() - fsMap := NewFeederMap() + feeders := NewFeeders(feedertypes.GetLogger("feeders"), f, ecClient) // we don't check empty tokenfeeders list maxNonce := oracleP.MaxNonce for feederID, feeder := range oracleP.TokenFeeders { @@ -83,9 +82,8 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo panic(fmt.Sprintf("source of nst:%s is not set", tokenName)) } } - fsMap.Add(feeder, feederID, f, ecClient, source, tokenName, maxNonce, feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, tokenName, feederID))) + feeders.SetupFeeder(feeder, feederID, source, tokenName, maxNonce) } - feeders := fsMap.NewFeeders(logger) feeders.Start() for event := range ecClient.EventsCh() { @@ -94,7 +92,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo if paramsUpdate := e.ParamsUpdate(); paramsUpdate { oracleP, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger) if err != nil { - fmt.Printf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err) + logger.Error(fmt.Sprintf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err)) return } feeders.UpdateOracleParams(oracleP) diff --git a/cmd/types.go b/cmd/types.go index ea09abd..48c6d1a 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -2,6 +2,9 @@ package cmd import ( "errors" + "fmt" + "strings" + "sync" oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" @@ -11,6 +14,11 @@ import ( sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) +const ( + loggerTagPrefix = "feed_%s_%d" + baseCurrency = "USDT" +) + type priceFetcher interface { GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) AddTokenForSource(source, token string) bool @@ -92,38 +100,37 @@ type feeder struct { paramsCh chan *updateParamsReq } -type feederInfo struct { - source string - token string - tokenID uint64 - feederID int +type FeederInfo struct { + Source string + Token string + TokenID uint64 + FeederID int // TODO: add check for rouleID, v1 can be skipped // ruleID - startRoundID int64 - startBaseBlock int64 - interval int64 - endBlock int64 - lastPrice localPrice - lastSent signInfo -} - -func (f *feeder) Info() feederInfo { - return feederInfo{ - source: f.source, - token: f.token, - tokenID: f.tokenID, - feederID: f.feederID, - startRoundID: f.startRoundID, - startBaseBlock: f.startBaseBlock, - interval: f.interval, - endBlock: f.endBlock, - lastPrice: *f.lastPrice, - lastSent: *f.lastSent, + StartRoundID int64 + StartBaseBlock int64 + Interval int64 + EndBlock int64 + LastPrice localPrice + LastSent signInfo +} + +func (f *feeder) Info() FeederInfo { + return FeederInfo{ + Source: f.source, + Token: f.token, + TokenID: f.tokenID, + FeederID: f.feederID, + StartRoundID: f.startRoundID, + StartBaseBlock: f.startBaseBlock, + Interval: f.interval, + EndBlock: f.endBlock, + LastPrice: *f.lastPrice, + LastSent: *f.lastSent, } } -// NewFeeder new a feeder struct from oracletypes' tokenfeeder -func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { +func newFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder { return &feeder{ logger: logger, source: source, @@ -135,15 +142,12 @@ func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, startBaseBlock: int64(tf.StartBaseBlock), interval: int64(tf.Interval), endBlock: int64(tf.EndBlock), - - // maxNonce: maxNonce, - - fetcher: fetcher, - submitter: submitter, + fetcher: fetcher, + submitter: submitter, + lastPrice: &localPrice{}, lastSent: &signInfo{ maxNonce: maxNonce, }, - lastPrice: &localPrice{}, priceCh: make(chan *updatePrice, 1), heightsCh: make(chan *triggerHeights, 1), @@ -213,7 +217,9 @@ func (f *feeder) start() { // update latest height that price had been updated f.lastPrice.height = price.txHeight case req := <-f.paramsCh: - f.updateFeederParams(req.params) + if err := f.updateFeederParams(req.params); err != nil { + f.logger.Error("failed to update params", "new params", req.params) + } req.result <- &updateParamsRes{} } } @@ -253,12 +259,22 @@ func (f *feeder) trigger(commitHeight, priceHeight int64) { } } -func (f *feeder) updateFeederParams(p *oracletypes.Params) { - if p == nil || len(p.TokenFeeders) == 0 { - return +func (f *feeder) updateFeederParams(p *oracletypes.Params) error { + if p == nil || len(p.TokenFeeders) < f.feederID+1 { + return errors.New("invalid oracle parmas") } // TODO: update feeder's params - + tokenFeeder := p.TokenFeeders[f.feederID] + if f.endBlock != int64(tokenFeeder.EndBlock) { + f.endBlock = int64(tokenFeeder.EndBlock) + } + if f.startBaseBlock != int64(tokenFeeder.StartBaseBlock) { + f.startBaseBlock = int64(tokenFeeder.StartBaseBlock) + } + if p.MaxNonce > 0 { + f.lastSent.maxNonce = p.MaxNonce + } + return nil } // TODO: stop feeder routine @@ -292,16 +308,28 @@ type updatePricesReq struct { prices []*finalPrice } -type feederMap map[int]*feeder - -// NewFeederMap new a map feeder> -func NewFeederMap() feederMap { - return make(map[int]*feeder) +type Feeders struct { + locker *sync.Mutex + running bool + fetcher priceFetcher + submitter priceSubmitter + logger feedertypes.LoggerInf + feederMap map[int]*feeder + // TODO: feeder has sync management, so feeders could remove these channel + trigger chan *triggerReq + updatePrice chan *updatePricesReq + updateParams chan *oracletypes.Params + // updateNST chan *updateNSTReq } -func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { + +func NewFeeders(logger feedertypes.LoggerInf, fetcher priceFetcher, submitter priceSubmitter) *Feeders { return &Feeders{ + locker: new(sync.Mutex), logger: logger, - feederMap: fm, + fetcher: fetcher, + submitter: submitter, + feederMap: make(map[int]*feeder), + // feederMap: fm, // don't block on height increasing trigger: make(chan *triggerReq, 1), updatePrice: make(chan *updatePricesReq, 1), @@ -309,47 +337,30 @@ func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders { // since for running feeders, only endBlock is possible to be modified updateParams: make(chan *oracletypes.Params, 1), } -} -// Add adds a new feeder with feederID into the map -func (fm feederMap) Add(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) { - fm[feederID] = &feeder{ - logger: logger, - source: source, - token: token, - tokenID: tf.TokenID, - feederID: feederID, - // these conversion a safe since the block height defined in cosmossdk is int64 - startRoundID: int64(tf.StartRoundID), - startBaseBlock: int64(tf.StartBaseBlock), - interval: int64(tf.Interval), - endBlock: int64(tf.EndBlock), - fetcher: fetcher, - submitter: submitter, - lastPrice: &localPrice{}, - lastSent: &signInfo{ - maxNonce: maxNonce, - }, - - priceCh: make(chan *updatePrice, 1), - heightsCh: make(chan *triggerHeights, 1), - paramsCh: make(chan *updateParamsReq, 1), - } } -type Feeders struct { - logger feedertypes.LoggerInf - feederMap map[int]*feeder - // TODO: feeder has sync management, so feeders could remove these channel - trigger chan *triggerReq - updatePrice chan *updatePricesReq - updateParams chan *oracletypes.Params - // updateNST chan *updateNSTReq +func (fs *Feeders) SetupFeeder(tf *oracletypes.TokenFeeder, feederID int, source string, token string, maxNonce int32) { + fs.locker.Lock() + defer fs.locker.Unlock() + if fs.running { + fs.logger.Error("failed to setup feeder for a running feeders, this should be called before feeders is started") + return + } + fs.feederMap[feederID] = newFeeder(tf, feederID, fs.fetcher, fs.submitter, source, token, maxNonce, fs.logger.With("feeder", fmt.Sprintf(loggerTagPrefix, token, feederID))) } // Start will start to listen the trigger(newHeight) and updatePrice events // usd channels to avoid race condition on map func (fs *Feeders) Start() { + fs.locker.Lock() + if fs.running { + fs.logger.Error("failed to start feeders since it's already running") + fs.locker.Unlock() + return + } + fs.running = true + fs.locker.Unlock() for _, f := range fs.feederMap { f.start() } @@ -358,16 +369,32 @@ func (fs *Feeders) Start() { select { case params := <-fs.updateParams: results := []chan *updateParamsRes{} + existingFeederIDs := make(map[int64]struct{}) for _, f := range fs.feederMap { res := f.updateParams(params) results = append(results, res) + existingFeederIDs[int64(f.feederID)] = struct{}{} } // wait for all feeders to complete updateing params for _, res := range results { <-res } - // TODO: add newly added tokenfeeders if exists - + for tfID, tf := range params.TokenFeeders { + if _, ok := existingFeederIDs[int64(tfID)]; !ok { + // create and start a new feeder + tokenName := strings.ToLower(params.Tokens[tf.TokenID].Name) + source := fetchertypes.Chainlink + if fetchertypes.IsNSTToken(tokenName) { + nstToken := fetchertypes.NSTToken(tokenName) + if source = fetchertypes.GetNSTSource(nstToken); len(source) == 0 { + fs.logger.Error("failed to add new feeder, source of nst token is not set", "token", tokenName) + } + } + feeder := newFeeder(tf, tfID, fs.fetcher, fs.submitter, source, tokenName, params.MaxNonce, fs.logger) + fs.feederMap[tfID] = feeder + feeder.start() + } + } case t := <-fs.trigger: // the order does not matter for _, f := range fs.feederMap { @@ -420,39 +447,6 @@ func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { fs.updateParams <- p } -// Define the types for the feeder -// type feederParams struct { -// startRoundID uint64 -// startBlock uint64 -// endBlock uint64 -// interval uint64 -// decimal int32 -// tokenIDStr string -// feederID int64 -// tokenName string -// } -// -// func (f *feederParams) update(p oracletypes.Params) (updated bool) { -// tokenFeeder := p.TokenFeeders[f.feederID] -// if tokenFeeder.StartBaseBlock != f.startBlock { -// f.startBlock = tokenFeeder.StartBaseBlock -// updated = true -// } -// if tokenFeeder.EndBlock != f.endBlock { -// f.endBlock = tokenFeeder.EndBlock -// updated = true -// } -// if tokenFeeder.Interval != f.interval { -// f.interval = tokenFeeder.Interval -// updated = true -// } -// if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal { -// f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal -// updated = true -// } -// return -// } - func getLogger() types.LoggerInf { return types.GetLogger("") } From 1704075fdf5e8ccc6789a3001ee0827394cb7fc3 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Fri, 27 Dec 2024 15:49:20 +0800 Subject: [PATCH 03/10] add basecurrency when fetching price from chainlink --- cmd/types.go | 5 +---- fetcher/chainlink/chainlink.go | 5 +++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cmd/types.go b/cmd/types.go index 48c6d1a..5a7a7e9 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -14,10 +14,7 @@ import ( sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) -const ( - loggerTagPrefix = "feed_%s_%d" - baseCurrency = "USDT" -) +const loggerTagPrefix = "feed_%s_%d" type priceFetcher interface { GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) diff --git a/fetcher/chainlink/chainlink.go b/fetcher/chainlink/chainlink.go index fccbbcf..903affb 100644 --- a/fetcher/chainlink/chainlink.go +++ b/fetcher/chainlink/chainlink.go @@ -16,7 +16,12 @@ import ( "github.com/ethereum/go-ethereum/ethclient" ) +const baseCurrency = "usdt" + func (s *source) fetch(token string) (*fetchertypes.PriceInfo, error) { + if !strings.HasSuffix(token, baseCurrency) { + token += baseCurrency + } chainlinkPriceFeedProxy, ok := s.chainlinkProxy.get(token) if !ok { return nil, feedertypes.ErrSourceTokenNotConfigured.Wrap(fmt.Sprintf("chainlinkProxy not configured for token: %s", token)) From e578ca97bed09d2e30812fdd3eb06fe5e3af238a Mon Sep 17 00:00:00 2001 From: leonz789 Date: Fri, 27 Dec 2024 16:19:22 +0800 Subject: [PATCH 04/10] add comment, update interval for feeders that are not started yet --- cmd/types.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/types.go b/cmd/types.go index 5a7a7e9..019cdb0 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -215,6 +215,7 @@ func (f *feeder) start() { f.lastPrice.height = price.txHeight case req := <-f.paramsCh: if err := f.updateFeederParams(req.params); err != nil { + // This should not happen under this case. f.logger.Error("failed to update params", "new params", req.params) } req.result <- &updateParamsRes{} @@ -268,6 +269,9 @@ func (f *feeder) updateFeederParams(p *oracletypes.Params) error { if f.startBaseBlock != int64(tokenFeeder.StartBaseBlock) { f.startBaseBlock = int64(tokenFeeder.StartBaseBlock) } + if f.interval != int64(tokenFeeder.Interval) { + f.interval = int64(tokenFeeder.Interval) + } if p.MaxNonce > 0 { f.lastSent.maxNonce = p.MaxNonce } From a042619703d27e7781ed40e7d24f5616d9818331 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Fri, 27 Dec 2024 16:34:48 +0800 Subject: [PATCH 05/10] nil checks --- cmd/types.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/cmd/types.go b/cmd/types.go index 019cdb0..dc1b998 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -9,7 +9,6 @@ import ( oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" - types "github.com/ExocoreNetwork/price-feeder/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) @@ -113,6 +112,14 @@ type FeederInfo struct { } func (f *feeder) Info() FeederInfo { + var lastPrice localPrice + var lastSent signInfo + if f.lastPrice != nil { + lastPrice = *f.lastPrice + } + if f.lastSent != nil { + lastSent = *f.lastSent + } return FeederInfo{ Source: f.source, Token: f.token, @@ -122,8 +129,8 @@ func (f *feeder) Info() FeederInfo { StartBaseBlock: f.startBaseBlock, Interval: f.interval, EndBlock: f.endBlock, - LastPrice: *f.lastPrice, - LastSent: *f.lastSent, + LastPrice: lastPrice, + LastSent: lastSent, } } @@ -445,9 +452,13 @@ func (fs *Feeders) UpdatePrice(txHeight int64, prices []*finalPrice) { // UpdateOracleParams updates all feeders' params from oracle params // if the receiving channel is full, blocking until all updateParams are received by the channel func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) { + if p == nil { + fs.logger.Error("received nil oracle params") + return + } + if len(p.TokenFeeders) == 0 { + fs.logger.Error("received empty token feeders") + return + } fs.updateParams <- p } - -func getLogger() types.LoggerInf { - return types.GetLogger("") -} From 72d2c7f41c0c1a3757ab24e62c330e7c1097dd72 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Sat, 28 Dec 2024 18:13:43 +0800 Subject: [PATCH 06/10] set up debug tool listening to new blocks --- cmd/debug.go | 17 +++++++ cmd/feeder_debug.go | 117 ++++++++++++++++++++++++++++++++++++++++++++ cmd/feeder_tool.go | 5 ++ cmd/root.go | 5 +- exoclient/client.go | 15 ++++-- exoclient/tx.go | 88 ++++++++++++++++++++------------- exoclient/types.go | 2 +- types/types.go | 3 +- 8 files changed, 212 insertions(+), 40 deletions(-) create mode 100644 cmd/debug.go create mode 100644 cmd/feeder_debug.go diff --git a/cmd/debug.go b/cmd/debug.go new file mode 100644 index 0000000..fc0046a --- /dev/null +++ b/cmd/debug.go @@ -0,0 +1,17 @@ +package cmd + +import ( + feedertypes "github.com/ExocoreNetwork/price-feeder/types" + "github.com/spf13/cobra" + "go.uber.org/zap/zapcore" +) + +var debugCmd = &cobra.Command{ + Use: "debug", + Short: "", + Long: "", + Run: func(cmd *cobra.Command, args []string) { + logger := feedertypes.NewLogger(zapcore.DebugLevel) + DebugPriceFeeder(conf, logger, mnemonic, sourcesPath) + }, +} diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go new file mode 100644 index 0000000..286aa0a --- /dev/null +++ b/cmd/feeder_debug.go @@ -0,0 +1,117 @@ +package cmd + +import ( + "errors" + "fmt" + "time" + + "github.com/ExocoreNetwork/price-feeder/exoclient" + fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" +) + +type PriceJSON struct { + Price string `json:"price"` + DetID string `json:"det_id"` + Decimal int32 `json:"decimal"` + Timestamp string `json:"timestamp"` +} + +func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { + return fetchertypes.PriceInfo{ + Price: p.Price, + Decimal: p.Decimal, + RoundID: p.DetID, + Timestamp: p.Timestamp, + } +} + +type sendRes struct { + status statusCode + txHash string + details string +} + +type sendReq struct { + // on which block committed send this transaction, required + height int64 + baseBlock uint64 + // optional + roundID int64 + + feederID uint64 + price PriceJSON + // decimal int32 + // detID string + // timestamp string + nonce int32 + + result chan *sendRes +} + +type statusCode string + +const ( + overwrite statusCode = "overwrite the pending transaction" + success statusCode = "successfully sent the transaction" + fail statusCode = "failed to send transaction" +) + +var ( + sendCh = make(chan *sendReq) +) + +func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) { + // init logger + if logger = feedertypes.SetLogger(logger); logger == nil { + panic("logger is not initialized") + } + // init exoclient + // 1. subscribing for heights + // 2. sending create-price tx + count := 0 + for count < DebugRetryConfig.MaxAttempts { + if err := exoclient.Init(conf, mnemonic, privFile, true); err != nil { + if errors.Is(err, feedertypes.ErrInitConnectionFail) { + logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err) + time.Sleep(DebugRetryConfig.Interval) + continue + } + logger.Error("failed to init exoclient", "error", err) + return + } + break + } + ec, _ := exoclient.GetClient() + + _, err := getOracleParamsWithMaxRetry(1, ec, logger) + if err != nil { + logger.Error("failed to get oracle params", "error", err) + return + } + logger.Info("subscribe block heights...") + ecClient, _ := exoclient.GetClient() + defer ecClient.Close() + ecClient.Subscribe() + var pendingReq *sendReq + for { + select { + case event := <-ecClient.EventsCh(): + if e, ok := event.(*exoclient.EventNewBlock); ok { + logger.Info("new block commited", "height", e.Height()) + if pendingReq != nil { + if pendingReq.height <= e.Height() { + // send this transaction + res, err := ecClient.SendTxDebug(pendingReq.feederID, pendingReq.baseBlock, pendingReq.price.getPriceInfo(), pendingReq.nonce) + fmt.Println("debug--> result") + fmt.Println(res) + fmt.Println(err) + pendingReq = nil + } + } + } + case req := <-sendCh: + pendingReq = req + } + } +} diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index 8e06aa7..fd84c02 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -27,6 +27,11 @@ var DefaultRetryConfig = RetryConfig{ Interval: 2 * time.Second, } +var DebugRetryConfig = RetryConfig{ + MaxAttempts: 10, + Interval: 3 * time.Second, +} + const ( statusOk = 0 privFile = "priv_validator_key.json" diff --git a/cmd/root.go b/cmd/root.go index 240c881..b31a6ec 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -54,7 +54,10 @@ func init() { startCmd.Flags().StringVarP(&mnemonic, "mnemonic", "m", "", "mnemonic of consensus key") - rootCmd.AddCommand(startCmd) + rootCmd.AddCommand( + startCmd, + debugCmd, + ) } // initConfig reads in config file and ENV variables if set. diff --git a/exoclient/client.go b/exoclient/client.go index 7063fcc..55eb5cb 100644 --- a/exoclient/client.go +++ b/exoclient/client.go @@ -10,6 +10,7 @@ import ( "cosmossdk.io/simapp/params" oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" + rpchttp "github.com/cometbft/cometbft/rpc/client/http" "github.com/cosmos/cosmos-sdk/client" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" "github.com/cosmos/cosmos-sdk/types/tx" @@ -33,7 +34,8 @@ type exoClient struct { chainID string // client to broadcast transactions to eoxocred - txClient tx.ServiceClient + txClient tx.ServiceClient + txClientDebug *rpchttp.HTTP // wsclient interact with exocored wsClient *websocket.Conn @@ -54,7 +56,7 @@ type exoClient struct { } // NewExoClient creates a exocore-client used to do queries and send transactions to exocored -func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, privKey cryptotypes.PrivKey, encCfg params.EncodingConfig, chainID string) (*exoClient, error) { +func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDebug string, privKey cryptotypes.PrivKey, encCfg params.EncodingConfig, chainID string) (*exoClient, error) { ec := &exoClient{ logger: logger, privKey: privKey, @@ -78,6 +80,14 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, pri // setup txClient ec.txClient = sdktx.NewServiceClient(ec.grpcConn) + + if len(endpointDebug) > 0 { + ec.txClientDebug, err = client.NewClientFromNode(endpointDebug) + if err != nil { + return nil, fmt.Errorf("failed to create new client for debug, endponit:%s, error:%v", endpointDebug, err) + } + } + // ec.txClient = sdktx.NewServiceClient(ec.grpcConn) // setup queryClient ec.oracleClient = oracletypes.NewQueryClient(ec.grpcConn) // setup wsClient @@ -99,7 +109,6 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint string, pri ec.wsClient.SetPongHandler(func(string) error { return nil }) - return ec, nil } diff --git a/exoclient/tx.go b/exoclient/tx.go index 305485f..4837e5d 100644 --- a/exoclient/tx.go +++ b/exoclient/tx.go @@ -11,49 +11,18 @@ import ( authsigning "github.com/cosmos/cosmos-sdk/x/auth/signing" + coretypes "github.com/cometbft/cometbft/rpc/core/types" sdk "github.com/cosmos/cosmos-sdk/types" sdktx "github.com/cosmos/cosmos-sdk/types/tx" "github.com/cosmos/cosmos-sdk/types/tx/signing" ) // SendTx signs a create-price transaction and send it to exocored -// func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price, roundID string, decimal int, nonce int32) (*sdktx.BroadcastTxResponse, error) { func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*sdktx.BroadcastTxResponse, error) { - // build create-price message - msg := oracletypes.NewMsgCreatePrice( - sdk.AccAddress(ec.pubKey.Address()).String(), - feederID, - []*oracletypes.PriceSource{ - { - SourceID: Chainlink, - Prices: []*oracletypes.PriceTimeDetID{ - { - Price: price.Price, - Decimal: price.Decimal, - Timestamp: time.Now().UTC().Format(feedertypes.TimeLayout), - DetID: price.RoundID, - }, - }, - Desc: "", - }, - }, - baseBlock, - nonce, - ) - - // sign the message with validator consensus-key configured - signedTx, err := ec.signMsg(msg) - if err != nil { - return nil, fmt.Errorf("failed to sign message, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) - } - - // encode transaction to broadcast - txBytes, err := ec.txCfg.TxEncoder()(signedTx) + msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce) if err != nil { - // this should not happen - return nil, fmt.Errorf("failed to encode singedTx, txBytes:%b, msg:%v, valConsAddr:%s, error:%w", txBytes, msg, sdk.ConsAddress(ec.pubKey.Address()), err) + return nil, err } - // broadcast txBytes res, err := ec.txClient.BroadcastTx( context.Background(), @@ -68,6 +37,19 @@ func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes return res, nil } +func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) { + msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce) + if err != nil { + return nil, err + } + // broadcast txBytes + res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes) + if err != nil { + return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } + return res, nil +} + // signMsg signs the message with consensusskey func (ec exoClient) signMsg(msgs ...sdk.Msg) (authsigning.Tx, error) { txBuilder := ec.txCfg.NewTxBuilder() @@ -121,3 +103,41 @@ func (ec exoClient) getSignature(sigBytes []byte) signing.SignatureV2 { } return signature } + +func (ec exoClient) getSignedTxBytes(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*oracletypes.MsgCreatePrice, []byte, error) { + // build create-price message + msg := oracletypes.NewMsgCreatePrice( + sdk.AccAddress(ec.pubKey.Address()).String(), + feederID, + []*oracletypes.PriceSource{ + { + SourceID: Chainlink, + Prices: []*oracletypes.PriceTimeDetID{ + { + Price: price.Price, + Decimal: price.Decimal, + Timestamp: time.Now().UTC().Format(feedertypes.TimeLayout), + DetID: price.RoundID, + }, + }, + Desc: "", + }, + }, + baseBlock, + nonce, + ) + + // sign the message with validator consensus-key configured + signedTx, err := ec.signMsg(msg) + if err != nil { + return nil, nil, fmt.Errorf("failed to sign message, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } + + // encode transaction to broadcast + txBytes, err := ec.txCfg.TxEncoder()(signedTx) + if err != nil { + // this should not happen + return nil, nil, fmt.Errorf("failed to encode singedTx, txBytes:%b, msg:%v, valConsAddr:%s, error:%w", txBytes, msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } + return msg, txBytes, nil +} diff --git a/exoclient/types.go b/exoclient/types.go index 11e0651..34738fa 100644 --- a/exoclient/types.go +++ b/exoclient/types.go @@ -415,7 +415,7 @@ func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) e } var err error - if defaultExoClient, err = NewExoClient(logger, confExocore.Rpc, confExocore.Ws, privKey, encCfg, confExocore.ChainID); err != nil { + if defaultExoClient, err = NewExoClient(logger, confExocore.Gprc, confExocore.Ws, conf.Exocore.Rpc, privKey, encCfg, confExocore.ChainID); err != nil { if errors.Is(err, feedertypes.ErrInitConnectionFail) { return err } diff --git a/types/types.go b/types/types.go index 5b8548c..f023c15 100644 --- a/types/types.go +++ b/types/types.go @@ -46,8 +46,9 @@ type Config struct { Exocore struct { ChainID string `mapstructure:"chainid"` AppName string `mapstructure:"appname"` - Rpc string `mapstructure:"rpc"` + Gprc string `mapstructure:"grpc"` Ws string `mapstructure:"ws"` + Rpc string `mspstructure:"rpc"` } `mapstructure:"exocore"` } From 129e5f8878e4dc764c144e3a876433e3cf5234aa Mon Sep 17 00:00:00 2001 From: leonz789 Date: Mon, 30 Dec 2024 08:25:50 +0800 Subject: [PATCH 07/10] feat: debug tools to send tx from cli --- cmd/debug.go | 95 +++++++- cmd/feeder_debug.go | 170 ++++++++++--- cmd/feeder_tool.go | 21 +- cmd/root.go | 36 ++- cmd/start.go | 15 +- cmd/types.go | 5 +- debugger/price_msg.go | 12 + debugger/service.pb.go | 411 ++++++++++++++++++++++++++++++++ debugger/service_grpc.pb.go | 109 +++++++++ exoclient/client.go | 65 ++--- exoclient/types.go | 4 +- external/feeder.go | 2 - proto/debugger/v1/service.proto | 32 +++ types/types.go | 15 +- 14 files changed, 866 insertions(+), 126 deletions(-) create mode 100644 debugger/price_msg.go create mode 100644 debugger/service.pb.go create mode 100644 debugger/service_grpc.pb.go create mode 100644 proto/debugger/v1/service.proto diff --git a/cmd/debug.go b/cmd/debug.go index fc0046a..9ca229f 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -1,17 +1,102 @@ package cmd import ( + "encoding/json" + "fmt" + + "github.com/ExocoreNetwork/price-feeder/debugger" feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/spf13/cobra" "go.uber.org/zap/zapcore" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +var ( + flagFeederID = "feederID" + flagHeight = "height" ) -var debugCmd = &cobra.Command{ +func init() { + debugStartCmd.PersistentFlags().Uint64(flagFeederID, 0, "feederID of the token") + debugStartCmd.PersistentFlags().Int64(flagHeight, 0, "committed block height after which the tx will be sent") + + debugStartCmd.AddCommand( + debugSendCmd, + debugSendImmCmd, + ) +} + +var debugStartCmd = &cobra.Command{ Use: "debug", - Short: "", - Long: "", - Run: func(cmd *cobra.Command, args []string) { + Short: "start listening to new blocks", + Long: "start listening to new blocks", + RunE: func(cmd *cobra.Command, args []string) error { logger := feedertypes.NewLogger(zapcore.DebugLevel) - DebugPriceFeeder(conf, logger, mnemonic, sourcesPath) + DebugPriceFeeder(feederConfig, logger, mnemonic, sourcesPath) + return nil + }, +} + +var debugSendCmd = &cobra.Command{ + Use: `send --feederID [feederID] --height [height] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`, + Short: "send a create-price message to exocored", + Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID) + if err != nil { + return err + } + height, err := cmd.Parent().PersistentFlags().GetInt64(flagHeight) + if err != nil { + return err + } + msgStr := args[0] + msgPrice := &debugger.PriceMsg{} + if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil { + return err + } + res, err := sendTx(feederID, height, msgPrice) + if err != nil { + return err + } + if len(res.Err) > 0 { + fmt.Println("") + } + printProto(res) + return nil }, } + +var debugSendImmCmd = &cobra.Command{ + Use: `send-imm --feederID [feederID] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`, + Short: "send a create-price message to exocored", + Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID) + if err != nil { + return err + } + msgStr := args[0] + msgPrice := &PriceJSON{} + if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil { + return err + } + res, err := sendTxImmediatly(feederID, msgPrice) + if err != nil { + return err + } + printProto(res) + return nil + }, +} + +func printProto(m proto.Message) { + marshaled, err := protojson.MarshalOptions{EmitUnpopulated: true, UseProtoNames: true}.Marshal(m) + if err != nil { + fmt.Printf("failed to print proto message, error:%v", err) + } + fmt.Println(string(marshaled)) +} diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index 286aa0a..fdf35c7 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -1,20 +1,50 @@ package cmd import ( + "context" "errors" "fmt" + "net" "time" + "github.com/ExocoreNetwork/price-feeder/debugger" "github.com/ExocoreNetwork/price-feeder/exoclient" fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" feedertypes "github.com/ExocoreNetwork/price-feeder/types" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) +type server struct { + sendCh chan *sendReq + debugger.UnimplementedPriceSubmitServiceServer +} + +func (s *server) SubmitPrice(ctx context.Context, req *debugger.SubmitPriceRequest) (*debugger.SubmitPriceResponse, error) { + result := make(chan *debugger.SubmitPriceResponse, 1) + s.sendCh <- &sendReq{ + height: req.Height, + feederID: req.FeederId, + price: req.Price, + result: result, + } + r := <-result + return r, nil +} + +func newServer(sendCh chan *sendReq) *server { + return &server{ + sendCh: sendCh, + } +} + type PriceJSON struct { Price string `json:"price"` DetID string `json:"det_id"` Decimal int32 `json:"decimal"` Timestamp string `json:"timestamp"` + Nonce int32 `json:"nonce"` + BaseBlock uint64 `json:"base_block"` } func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { @@ -27,26 +57,22 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { } type sendRes struct { - status statusCode - txHash string - details string + err error + checkTxSuccess bool + checkTxLog string + deliverTxSuccess bool + deliverTxLog string + txHash string + height int64 } type sendReq struct { // on which block committed send this transaction, required - height int64 - baseBlock uint64 - // optional - roundID int64 + height int64 feederID uint64 - price PriceJSON - // decimal int32 - // detID string - // timestamp string - nonce int32 - - result chan *sendRes + price *debugger.PriceMsg + result chan *debugger.SubmitPriceResponse } type statusCode string @@ -58,10 +84,14 @@ const ( ) var ( - sendCh = make(chan *sendReq) + sendCh = make(chan *sendReq) + DebugRetryConfig = RetryConfig{ + MaxAttempts: 10, + Interval: 3 * time.Second, + } ) -func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) { +func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) { // init logger if logger = feedertypes.SetLogger(logger); logger == nil { panic("logger is not initialized") @@ -71,7 +101,7 @@ func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mne // 2. sending create-price tx count := 0 for count < DebugRetryConfig.MaxAttempts { - if err := exoclient.Init(conf, mnemonic, privFile, true); err != nil { + if err := exoclient.Init(conf, mnemonic, privFile, false, true); err != nil { if errors.Is(err, feedertypes.ErrInitConnectionFail) { logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err) time.Sleep(DebugRetryConfig.Interval) @@ -93,25 +123,99 @@ func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mne ecClient, _ := exoclient.GetClient() defer ecClient.Close() ecClient.Subscribe() - var pendingReq *sendReq - for { - select { - case event := <-ecClient.EventsCh(): - if e, ok := event.(*exoclient.EventNewBlock); ok { - logger.Info("new block commited", "height", e.Height()) - if pendingReq != nil { - if pendingReq.height <= e.Height() { - // send this transaction - res, err := ecClient.SendTxDebug(pendingReq.feederID, pendingReq.baseBlock, pendingReq.price.getPriceInfo(), pendingReq.nonce) - fmt.Println("debug--> result") - fmt.Println(res) - fmt.Println(err) - pendingReq = nil + // var pendingReq *sendReq + pendingReqs := make(map[int64][]*sendReq) + sendCh := make(chan *sendReq, 10) + go func() { + for { + select { + case event := <-ecClient.EventsCh(): + if e, ok := event.(*exoclient.EventNewBlock); ok { + logger.Info("new block commited", "height", e.Height()) + for h, pendings := range pendingReqs { + if h <= e.Height() { + for i, req := range pendings { + res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce) + if err != nil { + logger.Error("failed to send tx", "error", err) + req.result <- &debugger.SubmitPriceResponse{Err: err.Error()} + } + req.result <- &debugger.SubmitPriceResponse{ + CheckTxSuccess: res.CheckTx.Code == 0, + CheckTxLog: res.CheckTx.Log, + DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0, + DeliverTxLog: res.DeliverTx.Log, + TxHash: res.Hash.String(), + Height: res.Height, + } + if i == len(pendings)-1 { + delete(pendingReqs, h) + } else { + pendingReqs[h] = append(pendings[:i], pendings[i+1:]...) + } + } + } } } + case req := <-sendCh: + logger.Info("add a new send request", "height", req.height) + if pendings, ok := pendingReqs[req.height]; ok { + pendingReqs[req.height] = append(pendings, req) + } else { + pendingReqs[req.height] = []*sendReq{req} + } } - case req := <-sendCh: - pendingReq = req } + }() + lis, err := net.Listen("tcp", ":50051") + if err != nil { + fmt.Printf("failed to listen: %v\r\n", err) + return + } + grpcServer := grpc.NewServer() + debugger.RegisterPriceSubmitServiceServer(grpcServer, newServer(sendCh)) + if err := grpcServer.Serve(lis); err != nil { + fmt.Printf("failed to serve:%v\r\n", err) + return + } +} + +func sendTx(feederID uint64, height int64, price *debugger.PriceMsg) (*debugger.SubmitPriceResponse, error) { + conn, err := grpc.Dial( + "localhost:50051", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) + if err != nil { + return nil, err + } + defer conn.Close() + c := debugger.NewPriceSubmitServiceClient(conn) + return c.SubmitPrice(context.Background(), &debugger.SubmitPriceRequest{ + Height: height, + FeederId: feederID, + Price: price, + }) +} + +func sendTxImmediatly(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) { + if err := exoclient.Init(feederConfig, mnemonic, privFile, true, true); err != nil { + return nil, fmt.Errorf("failed to init exoclient in txOnly mode for debug, error:%w", err) + } + ec, _ := exoclient.GetClient() + pInfo := price.getPriceInfo() + + res, err := ec.SendTxDebug(feederID, price.BaseBlock, pInfo, price.Nonce) + if err != nil { + return nil, err + } + protoRes := &debugger.SubmitPriceResponse{ + CheckTxSuccess: res.CheckTx.Code == 0, + CheckTxLog: res.CheckTx.Log, + DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0, + DeliverTxLog: res.DeliverTx.Log, + TxHash: res.Hash.String(), + Height: res.Height, } + return protoRes, nil } diff --git a/cmd/feeder_tool.go b/cmd/feeder_tool.go index fd84c02..ef0ac22 100644 --- a/cmd/feeder_tool.go +++ b/cmd/feeder_tool.go @@ -27,29 +27,16 @@ var DefaultRetryConfig = RetryConfig{ Interval: 2 * time.Second, } -var DebugRetryConfig = RetryConfig{ - MaxAttempts: 10, - Interval: 3 * time.Second, -} - -const ( - statusOk = 0 - privFile = "priv_validator_key.json" - - //feeder_tokenName_feederID - // loggerTagPrefix = "feed_%s_%d" -) - // var updateConfig sync.Mutex // RunPriceFeeder runs price feeder to fetching price and feed to exocorechain -func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) { +func RunPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) { // init logger if logger = feedertypes.SetLogger(logger); logger == nil { panic("logger is not initialized") } // init logger, fetchers, exocoreclient - if err := initComponents(logger, conf, standalone); err != nil { + if err := initComponents(logger, conf, sourcesPath, standalone); err != nil { logger.Error("failed to initialize components") panic(err) } @@ -167,7 +154,7 @@ func ResetAllStakerValidators(ec exoclient.ExoClientInf, logger feedertypes.Logg } // // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed -func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) error { +func initComponents(logger types.LoggerInf, conf *types.Config, sourcesPath string, standalone bool) error { count := 0 for count < DefaultRetryConfig.MaxAttempts { count++ @@ -178,7 +165,7 @@ func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) } // init exoclient - err = exoclient.Init(conf, mnemonic, privFile, standalone) + err = exoclient.Init(conf, mnemonic, privFile, false, standalone) if err != nil { if errors.Is(err, feedertypes.ErrInitConnectionFail) { logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err) diff --git a/cmd/root.go b/cmd/root.go index b31a6ec..cf60350 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,15 +5,16 @@ package cmd import ( "os" - "path" - "github.com/ExocoreNetwork/price-feeder/types" + feedertypes "github.com/ExocoreNetwork/price-feeder/types" "github.com/spf13/cobra" ) -var cfgFile string - -var sourcesPath string +var ( + cfgFile string + sourcesPath string + feederConfig *feedertypes.Config +) // rootCmd represents the base command when called without any subcommands var rootCmd = &cobra.Command{ @@ -28,6 +29,12 @@ to quickly create a Cobra application.`, // Uncomment the following line if your bare application // has an action associated with it: // Run: func(cmd *cobra.Command, args []string) { }, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + // load and parse config file + var err error + feederConfig, err = feedertypes.InitConfig(cfgFile) + return err + }, } // Execute adds all child commands to the root command and sets flags appropriately. @@ -40,7 +47,7 @@ func Execute() { } func init() { - cobra.OnInitialize(initConfig) + // cobra.OnInitialize(initConfig) // Here you will define your flags and configuration settings. // Cobra supports persistent flags, which, if defined here, @@ -56,21 +63,6 @@ func init() { rootCmd.AddCommand( startCmd, - debugCmd, + debugStartCmd, ) } - -// initConfig reads in config file and ENV variables if set. -func initConfig() { - if len(cfgFile) == 0 { - // Find home directory. - home, err := os.UserHomeDir() - cobra.CheckErr(err) - - cfgFile = path.Join(home, ".price-feeder") - - } - - types.ConfigFile = cfgFile - conf = types.InitConfig(cfgFile) -} diff --git a/cmd/start.go b/cmd/start.go index 41cdf6d..4623e87 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -9,10 +9,9 @@ import ( "go.uber.org/zap/zapcore" ) -var ( - mnemonic string - conf feedertypes.Config -) +const privFile = "priv_validator_key.json" + +var mnemonic string // startCmd represents the start command var startCmd = &cobra.Command{ @@ -24,12 +23,10 @@ and usage of using your command. For example: Cobra is a CLI library for Go that empowers applications. This application is a tool to generate the needed files to quickly create a Cobra application.`, - // PreRun: func(cmd *cobra.Command, args []string) { - // - // }, - Run: func(cmd *cobra.Command, args []string) { + RunE: func(cmd *cobra.Command, args []string) error { logger := feedertypes.NewLogger(zapcore.InfoLevel) // start fetcher to get prices from chainlink - RunPriceFeeder(conf, logger, mnemonic, sourcesPath, true) + RunPriceFeeder(feederConfig, logger, mnemonic, sourcesPath, true) + return nil }, } diff --git a/cmd/types.go b/cmd/types.go index dc1b998..153e2a0 100644 --- a/cmd/types.go +++ b/cmd/types.go @@ -13,7 +13,10 @@ import ( sdktx "github.com/cosmos/cosmos-sdk/types/tx" ) -const loggerTagPrefix = "feed_%s_%d" +const ( + loggerTagPrefix = "feed_%s_%d" + statusOk = 0 +) type priceFetcher interface { GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error) diff --git a/debugger/price_msg.go b/debugger/price_msg.go new file mode 100644 index 0000000..89d7ec8 --- /dev/null +++ b/debugger/price_msg.go @@ -0,0 +1,12 @@ +package debugger + +import fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types" + +func (p *PriceMsg) GetPriceInfo() fetchertypes.PriceInfo { + return fetchertypes.PriceInfo{ + Price: p.Price, + Decimal: p.Decimal, + RoundID: p.DetId, + Timestamp: p.Timestamp, + } +} diff --git a/debugger/service.pb.go b/debugger/service.pb.go new file mode 100644 index 0000000..ceeb8b6 --- /dev/null +++ b/debugger/service.pb.go @@ -0,0 +1,411 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.2 +// protoc v4.23.4 +// source: service.proto + +package debugger + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type PriceMsg struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Price string `protobuf:"bytes,1,opt,name=price,proto3" json:"price,omitempty"` + DetId string `protobuf:"bytes,2,opt,name=det_id,json=detId,proto3" json:"det_id,omitempty"` + Decimal int32 `protobuf:"varint,3,opt,name=decimal,proto3" json:"decimal,omitempty"` + Timestamp string `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Nonce int32 `protobuf:"varint,5,opt,name=nonce,proto3" json:"nonce,omitempty"` + BaseBlock uint64 `protobuf:"varint,6,opt,name=base_block,json=baseBlock,proto3" json:"base_block,omitempty"` +} + +func (x *PriceMsg) Reset() { + *x = PriceMsg{} + if protoimpl.UnsafeEnabled { + mi := &file_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PriceMsg) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PriceMsg) ProtoMessage() {} + +func (x *PriceMsg) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PriceMsg.ProtoReflect.Descriptor instead. +func (*PriceMsg) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{0} +} + +func (x *PriceMsg) GetPrice() string { + if x != nil { + return x.Price + } + return "" +} + +func (x *PriceMsg) GetDetId() string { + if x != nil { + return x.DetId + } + return "" +} + +func (x *PriceMsg) GetDecimal() int32 { + if x != nil { + return x.Decimal + } + return 0 +} + +func (x *PriceMsg) GetTimestamp() string { + if x != nil { + return x.Timestamp + } + return "" +} + +func (x *PriceMsg) GetNonce() int32 { + if x != nil { + return x.Nonce + } + return 0 +} + +func (x *PriceMsg) GetBaseBlock() uint64 { + if x != nil { + return x.BaseBlock + } + return 0 +} + +type SubmitPriceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Height int64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + FeederId uint64 `protobuf:"varint,2,opt,name=feeder_id,json=feederId,proto3" json:"feeder_id,omitempty"` + Price *PriceMsg `protobuf:"bytes,3,opt,name=price,proto3" json:"price,omitempty"` +} + +func (x *SubmitPriceRequest) Reset() { + *x = SubmitPriceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubmitPriceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubmitPriceRequest) ProtoMessage() {} + +func (x *SubmitPriceRequest) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubmitPriceRequest.ProtoReflect.Descriptor instead. +func (*SubmitPriceRequest) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{1} +} + +func (x *SubmitPriceRequest) GetHeight() int64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *SubmitPriceRequest) GetFeederId() uint64 { + if x != nil { + return x.FeederId + } + return 0 +} + +func (x *SubmitPriceRequest) GetPrice() *PriceMsg { + if x != nil { + return x.Price + } + return nil +} + +type SubmitPriceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CheckTxSuccess bool `protobuf:"varint,1,opt,name=check_tx_success,json=checkTxSuccess,proto3" json:"check_tx_success,omitempty"` + DeliverTxSuccess bool `protobuf:"varint,2,opt,name=deliver_tx_success,json=deliverTxSuccess,proto3" json:"deliver_tx_success,omitempty"` + CheckTxLog string `protobuf:"bytes,3,opt,name=check_tx_log,json=checkTxLog,proto3" json:"check_tx_log,omitempty"` + DeliverTxLog string `protobuf:"bytes,4,opt,name=deliver_tx_log,json=deliverTxLog,proto3" json:"deliver_tx_log,omitempty"` + Height int64 `protobuf:"varint,5,opt,name=height,proto3" json:"height,omitempty"` + TxHash string `protobuf:"bytes,6,opt,name=tx_hash,json=txHash,proto3" json:"tx_hash,omitempty"` + Err string `protobuf:"bytes,7,opt,name=err,proto3" json:"err,omitempty"` +} + +func (x *SubmitPriceResponse) Reset() { + *x = SubmitPriceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SubmitPriceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SubmitPriceResponse) ProtoMessage() {} + +func (x *SubmitPriceResponse) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SubmitPriceResponse.ProtoReflect.Descriptor instead. +func (*SubmitPriceResponse) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{2} +} + +func (x *SubmitPriceResponse) GetCheckTxSuccess() bool { + if x != nil { + return x.CheckTxSuccess + } + return false +} + +func (x *SubmitPriceResponse) GetDeliverTxSuccess() bool { + if x != nil { + return x.DeliverTxSuccess + } + return false +} + +func (x *SubmitPriceResponse) GetCheckTxLog() string { + if x != nil { + return x.CheckTxLog + } + return "" +} + +func (x *SubmitPriceResponse) GetDeliverTxLog() string { + if x != nil { + return x.DeliverTxLog + } + return "" +} + +func (x *SubmitPriceResponse) GetHeight() int64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *SubmitPriceResponse) GetTxHash() string { + if x != nil { + return x.TxHash + } + return "" +} + +func (x *SubmitPriceResponse) GetErr() string { + if x != nil { + return x.Err + } + return "" +} + +var File_service_proto protoreflect.FileDescriptor + +var file_service_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x11, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x64, 0x65, 0x62, 0x75, 0x67, 0x67, 0x65, 0x72, 0x2e, + 0x76, 0x31, 0x22, 0xa4, 0x01, 0x0a, 0x08, 0x50, 0x72, 0x69, 0x63, 0x65, 0x4d, 0x73, 0x67, 0x12, + 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x70, 0x72, 0x69, 0x63, 0x65, 0x12, 0x15, 0x0a, 0x06, 0x64, 0x65, 0x74, 0x5f, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x64, 0x65, 0x74, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, + 0x64, 0x65, 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x64, + 0x65, 0x63, 0x69, 0x6d, 0x61, 0x6c, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x05, 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x62, 0x61, + 0x73, 0x65, 0x5f, 0x62, 0x6c, 0x6f, 0x63, 0x6b, 0x18, 0x06, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, + 0x62, 0x61, 0x73, 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x7c, 0x0a, 0x12, 0x53, 0x75, 0x62, + 0x6d, 0x69, 0x74, 0x50, 0x72, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x66, 0x65, 0x65, 0x64, 0x65, + 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x66, 0x65, 0x65, 0x64, + 0x65, 0x72, 0x49, 0x64, 0x12, 0x31, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x64, 0x65, 0x62, 0x75, + 0x67, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x72, 0x69, 0x63, 0x65, 0x4d, 0x73, 0x67, + 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x22, 0xf8, 0x01, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x6d, + 0x69, 0x74, 0x50, 0x72, 0x69, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x28, 0x0a, 0x10, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x5f, 0x74, 0x78, 0x5f, 0x73, 0x75, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x63, 0x68, 0x65, 0x63, 0x6b, + 0x54, 0x78, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x64, 0x65, 0x6c, + 0x69, 0x76, 0x65, 0x72, 0x5f, 0x74, 0x78, 0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x10, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x54, 0x78, + 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x20, 0x0a, 0x0c, 0x63, 0x68, 0x65, 0x63, 0x6b, + 0x5f, 0x74, 0x78, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, + 0x68, 0x65, 0x63, 0x6b, 0x54, 0x78, 0x4c, 0x6f, 0x67, 0x12, 0x24, 0x0a, 0x0e, 0x64, 0x65, 0x6c, + 0x69, 0x76, 0x65, 0x72, 0x5f, 0x74, 0x78, 0x5f, 0x6c, 0x6f, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x54, 0x78, 0x4c, 0x6f, 0x67, 0x12, + 0x16, 0x0a, 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x06, 0x68, 0x65, 0x69, 0x67, 0x68, 0x74, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x78, 0x5f, 0x68, 0x61, + 0x73, 0x68, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x78, 0x48, 0x61, 0x73, 0x68, + 0x12, 0x10, 0x0a, 0x03, 0x65, 0x72, 0x72, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x65, + 0x72, 0x72, 0x32, 0x72, 0x0a, 0x12, 0x50, 0x72, 0x69, 0x63, 0x65, 0x53, 0x75, 0x62, 0x6d, 0x69, + 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x5c, 0x0a, 0x0b, 0x53, 0x75, 0x62, 0x6d, + 0x69, 0x74, 0x50, 0x72, 0x69, 0x63, 0x65, 0x12, 0x25, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x64, 0x65, 0x62, 0x75, 0x67, 0x67, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x6d, + 0x69, 0x74, 0x50, 0x72, 0x69, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x64, 0x65, 0x62, 0x75, 0x67, 0x67, 0x65, 0x72, 0x2e, + 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x6d, 0x69, 0x74, 0x50, 0x72, 0x69, 0x63, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x31, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x45, 0x78, 0x6f, 0x63, 0x6f, 0x72, 0x65, 0x4e, 0x65, 0x74, 0x77, + 0x6f, 0x72, 0x6b, 0x2f, 0x70, 0x72, 0x69, 0x63, 0x65, 0x2d, 0x66, 0x65, 0x65, 0x64, 0x65, 0x72, + 0x2f, 0x64, 0x65, 0x62, 0x75, 0x67, 0x67, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_service_proto_rawDescOnce sync.Once + file_service_proto_rawDescData = file_service_proto_rawDesc +) + +func file_service_proto_rawDescGZIP() []byte { + file_service_proto_rawDescOnce.Do(func() { + file_service_proto_rawDescData = protoimpl.X.CompressGZIP(file_service_proto_rawDescData) + }) + return file_service_proto_rawDescData +} + +var file_service_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_service_proto_goTypes = []any{ + (*PriceMsg)(nil), // 0: proto.debugger.v1.PriceMsg + (*SubmitPriceRequest)(nil), // 1: proto.debugger.v1.SubmitPriceRequest + (*SubmitPriceResponse)(nil), // 2: proto.debugger.v1.SubmitPriceResponse +} +var file_service_proto_depIdxs = []int32{ + 0, // 0: proto.debugger.v1.SubmitPriceRequest.price:type_name -> proto.debugger.v1.PriceMsg + 1, // 1: proto.debugger.v1.PriceSubmitService.SubmitPrice:input_type -> proto.debugger.v1.SubmitPriceRequest + 2, // 2: proto.debugger.v1.PriceSubmitService.SubmitPrice:output_type -> proto.debugger.v1.SubmitPriceResponse + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_service_proto_init() } +func file_service_proto_init() { + if File_service_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_service_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*PriceMsg); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_service_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*SubmitPriceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_service_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*SubmitPriceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_service_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_service_proto_goTypes, + DependencyIndexes: file_service_proto_depIdxs, + MessageInfos: file_service_proto_msgTypes, + }.Build() + File_service_proto = out.File + file_service_proto_rawDesc = nil + file_service_proto_goTypes = nil + file_service_proto_depIdxs = nil +} diff --git a/debugger/service_grpc.pb.go b/debugger/service_grpc.pb.go new file mode 100644 index 0000000..0c2978d --- /dev/null +++ b/debugger/service_grpc.pb.go @@ -0,0 +1,109 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.23.4 +// source: service.proto + +package debugger + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + PriceSubmitService_SubmitPrice_FullMethodName = "/proto.debugger.v1.PriceSubmitService/SubmitPrice" +) + +// PriceSubmitServiceClient is the client API for PriceSubmitService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type PriceSubmitServiceClient interface { + SubmitPrice(ctx context.Context, in *SubmitPriceRequest, opts ...grpc.CallOption) (*SubmitPriceResponse, error) +} + +type priceSubmitServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewPriceSubmitServiceClient(cc grpc.ClientConnInterface) PriceSubmitServiceClient { + return &priceSubmitServiceClient{cc} +} + +func (c *priceSubmitServiceClient) SubmitPrice(ctx context.Context, in *SubmitPriceRequest, opts ...grpc.CallOption) (*SubmitPriceResponse, error) { + out := new(SubmitPriceResponse) + err := c.cc.Invoke(ctx, PriceSubmitService_SubmitPrice_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// PriceSubmitServiceServer is the server API for PriceSubmitService service. +// All implementations must embed UnimplementedPriceSubmitServiceServer +// for forward compatibility +type PriceSubmitServiceServer interface { + SubmitPrice(context.Context, *SubmitPriceRequest) (*SubmitPriceResponse, error) + mustEmbedUnimplementedPriceSubmitServiceServer() +} + +// UnimplementedPriceSubmitServiceServer must be embedded to have forward compatible implementations. +type UnimplementedPriceSubmitServiceServer struct { +} + +func (UnimplementedPriceSubmitServiceServer) SubmitPrice(context.Context, *SubmitPriceRequest) (*SubmitPriceResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SubmitPrice not implemented") +} +func (UnimplementedPriceSubmitServiceServer) mustEmbedUnimplementedPriceSubmitServiceServer() {} + +// UnsafePriceSubmitServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to PriceSubmitServiceServer will +// result in compilation errors. +type UnsafePriceSubmitServiceServer interface { + mustEmbedUnimplementedPriceSubmitServiceServer() +} + +func RegisterPriceSubmitServiceServer(s grpc.ServiceRegistrar, srv PriceSubmitServiceServer) { + s.RegisterService(&PriceSubmitService_ServiceDesc, srv) +} + +func _PriceSubmitService_SubmitPrice_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SubmitPriceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PriceSubmitServiceServer).SubmitPrice(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: PriceSubmitService_SubmitPrice_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PriceSubmitServiceServer).SubmitPrice(ctx, req.(*SubmitPriceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// PriceSubmitService_ServiceDesc is the grpc.ServiceDesc for PriceSubmitService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var PriceSubmitService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.debugger.v1.PriceSubmitService", + HandlerType: (*PriceSubmitServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SubmitPrice", + Handler: _PriceSubmitService_SubmitPrice_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "service.proto", +} diff --git a/exoclient/client.go b/exoclient/client.go index 55eb5cb..073dfe3 100644 --- a/exoclient/client.go +++ b/exoclient/client.go @@ -1,6 +1,7 @@ package exoclient import ( + "errors" "fmt" "net" "net/http" @@ -56,7 +57,7 @@ type exoClient struct { } // NewExoClient creates a exocore-client used to do queries and send transactions to exocored -func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDebug string, privKey cryptotypes.PrivKey, encCfg params.EncodingConfig, chainID string) (*exoClient, error) { +func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDebug string, privKey cryptotypes.PrivKey, encCfg params.EncodingConfig, chainID string, txOnly bool) (*exoClient, error) { ec := &exoClient{ logger: logger, privKey: privKey, @@ -72,43 +73,47 @@ func NewExoClient(logger feedertypes.LoggerInf, endpoint, wsEndpoint, endpointDe } var err error - ec.logger.Info("establish grpc connection") - ec.grpcConn, err = createGrpcConn(endpoint, encCfg) - if err != nil { - return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create new Exoclient, endpoint:%s, error:%v", endpoint, err)) + if txOnly && len(endpointDebug) == 0 { + return nil, errors.New("rpc endpoint is empty under debug mode") } - - // setup txClient - ec.txClient = sdktx.NewServiceClient(ec.grpcConn) - if len(endpointDebug) > 0 { ec.txClientDebug, err = client.NewClientFromNode(endpointDebug) if err != nil { return nil, fmt.Errorf("failed to create new client for debug, endponit:%s, error:%v", endpointDebug, err) } } - // ec.txClient = sdktx.NewServiceClient(ec.grpcConn) - // setup queryClient - ec.oracleClient = oracletypes.NewQueryClient(ec.grpcConn) - // setup wsClient - u, err := url.Parse(wsEndpoint) - if err != nil { - return nil, fmt.Errorf("failed to parse wsEndpoint, wsEndpoint:%s, error:%w", wsEndpoint, err) - } - ec.wsDialer = &websocket.Dialer{ - NetDial: func(_, _ string) (net.Conn, error) { - return net.Dial("tcp", u.Host) - }, - Proxy: http.ProxyFromEnvironment, - } - ec.logger.Info("establish ws connection") - ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{}) - if err != nil { - return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err)) + // grpc connection, websocket is not needed for txOnly mode when do debug + if !txOnly { + ec.logger.Info("establish grpc connection") + ec.grpcConn, err = createGrpcConn(endpoint, encCfg) + if err != nil { + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create new Exoclient, endpoint:%s, error:%v", endpoint, err)) + } + + // setup txClient + ec.txClient = sdktx.NewServiceClient(ec.grpcConn) + // setup queryClient + ec.oracleClient = oracletypes.NewQueryClient(ec.grpcConn) + // setup wsClient + u, err := url.Parse(wsEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to parse wsEndpoint, wsEndpoint:%s, error:%w", wsEndpoint, err) + } + ec.wsDialer = &websocket.Dialer{ + NetDial: func(_, _ string) (net.Conn, error) { + return net.Dial("tcp", u.Host) + }, + Proxy: http.ProxyFromEnvironment, + } + ec.logger.Info("establish ws connection") + ec.wsClient, _, err = ec.wsDialer.Dial(wsEndpoint, http.Header{}) + if err != nil { + return nil, feedertypes.ErrInitConnectionFail.Wrap(fmt.Sprintf("failed to create ws connection, error:%v", err)) + } + ec.wsClient.SetPongHandler(func(string) error { + return nil + }) } - ec.wsClient.SetPongHandler(func(string) error { - return nil - }) return ec, nil } diff --git a/exoclient/types.go b/exoclient/types.go index 34738fa..7d76786 100644 --- a/exoclient/types.go +++ b/exoclient/types.go @@ -356,7 +356,7 @@ var ( // Init intialize the exoclient with configuration including consensuskey info, chainID // func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) (*grpc.ClientConn, func(), error) { -func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) error { +func Init(conf *feedertypes.Config, mnemonic, privFile string, txOnly bool, standalone bool) error { if logger = feedertypes.GetLogger("exoclient"); logger == nil { panic("logger is not initialized") } @@ -415,7 +415,7 @@ func Init(conf feedertypes.Config, mnemonic, privFile string, standalone bool) e } var err error - if defaultExoClient, err = NewExoClient(logger, confExocore.Gprc, confExocore.Ws, conf.Exocore.Rpc, privKey, encCfg, confExocore.ChainID); err != nil { + if defaultExoClient, err = NewExoClient(logger, confExocore.Gprc, confExocore.Ws, conf.Exocore.Rpc, privKey, encCfg, confExocore.ChainID, txOnly); err != nil { if errors.Is(err, feedertypes.ErrInitConnectionFail) { return err } diff --git a/external/feeder.go b/external/feeder.go index fa1419a..da7a8da 100644 --- a/external/feeder.go +++ b/external/feeder.go @@ -12,8 +12,6 @@ func StartPriceFeeder(cfgFile, mnemonic, sourcesPath string, logger feedertypes. if len(cfgFile) == 0 { return false } - feedertypes.ConfigFile = cfgFile - feedertypes.SourcesConfigPath = sourcesPath conf := feedertypes.InitConfig(cfgFile) // Start price feeder diff --git a/proto/debugger/v1/service.proto b/proto/debugger/v1/service.proto new file mode 100644 index 0000000..5d8ee18 --- /dev/null +++ b/proto/debugger/v1/service.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; +package proto.debugger.v1; +option go_package = "github.com/ExocoreNetwork/price-feeder/debugger"; + +service PriceSubmitService { + rpc SubmitPrice(SubmitPriceRequest) returns (SubmitPriceResponse); +} + +message PriceMsg { + string price = 1; + string det_id = 2; + int32 decimal = 3; + string timestamp = 4; + int32 nonce = 5; + uint64 base_block = 6; +} + +message SubmitPriceRequest{ + int64 height = 1; + uint64 feeder_id = 2; + PriceMsg price = 3; +} + +message SubmitPriceResponse{ + bool check_tx_success = 1; + bool deliver_tx_success = 2; + string check_tx_log = 3; + string deliver_tx_log = 4; + int64 height = 5; + string tx_hash = 6; + string err = 7; +} diff --git a/types/types.go b/types/types.go index f023c15..3c2f31f 100644 --- a/types/types.go +++ b/types/types.go @@ -1,6 +1,7 @@ package types import ( + "errors" "fmt" "os" @@ -150,9 +151,7 @@ func (e *Err) Unwrap() error { } var ( - ConfigFile string - SourcesConfigPath string - v *viper.Viper + v *viper.Viper ErrInitFail = NewErr("failed to initialization") ErrInitConnectionFail = NewErr("failed to establish a connection") @@ -161,7 +160,13 @@ var ( ) // InitConfig will only read path cfgFile once, and for reload after InitConfig, should use ReloadConfig -func InitConfig(cfgFile string) Config { +func InitConfig(cfgFile string) (*Config, error) { + if len(cfgFile) == 0 { + return nil, errors.New("empty file name") + } + if _, err := os.Stat(cfgFile); os.IsNotExist(err) { + return nil, err + } if v == nil { v = viper.New() } @@ -176,7 +181,7 @@ func InitConfig(cfgFile string) Config { if err := v.Unmarshal(conf); err != nil { panic(ErrInitFail.Wrap(err.Error())) } - return *conf + return conf, nil } // ReloadConfig will reload config file with path set by InitConfig From 5b7145c530cdb746d5f950519633d58d3e1f3ea7 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Mon, 30 Dec 2024 09:08:30 +0800 Subject: [PATCH 08/10] typo --- cmd/feeder_debug.go | 2 +- types/types.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index fdf35c7..c1a3784 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -198,7 +198,7 @@ func sendTx(feederID uint64, height int64, price *debugger.PriceMsg) (*debugger. }) } -func sendTxImmediatly(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) { +func sendTxImmediately(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) { if err := exoclient.Init(feederConfig, mnemonic, privFile, true, true); err != nil { return nil, fmt.Errorf("failed to init exoclient in txOnly mode for debug, error:%w", err) } diff --git a/types/types.go b/types/types.go index 3c2f31f..d1e40ce 100644 --- a/types/types.go +++ b/types/types.go @@ -49,7 +49,7 @@ type Config struct { AppName string `mapstructure:"appname"` Gprc string `mapstructure:"grpc"` Ws string `mapstructure:"ws"` - Rpc string `mspstructure:"rpc"` + Rpc string `mapstructure:"rpc"` } `mapstructure:"exocore"` } From f716c61e372075155c58459285638c9326b4c5c0 Mon Sep 17 00:00:00 2001 From: leonz789 Date: Mon, 30 Dec 2024 09:38:41 +0800 Subject: [PATCH 09/10] typo, refactor --- cmd/debug.go | 5 +- cmd/feeder_debug.go | 124 ++++++++++++++++++++++++-------------------- exoclient/types.go | 2 +- types/types.go | 2 +- 4 files changed, 73 insertions(+), 60 deletions(-) diff --git a/cmd/debug.go b/cmd/debug.go index 9ca229f..f5e4ebc 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -84,7 +84,10 @@ var debugSendImmCmd = &cobra.Command{ if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil { return err } - res, err := sendTxImmediatly(feederID, msgPrice) + if err := msgPrice.validate(); err != nil { + return err + } + res, err := sendTxImmediately(feederID, msgPrice) if err != nil { return err } diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index c1a3784..f613611 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -15,6 +15,41 @@ import ( "google.golang.org/grpc/credentials/insecure" ) +type sendReq struct { + // on which block committed send this transaction, required + height int64 + + feederID uint64 + price *debugger.PriceMsg + result chan *debugger.SubmitPriceResponse +} +type pendingRequestManager map[int64][]*sendReq + +func newPendingRequestManager() pendingRequestManager { + return make(map[int64][]*sendReq) +} + +func (p pendingRequestManager) add(height int64, req *sendReq) { + if pendings, ok := p[height]; ok { + p[height] = append(pendings, req) + } else { + p[height] = []*sendReq{req} + } +} + +func (p pendingRequestManager) process(height int64, handler func(*sendReq) error) { + for h, pendings := range p { + if h <= height { + for _, req := range pendings { + if err := handler(req); err != nil { + req.result <- &debugger.SubmitPriceResponse{Err: err.Error()} + } + } + delete(p, h) + } + } +} + type server struct { sendCh chan *sendReq debugger.UnimplementedPriceSubmitServiceServer @@ -47,6 +82,19 @@ type PriceJSON struct { BaseBlock uint64 `json:"base_block"` } +func (p PriceJSON) validate() error { + if len(p.Price) == 0 { + return errors.New("price is required") + } + if len(p.DetID) == 0 { + return errors.New("det_id is required") + } + if p.Nonce == 0 { + return errors.New("nonce should be greater than 0") + } + return nil +} + func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { return fetchertypes.PriceInfo{ Price: p.Price, @@ -56,35 +104,7 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo { } } -type sendRes struct { - err error - checkTxSuccess bool - checkTxLog string - deliverTxSuccess bool - deliverTxLog string - txHash string - height int64 -} - -type sendReq struct { - // on which block committed send this transaction, required - height int64 - - feederID uint64 - price *debugger.PriceMsg - result chan *debugger.SubmitPriceResponse -} - -type statusCode string - -const ( - overwrite statusCode = "overwrite the pending transaction" - success statusCode = "successfully sent the transaction" - fail statusCode = "failed to send transaction" -) - var ( - sendCh = make(chan *sendReq) DebugRetryConfig = RetryConfig{ MaxAttempts: 10, Interval: 3 * time.Second, @@ -124,7 +144,8 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn defer ecClient.Close() ecClient.Subscribe() // var pendingReq *sendReq - pendingReqs := make(map[int64][]*sendReq) + // pendingReqs := make(map[int64][]*sendReq) + pendingReqs := newPendingRequestManager() sendCh := make(chan *sendReq, 10) go func() { for { @@ -132,38 +153,27 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn case event := <-ecClient.EventsCh(): if e, ok := event.(*exoclient.EventNewBlock); ok { logger.Info("new block commited", "height", e.Height()) - for h, pendings := range pendingReqs { - if h <= e.Height() { - for i, req := range pendings { - res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce) - if err != nil { - logger.Error("failed to send tx", "error", err) - req.result <- &debugger.SubmitPriceResponse{Err: err.Error()} - } - req.result <- &debugger.SubmitPriceResponse{ - CheckTxSuccess: res.CheckTx.Code == 0, - CheckTxLog: res.CheckTx.Log, - DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0, - DeliverTxLog: res.DeliverTx.Log, - TxHash: res.Hash.String(), - Height: res.Height, - } - if i == len(pendings)-1 { - delete(pendingReqs, h) - } else { - pendingReqs[h] = append(pendings[:i], pendings[i+1:]...) - } - } + + pendingReqs.process(e.Height(), func(req *sendReq) error { + res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce) + if err != nil { + logger.Error("failed to send tx", "error", err) + return err } - } + req.result <- &debugger.SubmitPriceResponse{ + CheckTxSuccess: res.CheckTx.Code == 0, + CheckTxLog: res.CheckTx.Log, + DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0, + DeliverTxLog: res.DeliverTx.Log, + TxHash: res.Hash.String(), + Height: res.Height, + } + return nil + }) } case req := <-sendCh: logger.Info("add a new send request", "height", req.height) - if pendings, ok := pendingReqs[req.height]; ok { - pendingReqs[req.height] = append(pendings, req) - } else { - pendingReqs[req.height] = []*sendReq{req} - } + pendingReqs.add(req.height, req) } } }() diff --git a/exoclient/types.go b/exoclient/types.go index 7d76786..2e79dde 100644 --- a/exoclient/types.go +++ b/exoclient/types.go @@ -415,7 +415,7 @@ func Init(conf *feedertypes.Config, mnemonic, privFile string, txOnly bool, stan } var err error - if defaultExoClient, err = NewExoClient(logger, confExocore.Gprc, confExocore.Ws, conf.Exocore.Rpc, privKey, encCfg, confExocore.ChainID, txOnly); err != nil { + if defaultExoClient, err = NewExoClient(logger, confExocore.Grpc, confExocore.Ws, conf.Exocore.Rpc, privKey, encCfg, confExocore.ChainID, txOnly); err != nil { if errors.Is(err, feedertypes.ErrInitConnectionFail) { return err } diff --git a/types/types.go b/types/types.go index d1e40ce..ad95c43 100644 --- a/types/types.go +++ b/types/types.go @@ -47,7 +47,7 @@ type Config struct { Exocore struct { ChainID string `mapstructure:"chainid"` AppName string `mapstructure:"appname"` - Gprc string `mapstructure:"grpc"` + Grpc string `mapstructure:"grpc"` Ws string `mapstructure:"ws"` Rpc string `mapstructure:"rpc"` } `mapstructure:"exocore"` From c563c5b6b64ea0f7d6b86ec4e7a191eafe4e1dad Mon Sep 17 00:00:00 2001 From: leonz789 Date: Mon, 30 Dec 2024 09:44:58 +0800 Subject: [PATCH 10/10] add config for grpc port --- cmd/debug.go | 2 +- cmd/feeder_debug.go | 7 ++++--- types/types.go | 3 +++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/debug.go b/cmd/debug.go index f5e4ebc..aacaeda 100644 --- a/cmd/debug.go +++ b/cmd/debug.go @@ -57,7 +57,7 @@ var debugSendCmd = &cobra.Command{ if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil { return err } - res, err := sendTx(feederID, height, msgPrice) + res, err := sendTx(feederID, height, msgPrice, feederConfig.Debugger.Grpc) if err != nil { return err } diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index f613611..dde28fd 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -177,7 +177,7 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn } } }() - lis, err := net.Listen("tcp", ":50051") + lis, err := net.Listen("tcp", conf.Debugger.Grpc) if err != nil { fmt.Printf("failed to listen: %v\r\n", err) return @@ -190,9 +190,10 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn } } -func sendTx(feederID uint64, height int64, price *debugger.PriceMsg) (*debugger.SubmitPriceResponse, error) { +func sendTx(feederID uint64, height int64, price *debugger.PriceMsg, port string) (*debugger.SubmitPriceResponse, error) { conn, err := grpc.Dial( - "localhost:50051", + fmt.Sprintf("localhost%s", port), + // "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) diff --git a/types/types.go b/types/types.go index ad95c43..970d782 100644 --- a/types/types.go +++ b/types/types.go @@ -51,6 +51,9 @@ type Config struct { Ws string `mapstructure:"ws"` Rpc string `mapstructure:"rpc"` } `mapstructure:"exocore"` + Debugger struct { + Grpc string `mapstructure:"grpc"` + } `mapstructure:"debugger"` } type LoggerInf log.Logger