Skip to content

Commit

Permalink
perf: send multiple txs on the same height concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
leonz789 committed Jan 1, 2025
1 parent 0c42dc8 commit 9a872d9
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 30 deletions.
52 changes: 35 additions & 17 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"

"github.com/ExocoreNetwork/price-feeder/debugger"
Expand All @@ -23,28 +24,36 @@ type sendReq struct {
price *debugger.PriceMsg
result chan *debugger.SubmitPriceResponse
}
type pendingRequestManager map[int64][]*sendReq
type sendBytesReq struct {
txBytes []byte
result chan *debugger.SubmitPriceResponse
}
type pendingRequestManager map[int64][]*sendBytesReq

func newPendingRequestManager() pendingRequestManager {
return make(map[int64][]*sendReq)
return make(map[int64][]*sendBytesReq)
}

func (p pendingRequestManager) add(height int64, req *sendReq) {
func (p pendingRequestManager) add(height int64, req *sendBytesReq) {
if pendings, ok := p[height]; ok {
p[height] = append(pendings, req)
} else {
p[height] = []*sendReq{req}
p[height] = []*sendBytesReq{req}
}
}

func (p pendingRequestManager) process(height int64, handler func(*sendReq) error) {
func (p pendingRequestManager) process(height int64, handler func(*sendBytesReq)) {
wg := sync.WaitGroup{}
for h, pendings := range p {
if h <= height {
for _, req := range pendings {
if err := handler(req); err != nil {
req.result <- &debugger.SubmitPriceResponse{Err: err.Error()}
}
wg.Add(1)
go func(req *sendBytesReq) {
handler(req)
wg.Done()
}(req)
}
wg.Wait()
delete(p, h)
}
}
Expand Down Expand Up @@ -143,8 +152,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
ecClient, _ := exoclient.GetClient()
defer ecClient.Close()
ecClient.Subscribe()
// var pendingReq *sendReq
// pendingReqs := make(map[int64][]*sendReq)
pendingReqs := newPendingRequestManager()
sendCh := make(chan *sendReq, 10)
go func() {
Expand All @@ -154,11 +161,14 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())

pendingReqs.process(e.Height(), func(req *sendReq) error {
res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
pendingReqs.process(e.Height(), func(req *sendBytesReq) {
res, err := ecClient.SendSignedTxBytesDebug(req.txBytes)
if err != nil {
logger.Error("failed to send tx", "error", err)
return err
req.result <- &debugger.SubmitPriceResponse{
Err: err.Error(),
}
return
}
req.result <- &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
Expand All @@ -168,12 +178,21 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
TxHash: res.Hash.String(),
Height: res.Height,
}
return nil
})
}
case req := <-sendCh:
logger.Info("add a new send request", "height", req.height)
pendingReqs.add(req.height, req)
logger.Info("add a new send request", "height", req.height, "feederID", req.feederID)
_, txBytes, err := ecClient.GetSignedTxBytesDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
if err != nil {
req.result <- &debugger.SubmitPriceResponse{
Err: fmt.Sprintf("failed to sign tx from req:%v", req),
}
break
}
pendingReqs.add(req.height, &sendBytesReq{
txBytes: txBytes,
result: req.result,
})
}
}
}()
Expand All @@ -193,7 +212,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
func sendTx(feederID uint64, height int64, price *debugger.PriceMsg, port string) (*debugger.SubmitPriceResponse, error) {
conn, err := grpc.Dial(
fmt.Sprintf("localhost%s", port),
// "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
Expand Down
39 changes: 26 additions & 13 deletions exoclient/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes
return res, nil
}

func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) {
msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
if err != nil {
return nil, err
}
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}

// signMsg signs the message with consensusskey
func (ec exoClient) signMsg(msgs ...sdk.Msg) (authsigning.Tx, error) {
txBuilder := ec.txCfg.NewTxBuilder()
Expand Down Expand Up @@ -141,3 +128,29 @@ func (ec exoClient) getSignedTxBytes(feederID uint64, baseBlock uint64, price fe
}
return msg, txBytes, nil
}

func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) {
msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
if err != nil {
return nil, err
}
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}

func (ec exoClient) GetSignedTxBytesDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*oracletypes.MsgCreatePrice, []byte, error) {
return ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
}

func (ec exoClient) SendSignedTxBytesDebug(txBytes []byte) (*coretypes.ResultBroadcastTxCommit, error) {
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, txBytes:%v, valConsAddr:%s, error:%w", txBytes, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}

0 comments on commit 9a872d9

Please sign in to comment.