Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move node types and handlings to separate packages #729

Open
wants to merge 8 commits into
base: move-atomic-sync
Choose a base branch
from
1 change: 1 addition & 0 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

log.Debug("received AppRequest from node", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request))

// TODO: investigate if we can move all these network logic to new SDK handlers
var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("forwarding AppRequest to SDK network", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
Expand Down
11 changes: 8 additions & 3 deletions plugin/evm/atomic/atomic_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ import (
"github.com/ava-labs/coreth/trie"
)

const (
// AtomicTrieNode represents a leaf node that belongs to the atomic trie.
AtomicTrieNode message.NodeType = 2
)

var (
_ Syncer = &atomicSyncer{}
_ syncclient.LeafSyncTask = &atomicSyncerLeafTask{}
Expand Down Expand Up @@ -53,7 +58,7 @@ type atomicSyncer struct {

// addZeros adds [common.HashLenth] zeros to [height] and returns the result as []byte
func addZeroes(height uint64) []byte {
packer := wrappers.Packer{Bytes: make([]byte, atomicKeyLength)}
packer := wrappers.Packer{Bytes: make([]byte, AtomicTrieKeyLength)}
packer.PackLong(height)
packer.PackFixedBytes(bytes.Repeat([]byte{0x00}, common.HashLength))
return packer.Bytes
Expand Down Expand Up @@ -91,7 +96,7 @@ func (s *atomicSyncer) Start(ctx context.Context) error {
// onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie.
func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error {
for i, key := range keys {
if len(key) != atomicKeyLength {
if len(key) != AtomicTrieKeyLength {
return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key))
}
// key = height + blockchainID
Expand Down Expand Up @@ -176,7 +181,7 @@ type atomicSyncerLeafTask struct {

func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.lastHeight + 1) }
func (a *atomicSyncerLeafTask) End() []byte { return nil }
func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return message.AtomicTrieNode }
func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return AtomicTrieNode }
func (a *atomicSyncerLeafTask) OnFinish(context.Context) error { return a.atomicSyncer.onFinish() }
func (a *atomicSyncerLeafTask) OnStart() (bool, error) { return false, nil }
func (a *atomicSyncerLeafTask) Root() common.Hash { return a.atomicSyncer.targetRoot }
Expand Down
10 changes: 5 additions & 5 deletions plugin/evm/atomic/atomic_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *triedb.Database, targetHeight
numLeaves := 0
mockClient := syncclient.NewMockClient(
message.Codec,
handlers.NewLeafsRequestHandler(serverTrieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()),
handlers.NewLeafsRequestHandler(serverTrieDB, AtomicTrieKeyLength, nil, message.Codec, handlerstats.NewNoopHandlerStats()),
nil,
nil,
)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestAtomicSyncer(t *testing.T) {
rand.Seed(1)
targetHeight := 10 * uint64(commitInterval)
serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil)
root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, int(targetHeight), atomicKeyLength)
root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, int(targetHeight), AtomicTrieKeyLength)

testAtomicSyncer(t, serverTrieDB, targetHeight, root, nil, int64(targetHeight))
}
Expand All @@ -167,7 +167,7 @@ func TestAtomicSyncerResume(t *testing.T) {
targetHeight := 10 * uint64(commitInterval)
serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil)
numTrieKeys := int(targetHeight) - 1 // no atomic ops for genesis
root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys, atomicKeyLength)
root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys, AtomicTrieKeyLength)

testAtomicSyncer(t, serverTrieDB, targetHeight, root, []atomicSyncTestCheckpoint{
{
Expand All @@ -184,12 +184,12 @@ func TestAtomicSyncerResumeNewRootCheckpoint(t *testing.T) {
targetHeight1 := 10 * uint64(commitInterval)
serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil)
numTrieKeys1 := int(targetHeight1) - 1 // no atomic ops for genesis
root1, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys1, atomicKeyLength)
root1, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys1, AtomicTrieKeyLength)

targetHeight2 := 20 * uint64(commitInterval)
numTrieKeys2 := int(targetHeight2) - 1 // no atomic ops for genesis
root2, _, _ := syncutils.FillTrie(
t, numTrieKeys1, numTrieKeys2, atomicKeyLength, serverTrieDB, root1,
t, numTrieKeys1, numTrieKeys2, AtomicTrieKeyLength, serverTrieDB, root1,
)

testAtomicSyncer(t, serverTrieDB, targetHeight1, root1, []atomicSyncTestCheckpoint{
Expand Down
4 changes: 2 additions & 2 deletions plugin/evm/atomic/atomic_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

const (
progressLogFrequency = 30 * time.Second
atomicKeyLength = wrappers.LongLen + common.HashLength
AtomicTrieKeyLength = wrappers.LongLen + common.HashLength
sharedMemoryApplyBatchSize = 10_000 // specifies the number of atomic operations to batch progress updates

atomicTrieTipBufferSize = 1 // No need to support a buffer of previously accepted tries for the atomic trie
Expand Down Expand Up @@ -232,7 +232,7 @@ func (a *atomicTrie) UpdateTrie(trie *trie.Trie, height uint64, atomicOps map[id
}

// key is [height]+[blockchainID]
keyPacker := wrappers.Packer{Bytes: make([]byte, atomicKeyLength)}
keyPacker := wrappers.Packer{Bytes: make([]byte, AtomicTrieKeyLength)}
keyPacker.PackLong(height)
keyPacker.PackFixedBytes(blockchainID[:])
if err := trie.Update(keyPacker.Bytes, valueBytes); err != nil {
Expand Down
7 changes: 2 additions & 5 deletions plugin/evm/atomic/atomic_trie_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"

"github.com/ava-labs/coreth/trie"
"github.com/ethereum/go-ethereum/common"
)

const atomicTrieKeyLen = wrappers.LongLen + common.HashLength

// atomicTrieIterator is an implementation of types.AtomicTrieIterator that serves
// parsed data with each iteration
type atomicTrieIterator struct {
Expand Down Expand Up @@ -55,8 +52,8 @@ func (a *atomicTrieIterator) Next() bool {
keyLen := len(a.trieIterator.Key)
// If the key has an unexpected length, set the error and stop the iteration since the data is
// no longer reliable.
if keyLen != atomicTrieKeyLen {
a.resetFields(fmt.Errorf("expected atomic trie key length to be %d but was %d", atomicTrieKeyLen, keyLen))
if keyLen != AtomicTrieKeyLength {
a.resetFields(fmt.Errorf("expected atomic trie key length to be %d but was %d", AtomicTrieKeyLength, keyLen))
return false
}

Expand Down
10 changes: 2 additions & 8 deletions plugin/evm/message/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ var _ RequestHandler = NoopRequestHandler{}
// Must have methods in format of handleType(context.Context, ids.NodeID, uint32, request Type) error
// so that the Request object of relevant Type can invoke its respective handle method
// on this struct.
// Also see GossipHandler for implementation style.
type RequestHandler interface {
HandleStateTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error)
HandleAtomicTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error)
HandleLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error)
HandleBlockRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, request BlockRequest) ([]byte, error)
HandleCodeRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, codeRequest CodeRequest) ([]byte, error)
HandleMessageSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest MessageSignatureRequest) ([]byte, error)
Expand All @@ -36,11 +34,7 @@ type ResponseHandler interface {

type NoopRequestHandler struct{}

func (NoopRequestHandler) HandleStateTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) {
return nil, nil
}

func (NoopRequestHandler) HandleAtomicTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) {
func (NoopRequestHandler) HandleLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) {
return nil, nil
}

Expand Down
32 changes: 5 additions & 27 deletions plugin/evm/message/leafs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
)

const MaxCodeHashesPerRequest = 5
Expand All @@ -18,27 +17,14 @@ var _ Request = LeafsRequest{}

// NodeType outlines the trie that a leaf node belongs to
// handlers.LeafsRequestHandler uses this information to determine
// which of the two tries (state/atomic) to fetch the information from
// which trie type to fetch the information from
type NodeType uint8

const (
// StateTrieNode represents a leaf node that belongs to the coreth State trie
StateTrieNode NodeType = iota + 1
// AtomicTrieNode represents a leaf node that belongs to the coreth evm.AtomicTrie
AtomicTrieNode
StateTrieNode = NodeType(0)
StateTrieKeyLength = common.HashLength
)

func (nt NodeType) String() string {
switch nt {
case StateTrieNode:
return "StateTrie"
case AtomicTrieNode:
return "AtomicTrie"
default:
return "Unknown"
}
}

// LeafsRequest is a request to receive trie leaves at specified Root within Start and End byte range
// Limit outlines maximum number of leaves to returns starting at Start
// NodeType outlines which trie to read from state/atomic.
Expand All @@ -53,21 +39,13 @@ type LeafsRequest struct {

func (l LeafsRequest) String() string {
return fmt.Sprintf(
"LeafsRequest(Root=%s, Account=%s, Start=%s, End=%s, Limit=%d, NodeType=%s)",
"LeafsRequest(Root=%s, Account=%s, Start=%s, End=%s, Limit=%d, NodeType=%d)",
l.Root, l.Account, common.Bytes2Hex(l.Start), common.Bytes2Hex(l.End), l.Limit, l.NodeType,
)
}

func (l LeafsRequest) Handle(ctx context.Context, nodeID ids.NodeID, requestID uint32, handler RequestHandler) ([]byte, error) {
switch l.NodeType {
case StateTrieNode:
return handler.HandleStateTrieLeafsRequest(ctx, nodeID, requestID, l)
case AtomicTrieNode:
return handler.HandleAtomicTrieLeafsRequest(ctx, nodeID, requestID, l)
}

log.Debug("node type is not recognised, dropping request", "nodeID", nodeID, "requestID", requestID, "nodeType", l.NodeType)
return nil, nil
return handler.HandleLeafsRequest(ctx, nodeID, requestID, l)
}

// LeafsResponse is a response to a LeafsRequest
Expand Down
115 changes: 1 addition & 114 deletions plugin/evm/message/leafs_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@
package message

import (
"bytes"
"context"
"encoding/base64"
"math/rand"
"testing"

"github.com/ava-labs/avalanchego/ids"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
)
Expand All @@ -37,7 +33,7 @@ func TestMarshalLeafsRequest(t *testing.T) {
Start: startBytes,
End: endBytes,
Limit: 1024,
NodeType: StateTrieNode,
NodeType: NodeType(1),
}

base64LeafsRequest := "AAAAAAAAAAAAAAAAAAAAAABpbSBST09UaW5nIGZvciB5YQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIFL9/AchgmVPFj9fD5piHXKVZsdNEAN8TXu7BAfR4sZJAAAAIIGFWthoHQ2G0ekeABZ5OctmlNLEIqzSCKAHKTlIf2mZBAAB"
Expand Down Expand Up @@ -108,112 +104,3 @@ func TestMarshalLeafsResponse(t *testing.T) {
assert.False(t, l.More) // make sure it is not serialized
assert.Equal(t, leafsResponse.ProofVals, l.ProofVals)
}

func TestLeafsRequestValidation(t *testing.T) {
mockRequestHandler := &mockHandler{}

tests := map[string]struct {
request LeafsRequest
assertResponse func(t *testing.T)
}{
"node type StateTrieNode": {
request: LeafsRequest{
Root: common.BytesToHash([]byte("some hash goes here")),
Start: bytes.Repeat([]byte{0x00}, common.HashLength),
End: bytes.Repeat([]byte{0xff}, common.HashLength),
Limit: 10,
NodeType: StateTrieNode,
},
assertResponse: func(t *testing.T) {
assert.True(t, mockRequestHandler.handleStateTrieCalled)
assert.False(t, mockRequestHandler.handleAtomicTrieCalled)
assert.False(t, mockRequestHandler.handleBlockRequestCalled)
assert.False(t, mockRequestHandler.handleCodeRequestCalled)
},
},
"node type AtomicTrieNode": {
request: LeafsRequest{
Root: common.BytesToHash([]byte("some hash goes here")),
Start: bytes.Repeat([]byte{0x00}, common.HashLength),
End: bytes.Repeat([]byte{0xff}, common.HashLength),
Limit: 10,
NodeType: AtomicTrieNode,
},
assertResponse: func(t *testing.T) {
assert.False(t, mockRequestHandler.handleStateTrieCalled)
assert.True(t, mockRequestHandler.handleAtomicTrieCalled)
assert.False(t, mockRequestHandler.handleBlockRequestCalled)
assert.False(t, mockRequestHandler.handleCodeRequestCalled)
},
},
"unknown node type": {
request: LeafsRequest{
Root: common.BytesToHash([]byte("some hash goes here")),
Start: bytes.Repeat([]byte{0x00}, common.HashLength),
End: bytes.Repeat([]byte{0xff}, common.HashLength),
Limit: 10,
NodeType: NodeType(11),
},
assertResponse: func(t *testing.T) {
assert.False(t, mockRequestHandler.handleStateTrieCalled)
assert.False(t, mockRequestHandler.handleAtomicTrieCalled)
assert.False(t, mockRequestHandler.handleBlockRequestCalled)
assert.False(t, mockRequestHandler.handleCodeRequestCalled)
},
},
}
for name, test := range tests {
t.Run(name, func(t *testing.T) {
_, _ = test.request.Handle(context.Background(), ids.GenerateTestNodeID(), 1, mockRequestHandler)
test.assertResponse(t)
mockRequestHandler.reset()
})
}
}

var _ RequestHandler = &mockHandler{}

type mockHandler struct {
handleStateTrieCalled,
handleAtomicTrieCalled,
handleBlockRequestCalled,
handleCodeRequestCalled,
handleMessageSignatureCalled,
handleBlockSignatureCalled bool
}

func (m *mockHandler) HandleStateTrieLeafsRequest(context.Context, ids.NodeID, uint32, LeafsRequest) ([]byte, error) {
m.handleStateTrieCalled = true
return nil, nil
}

func (m *mockHandler) HandleAtomicTrieLeafsRequest(context.Context, ids.NodeID, uint32, LeafsRequest) ([]byte, error) {
m.handleAtomicTrieCalled = true
return nil, nil
}

func (m *mockHandler) HandleBlockRequest(context.Context, ids.NodeID, uint32, BlockRequest) ([]byte, error) {
m.handleBlockRequestCalled = true
return nil, nil
}

func (m *mockHandler) HandleCodeRequest(context.Context, ids.NodeID, uint32, CodeRequest) ([]byte, error) {
m.handleCodeRequestCalled = true
return nil, nil
}

func (m *mockHandler) HandleMessageSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest MessageSignatureRequest) ([]byte, error) {
m.handleMessageSignatureCalled = true
return nil, nil
}
func (m *mockHandler) HandleBlockSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest BlockSignatureRequest) ([]byte, error) {
m.handleBlockSignatureCalled = true
return nil, nil
}

func (m *mockHandler) reset() {
m.handleStateTrieCalled = false
m.handleAtomicTrieCalled = false
m.handleBlockRequestCalled = false
m.handleCodeRequestCalled = false
}
Loading
Loading