Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/debug tools #12

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions cmd/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package cmd

import (
"encoding/json"
"fmt"

"github.com/ExocoreNetwork/price-feeder/debugger"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

var (
flagFeederID = "feederID"
flagHeight = "height"
)

func init() {
debugStartCmd.PersistentFlags().Uint64(flagFeederID, 0, "feederID of the token")
debugStartCmd.PersistentFlags().Int64(flagHeight, 0, "committed block height after which the tx will be sent")

debugStartCmd.AddCommand(
debugSendCmd,
debugSendImmCmd,
)
}

var debugStartCmd = &cobra.Command{
Use: "debug",
Short: "start listening to new blocks",
Long: "start listening to new blocks",
RunE: func(cmd *cobra.Command, args []string) error {
logger := feedertypes.NewLogger(zapcore.DebugLevel)
DebugPriceFeeder(feederConfig, logger, mnemonic, sourcesPath)
return nil
},
}

var debugSendCmd = &cobra.Command{
Use: `send --feederID [feederID] --height [height] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
height, err := cmd.Parent().PersistentFlags().GetInt64(flagHeight)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &debugger.PriceMsg{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
res, err := sendTx(feederID, height, msgPrice)
if err != nil {
return err
}
if len(res.Err) > 0 {
fmt.Println("")
}
printProto(res)
return nil
},
}

var debugSendImmCmd = &cobra.Command{
Use: `send-imm --feederID [feederID] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &PriceJSON{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
res, err := sendTxImmediatly(feederID, msgPrice)
if err != nil {
return err
}
printProto(res)
return nil
},
}

func printProto(m proto.Message) {
marshaled, err := protojson.MarshalOptions{EmitUnpopulated: true, UseProtoNames: true}.Marshal(m)
if err != nil {
fmt.Printf("failed to print proto message, error:%v", err)
}
fmt.Println(string(marshaled))
}
221 changes: 221 additions & 0 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package cmd

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/ExocoreNetwork/price-feeder/debugger"
"github.com/ExocoreNetwork/price-feeder/exoclient"
fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type server struct {
sendCh chan *sendReq
debugger.UnimplementedPriceSubmitServiceServer
}

func (s *server) SubmitPrice(ctx context.Context, req *debugger.SubmitPriceRequest) (*debugger.SubmitPriceResponse, error) {
result := make(chan *debugger.SubmitPriceResponse, 1)
s.sendCh <- &sendReq{
height: req.Height,
feederID: req.FeederId,
price: req.Price,
result: result,
}
r := <-result
return r, nil
}

func newServer(sendCh chan *sendReq) *server {
return &server{
sendCh: sendCh,
}
}

type PriceJSON struct {
Price string `json:"price"`
DetID string `json:"det_id"`
Decimal int32 `json:"decimal"`
Timestamp string `json:"timestamp"`
Nonce int32 `json:"nonce"`
BaseBlock uint64 `json:"base_block"`
}

func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {
return fetchertypes.PriceInfo{
Price: p.Price,
Decimal: p.Decimal,
RoundID: p.DetID,
Timestamp: p.Timestamp,
}
}
leonz789 marked this conversation as resolved.
Show resolved Hide resolved

type sendRes struct {
err error
checkTxSuccess bool
checkTxLog string
deliverTxSuccess bool
deliverTxLog string
txHash string
height int64
}

type sendReq struct {
// on which block committed send this transaction, required
height int64

feederID uint64
price *debugger.PriceMsg
result chan *debugger.SubmitPriceResponse
}

type statusCode string

const (
overwrite statusCode = "overwrite the pending transaction"
success statusCode = "successfully sent the transaction"
fail statusCode = "failed to send transaction"
)

var (
sendCh = make(chan *sendReq)
DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
}
)

func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) {
// init logger
if logger = feedertypes.SetLogger(logger); logger == nil {
panic("logger is not initialized")
}
// init exoclient
// 1. subscribing for heights
// 2. sending create-price tx
count := 0
for count < DebugRetryConfig.MaxAttempts {
if err := exoclient.Init(conf, mnemonic, privFile, false, true); err != nil {
if errors.Is(err, feedertypes.ErrInitConnectionFail) {
logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err)
time.Sleep(DebugRetryConfig.Interval)
continue
}
logger.Error("failed to init exoclient", "error", err)
return
}
break
}
ec, _ := exoclient.GetClient()

_, err := getOracleParamsWithMaxRetry(1, ec, logger)
if err != nil {
logger.Error("failed to get oracle params", "error", err)
return
}
logger.Info("subscribe block heights...")
ecClient, _ := exoclient.GetClient()
defer ecClient.Close()
ecClient.Subscribe()
// var pendingReq *sendReq
pendingReqs := make(map[int64][]*sendReq)
sendCh := make(chan *sendReq, 10)
go func() {
for {
select {
case event := <-ecClient.EventsCh():
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())
for h, pendings := range pendingReqs {
if h <= e.Height() {
for i, req := range pendings {
res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
if err != nil {
logger.Error("failed to send tx", "error", err)
req.result <- &debugger.SubmitPriceResponse{Err: err.Error()}
}
req.result <- &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
if i == len(pendings)-1 {
delete(pendingReqs, h)
} else {
pendingReqs[h] = append(pendings[:i], pendings[i+1:]...)
}
}
}
}
}
case req := <-sendCh:
logger.Info("add a new send request", "height", req.height)
if pendings, ok := pendingReqs[req.height]; ok {
pendingReqs[req.height] = append(pendings, req)
} else {
pendingReqs[req.height] = []*sendReq{req}
}
}
}
}()
leonz789 marked this conversation as resolved.
Show resolved Hide resolved
lis, err := net.Listen("tcp", ":50051")
if err != nil {
fmt.Printf("failed to listen: %v\r\n", err)
return
}
grpcServer := grpc.NewServer()
debugger.RegisterPriceSubmitServiceServer(grpcServer, newServer(sendCh))
if err := grpcServer.Serve(lis); err != nil {
fmt.Printf("failed to serve:%v\r\n", err)
return
}
}
leonz789 marked this conversation as resolved.
Show resolved Hide resolved

func sendTx(feederID uint64, height int64, price *debugger.PriceMsg) (*debugger.SubmitPriceResponse, error) {
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, err
}
defer conn.Close()
c := debugger.NewPriceSubmitServiceClient(conn)
return c.SubmitPrice(context.Background(), &debugger.SubmitPriceRequest{
Height: height,
FeederId: feederID,
Price: price,
})
}
leonz789 marked this conversation as resolved.
Show resolved Hide resolved

func sendTxImmediatly(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) {
if err := exoclient.Init(feederConfig, mnemonic, privFile, true, true); err != nil {
return nil, fmt.Errorf("failed to init exoclient in txOnly mode for debug, error:%w", err)
}
ec, _ := exoclient.GetClient()
pInfo := price.getPriceInfo()

res, err := ec.SendTxDebug(feederID, price.BaseBlock, pInfo, price.Nonce)
if err != nil {
return nil, err
}
protoRes := &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
return protoRes, nil
}
leonz789 marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 4 additions & 12 deletions cmd/feeder_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,16 @@ var DefaultRetryConfig = RetryConfig{
Interval: 2 * time.Second,
}

const (
statusOk = 0
privFile = "priv_validator_key.json"

//feeder_tokenName_feederID
// loggerTagPrefix = "feed_%s_%d"
)

// var updateConfig sync.Mutex

// RunPriceFeeder runs price feeder to fetching price and feed to exocorechain
func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
func RunPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string, standalone bool) {
// init logger
if logger = feedertypes.SetLogger(logger); logger == nil {
panic("logger is not initialized")
}
// init logger, fetchers, exocoreclient
if err := initComponents(logger, conf, standalone); err != nil {
if err := initComponents(logger, conf, sourcesPath, standalone); err != nil {
logger.Error("failed to initialize components")
panic(err)
}
Expand Down Expand Up @@ -162,7 +154,7 @@ func ResetAllStakerValidators(ec exoclient.ExoClientInf, logger feedertypes.Logg
}

// // initComponents, initialize fetcher, exoclient, it will panic if any initialization fialed
func initComponents(logger types.LoggerInf, conf types.Config, standalone bool) error {
func initComponents(logger types.LoggerInf, conf *types.Config, sourcesPath string, standalone bool) error {
count := 0
for count < DefaultRetryConfig.MaxAttempts {
count++
Expand All @@ -173,7 +165,7 @@ func initComponents(logger types.LoggerInf, conf types.Config, standalone bool)
}

// init exoclient
err = exoclient.Init(conf, mnemonic, privFile, standalone)
err = exoclient.Init(conf, mnemonic, privFile, false, standalone)
if err != nil {
if errors.Is(err, feedertypes.ErrInitConnectionFail) {
logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err)
Expand Down
Loading
Loading