diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 505ac3892a..214edd850b 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -450,13 +450,13 @@ namespace eosio { void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id ); void on_accepted_block( const signed_block_ptr& block, const block_id_type& id ); + void on_irreversible_block( const signed_block_ptr& block, const block_id_type& id ); void broadcast_vote_message( uint32_t connection_id, vote_result_t status, const vote_message_ptr& vote, const finalizer_authority_ptr& active_auth, const finalizer_authority_ptr& pending_auth); void transaction_ack(const std::pair&); - void on_irreversible_block( const block_id_type& id, uint32_t block_num ); void bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg ); @@ -3721,8 +3721,10 @@ namespace eosio { my_impl->dispatcher.strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable { controller& cc = my_impl->chain_plug->chain(); + auto lib_num = my_impl->get_chain_lib_num(); + // may have come in on a different connection and posted into dispatcher strand before this one - if( my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe + if( block_header::num_from_id(id) <= lib_num || my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe my_impl->dispatcher.add_peer_block( id, c->connection_id ); c->strand.post( [c, id, ptr{std::move(ptr)}]() { const fc::microseconds age(fc::time_point::now() - ptr->timestamp); @@ -3788,6 +3790,14 @@ namespace eosio { my_impl->dispatcher.bcast_block( obh->block(), obh->id() ); c->block_status_monitor_.accepted(); + if (my_impl->chain_plug->chain().get_read_mode() == db_read_mode::IRREVERSIBLE) { + // non-irreversible notifies sync_manager when block is applied + my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), bh=*obh]() { + const fc::microseconds age(fc::time_point::now() - bh.timestamp()); + sync_master->sync_recv_block(connection_ptr{}, bh.id(), bh.block_num(), age); + }); + } + if (best_head) { ++c->unique_blocks_rcvd_count; fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num())); @@ -3860,23 +3870,26 @@ namespace eosio { // called from application thread void net_plugin_impl::on_accepted_block_header(const signed_block_ptr& block, const block_id_type& id) { - fc_dlog(logger, "on_accpeted_block_header ${bn} ${id}", ("bn", block->block_num())("id", id)); + fc_dlog(logger, "on_accepted_block_header ${bn} ${id}", ("bn", block->block_num())("id", id)); update_chain_info(); - dispatcher.strand.post([block, id]() { + boost::asio::post( my_impl->thread_pool.get_executor(), [block, id]() { fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id)); my_impl->dispatcher.bcast_block(block, id); }); } void net_plugin_impl::on_accepted_block( const signed_block_ptr& block, const block_id_type& id) { - fc_dlog(logger, "on_accpeted_block ${bn} ${id}", ("bn", block->block_num())("id", id)); + fc_dlog(logger, "on_accepted_block ${bn} ${id}", ("bn", block->block_num())("id", id)); update_chain_info(); - my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() { - const fc::microseconds age(fc::time_point::now() - block->timestamp); - sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age); - }); + if (my_impl->chain_plug->chain().get_read_mode() != db_read_mode::IRREVERSIBLE) { + // irreversible notifies sync_manager when added to forkdb, non-irreversible notifies when applied + my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() { + const fc::microseconds age(fc::time_point::now() - block->timestamp); + sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age); + }); + } sync_master->send_handshakes_if_synced(fc::time_point::now() - block->timestamp); if (const auto* pending_producers = chain_plug->chain().pending_producers()) { @@ -3885,6 +3898,20 @@ namespace eosio { on_active_schedule(chain_plug->chain().active_producers()); } + // called from application thread + void net_plugin_impl::on_irreversible_block( const signed_block_ptr& block, const block_id_type& id ) { + fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id) ); + update_chain_info(id); + + if (my_impl->chain_plug->chain().get_read_mode() == db_read_mode::IRREVERSIBLE) { + // irreversible notifies sync_manager when added to forkdb, non-irreversible notifies when applied + my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() { + const fc::microseconds age(fc::time_point::now() - block->timestamp); + sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age); + }); + } + } + // called from other threads including net threads void net_plugin_impl::broadcast_vote_message(uint32_t connection_id, vote_result_t status, const vote_message_ptr& msg, @@ -3941,17 +3968,11 @@ namespace eosio { ("t", exclude_peer ? "received" : "our")("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16)) ("v", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8,16))); - dispatcher.strand.post( [this, exclude_peer, msg{std::move(send_buffer)}]() mutable { - dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) ); + boost::asio::post( my_impl->thread_pool.get_executor(), [exclude_peer, msg{std::move(send_buffer)}]() mutable { + my_impl->dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) ); }); } - // called from application thread - void net_plugin_impl::on_irreversible_block( const block_id_type& id, uint32_t block_num) { - fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block_num)("id", id) ); - update_chain_info(id); - } - // called from application thread void net_plugin_impl::transaction_ack(const std::pair& results) { boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, results]() { @@ -4347,7 +4368,7 @@ namespace eosio { } ); cc.irreversible_block().connect( [my = shared_from_this()]( const block_signal_params& t ) { const auto& [ block, id ] = t; - my->on_irreversible_block( id, block->block_num() ); + my->on_irreversible_block( block, id ); } ); auto broadcast_vote = [my = shared_from_this()]( const vote_signal_params& vote_signal ) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a99d2b7d96..a6194c13f1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -292,9 +292,9 @@ set_property(TEST p2p_multiple_listen_if_test PROPERTY LABELS nonparallelizable_ add_test(NAME p2p_no_listen_test COMMAND tests/p2p_no_listen_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) set_property(TEST p2p_no_listen_test PROPERTY LABELS nonparallelizable_tests) add_test(NAME p2p_sync_throttle_test COMMAND tests/p2p_sync_throttle_test.py -v -d 2 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) -set_property(TEST p2p_sync_throttle_test PROPERTY LABELS nonparallelizable_tests) +set_property(TEST p2p_sync_throttle_test PROPERTY LABELS long_running_tests) add_test(NAME p2p_sync_throttle_if_test COMMAND tests/p2p_sync_throttle_test.py -v -d 2 --activate-if ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) -set_property(TEST p2p_sync_throttle_if_test PROPERTY LABELS nonparallelizable_tests) +set_property(TEST p2p_sync_throttle_if_test PROPERTY LABELS long_running_tests) # needs iproute-tc or iproute2 depending on platform #add_test(NAME p2p_high_latency_test COMMAND tests/p2p_high_latency_test.py -v WORKING_DIRECTORY ${CMAKE_BINARY_DIR}) diff --git a/tests/nodeos_startup_catchup.py b/tests/nodeos_startup_catchup.py index f74b8578f8..6126345425 100755 --- a/tests/nodeos_startup_catchup.py +++ b/tests/nodeos_startup_catchup.py @@ -239,8 +239,13 @@ def waitForNodeStarted(node): if irreversible: # for now just use a larger tolerance, later when the logs include calculated lib this can be more precise # See https://github.com/AntelopeIO/spring/issues/806 - if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*10): - errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}") + if activateIF: + if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*10): + errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}") + else: + if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*2+360): + errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}") + else: if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*2-1): errorExit(f"Requested range too far head of fork head {fhead} sync-fetch-span {sync_fetch_span}: {line}") diff --git a/tests/p2p_sync_throttle_test.py b/tests/p2p_sync_throttle_test.py index 4cc9fe277f..cf7a12abe9 100755 --- a/tests/p2p_sync_throttle_test.py +++ b/tests/p2p_sync_throttle_test.py @@ -57,6 +57,7 @@ # # Compare the sync time of throttledNode and unThrottledNode if cluster.launch(pnodes=pnodes, unstartedNodes=3, totalNodes=total_nodes, prodCount=prod_count, + extraNodeosArgs="--sync-fetch-span 5", topo='./tests/p2p_sync_throttle_test_shape.json', delay=delay, activateIF=activateIF) is False: errorExit("Failed to stand up eos cluster.") @@ -91,7 +92,7 @@ Print("Configure and launch txn generators") targetTpsPerGenerator = 500 - testTrxGenDurationSec=60 + testTrxGenDurationSec=90 trxGeneratorCnt=1 cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[accounts[0].name,accounts[1].name], acctPrivKeysList=[account1PrivKey,account2PrivKey], nodeId=prodNode.nodeId, tpsPerGenerator=targetTpsPerGenerator, @@ -132,7 +133,7 @@ assert unThrottledNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on un-throttled node timed out' endUnThrottledSync = time.time() - assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=240), f'Wait for block {endLargeBlocksHeadBlock} on throttled node timed out' + assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=endLargeBlocksHeadBlock*2), f'Wait for block {endLargeBlocksHeadBlock} on throttled node timed out' endThrottledSync = time.time() throttledElapsed = endThrottledSync - clusterStart