Skip to content

Commit

Permalink
Merge branch 'GH-529-optimize-block-processing-part2' into GH-529-opt…
Browse files Browse the repository at this point in the history
…imize-block-processing-part3
  • Loading branch information
heifner authored Sep 26, 2024
2 parents 2f0da1f + 5cc4828 commit a57af4b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 24 deletions.
57 changes: 39 additions & 18 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -450,13 +450,13 @@ namespace eosio {

void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id );
void on_accepted_block( const signed_block_ptr& block, const block_id_type& id );
void on_irreversible_block( const signed_block_ptr& block, const block_id_type& id );
void broadcast_vote_message( uint32_t connection_id, vote_result_t status,
const vote_message_ptr& vote,
const finalizer_authority_ptr& active_auth,
const finalizer_authority_ptr& pending_auth);

void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
void on_irreversible_block( const block_id_type& id, uint32_t block_num );

void bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg );

Expand Down Expand Up @@ -3721,8 +3721,10 @@ namespace eosio {
my_impl->dispatcher.strand.post([id, c{shared_from_this()}, ptr{std::move(ptr)}, cid=connection_id]() mutable {
controller& cc = my_impl->chain_plug->chain();

auto lib_num = my_impl->get_chain_lib_num();

// may have come in on a different connection and posted into dispatcher strand before this one
if( my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe
if( block_header::num_from_id(id) <= lib_num || my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe
my_impl->dispatcher.add_peer_block( id, c->connection_id );
c->strand.post( [c, id, ptr{std::move(ptr)}]() {
const fc::microseconds age(fc::time_point::now() - ptr->timestamp);
Expand Down Expand Up @@ -3788,6 +3790,14 @@ namespace eosio {
my_impl->dispatcher.bcast_block( obh->block(), obh->id() );
c->block_status_monitor_.accepted();

if (my_impl->chain_plug->chain().get_read_mode() == db_read_mode::IRREVERSIBLE) {
// non-irreversible notifies sync_manager when block is applied
my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), bh=*obh]() {
const fc::microseconds age(fc::time_point::now() - bh.timestamp());
sync_master->sync_recv_block(connection_ptr{}, bh.id(), bh.block_num(), age);
});
}

if (best_head) {
++c->unique_blocks_rcvd_count;
fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num()));
Expand Down Expand Up @@ -3860,23 +3870,26 @@ namespace eosio {

// called from application thread
void net_plugin_impl::on_accepted_block_header(const signed_block_ptr& block, const block_id_type& id) {
fc_dlog(logger, "on_accpeted_block_header ${bn} ${id}", ("bn", block->block_num())("id", id));
fc_dlog(logger, "on_accepted_block_header ${bn} ${id}", ("bn", block->block_num())("id", id));
update_chain_info();

dispatcher.strand.post([block, id]() {
boost::asio::post( my_impl->thread_pool.get_executor(), [block, id]() {
fc_dlog(logger, "signaled accepted_block_header, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id));
my_impl->dispatcher.bcast_block(block, id);
});
}

void net_plugin_impl::on_accepted_block( const signed_block_ptr& block, const block_id_type& id) {
fc_dlog(logger, "on_accpeted_block ${bn} ${id}", ("bn", block->block_num())("id", id));
fc_dlog(logger, "on_accepted_block ${bn} ${id}", ("bn", block->block_num())("id", id));
update_chain_info();

my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() {
const fc::microseconds age(fc::time_point::now() - block->timestamp);
sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age);
});
if (my_impl->chain_plug->chain().get_read_mode() != db_read_mode::IRREVERSIBLE) {
// irreversible notifies sync_manager when added to forkdb, non-irreversible notifies when applied
my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() {
const fc::microseconds age(fc::time_point::now() - block->timestamp);
sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age);
});
}

sync_master->send_handshakes_if_synced(fc::time_point::now() - block->timestamp);
if (const auto* pending_producers = chain_plug->chain().pending_producers()) {
Expand All @@ -3885,6 +3898,20 @@ namespace eosio {
on_active_schedule(chain_plug->chain().active_producers());
}

// called from application thread
void net_plugin_impl::on_irreversible_block( const signed_block_ptr& block, const block_id_type& id ) {
fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block->block_num())("id", id) );
update_chain_info(id);

if (my_impl->chain_plug->chain().get_read_mode() == db_read_mode::IRREVERSIBLE) {
// irreversible notifies sync_manager when added to forkdb, non-irreversible notifies when applied
my_impl->dispatcher.strand.post([sync_master = my_impl->sync_master.get(), block, id]() {
const fc::microseconds age(fc::time_point::now() - block->timestamp);
sync_master->sync_recv_block(connection_ptr{}, id, block->block_num(), age);
});
}
}

// called from other threads including net threads
void net_plugin_impl::broadcast_vote_message(uint32_t connection_id, vote_result_t status,
const vote_message_ptr& msg,
Expand Down Expand Up @@ -3941,17 +3968,11 @@ namespace eosio {
("t", exclude_peer ? "received" : "our")("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16))
("v", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8,16)));

dispatcher.strand.post( [this, exclude_peer, msg{std::move(send_buffer)}]() mutable {
dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) );
boost::asio::post( my_impl->thread_pool.get_executor(), [exclude_peer, msg{std::move(send_buffer)}]() mutable {
my_impl->dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) );
});
}

// called from application thread
void net_plugin_impl::on_irreversible_block( const block_id_type& id, uint32_t block_num) {
fc_dlog( logger, "on_irreversible_block, blk num = ${num}, id = ${id}", ("num", block_num)("id", id) );
update_chain_info(id);
}

// called from application thread
void net_plugin_impl::transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>& results) {
boost::asio::post( my_impl->thread_pool.get_executor(), [&dispatcher = my_impl->dispatcher, results]() {
Expand Down Expand Up @@ -4347,7 +4368,7 @@ namespace eosio {
} );
cc.irreversible_block().connect( [my = shared_from_this()]( const block_signal_params& t ) {
const auto& [ block, id ] = t;
my->on_irreversible_block( id, block->block_num() );
my->on_irreversible_block( block, id );
} );

auto broadcast_vote = [my = shared_from_this()]( const vote_signal_params& vote_signal ) {
Expand Down
4 changes: 2 additions & 2 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ set_property(TEST p2p_multiple_listen_if_test PROPERTY LABELS nonparallelizable_
add_test(NAME p2p_no_listen_test COMMAND tests/p2p_no_listen_test.py -v ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_no_listen_test PROPERTY LABELS nonparallelizable_tests)
add_test(NAME p2p_sync_throttle_test COMMAND tests/p2p_sync_throttle_test.py -v -d 2 ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_sync_throttle_test PROPERTY LABELS nonparallelizable_tests)
set_property(TEST p2p_sync_throttle_test PROPERTY LABELS long_running_tests)
add_test(NAME p2p_sync_throttle_if_test COMMAND tests/p2p_sync_throttle_test.py -v -d 2 --activate-if ${UNSHARE} WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set_property(TEST p2p_sync_throttle_if_test PROPERTY LABELS nonparallelizable_tests)
set_property(TEST p2p_sync_throttle_if_test PROPERTY LABELS long_running_tests)

# needs iproute-tc or iproute2 depending on platform
#add_test(NAME p2p_high_latency_test COMMAND tests/p2p_high_latency_test.py -v WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
Expand Down
9 changes: 7 additions & 2 deletions tests/nodeos_startup_catchup.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,13 @@ def waitForNodeStarted(node):
if irreversible:
# for now just use a larger tolerance, later when the logs include calculated lib this can be more precise
# See https://github.com/AntelopeIO/spring/issues/806
if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*10):
errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}")
if activateIF:
if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*10):
errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}")
else:
if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*2+360):
errorExit(f"Requested range too far head of fork head {fhead} in irreversible mode, sync-fetch-span {sync_fetch_span}: {line}")

else:
if endBlockNum > fhead and fhead > libNum and endBlockNum - fhead > (sync_fetch_span*2-1):
errorExit(f"Requested range too far head of fork head {fhead} sync-fetch-span {sync_fetch_span}: {line}")
Expand Down
5 changes: 3 additions & 2 deletions tests/p2p_sync_throttle_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#
# Compare the sync time of throttledNode and unThrottledNode
if cluster.launch(pnodes=pnodes, unstartedNodes=3, totalNodes=total_nodes, prodCount=prod_count,
extraNodeosArgs="--sync-fetch-span 5",
topo='./tests/p2p_sync_throttle_test_shape.json', delay=delay, activateIF=activateIF) is False:
errorExit("Failed to stand up eos cluster.")

Expand Down Expand Up @@ -91,7 +92,7 @@

Print("Configure and launch txn generators")
targetTpsPerGenerator = 500
testTrxGenDurationSec=60
testTrxGenDurationSec=90
trxGeneratorCnt=1
cluster.launchTrxGenerators(contractOwnerAcctName=cluster.eosioAccount.name, acctNamesList=[accounts[0].name,accounts[1].name],
acctPrivKeysList=[account1PrivKey,account2PrivKey], nodeId=prodNode.nodeId, tpsPerGenerator=targetTpsPerGenerator,
Expand Down Expand Up @@ -132,7 +133,7 @@
assert unThrottledNode.waitForBlock(endLargeBlocksHeadBlock), f'wait for block {endLargeBlocksHeadBlock} on un-throttled node timed out'
endUnThrottledSync = time.time()

assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=240), f'Wait for block {endLargeBlocksHeadBlock} on throttled node timed out'
assert throttledNode.waitForBlock(endLargeBlocksHeadBlock, timeout=endLargeBlocksHeadBlock*2), f'Wait for block {endLargeBlocksHeadBlock} on throttled node timed out'
endThrottledSync = time.time()

throttledElapsed = endThrottledSync - clusterStart
Expand Down

0 comments on commit a57af4b

Please sign in to comment.