From fd1adbd1fece864ab75475077e677ea183bf1ab7 Mon Sep 17 00:00:00 2001 From: Alexgao001 Date: Mon, 22 Jul 2024 15:17:14 +0800 Subject: [PATCH] add feat --- core/blockarchiver/block_archiver_service.go | 46 +++++-- core/blockarchiver/client.go | 125 ++++++++++++++----- core/blockarchiver/config.go | 2 + core/blockarchiver/converter.go | 4 +- core/blockarchiver/types.go | 15 +++ core/blockchain.go | 2 + go.mod | 3 +- go.sum | 3 + 8 files changed, 156 insertions(+), 44 deletions(-) diff --git a/core/blockarchiver/block_archiver_service.go b/core/blockarchiver/block_archiver_service.go index 7e65088770..5a697a4ac1 100644 --- a/core/blockarchiver/block_archiver_service.go +++ b/core/blockarchiver/block_archiver_service.go @@ -1,6 +1,7 @@ package blockarchiver import ( + "context" "errors" "time" @@ -13,6 +14,7 @@ import ( const ( GetBlockRetry = 3 GetBlockRetryInterval = 2 * time.Second + RPCTimeout = 10 * time.Second ) var _ BlockArchiver = (*BlockArchiverService)(nil) @@ -38,12 +40,12 @@ type BlockArchiverService struct { // NewBlockArchiverService creates a new block archiver service // the bodyCache and headerCache are injected from the BlockChain -func NewBlockArchiverService(blockHub string, +func NewBlockArchiverService(blockArchiver, sp, bucketName string, bodyCache *lru.Cache[common.Hash, *types.Body], headerCache *lru.Cache[common.Hash, *types.Header], cacheSize int, ) (BlockArchiver, error) { - client, err := New(blockHub) + client, err := New(blockArchiver, sp, bucketName) if err != nil { return nil, err } @@ -60,12 +62,16 @@ func NewBlockArchiverService(blockHub string, // GetLatestBlock returns the latest block func (c *BlockArchiverService) GetLatestBlock() (*GeneralBlock, error) { - blockResp, err := c.client.GetLatestBlock() + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + blockResp, err := c.client.GetLatestBlock(ctx) if err != nil { + log.Error("failed to get latest block", "err", err) return nil, err } block, err := convertBlock(blockResp) if err != nil { + log.Error("failed to convert block", "block", blockResp, "err", err) return nil, err } return block, nil @@ -75,6 +81,7 @@ func (c *BlockArchiverService) GetLatestBlock() (*GeneralBlock, error) { func (c *BlockArchiverService) GetLatestHeader() (*types.Header, error) { block, err := c.GetLatestBlock() if err != nil { + log.Error("failed to get latest block", "err", err) return nil, err } return block.Header(), nil @@ -82,9 +89,11 @@ func (c *BlockArchiverService) GetLatestHeader() (*types.Header, error) { // GetBlockByNumber returns the block by number func (c *BlockArchiverService) GetBlockByNumber(number uint64) (*types.Body, *types.Header, error) { + log.Info("get block by number", "number", number) // check if the block is in the cache hash, found := c.hashCache.Get(number) if found { + log.Debug("GetBlockByNumber found in cache", number) body, foundB := c.bodyCache.Get(hash) header, foundH := c.headerCache.Get(hash) if foundB && foundH { @@ -100,6 +109,7 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty // if the number is within any of the ranges, should not fetch the bundle from the block archiver service but // wait for a while and fetch from the cache if c.requestLock.IsWithinAnyRange(number) { + log.Info("getBlockByNumber is within any range", number) // wait for a while, and fetch from the cache for retry := 0; retry < GetBlockRetry; retry++ { hash, found := c.hashCache.Get(number) @@ -117,17 +127,29 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty } // fetch the bundle range log.Info("fetching bundle of blocks", "number", number) - start, end, err := c.client.GetBundleBlocksRange(number) - log.Debug("bundle of blocks", "start", start, "end", end, "err", err) + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + + bundleName, err := c.client.GetBundleName(ctx, number) if err != nil { + log.Error("failed to get bundle name", "number", number, "err", err) + return nil, nil, err + } + + start, end, err := ParseBundleName(bundleName) + if err != nil { + log.Error("failed to parse bundle name", "bundleName", bundleName, "err", err) return nil, nil, err } // add lock to avoid concurrent fetching of the same bundle of blocks c.requestLock.AddRange(start, end) defer c.requestLock.RemoveRange(start, end) - //todo can fetch the bundle by request SP directly and extract blocks instead of calling the block archiver service if bundle name is known. - blocks, err := c.client.GetBundleBlocksByBlockNum(number) + ctx, cancel = context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + + blocks, err := c.client.GetBundleBlocks(ctx, bundleName) if err != nil { + log.Error("failed to get bundle blocks", "bundleName", bundleName, "err", err) return nil, nil, err } var body *types.Body @@ -137,6 +159,7 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty for _, b := range blocks { block, err := convertBlock(b) if err != nil { + log.Error("failed to convert block", "block", b, "err", err) return nil, nil, err } c.bodyCache.Add(block.Hash(), block.Body()) @@ -152,21 +175,26 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty // GetBlockByHash returns the block by hash func (c *BlockArchiverService) GetBlockByHash(hash common.Hash) (*types.Body, *types.Header, error) { + log.Info("get block by hash", "hash", hash) body, foundB := c.bodyCache.Get(hash) header, foundH := c.headerCache.Get(hash) if foundB && foundH { return body, header, nil } - - block, err := c.client.GetBlockByHash(hash) + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + block, err := c.client.GetBlockByHash(ctx, hash) if err != nil { + log.Error("failed to get block by hash", "hash", hash, "err", err) return nil, nil, err } if block == nil { + log.Debug("block is nil", "hash", hash) return nil, nil, nil } number, err := HexToUint64(block.Number) if err != nil { + log.Error("failed to convert block number", "block", block, "err", err) return nil, nil, err } return c.getBlockByNumber(number) diff --git a/core/blockarchiver/client.go b/core/blockarchiver/client.go index b3738643b9..97d9398b2e 100644 --- a/core/blockarchiver/client.go +++ b/core/blockarchiver/client.go @@ -2,25 +2,30 @@ package blockarchiver import ( "bytes" + "context" "encoding/json" "errors" "fmt" "io" "net/http" - "strconv" + "os" "strings" "time" "github.com/ethereum/go-ethereum/common" + + bundlesdk "github.com/bnb-chain/greenfield-bundle-sdk/bundle" ) // Client is a client to interact with the block archiver service type Client struct { hc *http.Client blockArchiverHost string + spHost string + bucketName string } -func New(blockHubHost string) (*Client, error) { +func New(blockAchieverHost, spHost, bucketName string) (*Client, error) { transport := &http.Transport{ DisableCompression: true, MaxIdleConnsPerHost: 1000, @@ -31,12 +36,12 @@ func New(blockHubHost string) (*Client, error) { Timeout: 10 * time.Minute, Transport: transport, } - return &Client{hc: client, blockArchiverHost: blockHubHost}, nil + return &Client{hc: client, blockArchiverHost: blockAchieverHost, spHost: spHost, bucketName: bucketName}, nil } -func (c *Client) GetBlockByHash(hash common.Hash) (*Block, error) { +func (c *Client) GetBlockByHash(ctx context.Context, hash common.Hash) (*Block, error) { payload := preparePayload("eth_getBlockByHash", []interface{}{hash.String(), "true"}) - body, err := c.postRequest(payload) + body, err := c.postRequest(ctx, payload) if err != nil { return nil, err } @@ -48,9 +53,9 @@ func (c *Client) GetBlockByHash(hash common.Hash) (*Block, error) { return getBlockResp.Result, nil } -func (c *Client) GetBlockByNumber(number uint64) (*Block, error) { +func (c *Client) GetBlockByNumber(ctx context.Context, number uint64) (*Block, error) { payload := preparePayload("eth_getBlockByNumber", []interface{}{Int64ToHex(int64(number)), "true"}) - body, err := c.postRequest(payload) + body, err := c.postRequest(ctx, payload) if err != nil { return nil, err } @@ -62,9 +67,9 @@ func (c *Client) GetBlockByNumber(number uint64) (*Block, error) { return getBlockResp.Result, nil } -func (c *Client) GetLatestBlock() (*Block, error) { +func (c *Client) GetLatestBlock(ctx context.Context) (*Block, error) { payload := preparePayload("eth_getBlockByNumber", []interface{}{"latest", "true"}) - body, err := c.postRequest(payload) + body, err := c.postRequest(ctx, payload) if err != nil { return nil, err } @@ -76,46 +81,36 @@ func (c *Client) GetLatestBlock() (*Block, error) { return getBlockResp.Result, nil } -// GetBundleBlocksRange returns the bundle blocks range -func (c *Client) GetBundleBlocksRange(blockNum uint64) (uint64, uint64, error) { - req, err := http.NewRequest("GET", c.blockArchiverHost+fmt.Sprintf("/bsc/v1/blocks/%d/bundle/name", blockNum), nil) +// GetBundleName returns the bundle name by a specific block number +func (c *Client) GetBundleName(ctx context.Context, blockNum uint64) (string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", c.blockArchiverHost+fmt.Sprintf("/bsc/v1/blocks/%d/bundle/name", blockNum), nil) if err != nil { - return 0, 0, err + return "", err } resp, err := c.hc.Do(req) if err != nil { - return 0, 0, err + return "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return 0, 0, errors.New("failed to get bundle name") + return "", errors.New("failed to get bundle name") } body, err := io.ReadAll(resp.Body) if err != nil { - return 0, 0, err + return "", err } getBundleNameResp := GetBundleNameResponse{} err = json.Unmarshal(body, &getBundleNameResp) if err != nil { - return 0, 0, err - } - bundleName := getBundleNameResp.Data - parts := strings.Split(bundleName, "_") - startSlot, err := strconv.ParseUint(parts[1][1:], 10, 64) - if err != nil { - return 0, 0, err - } - endSlot, err := strconv.ParseUint(parts[2][1:], 10, 64) - if err != nil { - return 0, 0, err + return "", err } - return startSlot, endSlot, nil + return getBundleNameResp.Data, nil } // GetBundleBlocksByBlockNum returns the bundle blocks by block number that within the range -func (c *Client) GetBundleBlocksByBlockNum(blockNum uint64) ([]*Block, error) { +func (c *Client) GetBundleBlocksByBlockNum(ctx context.Context, blockNum uint64) ([]*Block, error) { payload := preparePayload("eth_getBundledBlockByNumber", []interface{}{Int64ToHex(int64(blockNum))}) - body, err := c.postRequest(payload) + body, err := c.postRequest(ctx, payload) if err != nil { return nil, err } @@ -127,8 +122,74 @@ func (c *Client) GetBundleBlocksByBlockNum(blockNum uint64) ([]*Block, error) { return getBlocksResp.Result, nil } +// GetBundleBlocks returns the bundle blocks by object name +func (c *Client) GetBundleBlocks(ctx context.Context, objectName string) ([]*Block, error) { + var urlStr string + parts := strings.Split(c.spHost, "//") + urlStr = parts[0] + "//" + c.bucketName + "." + parts[1] + "/" + objectName + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil) + if err != nil { + return nil, err + } + resp, err := c.hc.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + tempFile, err := os.CreateTemp("", "bundle") + if err != nil { + fmt.Printf("Failed to create temporary file: %v\n", err) + return nil, err + } + defer os.Remove(tempFile.Name()) + // Write the content to the temporary file + _, err = tempFile.Write(body) + if err != nil { + fmt.Printf("Failed to write downloaded bundle to file: %v\n", err) + return nil, err + } + defer tempFile.Close() + + bundleObjects, err := bundlesdk.NewBundleFromFile(tempFile.Name()) + if err != nil { + fmt.Printf("Failed to create bundle from file: %v\n", err) + return nil, err + } + var blocksInfo []*Block + for _, objMeta := range bundleObjects.GetBundleObjectsMeta() { + objFile, _, err := bundleObjects.GetObject(objMeta.Name) + if err != nil { + return nil, err + } + + var objectInfo []byte + objectInfo, err = io.ReadAll(objFile) + if err != nil { + objFile.Close() + return nil, err + } + objFile.Close() + + var blockInfo *Block + err = json.Unmarshal(objectInfo, &blockInfo) + if err != nil { + return nil, err + } + blocksInfo = append(blocksInfo, blockInfo) + } + + return blocksInfo, nil +} + // postRequest sends a POST request to the block archiver service -func (c *Client) postRequest(payload map[string]interface{}) ([]byte, error) { +func (c *Client) postRequest(ctx context.Context, payload map[string]interface{}) ([]byte, error) { // Encode payload to JSON payloadBytes, err := json.Marshal(payload) if err != nil { @@ -136,7 +197,7 @@ func (c *Client) postRequest(payload map[string]interface{}) ([]byte, error) { } // post call to block archiver - req, err := http.NewRequest("POST", c.blockArchiverHost, bytes.NewBuffer(payloadBytes)) + req, err := http.NewRequestWithContext(ctx, "POST", c.blockArchiverHost, bytes.NewBuffer(payloadBytes)) if err != nil { return nil, err } diff --git a/core/blockarchiver/config.go b/core/blockarchiver/config.go index b88f91aaa7..567b1a585e 100644 --- a/core/blockarchiver/config.go +++ b/core/blockarchiver/config.go @@ -2,6 +2,8 @@ package blockarchiver type BlockArchiverConfig struct { RPCAddress string + SPAddress string + BucketName string BlockCacheSize int64 } diff --git a/core/blockarchiver/converter.go b/core/blockarchiver/converter.go index cbfeb51a47..fb381a9b82 100644 --- a/core/blockarchiver/converter.go +++ b/core/blockarchiver/converter.go @@ -68,7 +68,7 @@ func convertBlock(block *Block) (*GeneralBlock, error) { if err != nil { return nil, err } - totalDiffficulty, err := HexToBigInt(block.TotalDifficulty) + totalDifficulty, err := HexToBigInt(block.TotalDifficulty) if err != nil { return nil, err } @@ -316,6 +316,6 @@ func convertBlock(block *Block) (*GeneralBlock, error) { newBlock := types.NewBlockWithHeader(header).WithBody(txs, nil) return &GeneralBlock{ Block: newBlock, - TotalDifficulty: totalDiffficulty, + TotalDifficulty: totalDifficulty, }, nil } diff --git a/core/blockarchiver/types.go b/core/blockarchiver/types.go index 5a59721c20..0f84b37823 100644 --- a/core/blockarchiver/types.go +++ b/core/blockarchiver/types.go @@ -2,6 +2,8 @@ package blockarchiver import ( "math/big" + "strconv" + "strings" "sync" "github.com/ethereum/go-ethereum/core/types" @@ -152,3 +154,16 @@ func (rc *RequestLock) RemoveRange(from, to uint64) { delete(rc.lookupMap, i) } } + +func ParseBundleName(bundleName string) (uint64, uint64, error) { + parts := strings.Split(bundleName, "_") + startHeight, err := strconv.ParseUint(parts[1][1:], 10, 64) + if err != nil { + return 0, 0, err + } + endHeight, err := strconv.ParseUint(parts[2][1:], 10, 64) + if err != nil { + return 0, 0, err + } + return startHeight, endHeight, nil +} diff --git a/core/blockchain.go b/core/blockchain.go index a344868719..90e1d48123 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -440,6 +440,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis // block archiver service blockArchiverService, err := blockarchiver.NewBlockArchiverService( bc.blockArchiverConfig.RPCAddress, + bc.blockArchiverConfig.SPAddress, + bc.blockArchiverConfig.BucketName, bc.bodyCache, bc.hc.headerCache, cacheSize, diff --git a/go.mod b/go.mod index b59bbab5a6..018e7d0375 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.13.43 github.com/aws/aws-sdk-go-v2/service/route53 v1.30.2 github.com/bnb-chain/fastssz v0.1.2 + github.com/bnb-chain/greenfield-bundle-sdk v1.1.0 github.com/bnb-chain/ics23 v0.1.0 github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/cespare/cp v1.1.1 @@ -38,7 +39,6 @@ require ( github.com/golang/protobuf v1.5.4 github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb github.com/google/gofuzz v1.2.0 - github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 github.com/google/uuid v1.4.0 github.com/gorilla/websocket v1.5.1 github.com/graph-gophers/graphql-go v1.3.0 @@ -154,6 +154,7 @@ require ( github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gopacket v1.1.19 // indirect + github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect diff --git a/go.sum b/go.sum index dc721876c6..e3581fa194 100644 --- a/go.sum +++ b/go.sum @@ -363,6 +363,8 @@ github.com/blizzy78/varnamelen v0.8.0/go.mod h1:V9TzQZ4fLJ1DSrjVDfl89H7aMnTvKkAp github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40/go.mod h1:8rLXio+WjiTceGBHIoTvn60HIbs7Hm7bcHjyrSqYB9c= github.com/bnb-chain/fastssz v0.1.2 h1:vTcXw5SwCtRYnl/BEclujiml7GXiVOZ74tub4GHpvlM= github.com/bnb-chain/fastssz v0.1.2/go.mod h1:KcabV+OEw2QwgyY8Fc88ZG79CKYkFdu0kKWyfA3dI6o= +github.com/bnb-chain/greenfield-bundle-sdk v1.1.0 h1:0BWQsV+c32wHxEEpJY9igBSBg5N1Fm3KoSLC+Yef2n0= +github.com/bnb-chain/greenfield-bundle-sdk v1.1.0/go.mod h1:NCjQp0sniAbBR5yR5pYiXpYwYd1okSIBLj+31sTpmXA= github.com/bnb-chain/greenfield-tendermint v0.0.0-20230417032003-4cda1f296fb2 h1:jubavYCs/mCFj/g6Utl+l4SfpykdBdWJFPsvb9FcEXU= github.com/bnb-chain/greenfield-tendermint v0.0.0-20230417032003-4cda1f296fb2/go.mod h1:9q11eHNRY9FDwFH+4pompzPNGv//Z3VcfvkELaHJPMs= github.com/bnb-chain/ics23 v0.1.0 h1:DvjGOts2FBfbxB48384CYD1LbcrfjThFz8kowY/7KxU= @@ -2525,6 +2527,7 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=