Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: send multiple txs on the same height concurrently #16

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"sync"
"time"

"github.com/ExocoreNetwork/price-feeder/debugger"
Expand All @@ -23,28 +24,36 @@ type sendReq struct {
price *debugger.PriceMsg
result chan *debugger.SubmitPriceResponse
}
type pendingRequestManager map[int64][]*sendReq
type sendBytesReq struct {
txBytes []byte
result chan *debugger.SubmitPriceResponse
}
type pendingRequestManager map[int64][]*sendBytesReq

func newPendingRequestManager() pendingRequestManager {
return make(map[int64][]*sendReq)
return make(map[int64][]*sendBytesReq)
}

func (p pendingRequestManager) add(height int64, req *sendReq) {
func (p pendingRequestManager) add(height int64, req *sendBytesReq) {
if pendings, ok := p[height]; ok {
p[height] = append(pendings, req)
} else {
p[height] = []*sendReq{req}
p[height] = []*sendBytesReq{req}
}
}

func (p pendingRequestManager) process(height int64, handler func(*sendReq) error) {
func (p pendingRequestManager) process(height int64, handler func(*sendBytesReq)) {
wg := sync.WaitGroup{}
for h, pendings := range p {
if h <= height {
for _, req := range pendings {
if err := handler(req); err != nil {
req.result <- &debugger.SubmitPriceResponse{Err: err.Error()}
}
wg.Add(1)
go func(req *sendBytesReq) {
handler(req)
wg.Done()
}(req)
}
wg.Wait()
delete(p, h)
}
}
Expand Down Expand Up @@ -143,8 +152,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
ecClient, _ := exoclient.GetClient()
defer ecClient.Close()
ecClient.Subscribe()
// var pendingReq *sendReq
// pendingReqs := make(map[int64][]*sendReq)
pendingReqs := newPendingRequestManager()
sendCh := make(chan *sendReq, 10)
go func() {
Expand All @@ -154,11 +161,14 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())

pendingReqs.process(e.Height(), func(req *sendReq) error {
res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
pendingReqs.process(e.Height(), func(req *sendBytesReq) {
res, err := ecClient.SendSignedTxBytesDebug(req.txBytes)
if err != nil {
logger.Error("failed to send tx", "error", err)
return err
req.result <- &debugger.SubmitPriceResponse{
Err: err.Error(),
}
return
}
req.result <- &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
Expand All @@ -168,12 +178,21 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
TxHash: res.Hash.String(),
Height: res.Height,
}
return nil
})
}
case req := <-sendCh:
logger.Info("add a new send request", "height", req.height)
pendingReqs.add(req.height, req)
logger.Info("add a new send request", "height", req.height, "feederID", req.feederID)
_, txBytes, err := ecClient.GetSignedTxBytesDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
if err != nil {
req.result <- &debugger.SubmitPriceResponse{
Err: fmt.Sprintf("failed to sign tx from req:%v", req),
}
break
}
pendingReqs.add(req.height, &sendBytesReq{
txBytes: txBytes,
result: req.result,
})
}
}
}()
Expand All @@ -193,7 +212,6 @@ func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mn
func sendTx(feederID uint64, height int64, price *debugger.PriceMsg, port string) (*debugger.SubmitPriceResponse, error) {
conn, err := grpc.Dial(
fmt.Sprintf("localhost%s", port),
// "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
Expand Down
39 changes: 26 additions & 13 deletions exoclient/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ func (ec exoClient) SendTx(feederID uint64, baseBlock uint64, price fetchertypes
return res, nil
}

func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) {
msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
if err != nil {
return nil, err
}
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}

// signMsg signs the message with consensusskey
func (ec exoClient) signMsg(msgs ...sdk.Msg) (authsigning.Tx, error) {
txBuilder := ec.txCfg.NewTxBuilder()
Expand Down Expand Up @@ -141,3 +128,29 @@ func (ec exoClient) getSignedTxBytes(feederID uint64, baseBlock uint64, price fe
}
return msg, txBytes, nil
}

func (ec exoClient) SendTxDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*coretypes.ResultBroadcastTxCommit, error) {
msg, txBytes, err := ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
if err != nil {
return nil, err
}
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, msg:%v, valConsAddr:%s, error:%w", msg, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}

func (ec exoClient) GetSignedTxBytesDebug(feederID uint64, baseBlock uint64, price fetchertypes.PriceInfo, nonce int32) (*oracletypes.MsgCreatePrice, []byte, error) {
return ec.getSignedTxBytes(feederID, baseBlock, price, nonce)
}

func (ec exoClient) SendSignedTxBytesDebug(txBytes []byte) (*coretypes.ResultBroadcastTxCommit, error) {
// broadcast txBytes
res, err := ec.txClientDebug.BroadcastTxCommit(context.Background(), txBytes)
if err != nil {
return nil, fmt.Errorf("failed to braodcast transaction, txBytes:%v, valConsAddr:%s, error:%w", txBytes, sdk.ConsAddress(ec.pubKey.Address()), err)
}
return res, nil
}
Loading