From 9a872d992704b3fb051a50659efeec7d8c9d3e0c Mon Sep 17 00:00:00 2001 From: leonz789 Date: Wed, 1 Jan 2025 14:50:25 +0800 Subject: [PATCH] perf: send multiple txs on the same height concurrently --- cmd/feeder_debug.go | 52 ++++++++++++++++++++++++++++++--------------- exoclient/tx.go | 39 ++++++++++++++++++++++------------ 2 files changed, 61 insertions(+), 30 deletions(-) diff --git a/cmd/feeder_debug.go b/cmd/feeder_debug.go index 0cfe62c..caa3540 100644 --- a/cmd/feeder_debug.go +++ b/cmd/feeder_debug.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/ExocoreNetwork/price-feeder/debugger" @@ -23,28 +24,36 @@ type sendReq struct { price *debugger.PriceMsg result chan *debugger.SubmitPriceResponse } -type pendingRequestManager map[int64][]*sendReq +type sendBytesReq struct { + txBytes []byte + result chan *debugger.SubmitPriceResponse +} +type pendingRequestManager map[int64][]*sendBytesReq func newPendingRequestManager() pendingRequestManager { - return make(map[int64][]*sendReq) + return make(map[int64][]*sendBytesReq) } -func (p pendingRequestManager) add(height int64, req *sendReq) { +func (p pendingRequestManager) add(height int64, req *sendBytesReq) { if pendings, ok := p[height]; ok { p[height] = append(pendings, req) } else { - p[height] = []*sendReq{req} + p[height] = []*sendBytesReq{req} } } -func (p pendingRequestManager) process(height int64, handler func(*sendReq) error) { +func (p pendingRequestManager) process(height int64, handler func(*sendBytesReq)) { + wg := sync.WaitGroup{} for h, pendings := range p { if h <= height { for _, req := range pendings { - if err := handler(req); err != nil { - req.result <- &debugger.SubmitPriceResponse{Err: err.Error()} - } + wg.Add(1) + go func(req *sendBytesReq) { + handler(req) + wg.Done() + }(req) } + wg.Wait() delete(p, h) } } @@ -143,8 +152,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn ecClient, _ := exoclient.GetClient() defer ecClient.Close() ecClient.Subscribe() - // var pendingReq *sendReq - // pendingReqs := make(map[int64][]*sendReq) pendingReqs := newPendingRequestManager() sendCh := make(chan *sendReq, 10) go func() { @@ -154,11 +161,14 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn if e, ok := event.(*exoclient.EventNewBlock); ok { logger.Info("new block commited", "height", e.Height()) - pendingReqs.process(e.Height(), func(req *sendReq) error { - res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce) + pendingReqs.process(e.Height(), func(req *sendBytesReq) { + res, err := ecClient.SendSignedTxBytesDebug(req.txBytes) if err != nil { logger.Error("failed to send tx", "error", err) - return err + req.result <- &debugger.SubmitPriceResponse{ + Err: err.Error(), + } + return } req.result <- &debugger.SubmitPriceResponse{ CheckTxSuccess: res.CheckTx.Code == 0, @@ -168,12 +178,21 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn TxHash: res.Hash.String(), Height: res.Height, } - return nil }) } case req := <-sendCh: - logger.Info("add a new send request", "height", req.height) - pendingReqs.add(req.height, req) + logger.Info("add a new send request", "height", req.height, "feederID", req.feederID) + _, txBytes, err := ecClient.GetSignedTxBytesDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce) + if err != nil { + req.result <- &debugger.SubmitPriceResponse{ + Err: fmt.Sprintf("failed to sign tx from req:%v", req), + } + break + } + pendingReqs.add(req.height, &sendBytesReq{ + txBytes: txBytes, + result: req.result, + }) } } }() @@ -193,7 +212,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn func sendTx(feederID uint64, height int64, price *debugger.PriceMsg, port string) (*debugger.SubmitPriceResponse, error) { conn, err := grpc.Dial( fmt.Sprintf("localhost%s", port), - // "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), ) diff --git a/exoclient/tx.go b/exoclient/tx.go index 4837e5d..dc85101 100644 --- a/exoclient/tx.go +++ b/exoclient/tx.go @@ -37,19 +37,6 @@ func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes return res, nil } -func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) { - msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce) - if err != nil { - return nil, err - } - // broadcast txBytes - res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes) - if err != nil { - return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) - } - return res, nil -} - // signMsg signs the message with consensusskey func (ec exoClient) signMsg(msgs ...sdk.Msg) (authsigning.Tx, error) { txBuilder := ec.txCfg.NewTxBuilder() @@ -141,3 +128,29 @@ func (ec exoClient) getSignedTxBytes(feederID uint64, baseBlock uint64, price fe } return msg, txBytes, nil } + +func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) { + msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce) + if err != nil { + return nil, err + } + // broadcast txBytes + res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes) + if err != nil { + return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err) + } + return res, nil +} + +func (ec exoClient) GetSignedTxBytesDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*oracletypes.MsgCreatePrice, []byte, error) { + return ec.getSignedTxBytes(feederID, baseBlock, price, nonce) +} + +func (ec exoClient) SendSignedTxBytesDebug(txBytes []byte) (*coretypes.ResultBroadcastTxCommit, error) { + // broadcast txBytes + res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes) + if err != nil { + return nil, fmt.Errorf("failed to braodcast transaction, txBytes:%v, valConsAddr:%s, error:%w", txBytes, sdk.ConsAddress(ec.pubKey.Address()), err) + } + return res, nil +}