Skip to content

Commit

Permalink
udpate feeder and setup new feeder when updateparams event received, …
Browse files Browse the repository at this point in the history
…refactor feeders methods
  • Loading branch information
leonz789 committed Dec 27, 2024
1 parent 5aac5f8 commit 051df11
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 121 deletions.
14 changes: 6 additions & 8 deletions cmd/feeder_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ var DefaultRetryConfig = RetryConfig{
}

const (
statusOk = 0
privFile = "priv_validator_key.json"
baseCurrency = "USDT"
statusOk = 0
privFile = "priv_validator_key.json"

//feeder_tokenName_feederID
loggerTagPrefix = "feed_%s_%d"
// loggerTagPrefix = "feed_%s_%d"
)

// var updateConfig sync.Mutex
Expand Down Expand Up @@ -68,7 +67,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo

ecClient.Subscribe()

fsMap := NewFeederMap()
feeders := NewFeeders(feedertypes.GetLogger("feeders"), f, ecClient)
// we don't check empty tokenfeeders list
maxNonce := oracleP.MaxNonce
for feederID, feeder := range oracleP.TokenFeeders {
Expand All @@ -83,9 +82,8 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo
panic(fmt.Sprintf("source of nst:%s is not set", tokenName))
}
}
fsMap.Add(feeder, feederID, f, ecClient, source, tokenName, maxNonce, feedertypes.GetLogger(fmt.Sprintf(loggerTagPrefix, tokenName, feederID)))
feeders.SetupFeeder(feeder, feederID, source, tokenName, maxNonce)
}
feeders := fsMap.NewFeeders(logger)
feeders.Start()

for event := range ecClient.EventsCh() {
Expand All @@ -94,7 +92,7 @@ func RunPriceFeeder(conf feedertypes.Config, logger feedertypes.LoggerInf, mnemo
if paramsUpdate := e.ParamsUpdate(); paramsUpdate {
oracleP, err = getOracleParamsWithMaxRetry(DefaultRetryConfig.MaxAttempts, ecClient, logger)
if err != nil {
fmt.Printf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err)
logger.Error(fmt.Sprintf("Failed to get oracle params with maxRetry when params update detected, price-feeder will exit, error:%v", err))
return
}
feeders.UpdateOracleParams(oracleP)
Expand Down
220 changes: 107 additions & 113 deletions cmd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package cmd

import (
"errors"
"fmt"
"strings"
"sync"

oracletypes "github.com/ExocoreNetwork/exocore/x/oracle/types"
fetchertypes "github.com/ExocoreNetwork/price-feeder/fetcher/types"
Expand All @@ -11,6 +14,11 @@ import (
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
)

const (
loggerTagPrefix = "feed_%s_%d"
baseCurrency = "USDT"
)

type priceFetcher interface {
GetLatestPrice(source, token string) (fetchertypes.PriceInfo, error)
AddTokenForSource(source, token string) bool
Expand Down Expand Up @@ -92,38 +100,37 @@ type feeder struct {
paramsCh chan *updateParamsReq
}

type feederInfo struct {
source string
token string
tokenID uint64
feederID int
type FeederInfo struct {
Source string
Token string
TokenID uint64
FeederID int
// TODO: add check for rouleID, v1 can be skipped
// ruleID
startRoundID int64
startBaseBlock int64
interval int64
endBlock int64
lastPrice localPrice
lastSent signInfo
}

func (f *feeder) Info() feederInfo {
return feederInfo{
source: f.source,
token: f.token,
tokenID: f.tokenID,
feederID: f.feederID,
startRoundID: f.startRoundID,
startBaseBlock: f.startBaseBlock,
interval: f.interval,
endBlock: f.endBlock,
lastPrice: *f.lastPrice,
lastSent: *f.lastSent,
StartRoundID int64
StartBaseBlock int64
Interval int64
EndBlock int64
LastPrice localPrice
LastSent signInfo
}

func (f *feeder) Info() FeederInfo {
return FeederInfo{
Source: f.source,
Token: f.token,
TokenID: f.tokenID,
FeederID: f.feederID,
StartRoundID: f.startRoundID,
StartBaseBlock: f.startBaseBlock,
Interval: f.interval,
EndBlock: f.endBlock,
LastPrice: *f.lastPrice,
LastSent: *f.lastSent,
}
}

// NewFeeder new a feeder struct from oracletypes' tokenfeeder
func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder {
func newFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) *feeder {
return &feeder{
logger: logger,
source: source,
Expand All @@ -135,15 +142,12 @@ func NewFeeder(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher,
startBaseBlock: int64(tf.StartBaseBlock),
interval: int64(tf.Interval),
endBlock: int64(tf.EndBlock),

// maxNonce: maxNonce,

fetcher: fetcher,
submitter: submitter,
fetcher: fetcher,
submitter: submitter,
lastPrice: &localPrice{},
lastSent: &signInfo{
maxNonce: maxNonce,
},
lastPrice: &localPrice{},

priceCh: make(chan *updatePrice, 1),
heightsCh: make(chan *triggerHeights, 1),
Expand Down Expand Up @@ -213,7 +217,9 @@ func (f *feeder) start() {
// update latest height that price had been updated
f.lastPrice.height = price.txHeight
case req := <-f.paramsCh:
f.updateFeederParams(req.params)
if err := f.updateFeederParams(req.params); err != nil {
f.logger.Error("failed to update params", "new params", req.params)
}
req.result <- &updateParamsRes{}
}
}
Expand Down Expand Up @@ -253,12 +259,22 @@ func (f *feeder) trigger(commitHeight, priceHeight int64) {
}
}

func (f *feeder) updateFeederParams(p *oracletypes.Params) {
if p == nil || len(p.TokenFeeders) == 0 {
return
func (f *feeder) updateFeederParams(p *oracletypes.Params) error {
if p == nil || len(p.TokenFeeders) < f.feederID+1 {
return errors.New("invalid oracle parmas")
}
// TODO: update feeder's params

tokenFeeder := p.TokenFeeders[f.feederID]
if f.endBlock != int64(tokenFeeder.EndBlock) {
f.endBlock = int64(tokenFeeder.EndBlock)
}
if f.startBaseBlock != int64(tokenFeeder.StartBaseBlock) {
f.startBaseBlock = int64(tokenFeeder.StartBaseBlock)
}
if p.MaxNonce > 0 {
f.lastSent.maxNonce = p.MaxNonce
}
return nil
}

// TODO: stop feeder routine
Expand Down Expand Up @@ -292,64 +308,59 @@ type updatePricesReq struct {
prices []*finalPrice
}

type feederMap map[int]*feeder

// NewFeederMap new a map <feederID->feeder>
func NewFeederMap() feederMap {
return make(map[int]*feeder)
type Feeders struct {
locker *sync.Mutex
running bool
fetcher priceFetcher
submitter priceSubmitter
logger feedertypes.LoggerInf
feederMap map[int]*feeder
// TODO: feeder has sync management, so feeders could remove these channel
trigger chan *triggerReq
updatePrice chan *updatePricesReq
updateParams chan *oracletypes.Params
// updateNST chan *updateNSTReq
}
func (fm feederMap) NewFeeders(logger feedertypes.LoggerInf) *Feeders {

func NewFeeders(logger feedertypes.LoggerInf, fetcher priceFetcher, submitter priceSubmitter) *Feeders {
return &Feeders{
locker: new(sync.Mutex),
logger: logger,
feederMap: fm,
fetcher: fetcher,
submitter: submitter,
feederMap: make(map[int]*feeder),
// feederMap: fm,
// don't block on height increasing
trigger: make(chan *triggerReq, 1),
updatePrice: make(chan *updatePricesReq, 1),
// it's safe to have a buffer to not block running feeders,
// since for running feeders, only endBlock is possible to be modified
updateParams: make(chan *oracletypes.Params, 1),
}
}

// Add adds a new feeder with feederID into the map
func (fm feederMap) Add(tf *oracletypes.TokenFeeder, feederID int, fetcher priceFetcher, submitter priceSubmitter, source string, token string, maxNonce int32, logger feedertypes.LoggerInf) {
fm[feederID] = &feeder{
logger: logger,
source: source,
token: token,
tokenID: tf.TokenID,
feederID: feederID,
// these conversion a safe since the block height defined in cosmossdk is int64
startRoundID: int64(tf.StartRoundID),
startBaseBlock: int64(tf.StartBaseBlock),
interval: int64(tf.Interval),
endBlock: int64(tf.EndBlock),
fetcher: fetcher,
submitter: submitter,
lastPrice: &localPrice{},
lastSent: &signInfo{
maxNonce: maxNonce,
},

priceCh: make(chan *updatePrice, 1),
heightsCh: make(chan *triggerHeights, 1),
paramsCh: make(chan *updateParamsReq, 1),
}
}

type Feeders struct {
logger feedertypes.LoggerInf
feederMap map[int]*feeder
// TODO: feeder has sync management, so feeders could remove these channel
trigger chan *triggerReq
updatePrice chan *updatePricesReq
updateParams chan *oracletypes.Params
// updateNST chan *updateNSTReq
func (fs *Feeders) SetupFeeder(tf *oracletypes.TokenFeeder, feederID int, source string, token string, maxNonce int32) {
fs.locker.Lock()
defer fs.locker.Unlock()
if fs.running {
fs.logger.Error("failed to setup feeder for a running feeders, this should be called before feeders is started")
return
}
fs.feederMap[feederID] = newFeeder(tf, feederID, fs.fetcher, fs.submitter, source, token, maxNonce, fs.logger.With("feeder", fmt.Sprintf(loggerTagPrefix, token, feederID)))
}

// Start will start to listen the trigger(newHeight) and updatePrice events
// usd channels to avoid race condition on map
func (fs *Feeders) Start() {
fs.locker.Lock()
if fs.running {
fs.logger.Error("failed to start feeders since it's already running")
fs.locker.Unlock()
return
}
fs.running = true
fs.locker.Unlock()
for _, f := range fs.feederMap {
f.start()
}
Expand All @@ -358,16 +369,32 @@ func (fs *Feeders) Start() {
select {
case params := <-fs.updateParams:
results := []chan *updateParamsRes{}
existingFeederIDs := make(map[int64]struct{})
for _, f := range fs.feederMap {
res := f.updateParams(params)
results = append(results, res)
existingFeederIDs[int64(f.feederID)] = struct{}{}
}
// wait for all feeders to complete updateing params
for _, res := range results {
<-res
}
// TODO: add newly added tokenfeeders if exists

for tfID, tf := range params.TokenFeeders {
if _, ok := existingFeederIDs[int64(tfID)]; !ok {
// create and start a new feeder
tokenName := strings.ToLower(params.Tokens[tf.TokenID].Name)
source := fetchertypes.Chainlink
if fetchertypes.IsNSTToken(tokenName) {
nstToken := fetchertypes.NSTToken(tokenName)
if source = fetchertypes.GetNSTSource(nstToken); len(source) == 0 {
fs.logger.Error("failed to add new feeder, source of nst token is not set", "token", tokenName)
}
}
feeder := newFeeder(tf, tfID, fs.fetcher, fs.submitter, source, tokenName, params.MaxNonce, fs.logger)
fs.feederMap[tfID] = feeder
feeder.start()
}
}
case t := <-fs.trigger:
// the order does not matter
for _, f := range fs.feederMap {
Expand Down Expand Up @@ -420,39 +447,6 @@ func (fs *Feeders) UpdateOracleParams(p *oracletypes.Params) {
fs.updateParams <- p
}

// Define the types for the feeder
// type feederParams struct {
// startRoundID uint64
// startBlock uint64
// endBlock uint64
// interval uint64
// decimal int32
// tokenIDStr string
// feederID int64
// tokenName string
// }
//
// func (f *feederParams) update(p oracletypes.Params) (updated bool) {
// tokenFeeder := p.TokenFeeders[f.feederID]
// if tokenFeeder.StartBaseBlock != f.startBlock {
// f.startBlock = tokenFeeder.StartBaseBlock
// updated = true
// }
// if tokenFeeder.EndBlock != f.endBlock {
// f.endBlock = tokenFeeder.EndBlock
// updated = true
// }
// if tokenFeeder.Interval != f.interval {
// f.interval = tokenFeeder.Interval
// updated = true
// }
// if p.Tokens[tokenFeeder.TokenID].Decimal != f.decimal {
// f.decimal = p.Tokens[tokenFeeder.TokenID].Decimal
// updated = true
// }
// return
// }

func getLogger() types.LoggerInf {
return types.GetLogger("")
}

0 comments on commit 051df11

Please sign in to comment.