Skip to content

Commit

Permalink
add feat
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Jul 22, 2024
1 parent b43ce17 commit fd1adbd
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 44 deletions.
46 changes: 37 additions & 9 deletions core/blockarchiver/block_archiver_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blockarchiver

import (
"context"
"errors"
"time"

Expand All @@ -13,6 +14,7 @@ import (
const (
GetBlockRetry = 3
GetBlockRetryInterval = 2 * time.Second
RPCTimeout = 10 * time.Second
)

var _ BlockArchiver = (*BlockArchiverService)(nil)
Expand All @@ -38,12 +40,12 @@ type BlockArchiverService struct {

// NewBlockArchiverService creates a new block archiver service
// the bodyCache and headerCache are injected from the BlockChain
func NewBlockArchiverService(blockHub string,
func NewBlockArchiverService(blockArchiver, sp, bucketName string,
bodyCache *lru.Cache[common.Hash, *types.Body],
headerCache *lru.Cache[common.Hash, *types.Header],
cacheSize int,
) (BlockArchiver, error) {
client, err := New(blockHub)
client, err := New(blockArchiver, sp, bucketName)
if err != nil {
return nil, err
}
Expand All @@ -60,12 +62,16 @@ func NewBlockArchiverService(blockHub string,

// GetLatestBlock returns the latest block
func (c *BlockArchiverService) GetLatestBlock() (*GeneralBlock, error) {
blockResp, err := c.client.GetLatestBlock()
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
blockResp, err := c.client.GetLatestBlock(ctx)
if err != nil {
log.Error("failed to get latest block", "err", err)
return nil, err
}
block, err := convertBlock(blockResp)
if err != nil {
log.Error("failed to convert block", "block", blockResp, "err", err)
return nil, err
}
return block, nil
Expand All @@ -75,16 +81,19 @@ func (c *BlockArchiverService) GetLatestBlock() (*GeneralBlock, error) {
func (c *BlockArchiverService) GetLatestHeader() (*types.Header, error) {
block, err := c.GetLatestBlock()
if err != nil {
log.Error("failed to get latest block", "err", err)
return nil, err
}
return block.Header(), nil
}

// GetBlockByNumber returns the block by number
func (c *BlockArchiverService) GetBlockByNumber(number uint64) (*types.Body, *types.Header, error) {
log.Info("get block by number", "number", number)
// check if the block is in the cache
hash, found := c.hashCache.Get(number)
if found {
log.Debug("GetBlockByNumber found in cache", number)
body, foundB := c.bodyCache.Get(hash)
header, foundH := c.headerCache.Get(hash)
if foundB && foundH {
Expand All @@ -100,6 +109,7 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty
// if the number is within any of the ranges, should not fetch the bundle from the block archiver service but
// wait for a while and fetch from the cache
if c.requestLock.IsWithinAnyRange(number) {
log.Info("getBlockByNumber is within any range", number)
// wait for a while, and fetch from the cache
for retry := 0; retry < GetBlockRetry; retry++ {
hash, found := c.hashCache.Get(number)
Expand All @@ -117,17 +127,29 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty
}
// fetch the bundle range
log.Info("fetching bundle of blocks", "number", number)
start, end, err := c.client.GetBundleBlocksRange(number)
log.Debug("bundle of blocks", "start", start, "end", end, "err", err)
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()

bundleName, err := c.client.GetBundleName(ctx, number)
if err != nil {
log.Error("failed to get bundle name", "number", number, "err", err)
return nil, nil, err
}

start, end, err := ParseBundleName(bundleName)
if err != nil {
log.Error("failed to parse bundle name", "bundleName", bundleName, "err", err)
return nil, nil, err
}
// add lock to avoid concurrent fetching of the same bundle of blocks
c.requestLock.AddRange(start, end)
defer c.requestLock.RemoveRange(start, end)
//todo can fetch the bundle by request SP directly and extract blocks instead of calling the block archiver service if bundle name is known.
blocks, err := c.client.GetBundleBlocksByBlockNum(number)
ctx, cancel = context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()

blocks, err := c.client.GetBundleBlocks(ctx, bundleName)
if err != nil {
log.Error("failed to get bundle blocks", "bundleName", bundleName, "err", err)
return nil, nil, err
}
var body *types.Body
Expand All @@ -137,6 +159,7 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty
for _, b := range blocks {
block, err := convertBlock(b)
if err != nil {
log.Error("failed to convert block", "block", b, "err", err)
return nil, nil, err
}
c.bodyCache.Add(block.Hash(), block.Body())
Expand All @@ -152,21 +175,26 @@ func (c *BlockArchiverService) getBlockByNumber(number uint64) (*types.Body, *ty

// GetBlockByHash returns the block by hash
func (c *BlockArchiverService) GetBlockByHash(hash common.Hash) (*types.Body, *types.Header, error) {
log.Info("get block by hash", "hash", hash)
body, foundB := c.bodyCache.Get(hash)
header, foundH := c.headerCache.Get(hash)
if foundB && foundH {
return body, header, nil
}

block, err := c.client.GetBlockByHash(hash)
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()
block, err := c.client.GetBlockByHash(ctx, hash)
if err != nil {
log.Error("failed to get block by hash", "hash", hash, "err", err)
return nil, nil, err
}
if block == nil {
log.Debug("block is nil", "hash", hash)
return nil, nil, nil
}
number, err := HexToUint64(block.Number)
if err != nil {
log.Error("failed to convert block number", "block", block, "err", err)
return nil, nil, err
}
return c.getBlockByNumber(number)
Expand Down
125 changes: 93 additions & 32 deletions core/blockarchiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,30 @@ package blockarchiver

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"os"
"strings"
"time"

"github.com/ethereum/go-ethereum/common"

bundlesdk "github.com/bnb-chain/greenfield-bundle-sdk/bundle"
)

// Client is a client to interact with the block archiver service
type Client struct {
hc *http.Client
blockArchiverHost string
spHost string
bucketName string
}

func New(blockHubHost string) (*Client, error) {
func New(blockAchieverHost, spHost, bucketName string) (*Client, error) {
transport := &http.Transport{
DisableCompression: true,
MaxIdleConnsPerHost: 1000,
Expand All @@ -31,12 +36,12 @@ func New(blockHubHost string) (*Client, error) {
Timeout: 10 * time.Minute,
Transport: transport,
}
return &Client{hc: client, blockArchiverHost: blockHubHost}, nil
return &Client{hc: client, blockArchiverHost: blockAchieverHost, spHost: spHost, bucketName: bucketName}, nil
}

func (c *Client) GetBlockByHash(hash common.Hash) (*Block, error) {
func (c *Client) GetBlockByHash(ctx context.Context, hash common.Hash) (*Block, error) {
payload := preparePayload("eth_getBlockByHash", []interface{}{hash.String(), "true"})
body, err := c.postRequest(payload)
body, err := c.postRequest(ctx, payload)
if err != nil {
return nil, err
}
Expand All @@ -48,9 +53,9 @@ func (c *Client) GetBlockByHash(hash common.Hash) (*Block, error) {
return getBlockResp.Result, nil
}

func (c *Client) GetBlockByNumber(number uint64) (*Block, error) {
func (c *Client) GetBlockByNumber(ctx context.Context, number uint64) (*Block, error) {
payload := preparePayload("eth_getBlockByNumber", []interface{}{Int64ToHex(int64(number)), "true"})
body, err := c.postRequest(payload)
body, err := c.postRequest(ctx, payload)
if err != nil {
return nil, err
}
Expand All @@ -62,9 +67,9 @@ func (c *Client) GetBlockByNumber(number uint64) (*Block, error) {
return getBlockResp.Result, nil
}

func (c *Client) GetLatestBlock() (*Block, error) {
func (c *Client) GetLatestBlock(ctx context.Context) (*Block, error) {
payload := preparePayload("eth_getBlockByNumber", []interface{}{"latest", "true"})
body, err := c.postRequest(payload)
body, err := c.postRequest(ctx, payload)
if err != nil {
return nil, err
}
Expand All @@ -76,46 +81,36 @@ func (c *Client) GetLatestBlock() (*Block, error) {
return getBlockResp.Result, nil
}

// GetBundleBlocksRange returns the bundle blocks range
func (c *Client) GetBundleBlocksRange(blockNum uint64) (uint64, uint64, error) {
req, err := http.NewRequest("GET", c.blockArchiverHost+fmt.Sprintf("/bsc/v1/blocks/%d/bundle/name", blockNum), nil)
// GetBundleName returns the bundle name by a specific block number
func (c *Client) GetBundleName(ctx context.Context, blockNum uint64) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", c.blockArchiverHost+fmt.Sprintf("/bsc/v1/blocks/%d/bundle/name", blockNum), nil)
if err != nil {
return 0, 0, err
return "", err
}
resp, err := c.hc.Do(req)
if err != nil {
return 0, 0, err
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, 0, errors.New("failed to get bundle name")
return "", errors.New("failed to get bundle name")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, 0, err
return "", err
}
getBundleNameResp := GetBundleNameResponse{}
err = json.Unmarshal(body, &getBundleNameResp)
if err != nil {
return 0, 0, err
}
bundleName := getBundleNameResp.Data
parts := strings.Split(bundleName, "_")
startSlot, err := strconv.ParseUint(parts[1][1:], 10, 64)
if err != nil {
return 0, 0, err
}
endSlot, err := strconv.ParseUint(parts[2][1:], 10, 64)
if err != nil {
return 0, 0, err
return "", err
}
return startSlot, endSlot, nil
return getBundleNameResp.Data, nil
}

// GetBundleBlocksByBlockNum returns the bundle blocks by block number that within the range
func (c *Client) GetBundleBlocksByBlockNum(blockNum uint64) ([]*Block, error) {
func (c *Client) GetBundleBlocksByBlockNum(ctx context.Context, blockNum uint64) ([]*Block, error) {
payload := preparePayload("eth_getBundledBlockByNumber", []interface{}{Int64ToHex(int64(blockNum))})
body, err := c.postRequest(payload)
body, err := c.postRequest(ctx, payload)
if err != nil {
return nil, err
}
Expand All @@ -127,16 +122,82 @@ func (c *Client) GetBundleBlocksByBlockNum(blockNum uint64) ([]*Block, error) {
return getBlocksResp.Result, nil
}

// GetBundleBlocks returns the bundle blocks by object name
func (c *Client) GetBundleBlocks(ctx context.Context, objectName string) ([]*Block, error) {
var urlStr string
parts := strings.Split(c.spHost, "//")
urlStr = parts[0] + "//" + c.bucketName + "." + parts[1] + "/" + objectName

req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
if err != nil {
return nil, err
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

tempFile, err := os.CreateTemp("", "bundle")
if err != nil {
fmt.Printf("Failed to create temporary file: %v\n", err)
return nil, err
}
defer os.Remove(tempFile.Name())
// Write the content to the temporary file
_, err = tempFile.Write(body)
if err != nil {
fmt.Printf("Failed to write downloaded bundle to file: %v\n", err)
return nil, err
}
defer tempFile.Close()

bundleObjects, err := bundlesdk.NewBundleFromFile(tempFile.Name())
if err != nil {
fmt.Printf("Failed to create bundle from file: %v\n", err)
return nil, err
}
var blocksInfo []*Block
for _, objMeta := range bundleObjects.GetBundleObjectsMeta() {
objFile, _, err := bundleObjects.GetObject(objMeta.Name)
if err != nil {
return nil, err
}

var objectInfo []byte
objectInfo, err = io.ReadAll(objFile)
if err != nil {
objFile.Close()
return nil, err
}
objFile.Close()

var blockInfo *Block
err = json.Unmarshal(objectInfo, &blockInfo)
if err != nil {
return nil, err
}
blocksInfo = append(blocksInfo, blockInfo)
}

return blocksInfo, nil
}

// postRequest sends a POST request to the block archiver service
func (c *Client) postRequest(payload map[string]interface{}) ([]byte, error) {
func (c *Client) postRequest(ctx context.Context, payload map[string]interface{}) ([]byte, error) {
// Encode payload to JSON
payloadBytes, err := json.Marshal(payload)
if err != nil {
return nil, err
}

// post call to block archiver
req, err := http.NewRequest("POST", c.blockArchiverHost, bytes.NewBuffer(payloadBytes))
req, err := http.NewRequestWithContext(ctx, "POST", c.blockArchiverHost, bytes.NewBuffer(payloadBytes))
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions core/blockarchiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package blockarchiver

type BlockArchiverConfig struct {
RPCAddress string
SPAddress string
BucketName string
BlockCacheSize int64
}

Expand Down
4 changes: 2 additions & 2 deletions core/blockarchiver/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func convertBlock(block *Block) (*GeneralBlock, error) {
if err != nil {
return nil, err
}
totalDiffficulty, err := HexToBigInt(block.TotalDifficulty)
totalDifficulty, err := HexToBigInt(block.TotalDifficulty)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,6 +316,6 @@ func convertBlock(block *Block) (*GeneralBlock, error) {
newBlock := types.NewBlockWithHeader(header).WithBody(txs, nil)
return &GeneralBlock{
Block: newBlock,
TotalDifficulty: totalDiffficulty,
TotalDifficulty: totalDifficulty,
}, nil
}
Loading

0 comments on commit fd1adbd

Please sign in to comment.