Skip to content

Commit

Permalink
feat: debug tools to send tx from cli
Browse files Browse the repository at this point in the history
  • Loading branch information
leonz789 committed Dec 30, 2024
1 parent 72d2c7f commit 129e5f8
Show file tree
Hide file tree
Showing 14 changed files with 866 additions and 126 deletions.
95 changes: 90 additions & 5 deletions cmd/debug.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,102 @@
package cmd

import (
"encoding/json"
"fmt"

"github.com/ExocoreNetwork/price-feeder/debugger"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

var (
flagFeederID = "feederID"
flagHeight = "height"
)

var debugCmd = &cobra.Command{
func init() {
debugStartCmd.PersistentFlags().Uint64(flagFeederID, 0, "feederID of the token")
debugStartCmd.PersistentFlags().Int64(flagHeight, 0, "committed block height after which the tx will be sent")

debugStartCmd.AddCommand(
debugSendCmd,
debugSendImmCmd,
)
}

var debugStartCmd = &cobra.Command{
Use: "debug",
Short: "",
Long: "",
Run: func(cmd *cobra.Command, args []string) {
Short: "start listening to new blocks",
Long: "start listening to new blocks",
RunE: func(cmd *cobra.Command, args []string) error {
logger := feedertypes.NewLogger(zapcore.DebugLevel)
DebugPriceFeeder(conf, logger, mnemonic, sourcesPath)
DebugPriceFeeder(feederConfig, logger, mnemonic, sourcesPath)
return nil
},
}

var debugSendCmd = &cobra.Command{
Use: `send --feederID [feederID] --height [height] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
height, err := cmd.Parent().PersistentFlags().GetInt64(flagHeight)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &debugger.PriceMsg{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
res, err := sendTx(feederID, height, msgPrice)
if err != nil {
return err
}
if len(res.Err) > 0 {
fmt.Println("")
}
printProto(res)
return nil
},
}

var debugSendImmCmd = &cobra.Command{
Use: `send-imm --feederID [feederID] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &PriceJSON{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
res, err := sendTxImmediatly(feederID, msgPrice)
if err != nil {
return err
}
printProto(res)
return nil
},
}

func printProto(m proto.Message) {
marshaled, err := protojson.MarshalOptions{EmitUnpopulated: true, UseProtoNames: true}.Marshal(m)
if err != nil {
fmt.Printf("failed to print proto message, error:%v", err)
}
fmt.Println(string(marshaled))
}
170 changes: 137 additions & 33 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,50 @@
package cmd

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/ExocoreNetwork/price-feeder/debugger"
"github.com/ExocoreNetwork/price-feeder/exoclient"
fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type server struct {
sendCh chan *sendReq
debugger.UnimplementedPriceSubmitServiceServer
}

func (s *server) SubmitPrice(ctx context.Context, req *debugger.SubmitPriceRequest) (*debugger.SubmitPriceResponse, error) {
result := make(chan *debugger.SubmitPriceResponse, 1)
s.sendCh <- &sendReq{
height: req.Height,
feederID: req.FeederId,
price: req.Price,
result: result,
}
r := <-result
return r, nil
}

func newServer(sendCh chan *sendReq) *server {
return &server{
sendCh: sendCh,
}
}

type PriceJSON struct {
Price string `json:"price"`
DetID string `json:"det_id"`
Decimal int32 `json:"decimal"`
Timestamp string `json:"timestamp"`
Nonce int32 `json:"nonce"`
BaseBlock uint64 `json:"base_block"`
}

func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {
Expand All @@ -27,26 +57,22 @@ func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {
}

type sendRes struct {
status statusCode
txHash string
details string
err error
checkTxSuccess bool
checkTxLog string
deliverTxSuccess bool
deliverTxLog string
txHash string
height int64
}

type sendReq struct {
// on which block committed send this transaction, required
height int64
baseBlock uint64
// optional
roundID int64
height int64

feederID uint64
price PriceJSON
// decimal int32
// detID string
// timestamp string
nonce int32

result chan *sendRes
price *debugger.PriceMsg
result chan *debugger.SubmitPriceResponse
}

type statusCode string
Expand All @@ -58,10 +84,14 @@ const (
)

var (
sendCh = make(chan *sendReq)
sendCh = make(chan *sendReq)
DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
}
)

func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) {
func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) {
// init logger
if logger = feedertypes.SetLogger(logger); logger == nil {
panic("logger is not initialized")
Expand All @@ -71,7 +101,7 @@ func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mne
// 2. sending create-price tx
count := 0
for count < DebugRetryConfig.MaxAttempts {
if err := exoclient.Init(conf, mnemonic, privFile, true); err != nil {
if err := exoclient.Init(conf, mnemonic, privFile, false, true); err != nil {
if errors.Is(err, feedertypes.ErrInitConnectionFail) {
logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err)
time.Sleep(DebugRetryConfig.Interval)
Expand All @@ -93,25 +123,99 @@ func DebugPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mne
ecClient, _ := exoclient.GetClient()
defer ecClient.Close()
ecClient.Subscribe()
var pendingReq *sendReq
for {
select {
case event := <-ecClient.EventsCh():
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())
if pendingReq != nil {
if pendingReq.height <= e.Height() {
// send this transaction
res, err := ecClient.SendTxDebug(pendingReq.feederID, pendingReq.baseBlock, pendingReq.price.getPriceInfo(), pendingReq.nonce)
fmt.Println("debug--> result")
fmt.Println(res)
fmt.Println(err)
pendingReq = nil
// var pendingReq *sendReq
pendingReqs := make(map[int64][]*sendReq)
sendCh := make(chan *sendReq, 10)
go func() {
for {
select {
case event := <-ecClient.EventsCh():
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())
for h, pendings := range pendingReqs {
if h <= e.Height() {
for i, req := range pendings {
res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
if err != nil {
logger.Error("failed to send tx", "error", err)
req.result <- &debugger.SubmitPriceResponse{Err: err.Error()}
}
req.result <- &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
if i == len(pendings)-1 {
delete(pendingReqs, h)
} else {
pendingReqs[h] = append(pendings[:i], pendings[i+1:]...)
}
}
}
}
}
case req := <-sendCh:
logger.Info("add a new send request", "height", req.height)
if pendings, ok := pendingReqs[req.height]; ok {
pendingReqs[req.height] = append(pendings, req)
} else {
pendingReqs[req.height] = []*sendReq{req}
}
}
case req := <-sendCh:
pendingReq = req
}
}()
lis, err := net.Listen("tcp", ":50051")
if err != nil {
fmt.Printf("failed to listen: %v\r\n", err)
return
}
grpcServer := grpc.NewServer()
debugger.RegisterPriceSubmitServiceServer(grpcServer, newServer(sendCh))
if err := grpcServer.Serve(lis); err != nil {
fmt.Printf("failed to serve:%v\r\n", err)
return
}
}

func sendTx(feederID uint64, height int64, price *debugger.PriceMsg) (*debugger.SubmitPriceResponse, error) {
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, err
}
defer conn.Close()
c := debugger.NewPriceSubmitServiceClient(conn)
return c.SubmitPrice(context.Background(), &debugger.SubmitPriceRequest{
Height: height,
FeederId: feederID,
Price: price,
})
}

func sendTxImmediatly(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) {
if err := exoclient.Init(feederConfig, mnemonic, privFile, true, true); err != nil {
return nil, fmt.Errorf("failed to init exoclient in txOnly mode for debug, error:%w", err)
}
ec, _ := exoclient.GetClient()
pInfo := price.getPriceInfo()

res, err := ec.SendTxDebug(feederID, price.BaseBlock, pInfo, price.Nonce)
if err != nil {
return nil, err
}
protoRes := &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
return protoRes, nil
}
21 changes: 4 additions & 17 deletions cmd/feeder_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,16 @@ var DefaultRetryConfig = RetryConfig{
Interval: 2 * time.Second,
}

var DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
}

const (
statusOk = 0
privFile = "priv_validator_key.json"

//feeder_tokenName_feederID
// loggerTagPrefix = "feed_%s_%d"
)

// var updateConfig sync.Mutex

// RunPriceFeeder runs price feeder to fetching price and feed to exocorechain
func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
func RunPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
// init logger
if logger = feedertypes.SetLogger(logger); logger == nil {
panic("logger is not initialized")
}
// init logger, fetchers, exocoreclient
if err := initComponents(logger, conf, standalone); err != nil {
if err := initComponents(logger, conf, sourcesPath, standalone); err != nil {
logger.Error("failed to initialize components")
panic(err)
}
Expand Down Expand Up @@ -167,7 +154,7 @@ func ResetAllStakerValidators(ec exoclient.ExoClientInf, logger feedertypes.Logg
}

// // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed
func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) error {
func initComponents(logger types.LoggerInf, conf *types.Config, sourcesPath string, standalone bool) error {
count := 0
for count < DefaultRetryConfig.MaxAttempts {
count++
Expand All @@ -178,7 +165,7 @@ func initComponents(logger types.LoggerInf, conf types.Config, standalone bool)
}

// init exoclient
err = exoclient.Init(conf, mnemonic, privFile, standalone)
err = exoclient.Init(conf, mnemonic, privFile, false, standalone)
if err != nil {
if errors.Is(err, feedertypes.ErrInitConnectionFail) {
logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err)
Expand Down
Loading

0 comments on commit 129e5f8

Please sign in to comment.