From 752680466d238b7e67db487556616c31b4bc7b2c Mon Sep 17 00:00:00 2001 From: Ceyhun Onur Date: Wed, 1 Jan 2025 21:47:06 +0300 Subject: [PATCH 1/3] move node types and handlings to separate packages --- peer/network.go | 1 + plugin/evm/atomic/atomic_syncer.go | 11 +- plugin/evm/atomic/atomic_syncer_test.go | 10 +- plugin/evm/atomic/atomic_trie.go | 4 +- plugin/evm/atomic/atomic_trie_iterator.go | 7 +- plugin/evm/message/handler.go | 10 +- plugin/evm/message/leafs_request.go | 30 +----- plugin/evm/message/leafs_request_test.go | 117 +--------------------- plugin/evm/network_handler.go | 51 ++++++---- plugin/evm/vm.go | 39 +++++++- sync/README.md | 6 +- sync/client/client_test.go | 6 +- sync/client/stats/stats.go | 34 +++---- sync/handlers/leafs_request.go | 34 ++----- sync/handlers/leafs_request_test.go | 2 +- sync/statesync/sync_test.go | 2 +- 16 files changed, 122 insertions(+), 242 deletions(-) diff --git a/peer/network.go b/peer/network.go index a4dfd015f6..6631c0a90a 100644 --- a/peer/network.go +++ b/peer/network.go @@ -228,6 +228,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u log.Debug("received AppRequest from node", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request)) + // TODO: investigate if we can move all these network logic to new SDK handlers var req message.Request if _, err := n.codec.Unmarshal(request, &req); err != nil { log.Debug("forwarding AppRequest to SDK network", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) diff --git a/plugin/evm/atomic/atomic_syncer.go b/plugin/evm/atomic/atomic_syncer.go index 52a8376319..f58ddc1e6c 100644 --- a/plugin/evm/atomic/atomic_syncer.go +++ b/plugin/evm/atomic/atomic_syncer.go @@ -19,6 +19,11 @@ import ( "github.com/ava-labs/coreth/trie" ) +const ( + // AtomicTrieNode represents a leaf node that belongs to the atomic trie. + AtomicTrieNode message.NodeType = 2 +) + var ( _ Syncer = &atomicSyncer{} _ syncclient.LeafSyncTask = &atomicSyncerLeafTask{} @@ -53,7 +58,7 @@ type atomicSyncer struct { // addZeros adds [common.HashLenth] zeros to [height] and returns the result as []byte func addZeroes(height uint64) []byte { - packer := wrappers.Packer{Bytes: make([]byte, atomicKeyLength)} + packer := wrappers.Packer{Bytes: make([]byte, AtomicTrieKeyLength)} packer.PackLong(height) packer.PackFixedBytes(bytes.Repeat([]byte{0x00}, common.HashLength)) return packer.Bytes @@ -91,7 +96,7 @@ func (s *atomicSyncer) Start(ctx context.Context) error { // onLeafs is the callback for the leaf syncer, which will insert the key-value pairs into the trie. func (s *atomicSyncer) onLeafs(keys [][]byte, values [][]byte) error { for i, key := range keys { - if len(key) != atomicKeyLength { + if len(key) != AtomicTrieKeyLength { return fmt.Errorf("unexpected key len (%d) in atomic trie sync", len(key)) } // key = height + blockchainID @@ -176,7 +181,7 @@ type atomicSyncerLeafTask struct { func (a *atomicSyncerLeafTask) Start() []byte { return addZeroes(a.atomicSyncer.lastHeight + 1) } func (a *atomicSyncerLeafTask) End() []byte { return nil } -func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return message.AtomicTrieNode } +func (a *atomicSyncerLeafTask) NodeType() message.NodeType { return AtomicTrieNode } func (a *atomicSyncerLeafTask) OnFinish(context.Context) error { return a.atomicSyncer.onFinish() } func (a *atomicSyncerLeafTask) OnStart() (bool, error) { return false, nil } func (a *atomicSyncerLeafTask) Root() common.Hash { return a.atomicSyncer.targetRoot } diff --git a/plugin/evm/atomic/atomic_syncer_test.go b/plugin/evm/atomic/atomic_syncer_test.go index 140a627710..a9abfa61f7 100644 --- a/plugin/evm/atomic/atomic_syncer_test.go +++ b/plugin/evm/atomic/atomic_syncer_test.go @@ -47,7 +47,7 @@ func testAtomicSyncer(t *testing.T, serverTrieDB *triedb.Database, targetHeight numLeaves := 0 mockClient := syncclient.NewMockClient( message.Codec, - handlers.NewLeafsRequestHandler(serverTrieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()), + handlers.NewLeafsRequestHandler(serverTrieDB, AtomicTrieKeyLength, nil, message.Codec, handlerstats.NewNoopHandlerStats()), nil, nil, ) @@ -157,7 +157,7 @@ func TestAtomicSyncer(t *testing.T) { rand.Seed(1) targetHeight := 10 * uint64(commitInterval) serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) - root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, int(targetHeight), atomicKeyLength) + root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, int(targetHeight), AtomicTrieKeyLength) testAtomicSyncer(t, serverTrieDB, targetHeight, root, nil, int64(targetHeight)) } @@ -167,7 +167,7 @@ func TestAtomicSyncerResume(t *testing.T) { targetHeight := 10 * uint64(commitInterval) serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) numTrieKeys := int(targetHeight) - 1 // no atomic ops for genesis - root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys, atomicKeyLength) + root, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys, AtomicTrieKeyLength) testAtomicSyncer(t, serverTrieDB, targetHeight, root, []atomicSyncTestCheckpoint{ { @@ -184,12 +184,12 @@ func TestAtomicSyncerResumeNewRootCheckpoint(t *testing.T) { targetHeight1 := 10 * uint64(commitInterval) serverTrieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) numTrieKeys1 := int(targetHeight1) - 1 // no atomic ops for genesis - root1, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys1, atomicKeyLength) + root1, _, _ := syncutils.GenerateTrie(t, serverTrieDB, numTrieKeys1, AtomicTrieKeyLength) targetHeight2 := 20 * uint64(commitInterval) numTrieKeys2 := int(targetHeight2) - 1 // no atomic ops for genesis root2, _, _ := syncutils.FillTrie( - t, numTrieKeys1, numTrieKeys2, atomicKeyLength, serverTrieDB, root1, + t, numTrieKeys1, numTrieKeys2, AtomicTrieKeyLength, serverTrieDB, root1, ) testAtomicSyncer(t, serverTrieDB, targetHeight1, root1, []atomicSyncTestCheckpoint{ diff --git a/plugin/evm/atomic/atomic_trie.go b/plugin/evm/atomic/atomic_trie.go index bbb299a391..d86642375f 100644 --- a/plugin/evm/atomic/atomic_trie.go +++ b/plugin/evm/atomic/atomic_trie.go @@ -29,7 +29,7 @@ import ( const ( progressLogFrequency = 30 * time.Second - atomicKeyLength = wrappers.LongLen + common.HashLength + AtomicTrieKeyLength = wrappers.LongLen + common.HashLength sharedMemoryApplyBatchSize = 10_000 // specifies the number of atomic operations to batch progress updates atomicTrieTipBufferSize = 1 // No need to support a buffer of previously accepted tries for the atomic trie @@ -232,7 +232,7 @@ func (a *atomicTrie) UpdateTrie(trie *trie.Trie, height uint64, atomicOps map[id } // key is [height]+[blockchainID] - keyPacker := wrappers.Packer{Bytes: make([]byte, atomicKeyLength)} + keyPacker := wrappers.Packer{Bytes: make([]byte, AtomicTrieKeyLength)} keyPacker.PackLong(height) keyPacker.PackFixedBytes(blockchainID[:]) if err := trie.Update(keyPacker.Bytes, valueBytes); err != nil { diff --git a/plugin/evm/atomic/atomic_trie_iterator.go b/plugin/evm/atomic/atomic_trie_iterator.go index 20be76416e..710b91af8c 100644 --- a/plugin/evm/atomic/atomic_trie_iterator.go +++ b/plugin/evm/atomic/atomic_trie_iterator.go @@ -13,11 +13,8 @@ import ( "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/coreth/trie" - "github.com/ethereum/go-ethereum/common" ) -const atomicTrieKeyLen = wrappers.LongLen + common.HashLength - // atomicTrieIterator is an implementation of types.AtomicTrieIterator that serves // parsed data with each iteration type atomicTrieIterator struct { @@ -55,8 +52,8 @@ func (a *atomicTrieIterator) Next() bool { keyLen := len(a.trieIterator.Key) // If the key has an unexpected length, set the error and stop the iteration since the data is // no longer reliable. - if keyLen != atomicTrieKeyLen { - a.resetFields(fmt.Errorf("expected atomic trie key length to be %d but was %d", atomicTrieKeyLen, keyLen)) + if keyLen != AtomicTrieKeyLength { + a.resetFields(fmt.Errorf("expected atomic trie key length to be %d but was %d", AtomicTrieKeyLength, keyLen)) return false } diff --git a/plugin/evm/message/handler.go b/plugin/evm/message/handler.go index 1b910e3826..d95d373fbd 100644 --- a/plugin/evm/message/handler.go +++ b/plugin/evm/message/handler.go @@ -15,10 +15,8 @@ var _ RequestHandler = NoopRequestHandler{} // Must have methods in format of handleType(context.Context, ids.NodeID, uint32, request Type) error // so that the Request object of relevant Type can invoke its respective handle method // on this struct. -// Also see GossipHandler for implementation style. type RequestHandler interface { - HandleStateTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) - HandleAtomicTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) + HandleLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) HandleBlockRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, request BlockRequest) ([]byte, error) HandleCodeRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, codeRequest CodeRequest) ([]byte, error) HandleMessageSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest MessageSignatureRequest) ([]byte, error) @@ -36,11 +34,7 @@ type ResponseHandler interface { type NoopRequestHandler struct{} -func (NoopRequestHandler) HandleStateTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) { - return nil, nil -} - -func (NoopRequestHandler) HandleAtomicTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) { +func (NoopRequestHandler) HandleLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest LeafsRequest) ([]byte, error) { return nil, nil } diff --git a/plugin/evm/message/leafs_request.go b/plugin/evm/message/leafs_request.go index 22629e62ef..84d10cccd5 100644 --- a/plugin/evm/message/leafs_request.go +++ b/plugin/evm/message/leafs_request.go @@ -9,7 +9,6 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" ) const MaxCodeHashesPerRequest = 5 @@ -22,23 +21,10 @@ var _ Request = LeafsRequest{} type NodeType uint8 const ( - // StateTrieNode represents a leaf node that belongs to the coreth State trie - StateTrieNode NodeType = iota + 1 - // AtomicTrieNode represents a leaf node that belongs to the coreth evm.AtomicTrie - AtomicTrieNode + StateTrieNode = NodeType(0) + StateTrieKeyLength = common.HashLength ) -func (nt NodeType) String() string { - switch nt { - case StateTrieNode: - return "StateTrie" - case AtomicTrieNode: - return "AtomicTrie" - default: - return "Unknown" - } -} - // LeafsRequest is a request to receive trie leaves at specified Root within Start and End byte range // Limit outlines maximum number of leaves to returns starting at Start // NodeType outlines which trie to read from state/atomic. @@ -53,21 +39,13 @@ type LeafsRequest struct { func (l LeafsRequest) String() string { return fmt.Sprintf( - "LeafsRequest(Root=%s, Account=%s, Start=%s, End=%s, Limit=%d, NodeType=%s)", + "LeafsRequest(Root=%s, Account=%s, Start=%s, End=%s, Limit=%d, NodeType=%d)", l.Root, l.Account, common.Bytes2Hex(l.Start), common.Bytes2Hex(l.End), l.Limit, l.NodeType, ) } func (l LeafsRequest) Handle(ctx context.Context, nodeID ids.NodeID, requestID uint32, handler RequestHandler) ([]byte, error) { - switch l.NodeType { - case StateTrieNode: - return handler.HandleStateTrieLeafsRequest(ctx, nodeID, requestID, l) - case AtomicTrieNode: - return handler.HandleAtomicTrieLeafsRequest(ctx, nodeID, requestID, l) - } - - log.Debug("node type is not recognised, dropping request", "nodeID", nodeID, "requestID", requestID, "nodeType", l.NodeType) - return nil, nil + return handler.HandleLeafsRequest(ctx, nodeID, requestID, l) } // LeafsResponse is a response to a LeafsRequest diff --git a/plugin/evm/message/leafs_request_test.go b/plugin/evm/message/leafs_request_test.go index ab6cab5124..22f95c2011 100644 --- a/plugin/evm/message/leafs_request_test.go +++ b/plugin/evm/message/leafs_request_test.go @@ -4,18 +4,16 @@ package message import ( - "bytes" - "context" "encoding/base64" "math/rand" "testing" - "github.com/ava-labs/avalanchego/ids" - "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/assert" ) +const nodeType = NodeType(0) + // TestMarshalLeafsRequest asserts that the structure or serialization logic hasn't changed, primarily to // ensure compatibility with the network. func TestMarshalLeafsRequest(t *testing.T) { @@ -37,7 +35,7 @@ func TestMarshalLeafsRequest(t *testing.T) { Start: startBytes, End: endBytes, Limit: 1024, - NodeType: StateTrieNode, + NodeType: nodeType, } base64LeafsRequest := "AAAAAAAAAAAAAAAAAAAAAABpbSBST09UaW5nIGZvciB5YQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIFL9/AchgmVPFj9fD5piHXKVZsdNEAN8TXu7BAfR4sZJAAAAIIGFWthoHQ2G0ekeABZ5OctmlNLEIqzSCKAHKTlIf2mZBAAB" @@ -108,112 +106,3 @@ func TestMarshalLeafsResponse(t *testing.T) { assert.False(t, l.More) // make sure it is not serialized assert.Equal(t, leafsResponse.ProofVals, l.ProofVals) } - -func TestLeafsRequestValidation(t *testing.T) { - mockRequestHandler := &mockHandler{} - - tests := map[string]struct { - request LeafsRequest - assertResponse func(t *testing.T) - }{ - "node type StateTrieNode": { - request: LeafsRequest{ - Root: common.BytesToHash([]byte("some hash goes here")), - Start: bytes.Repeat([]byte{0x00}, common.HashLength), - End: bytes.Repeat([]byte{0xff}, common.HashLength), - Limit: 10, - NodeType: StateTrieNode, - }, - assertResponse: func(t *testing.T) { - assert.True(t, mockRequestHandler.handleStateTrieCalled) - assert.False(t, mockRequestHandler.handleAtomicTrieCalled) - assert.False(t, mockRequestHandler.handleBlockRequestCalled) - assert.False(t, mockRequestHandler.handleCodeRequestCalled) - }, - }, - "node type AtomicTrieNode": { - request: LeafsRequest{ - Root: common.BytesToHash([]byte("some hash goes here")), - Start: bytes.Repeat([]byte{0x00}, common.HashLength), - End: bytes.Repeat([]byte{0xff}, common.HashLength), - Limit: 10, - NodeType: AtomicTrieNode, - }, - assertResponse: func(t *testing.T) { - assert.False(t, mockRequestHandler.handleStateTrieCalled) - assert.True(t, mockRequestHandler.handleAtomicTrieCalled) - assert.False(t, mockRequestHandler.handleBlockRequestCalled) - assert.False(t, mockRequestHandler.handleCodeRequestCalled) - }, - }, - "unknown node type": { - request: LeafsRequest{ - Root: common.BytesToHash([]byte("some hash goes here")), - Start: bytes.Repeat([]byte{0x00}, common.HashLength), - End: bytes.Repeat([]byte{0xff}, common.HashLength), - Limit: 10, - NodeType: NodeType(11), - }, - assertResponse: func(t *testing.T) { - assert.False(t, mockRequestHandler.handleStateTrieCalled) - assert.False(t, mockRequestHandler.handleAtomicTrieCalled) - assert.False(t, mockRequestHandler.handleBlockRequestCalled) - assert.False(t, mockRequestHandler.handleCodeRequestCalled) - }, - }, - } - for name, test := range tests { - t.Run(name, func(t *testing.T) { - _, _ = test.request.Handle(context.Background(), ids.GenerateTestNodeID(), 1, mockRequestHandler) - test.assertResponse(t) - mockRequestHandler.reset() - }) - } -} - -var _ RequestHandler = &mockHandler{} - -type mockHandler struct { - handleStateTrieCalled, - handleAtomicTrieCalled, - handleBlockRequestCalled, - handleCodeRequestCalled, - handleMessageSignatureCalled, - handleBlockSignatureCalled bool -} - -func (m *mockHandler) HandleStateTrieLeafsRequest(context.Context, ids.NodeID, uint32, LeafsRequest) ([]byte, error) { - m.handleStateTrieCalled = true - return nil, nil -} - -func (m *mockHandler) HandleAtomicTrieLeafsRequest(context.Context, ids.NodeID, uint32, LeafsRequest) ([]byte, error) { - m.handleAtomicTrieCalled = true - return nil, nil -} - -func (m *mockHandler) HandleBlockRequest(context.Context, ids.NodeID, uint32, BlockRequest) ([]byte, error) { - m.handleBlockRequestCalled = true - return nil, nil -} - -func (m *mockHandler) HandleCodeRequest(context.Context, ids.NodeID, uint32, CodeRequest) ([]byte, error) { - m.handleCodeRequestCalled = true - return nil, nil -} - -func (m *mockHandler) HandleMessageSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest MessageSignatureRequest) ([]byte, error) { - m.handleMessageSignatureCalled = true - return nil, nil -} -func (m *mockHandler) HandleBlockSignatureRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, signatureRequest BlockSignatureRequest) ([]byte, error) { - m.handleBlockSignatureCalled = true - return nil, nil -} - -func (m *mockHandler) reset() { - m.handleStateTrieCalled = false - m.handleAtomicTrieCalled = false - m.handleBlockRequestCalled = false - m.handleCodeRequestCalled = false -} diff --git a/plugin/evm/network_handler.go b/plugin/evm/network_handler.go index aa4b728ed5..52feb38b90 100644 --- a/plugin/evm/network_handler.go +++ b/plugin/evm/network_handler.go @@ -5,6 +5,7 @@ package evm import ( "context" + "fmt" "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/ids" @@ -21,38 +22,52 @@ import ( var _ message.RequestHandler = &networkHandler{} type networkHandler struct { - stateTrieLeafsRequestHandler *syncHandlers.LeafsRequestHandler - atomicTrieLeafsRequestHandler *syncHandlers.LeafsRequestHandler - blockRequestHandler *syncHandlers.BlockRequestHandler - codeRequestHandler *syncHandlers.CodeRequestHandler - signatureRequestHandler *warpHandlers.SignatureRequestHandler + leafRequestHandlers map[message.NodeType]*syncHandlers.LeafsRequestHandler + blockRequestHandler *syncHandlers.BlockRequestHandler + codeRequestHandler *syncHandlers.CodeRequestHandler + signatureRequestHandler *warpHandlers.SignatureRequestHandler +} + +type LeafRequestTypeConfig struct { + NodeType message.NodeType + NodeKeyLen int + TrieDB *triedb.Database + UseSnapshots bool + MetricName string } // newNetworkHandler constructs the handler for serving network requests. func newNetworkHandler( provider syncHandlers.SyncDataProvider, diskDB ethdb.KeyValueReader, - evmTrieDB *triedb.Database, - atomicTrieDB *triedb.Database, warpBackend warp.Backend, networkCodec codec.Manager, + leafRequesTypeConfigs map[message.NodeType]LeafRequestTypeConfig, ) message.RequestHandler { syncStats := syncStats.NewHandlerStats(metrics.Enabled) + leafRequestHandlers := make(map[message.NodeType]*syncHandlers.LeafsRequestHandler) + for _, config := range leafRequesTypeConfigs { + snapshotProvider := provider + if !config.UseSnapshots { + snapshotProvider = nil + } + leafRequestHandler := syncHandlers.NewLeafsRequestHandler(config.TrieDB, config.NodeKeyLen, snapshotProvider, networkCodec, syncStats) + leafRequestHandlers[config.NodeType] = leafRequestHandler + } return &networkHandler{ - stateTrieLeafsRequestHandler: syncHandlers.NewLeafsRequestHandler(evmTrieDB, provider, networkCodec, syncStats), - atomicTrieLeafsRequestHandler: syncHandlers.NewLeafsRequestHandler(atomicTrieDB, nil, networkCodec, syncStats), - blockRequestHandler: syncHandlers.NewBlockRequestHandler(provider, networkCodec, syncStats), - codeRequestHandler: syncHandlers.NewCodeRequestHandler(diskDB, networkCodec, syncStats), - signatureRequestHandler: warpHandlers.NewSignatureRequestHandler(warpBackend, networkCodec), + leafRequestHandlers: leafRequestHandlers, + blockRequestHandler: syncHandlers.NewBlockRequestHandler(provider, networkCodec, syncStats), + codeRequestHandler: syncHandlers.NewCodeRequestHandler(diskDB, networkCodec, syncStats), + signatureRequestHandler: warpHandlers.NewSignatureRequestHandler(warpBackend, networkCodec), } } -func (n networkHandler) HandleStateTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest message.LeafsRequest) ([]byte, error) { - return n.stateTrieLeafsRequestHandler.OnLeafsRequest(ctx, nodeID, requestID, leafsRequest) -} - -func (n networkHandler) HandleAtomicTrieLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest message.LeafsRequest) ([]byte, error) { - return n.atomicTrieLeafsRequestHandler.OnLeafsRequest(ctx, nodeID, requestID, leafsRequest) +func (n networkHandler) HandleLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest message.LeafsRequest) ([]byte, error) { + handler, ok := n.leafRequestHandlers[leafsRequest.NodeType] + if !ok { + return nil, fmt.Errorf("unknown node type %d", leafsRequest.NodeType) + } + return handler.OnLeafsRequest(ctx, nodeID, requestID, leafsRequest) } func (n networkHandler) HandleBlockRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, blockRequest message.BlockRequest) ([]byte, error) { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 0cf53150d9..6ed5edcfc3 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -282,6 +282,8 @@ type VM struct { vmsync.StateSyncServer vmsync.StateSyncClient + leafRequestTypeConfigs map[message.NodeType]LeafRequestTypeConfig + // Avalanche Warp Messaging backend // Used to serve BLS signatures of warp messages over RPC warpBackend warp.Backend @@ -723,6 +725,12 @@ func (vm *VM) initializeStateSyncClient(lastAcceptedHeight uint64) error { } } + // Get leaf metrics from config + leafMetricsNames := make(map[message.NodeType]string, len(vm.leafRequestTypeConfigs)) + for _, nodeType := range vm.leafRequestTypeConfigs { + leafMetricsNames[nodeType.NodeType] = nodeType.MetricName + } + vm.StateSyncClient = vmsync.NewStateSyncClient(&vmsync.StateSyncClientConfig{ Chain: vm.eth, State: vm.State, @@ -731,7 +739,7 @@ func (vm *VM) initializeStateSyncClient(lastAcceptedHeight uint64) error { &statesyncclient.ClientConfig{ NetworkClient: vm.client, Codec: vm.networkCodec, - Stats: stats.NewClientSyncerStats(), + Stats: stats.NewClientSyncerStats(leafMetricsNames), StateSyncNodeIDs: stateSyncIDs, BlockParser: vm, }, @@ -1234,7 +1242,7 @@ func (vm *VM) initBlockBuilding() error { // setAppRequestHandlers sets the request handlers for the VM to serve state sync // requests. -func (vm *VM) setAppRequestHandlers() { +func (vm *VM) setAppRequestHandlers() error { // Create standalone EVM TrieDB (read only) for serving leafs requests. // We create a standalone TrieDB here, so that it has a standalone cache from the one // used by the node when processing blocks. @@ -1246,15 +1254,38 @@ func (vm *VM) setAppRequestHandlers() { }, }, ) + if err := vm.RegisterLeafRequestHandler(message.StateTrieNode, "sync_state_trie_leaves", evmTrieDB, message.StateTrieKeyLength, true); err != nil { + return err + } + // Register atomic trieDB for serving atomic leafs requests. + if err := vm.RegisterLeafRequestHandler(atomic.AtomicTrieNode, "sync_atomic_trie_leaves", vm.atomicTrie.TrieDB(), atomic.AtomicTrieKeyLength, false); err != nil { + return err + } + networkHandler := newNetworkHandler( vm.blockChain, vm.chaindb, - evmTrieDB, - vm.atomicTrie.TrieDB(), vm.warpBackend, vm.networkCodec, + vm.leafRequestTypeConfigs, ) vm.Network.SetRequestHandler(networkHandler) + return nil +} + +func (vm *VM) RegisterLeafRequestHandler(nodeType message.NodeType, metricName string, trieDB *triedb.Database, trieKeyLen int, useSnapshot bool) error { + if _, ok := vm.leafRequestTypeConfigs[nodeType]; ok { + return fmt.Errorf("leaf request handler for node type %s already registered", nodeType) + } + handlerConfig := LeafRequestTypeConfig{ + NodeType: nodeType, + TrieDB: trieDB, + UseSnapshots: useSnapshot, + NodeKeyLen: trieKeyLen, + MetricName: metricName, + } + vm.leafRequestTypeConfigs[nodeType] = handlerConfig + return nil } // Shutdown implements the snowman.ChainVM interface diff --git a/sync/README.md b/sync/README.md index 2e6437a71f..0903f1629f 100644 --- a/sync/README.md +++ b/sync/README.md @@ -34,7 +34,7 @@ State sync code is structured as follows: - `plugin/evm/`: The engine expects the VM to implement `StateSyncableVM` interface, - `StateSyncServer`: Contains methods executed on nodes _serving_ state sync requests. - `StateSyncClient`: Contains methods executed on nodes joining the network via state sync, and orchestrates the top level steps of the sync. -- `peer`: Contains abstractions used by `sync/statesync` to send requests to peers (`AppRequest`) and receive responses from peers (`AppResponse`). +- `peer`: Contains abstractions used by `sync/statesync` to send requests to peers (`AppRequest`) and receive responses from peers (`AppResponse`). - `message`: Contains structs that are serialized and sent over the network during state sync. @@ -65,8 +65,8 @@ The following steps are executed by the VM to sync its state from peers (see `st Steps 3 and 4 involve syncing tries. To sync trie data, the VM will send a series of `LeafRequests` to its peers. Each request specifies: - Type of trie (`NodeType`): - - `message.StateTrieNode` (account trie and storage tries share the same database) - - `message.AtomicTrieNode` (atomic trie has an independent database) + - `statesync.StateTrieNode` (account trie and storage tries share the same database) + - `atomic.AtomicTrieNode` (atomic trie has an independent database) - `Root` of the trie to sync, - `Start` and `End` specify a range of keys. diff --git a/sync/client/client_test.go b/sync/client/client_test.go index 9902c4e694..167b5ce120 100644 --- a/sync/client/client_test.go +++ b/sync/client/client_test.go @@ -139,7 +139,7 @@ func TestGetBlocks(t *testing.T) { // set random seed for deterministic tests rand.Seed(1) - var gspec = &core.Genesis{ + gspec := &core.Genesis{ Config: params.TestChainConfig, } memdb := rawdb.NewMemoryDatabase() @@ -415,7 +415,7 @@ func TestGetLeafs(t *testing.T) { largeTrieRoot, largeTrieKeys, _ := syncutils.GenerateTrie(t, trieDB, 100_000, common.HashLength) smallTrieRoot, _, _ := syncutils.GenerateTrie(t, trieDB, leafsLimit, common.HashLength) - handler := handlers.NewLeafsRequestHandler(trieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()) + handler := handlers.NewLeafsRequestHandler(trieDB, message.StateTrieKeyLength, nil, message.Codec, handlerstats.NewNoopHandlerStats()) client := NewClient(&ClientConfig{ NetworkClient: &mockNetwork{}, Codec: message.Codec, @@ -797,7 +797,7 @@ func TestGetLeafsRetries(t *testing.T) { trieDB := triedb.NewDatabase(rawdb.NewMemoryDatabase(), nil) root, _, _ := syncutils.GenerateTrie(t, trieDB, 100_000, common.HashLength) - handler := handlers.NewLeafsRequestHandler(trieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()) + handler := handlers.NewLeafsRequestHandler(trieDB, message.StateTrieKeyLength, nil, message.Codec, handlerstats.NewNoopHandlerStats()) mockNetClient := &mockNetwork{} const maxAttempts = 8 diff --git a/sync/client/stats/stats.go b/sync/client/stats/stats.go index 14af154423..f8c7d2e3e9 100644 --- a/sync/client/stats/stats.go +++ b/sync/client/stats/stats.go @@ -75,19 +75,21 @@ func (m *messageMetric) UpdateRequestLatency(duration time.Duration) { } type clientSyncerStats struct { - atomicTrieLeavesMetric, - stateTrieLeavesMetric, + leafMetrics map[message.NodeType]MessageMetric codeRequestMetric, blockRequestMetric MessageMetric } // NewClientSyncerStats returns stats for the client syncer -func NewClientSyncerStats() ClientSyncerStats { +func NewClientSyncerStats(leafMetricNames map[message.NodeType]string) ClientSyncerStats { + leafMetrics := make(map[message.NodeType]MessageMetric, len(leafMetricNames)) + for nodeType, name := range leafMetricNames { + leafMetrics[nodeType] = NewMessageMetric(name) + } return &clientSyncerStats{ - atomicTrieLeavesMetric: NewMessageMetric("sync_atomic_trie_leaves"), - stateTrieLeavesMetric: NewMessageMetric("sync_state_trie_leaves"), - codeRequestMetric: NewMessageMetric("sync_code"), - blockRequestMetric: NewMessageMetric("sync_blocks"), + leafMetrics: leafMetrics, + codeRequestMetric: NewMessageMetric("sync_code"), + blockRequestMetric: NewMessageMetric("sync_blocks"), } } @@ -99,14 +101,11 @@ func (c *clientSyncerStats) GetMetric(msgIntf message.Request) (MessageMetric, e case message.CodeRequest: return c.codeRequestMetric, nil case message.LeafsRequest: - switch msg.NodeType { - case message.StateTrieNode: - return c.stateTrieLeavesMetric, nil - case message.AtomicTrieNode: - return c.atomicTrieLeavesMetric, nil - default: + metric, ok := c.leafMetrics[msg.NodeType] + if !ok { return nil, fmt.Errorf("invalid leafs request for node type: %T", msg.NodeType) } + return metric, nil default: return nil, fmt.Errorf("attempted to get metric for invalid request with type %T", msg) } @@ -133,12 +132,3 @@ func NewNoOpStats() ClientSyncerStats { func (n noopStats) GetMetric(_ message.Request) (MessageMetric, error) { return n.noop, nil } - -// NewStats returns syncer stats if enabled or a no-op version if disabled. -func NewStats(enabled bool) ClientSyncerStats { - if enabled { - return NewClientSyncerStats() - } else { - return NewNoOpStats() - } -} diff --git a/sync/handlers/leafs_request.go b/sync/handlers/leafs_request.go index 4bb0fcfb61..0c4e384052 100644 --- a/sync/handlers/leafs_request.go +++ b/sync/handlers/leafs_request.go @@ -6,13 +6,11 @@ package handlers import ( "bytes" "context" - "fmt" "sync" "time" "github.com/ava-labs/avalanchego/codec" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/coreth/core/state/snapshot" "github.com/ava-labs/coreth/core/types" "github.com/ava-labs/coreth/plugin/evm/message" @@ -48,14 +46,16 @@ type LeafsRequestHandler struct { codec codec.Manager stats stats.LeafsRequestHandlerStats pool sync.Pool + trieKeyLength int } -func NewLeafsRequestHandler(trieDB *triedb.Database, snapshotProvider SnapshotProvider, codec codec.Manager, syncerStats stats.LeafsRequestHandlerStats) *LeafsRequestHandler { +func NewLeafsRequestHandler(trieDB *triedb.Database, trieKeyLength int, snapshotProvider SnapshotProvider, codec codec.Manager, syncerStats stats.LeafsRequestHandlerStats) *LeafsRequestHandler { return &LeafsRequestHandler{ trieDB: trieDB, snapshotProvider: snapshotProvider, codec: codec, stats: syncerStats, + trieKeyLength: trieKeyLength, pool: sync.Pool{ New: func() interface{} { return make([][]byte, 0, maxLeavesLimit) }, }, @@ -70,7 +70,6 @@ func NewLeafsRequestHandler(trieDB *triedb.Database, snapshotProvider SnapshotPr // Specified Limit in message.LeafsRequest is overridden to maxLeavesLimit if it is greater than maxLeavesLimit // Expects returned errors to be treated as FATAL // Never returns errors -// Expects NodeType to be one of message.AtomicTrieNode or message.StateTrieNode // Returns nothing if NodeType is invalid or requested trie root is not found // Assumes ctx is active func (lrh *LeafsRequestHandler) OnLeafsRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, leafsRequest message.LeafsRequest) ([]byte, error) { @@ -85,16 +84,9 @@ func (lrh *LeafsRequestHandler) OnLeafsRequest(ctx context.Context, nodeID ids.N lrh.stats.IncInvalidLeafsRequest() return nil, nil } - keyLength, err := getKeyLength(leafsRequest.NodeType) - if err != nil { - // Note: LeafsRequest.Handle checks NodeType's validity so clients cannot cause the server to spam this error - log.Error("Failed to get key length for leafs request", "err", err) - lrh.stats.IncInvalidLeafsRequest() - return nil, nil - } - if len(leafsRequest.Start) != 0 && len(leafsRequest.Start) != keyLength || - len(leafsRequest.End) != 0 && len(leafsRequest.End) != keyLength { - log.Debug("invalid length for leafs request range, dropping request", "startLen", len(leafsRequest.Start), "endLen", len(leafsRequest.End), "expected", keyLength) + if len(leafsRequest.Start) != 0 && len(leafsRequest.Start) != lrh.trieKeyLength || + len(leafsRequest.End) != 0 && len(leafsRequest.End) != lrh.trieKeyLength { + log.Debug("invalid length for leafs request range, dropping request", "startLen", len(leafsRequest.Start), "endLen", len(leafsRequest.End), "expected", lrh.trieKeyLength) lrh.stats.IncInvalidLeafsRequest() return nil, nil } @@ -134,7 +126,7 @@ func (lrh *LeafsRequestHandler) OnLeafsRequest(ctx context.Context, nodeID ids.N request: &leafsRequest, response: &leafsResponse, t: t, - keyLength: keyLength, + keyLength: lrh.trieKeyLength, limit: limit, stats: lrh.stats, } @@ -455,18 +447,6 @@ func (rb *responseBuilder) fillFromTrie(ctx context.Context, end []byte) (bool, return more, it.Err } -// getKeyLength returns trie key length for given nodeType -// expects nodeType to be one of message.AtomicTrieNode or message.StateTrieNode -func getKeyLength(nodeType message.NodeType) (int, error) { - switch nodeType { - case message.AtomicTrieNode: - return wrappers.LongLen + common.HashLength, nil - case message.StateTrieNode: - return common.HashLength, nil - } - return 0, fmt.Errorf("cannot get key length for unknown node type: %s", nodeType) -} - // readLeafsFromSnapshot iterates the storage snapshot of the requested account // (or the main account trie if account is empty). Returns up to [rb.limit] key/value // pairs for keys that are in the request's range (inclusive). diff --git a/sync/handlers/leafs_request_test.go b/sync/handlers/leafs_request_test.go index 1c8201fdad..296e87371f 100644 --- a/sync/handlers/leafs_request_test.go +++ b/sync/handlers/leafs_request_test.go @@ -74,7 +74,7 @@ func TestLeafsRequestHandler_OnLeafsRequest(t *testing.T) { } } snapshotProvider := &TestSnapshotProvider{} - leafsHandler := NewLeafsRequestHandler(trieDB, snapshotProvider, message.Codec, mockHandlerStats) + leafsHandler := NewLeafsRequestHandler(trieDB, message.StateTrieKeyLength, snapshotProvider, message.Codec, mockHandlerStats) snapConfig := snapshot.Config{ CacheSize: 64, AsyncBuild: false, diff --git a/sync/statesync/sync_test.go b/sync/statesync/sync_test.go index 31b2518187..c64d36faa5 100644 --- a/sync/statesync/sync_test.go +++ b/sync/statesync/sync_test.go @@ -49,7 +49,7 @@ func testSync(t *testing.T, test syncTest) { ctx = test.ctx } clientDB, serverDB, serverTrieDB, root := test.prepareForTest(t) - leafsRequestHandler := handlers.NewLeafsRequestHandler(serverTrieDB, nil, message.Codec, handlerstats.NewNoopHandlerStats()) + leafsRequestHandler := handlers.NewLeafsRequestHandler(serverTrieDB, message.StateTrieKeyLength, nil, message.Codec, handlerstats.NewNoopHandlerStats()) codeRequestHandler := handlers.NewCodeRequestHandler(serverDB, message.Codec, handlerstats.NewNoopHandlerStats()) mockClient := statesyncclient.NewMockClient(message.Codec, leafsRequestHandler, codeRequestHandler, nil) // Set intercept functions for the mock client From 23149299dde1f7c64a83c769f9b1bdeb887a7172 Mon Sep 17 00:00:00 2001 From: Ceyhun Onur Date: Mon, 6 Jan 2025 16:08:21 +0300 Subject: [PATCH 2/3] fix fmt errorf --- plugin/evm/message/leafs_request.go | 2 +- plugin/evm/vm.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/evm/message/leafs_request.go b/plugin/evm/message/leafs_request.go index 84d10cccd5..ca51937ada 100644 --- a/plugin/evm/message/leafs_request.go +++ b/plugin/evm/message/leafs_request.go @@ -17,7 +17,7 @@ var _ Request = LeafsRequest{} // NodeType outlines the trie that a leaf node belongs to // handlers.LeafsRequestHandler uses this information to determine -// which of the two tries (state/atomic) to fetch the information from +// which trie type to fetch the information from type NodeType uint8 const ( diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 5c6b9f6c0a..5b4d021a92 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -1276,7 +1276,7 @@ func (vm *VM) setAppRequestHandlers() error { func (vm *VM) RegisterLeafRequestHandler(nodeType message.NodeType, metricName string, trieDB *triedb.Database, trieKeyLen int, useSnapshot bool) error { if _, ok := vm.leafRequestTypeConfigs[nodeType]; ok { - return fmt.Errorf("leaf request handler for node type %s already registered", nodeType) + return fmt.Errorf("leaf request handler for node type %d already registered", nodeType) } handlerConfig := LeafRequestTypeConfig{ NodeType: nodeType, From 0434accf34bce799e1ca6da4facd7fb208703093 Mon Sep 17 00:00:00 2001 From: Ceyhun Onur Date: Mon, 6 Jan 2025 16:35:21 +0300 Subject: [PATCH 3/3] fix tests --- plugin/evm/message/leafs_request_test.go | 4 +--- plugin/evm/vm.go | 3 +++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/plugin/evm/message/leafs_request_test.go b/plugin/evm/message/leafs_request_test.go index 22f95c2011..f70aad7bba 100644 --- a/plugin/evm/message/leafs_request_test.go +++ b/plugin/evm/message/leafs_request_test.go @@ -12,8 +12,6 @@ import ( "github.com/stretchr/testify/assert" ) -const nodeType = NodeType(0) - // TestMarshalLeafsRequest asserts that the structure or serialization logic hasn't changed, primarily to // ensure compatibility with the network. func TestMarshalLeafsRequest(t *testing.T) { @@ -35,7 +33,7 @@ func TestMarshalLeafsRequest(t *testing.T) { Start: startBytes, End: endBytes, Limit: 1024, - NodeType: nodeType, + NodeType: NodeType(1), } base64LeafsRequest := "AAAAAAAAAAAAAAAAAAAAAABpbSBST09UaW5nIGZvciB5YQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIFL9/AchgmVPFj9fD5piHXKVZsdNEAN8TXu7BAfR4sZJAAAAIIGFWthoHQ2G0ekeABZ5OctmlNLEIqzSCKAHKTlIf2mZBAAB" diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index 5b4d021a92..3e304d5e2f 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -1275,6 +1275,9 @@ func (vm *VM) setAppRequestHandlers() error { } func (vm *VM) RegisterLeafRequestHandler(nodeType message.NodeType, metricName string, trieDB *triedb.Database, trieKeyLen int, useSnapshot bool) error { + if vm.leafRequestTypeConfigs == nil { + vm.leafRequestTypeConfigs = make(map[message.NodeType]LeafRequestTypeConfig) + } if _, ok := vm.leafRequestTypeConfigs[nodeType]; ok { return fmt.Errorf("leaf request handler for node type %d already registered", nodeType) }