Skip to content

Commit

Permalink
Merge pull request #13 from ExocoreNetwork/develop
Browse files Browse the repository at this point in the history
update params on event, cli debug tools
  • Loading branch information
leonz789 authored Dec 30, 2024
2 parents da4ee7a + 90d5729 commit e858575
Show file tree
Hide file tree
Showing 16 changed files with 1,182 additions and 241 deletions.
105 changes: 105 additions & 0 deletions cmd/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package cmd

import (
"encoding/json"
"fmt"

"github.com/ExocoreNetwork/price-feeder/debugger"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"github.com/spf13/cobra"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)

var (
flagFeederID = "feederID"
flagHeight = "height"
)

func init() {
debugStartCmd.PersistentFlags().Uint64(flagFeederID, 0, "feederID of the token")
debugStartCmd.PersistentFlags().Int64(flagHeight, 0, "committed block height after which the tx will be sent")

debugStartCmd.AddCommand(
debugSendCmd,
debugSendImmCmd,
)
}

var debugStartCmd = &cobra.Command{
Use: "debug",
Short: "start listening to new blocks",
Long: "start listening to new blocks",
RunE: func(cmd *cobra.Command, args []string) error {
logger := feedertypes.NewLogger(zapcore.DebugLevel)
DebugPriceFeeder(feederConfig, logger, mnemonic, sourcesPath)
return nil
},
}

var debugSendCmd = &cobra.Command{
Use: `send --feederID [feederID] --height [height] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
height, err := cmd.Parent().PersistentFlags().GetInt64(flagHeight)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &debugger.PriceMsg{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
res, err := sendTx(feederID, height, msgPrice, feederConfig.Debugger.Grpc)
if err != nil {
return err
}
if len(res.Err) > 0 {
fmt.Println("")
}
printProto(res)
return nil
},
}

var debugSendImmCmd = &cobra.Command{
Use: `send-imm --feederID [feederID] [{"baseblock":1,"nonce":1,"price":"999","det_id":"123","decimal":8,"timestamp":"2006-01-02 15:16:17"}]`,
Short: "send a create-price message to exocored",
Long: "Send a create-price message to exocored, the flag -h is optional. The tx will be sent immediately if that value is not set.",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
feederID, err := cmd.Parent().PersistentFlags().GetUint64(flagFeederID)
if err != nil {
return err
}
msgStr := args[0]
msgPrice := &PriceJSON{}
if err := json.Unmarshal([]byte(msgStr), msgPrice); err != nil {
return err
}
if err := msgPrice.validate(); err != nil {
return err
}
res, err := sendTxImmediately(feederID, msgPrice)
if err != nil {
return err
}
printProto(res)
return nil
},
}

func printProto(m proto.Message) {
marshaled, err := protojson.MarshalOptions{EmitUnpopulated: true, UseProtoNames: true}.Marshal(m)
if err != nil {
fmt.Printf("failed to print proto message, error:%v", err)
}
fmt.Println(string(marshaled))
}
232 changes: 232 additions & 0 deletions cmd/feeder_debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package cmd

import (
"context"
"errors"
"fmt"
"net"
"time"

"github.com/ExocoreNetwork/price-feeder/debugger"
"github.com/ExocoreNetwork/price-feeder/exoclient"
fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types"
feedertypes "github.com/ExocoreNetwork/price-feeder/types"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type sendReq struct {
// on which block committed send this transaction, required
height int64

feederID uint64
price *debugger.PriceMsg
result chan *debugger.SubmitPriceResponse
}
type pendingRequestManager map[int64][]*sendReq

func newPendingRequestManager() pendingRequestManager {
return make(map[int64][]*sendReq)
}

func (p pendingRequestManager) add(height int64, req *sendReq) {
if pendings, ok := p[height]; ok {
p[height] = append(pendings, req)
} else {
p[height] = []*sendReq{req}
}
}

func (p pendingRequestManager) process(height int64, handler func(*sendReq) error) {
for h, pendings := range p {
if h <= height {
for _, req := range pendings {
if err := handler(req); err != nil {
req.result <- &debugger.SubmitPriceResponse{Err: err.Error()}
}
}
delete(p, h)
}
}
}

type server struct {
sendCh chan *sendReq
debugger.UnimplementedPriceSubmitServiceServer
}

func (s *server) SubmitPrice(ctx context.Context, req *debugger.SubmitPriceRequest) (*debugger.SubmitPriceResponse, error) {
result := make(chan *debugger.SubmitPriceResponse, 1)
s.sendCh <- &sendReq{
height: req.Height,
feederID: req.FeederId,
price: req.Price,
result: result,
}
r := <-result
return r, nil
}

func newServer(sendCh chan *sendReq) *server {
return &server{
sendCh: sendCh,
}
}

type PriceJSON struct {
Price string `json:"price"`
DetID string `json:"det_id"`
Decimal int32 `json:"decimal"`
Timestamp string `json:"timestamp"`
Nonce int32 `json:"nonce"`
BaseBlock uint64 `json:"base_block"`
}

func (p PriceJSON) validate() error {
if len(p.Price) == 0 {
return errors.New("price is required")
}
if len(p.DetID) == 0 {
return errors.New("det_id is required")
}
if p.Nonce == 0 {
return errors.New("nonce should be greater than 0")
}
return nil
}

func (p PriceJSON) getPriceInfo() fetchertypes.PriceInfo {
return fetchertypes.PriceInfo{
Price: p.Price,
Decimal: p.Decimal,
RoundID: p.DetID,
Timestamp: p.Timestamp,
}
}

var (
DebugRetryConfig = RetryConfig{
MaxAttempts: 10,
Interval: 3 * time.Second,
}
)

func DebugPriceFeeder(conf *feedertypes.Config, logger feedertypes.LoggerInf, mnemonic string, sourcesPath string) {
// init logger
if logger = feedertypes.SetLogger(logger); logger == nil {
panic("logger is not initialized")
}
// init exoclient
// 1. subscribing for heights
// 2. sending create-price tx
count := 0
for count < DebugRetryConfig.MaxAttempts {
if err := exoclient.Init(conf, mnemonic, privFile, false, true); err != nil {
if errors.Is(err, feedertypes.ErrInitConnectionFail) {
logger.Info("retry initComponents due to connectionfailed", "count", count, "maxRetry", DefaultRetryConfig.MaxAttempts, "error", err)
time.Sleep(DebugRetryConfig.Interval)
continue
}
logger.Error("failed to init exoclient", "error", err)
return
}
break
}
ec, _ := exoclient.GetClient()

_, err := getOracleParamsWithMaxRetry(1, ec, logger)
if err != nil {
logger.Error("failed to get oracle params", "error", err)
return
}
logger.Info("subscribe block heights...")
ecClient, _ := exoclient.GetClient()
defer ecClient.Close()
ecClient.Subscribe()
// var pendingReq *sendReq
// pendingReqs := make(map[int64][]*sendReq)
pendingReqs := newPendingRequestManager()
sendCh := make(chan *sendReq, 10)
go func() {
for {
select {
case event := <-ecClient.EventsCh():
if e, ok := event.(*exoclient.EventNewBlock); ok {
logger.Info("new block commited", "height", e.Height())

pendingReqs.process(e.Height(), func(req *sendReq) error {
res, err := ecClient.SendTxDebug(req.feederID, req.price.BaseBlock, req.price.GetPriceInfo(), req.price.Nonce)
if err != nil {
logger.Error("failed to send tx", "error", err)
return err
}
req.result <- &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
return nil
})
}
case req := <-sendCh:
logger.Info("add a new send request", "height", req.height)
pendingReqs.add(req.height, req)
}
}
}()
lis, err := net.Listen("tcp", conf.Debugger.Grpc)
if err != nil {
fmt.Printf("failed to listen: %v\r\n", err)
return
}
grpcServer := grpc.NewServer()
debugger.RegisterPriceSubmitServiceServer(grpcServer, newServer(sendCh))
if err := grpcServer.Serve(lis); err != nil {
fmt.Printf("failed to serve:%v\r\n", err)
return
}
}

func sendTx(feederID uint64, height int64, price *debugger.PriceMsg, port string) (*debugger.SubmitPriceResponse, error) {
conn, err := grpc.Dial(
fmt.Sprintf("localhost%s", port),
// "localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, err
}
defer conn.Close()
c := debugger.NewPriceSubmitServiceClient(conn)
return c.SubmitPrice(context.Background(), &debugger.SubmitPriceRequest{
Height: height,
FeederId: feederID,
Price: price,
})
}

func sendTxImmediately(feederID uint64, price *PriceJSON) (*debugger.SubmitPriceResponse, error) {
if err := exoclient.Init(feederConfig, mnemonic, privFile, true, true); err != nil {
return nil, fmt.Errorf("failed to init exoclient in txOnly mode for debug, error:%w", err)
}
ec, _ := exoclient.GetClient()
pInfo := price.getPriceInfo()

res, err := ec.SendTxDebug(feederID, price.BaseBlock, pInfo, price.Nonce)
if err != nil {
return nil, err
}
protoRes := &debugger.SubmitPriceResponse{
CheckTxSuccess: res.CheckTx.Code == 0,
CheckTxLog: res.CheckTx.Log,
DeliverTxSuccess: res.CheckTx.Code == 0 && res.DeliverTx.Code == 0,
DeliverTxLog: res.DeliverTx.Log,
TxHash: res.Hash.String(),
Height: res.Height,
}
return protoRes, nil
}
Loading

0 comments on commit e858575

Please sign in to comment.