Skip to content

Commit

Permalink
GH-1091 Initial implementation of block_nack with block_nack_message …
Browse files Browse the repository at this point in the history
…and block_notice_message
  • Loading branch information
heifner committed Jan 6, 2025
1 parent d89dc9f commit b54eea7
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 27 deletions.
14 changes: 13 additions & 1 deletion plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ namespace eosio {
uint32_t end_block{0};
};

struct block_nack_message {
block_id_type id;
};

struct block_notice_message {
block_id_type id;
};

using net_message = std::variant<handshake_message,
chain_size_message,
go_away_message,
Expand All @@ -143,7 +151,9 @@ namespace eosio {
sync_request_message,
signed_block,
packed_transaction,
vote_message>;
vote_message,
block_nack_message,
block_notice_message>;

} // namespace eosio

Expand All @@ -162,6 +172,8 @@ FC_REFLECT( eosio::time_message, (org)(rec)(xmt)(dst) )
FC_REFLECT( eosio::notice_message, (known_trx)(known_blocks) )
FC_REFLECT( eosio::request_message, (req_trx)(req_blocks) )
FC_REFLECT( eosio::sync_request_message, (start_block)(end_block) )
FC_REFLECT( eosio::block_nack_message, (id) )
FC_REFLECT( eosio::block_notice_message, (id) )

/**
*
Expand Down
163 changes: 137 additions & 26 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ namespace eosio {
};
explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance );
static void send_handshakes();
static void send_block_nack_resets();
bool syncing_from_peer() const { return sync_state == lib_catchup; }
bool is_in_sync() const { return sync_state == in_sync; }
void sync_reset_fork_db_root_num( const connection_ptr& conn, bool closing );
Expand Down Expand Up @@ -265,6 +266,8 @@ namespace eosio {
signed_block = fc::get_index<net_message, signed_block>(),
packed_transaction = fc::get_index<net_message, packed_transaction>(),
vote_message = fc::get_index<net_message, vote_message>(),
block_nack_message = fc::get_index<net_message, block_nack_message>(),
block_notice_message = fc::get_index<net_message, block_notice_message>(),
unknown
};

Expand Down Expand Up @@ -598,10 +601,11 @@ namespace eosio {
constexpr uint16_t proto_dup_node_id_goaway = 6; // eosio 2.1: support peer node_id based duplicate connection resolution
constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported
constexpr uint16_t proto_block_range = 8; // include block range in notice_message
constexpr uint16_t proto_savanna = 9; // savanna
constexpr uint16_t proto_savanna = 9; // savanna, adds vote_message
constexpr uint16_t proto_block_nack = 10; // adds block_nack_message & block_notice_message
#pragma GCC diagnostic pop

constexpr uint16_t net_version_max = proto_savanna;
constexpr uint16_t net_version_max = proto_block_nack;

/**
* Index by start_block_num
Expand Down Expand Up @@ -912,6 +916,11 @@ namespace eosio {

std::chrono::nanoseconds connection_start_time{0};

// block nack support
static constexpr uint16_t consecutive_block_nacks_threshold{3}; // stop sending blocks when reached
uint16_t consecutive_blocks_nacks{0};
block_id_type last_block_nack;

connection_status get_status()const;

/** \name Peer Timestamps
Expand Down Expand Up @@ -962,6 +971,7 @@ namespace eosio {
*/
bool process_next_message(uint32_t message_length);

void send_block_nack(const block_id_type& block_id);
void send_handshake();

/** \name Peer Timestamps
Expand Down Expand Up @@ -1042,6 +1052,8 @@ namespace eosio {
void handle_message( const packed_transaction_ptr& trx );
void handle_message( const vote_message_ptr& msg );
void handle_message( const vote_message& msg ) = delete; // vote_message_ptr overload used instead
void handle_message( const block_nack_message& msg);
void handle_message( const block_notice_message& msg);

// returns calculated number of blocks combined latency
uint32_t calc_block_latency();
Expand Down Expand Up @@ -1120,6 +1132,18 @@ namespace eosio {
peer_dlog( c, "handle sync_request_message" );
c->handle_message( msg );
}

void operator()( const block_nack_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle block_nack_message ${id}", ("id", msg.id) );
c->handle_message( msg );
}

void operator()( const block_notice_message& msg ) const {
// continue call to handle_message on connection strand
peer_dlog( c, "handle block_notice_message ${id}", ("id", msg.id) );
c->handle_message( msg );
}
};


Expand Down Expand Up @@ -1423,6 +1447,13 @@ namespace eosio {
block_sync_frame_bytes_sent = 0;
block_sync_throttling = false;
last_vote_received = time_point{};
consecutive_blocks_nacks = 0;
last_block_nack = block_id_type{};

uint32_t head_num = my_impl->get_chain_head_num();
if (last_received_block_num >= head_num) {
sync_manager::send_block_nack_resets();
}

if( reconnect && !shutdown && !incoming() ) {
my_impl->connections.start_conn_timer( std::chrono::milliseconds( 100 ),
Expand Down Expand Up @@ -1875,7 +1906,7 @@ namespace eosio {
queue_write(net_msg, send_buffer,
[conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) {
if (ec) {
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.to_string()));
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message()));
return;
}
if (net_msg == msg_type_t::signed_block)
Expand Down Expand Up @@ -2128,6 +2159,15 @@ namespace eosio {
} );
}

// static, thread safe
void sync_manager::send_block_nack_resets() {
my_impl->connections.for_each_block_connection( []( const connection_ptr& ci ) {
if( ci->current() ) {
ci->send_block_nack({});
}
} );
}

bool sync_manager::is_sync_required( uint32_t fork_db_head_block_num ) const REQUIRES(sync_mtx) {
fc_dlog( logger, "last req = ${req}, last recv = ${recv} known = ${known} our fhead = ${h}",
("req", sync_last_requested_num)( "recv", sync_next_expected_num-1 )( "known", sync_known_fork_db_root_num )
Expand Down Expand Up @@ -2691,12 +2731,26 @@ namespace eosio {
if(my_impl->sync_master->syncing_from_peer() ) return;

block_buffer_factory buff_factory;
buffer_factory block_id_buff_factory;
const auto bnum = b->block_num();
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory]( auto& cp ) {
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_id_buff_factory]( auto& cp ) {
fc_dlog( logger, "socket_is_open ${s}, state ${c}, syncing ${ss}, connection - ${cid}",
("s", cp->socket_is_open())("c", connection::state_str(cp->state()))("ss", cp->peer_syncing_from_us.load())("cid", cp->connection_id) );
if( !cp->current() ) return;

if (cp->protocol_version >= proto_block_nack) {
if (cp->consecutive_blocks_nacks > connection::consecutive_block_nacks_threshold) {
add_peer_block( id, cp->connection_id );
auto send_buffer = block_id_buff_factory.get_send_buffer( block_notice_message{id} );
boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::block_notice_message, send_buffer, 0, no_reason );
});
return;
}
}

if( !add_peer_block( id, cp->connection_id ) ) {
fc_dlog( logger, "not bcast block ${b} to connection - ${cid}", ("b", bnum)("cid", cp->connection_id) );
return;
Expand All @@ -2719,12 +2773,11 @@ namespace eosio {
my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) {
if( !cp->current() ) return true;
if( cp->connection_id == exclude_peer ) return true;
if (cp->protocol_version < proto_savanna) return true;
boost::asio::post(cp->strand, [cp, msg]() {
if (cp->protocol_version >= proto_savanna) {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg_type_t::vote_message, msg, 0, no_reason );
}
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg_type_t::vote_message, msg, 0, no_reason );
});
return true;
} );
Expand Down Expand Up @@ -3049,43 +3102,43 @@ namespace eosio {
const block_id_type blk_id = bh.calculate_id();
const uint32_t blk_num = last_received_block_num = block_header::num_from_id(blk_id);
const fc::microseconds age(fc::time_point::now() - bh.timestamp);
// don't add_peer_block because we have not validated this block header yet
if( my_impl->dispatcher.have_block( blk_id ) ) {
pending_message_buffer.advance_read_ptr( message_length ); // advance before any send

// if we have the block then it has been header validated, add for this connection_id
if (my_impl->dispatcher.add_peer_block(blk_id, connection_id)) {
send_block_nack(blk_id);
}
peer_dlog( this, "already received block ${num}, id ${id}..., latency ${l}ms",
("num", blk_num)("id", blk_id.str().substr(8,16))("l", age.count()/1000) );
my_impl->sync_master->sync_recv_block( shared_from_this(), blk_id, blk_num, age );

pending_message_buffer.advance_read_ptr( message_length );
return true;
}
peer_dlog( this, "received block ${num}, id ${id}..., latency: ${l}ms, head ${h}, fhead ${f}",
("num", bh.block_num())("id", blk_id.str().substr(8,16))("l", age.count()/1000)
("h", my_impl->get_chain_head_num())("f", my_impl->get_fork_db_head_num()));
if( !my_impl->sync_master->syncing_from_peer() ) { // guard against peer thinking it needs to send us old blocks
uint32_t fork_db_root_num = my_impl->get_fork_db_root_num();
block_num_type fork_db_root_num = my_impl->get_fork_db_root_num();
if( blk_num <= fork_db_root_num ) {
fc::unique_lock g( conn_mtx );
const auto last_sent_fork_db_root_num = last_handshake_sent.fork_db_root_num;
g.unlock();
peer_ilog( this, "received block ${n} less than ${which}froot ${fr}",
("n", blk_num)("which", blk_num < last_sent_fork_db_root_num ? "sent " : "")
("fr", blk_num < last_sent_fork_db_root_num ? last_sent_fork_db_root_num : fork_db_root_num) );
enqueue( (sync_request_message) {0, 0} );
send_handshake();
pending_message_buffer.advance_read_ptr( message_length ); // advance before any send
peer_ilog( this, "received block ${n} less than froot ${fr}", ("n", blk_num)("fr", fork_db_root_num) );
send_block_nack(blk_id);
cancel_sync_wait();

pending_message_buffer.advance_read_ptr( message_length );
return true;
}
} else {
block_sync_bytes_received += message_length;
uint32_t fork_db_root_num = my_impl->get_fork_db_root_num();
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, age);
if( blk_num <= fork_db_root_num ) {
const bool block_le_lib = blk_num <= fork_db_root_num;
if (block_le_lib) {
peer_dlog( this, "received block ${n} less than froot ${fr} while syncing", ("n", blk_num)("fr", fork_db_root_num) );
pending_message_buffer.advance_read_ptr( message_length );
return true;
pending_message_buffer.advance_read_ptr( message_length ); // advance before any send
}
my_impl->sync_master->sync_recv_block(shared_from_this(), blk_id, blk_num, age);
if (block_le_lib)
return true;
}

auto ds = pending_message_buffer.create_datastream();
Expand Down Expand Up @@ -3180,6 +3233,22 @@ namespace eosio {
return true;
}

// called from connection strand
void connection::send_block_nack(const block_id_type& block_id) {
if (protocol_version < proto_block_nack)
return;

if (my_impl->sync_master->syncing_from_peer())
return;

peer_dlog(this, "Sending nack ${n}", ("n", block_header::num_from_id(block_id)));

buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} );

enqueue_buffer( msg_type_t::block_nack_message, send_buffer, 0, no_reason );
}

void net_plugin_impl::plugin_shutdown() {
thread_pool.stop();
}
Expand Down Expand Up @@ -3674,6 +3743,7 @@ namespace eosio {
}
}

// called from connection strand
void connection::handle_message( const vote_message_ptr& msg ) {
last_vote_received = fc::time_point::now();
if (vote_logger.is_enabled(fc::log_level::debug)) {
Expand All @@ -3685,6 +3755,45 @@ namespace eosio {
cc.process_vote_message(connection_id, msg);
}

// called from connection strand
void connection::handle_message( const block_nack_message& msg ) {
auto block_num = block_header::num_from_id(msg.id);

peer_dlog(this, "received block nack #${bn}:${id}, consecutive ${c}", ("bn", block_num)("id", msg.id)("c", consecutive_blocks_nacks));

if (block_num == 0) { // peer requested reset
consecutive_blocks_nacks = 0;
last_block_nack = msg.id;
return;
}

latest_blk_time = std::chrono::steady_clock::now();
auto fork_db_root_num = my_impl->get_fork_db_root_num();
const bool before_lib = block_header::num_from_id(msg.id) <= fork_db_root_num;

if (before_lib || my_impl->dispatcher.have_block(msg.id)) {
if (block_num - 1 == block_header::num_from_id(last_block_nack)) {
++consecutive_blocks_nacks;
}
if (!before_lib) {
my_impl->dispatcher.add_peer_block(msg.id, connection_id);
}
}
last_block_nack = msg.id;
}

// called from connection strand
void connection::handle_message( const block_notice_message& msg ) {
auto fork_db_root_num = my_impl->get_fork_db_root_num();
if (block_header::num_from_id(msg.id) <= fork_db_root_num)
return;

latest_blk_time = std::chrono::steady_clock::now();
if (my_impl->dispatcher.have_block(msg.id)) {
my_impl->dispatcher.add_peer_block(msg.id, connection_id);
}
}

size_t calc_trx_size( const packed_transaction_ptr& trx ) {
return trx->get_estimated_size();
}
Expand Down Expand Up @@ -3730,8 +3839,10 @@ namespace eosio {

// may have come in on a different connection and posted into dispatcher strand before this one
if( block_header::num_from_id(id) <= fork_db_root_num || my_impl->dispatcher.have_block( id ) || cc.block_exists( id ) ) { // thread-safe
my_impl->dispatcher.add_peer_block( id, c->connection_id );
boost::asio::post(c->strand, [c, id, ptr{std::move(ptr)}]() {
if (my_impl->dispatcher.add_peer_block( id, c->connection_id )) {
c->send_block_nack(id);
}
const fc::microseconds age(fc::time_point::now() - ptr->timestamp);
my_impl->sync_master->sync_recv_block( c, id, block_header::num_from_id(id), age );
});
Expand Down

0 comments on commit b54eea7

Please sign in to comment.