Skip to content

Commit

Permalink
GH-1091 Add back sync_write_queue for sync blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Jan 8, 2025
1 parent 1d18678 commit dd0417b
Showing 1 changed file with 34 additions and 29 deletions.
63 changes: 34 additions & 29 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ namespace eosio {
void reset() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
_out_queue.clear();
Expand All @@ -635,6 +636,7 @@ namespace eosio {
void clear_write_queue() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_trx_write_queue.clear();
_write_queue_size = 0;
}
Expand All @@ -650,17 +652,13 @@ namespace eosio {
return _write_queue_size;
}

bool is_out_queue_empty() const {
fc::lock_guard g( _mtx );
return _out_queue.empty();
}

// called from connection strand
bool ready_to_send(uint32_t connection_id) const {
fc::unique_lock g( _mtx );
// if out_queue is not empty then async_write is in progress
bool async_write_in_progress = !_out_queue.empty();
bool ready = ((!_trx_write_queue.empty() || !_write_queue.empty()) && !async_write_in_progress);
const bool async_write_in_progress = !_out_queue.empty();
const bool ready = !async_write_in_progress &&
(!_sync_write_queue.empty() || !_write_queue.empty() || !_trx_write_queue.empty());
g.unlock();
if (async_write_in_progress) {
fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id));
Expand All @@ -669,12 +667,15 @@ namespace eosio {
}

// @param callback must not callback into queued_buffer
bool add_write_queue( const std::shared_ptr<vector<char>>& buff,
std::function<void( boost::system::error_code, std::size_t )> callback,
msg_type_t net_msg ) {
bool add_write_queue(msg_type_t net_msg,
bool to_sync_queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
fc::lock_guard g( _mtx );
if( net_msg == msg_type_t::packed_transaction ) {
_trx_write_queue.push_back( {buff, std::move(callback)} );
} else if (to_sync_queue) {
_sync_write_queue.push_back( {buff, std::move(callback)} );
} else {
_write_queue.push_back( {buff, std::move(callback)} );
}
Expand All @@ -687,11 +688,13 @@ namespace eosio {

void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
fc::lock_guard g( _mtx );
if( !_write_queue.empty() ) { // always send msgs from write_queue first
if (!_sync_write_queue.empty()) { // always send msgs from sync_write_queue first
fill_out_buffer( bufs, _sync_write_queue );
} else if (!_write_queue.empty()) { // always send msgs from write_queue before trx queue
fill_out_buffer( bufs, _write_queue );
} else {
fill_out_buffer( bufs, _trx_write_queue );
assert(_trx_write_queue.empty() && _write_queue.empty() && _write_queue_size == 0);
assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0);
}
}

Expand Down Expand Up @@ -722,8 +725,9 @@ namespace eosio {

alignas(hardware_destructive_interference_sz)
mutable fc::mutex _mtx;
uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue and _trx_write_queue
deque<queued_write> _write_queue GUARDED_BY(_mtx); // queued messages, all messages except trxs
uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue
deque<queued_write> _write_queue GUARDED_BY(_mtx); // queued messages, all messages except sync & trxs
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue blocks will be sent first
deque<queued_write> _trx_write_queue GUARDED_BY(_mtx); // queued trx messages, trx_write_queue will be sent last
deque<queued_write> _out_queue GUARDED_BY(_mtx); // currently being async_write

Expand Down Expand Up @@ -1006,8 +1010,9 @@ namespace eosio {
void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num );

void enqueue( const net_message& msg );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num, bool to_sync_queue );
void enqueue_buffer( msg_type_t net_msg,
bool to_sync_queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num,
go_away_reason close_after_send);
Expand All @@ -1020,6 +1025,7 @@ namespace eosio {
void sync_wait();

void queue_write(msg_type_t net_msg,
bool to_sync_queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback);
void do_queue_write();
Expand Down Expand Up @@ -1607,9 +1613,10 @@ namespace eosio {

// called from connection strand
void connection::queue_write(msg_type_t net_msg,
bool to_sync_queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
if( !buffer_queue.add_write_queue( buff, std::move(callback), net_msg )) {
if( !buffer_queue.add_write_queue( net_msg, to_sync_queue, buff, std::move(callback) )) {
peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
close();
return;
Expand Down Expand Up @@ -1725,7 +1732,7 @@ namespace eosio {
}
}
block_sync_throttling = false;
auto sent = enqueue_block( sb, num );
auto sent = enqueue_block( sb, num, true );
block_sync_total_bytes_sent += sent;
block_sync_frame_bytes_sent += sent;
++peer_requested->last;
Expand Down Expand Up @@ -1883,29 +1890,30 @@ namespace eosio {

buffer_factory buff_factory;
const auto& send_buffer = buff_factory.get_send_buffer( m );
enqueue_buffer( to_msg_type_t(m.index()), send_buffer, 0, close_after_send );
enqueue_buffer( to_msg_type_t(m.index()), false, send_buffer, 0, close_after_send );
}

// called from connection strand
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num ) {
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num, bool to_sync_queue ) {
peer_dlog( this, "enqueue block ${num}", ("num", block_num) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );

block_buffer_factory buff_factory;
const auto& sb = buff_factory.get_send_buffer( b );
latest_blk_time = std::chrono::steady_clock::now();
enqueue_buffer( msg_type_t::signed_block, sb, block_num, no_reason);
enqueue_buffer( msg_type_t::signed_block, to_sync_queue, sb, block_num, no_reason);
return sb->size();
}

// called from connection strand
void connection::enqueue_buffer( msg_type_t net_msg,
bool to_sync_queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num, // only valid for net_msg == signed_block variant which
go_away_reason close_after_send)
{
connection_ptr self = shared_from_this();
queue_write(net_msg, send_buffer,
queue_write(net_msg, to_sync_queue, send_buffer,
[conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) {
if (ec) {
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message()));
Expand Down Expand Up @@ -2750,16 +2758,12 @@ namespace eosio {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::block_notice_message, send_buffer, 0, no_reason );
cp->enqueue_buffer( msg_type_t::block_notice_message, false, send_buffer, 0, no_reason );
});
return;
}
}

if( !add_peer_block( id, cp->connection_id ) ) {
fc_dlog( logger, "not bcast block ${b} to connection - ${cid}", ("b", bnum)("cid", cp->connection_id) );
return;
}

send_buffer_type sb = buff_factory.get_send_buffer( b );

boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() {
Expand All @@ -2768,6 +2772,7 @@ namespace eosio {
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::signed_block, sb, bnum, no_reason );
cp->enqueue_buffer( msg_type_t::signed_block, false, sb, bnum, no_reason );
}
});
} );
Expand All @@ -2781,7 +2786,7 @@ namespace eosio {
boost::asio::post(cp->strand, [cp, msg]() {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg_type_t::vote_message, msg, 0, no_reason );
cp->enqueue_buffer( msg_type_t::vote_message, false, msg, 0, no_reason );
});
return true;
} );
Expand All @@ -2802,7 +2807,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( msg_type_t::packed_transaction, sb, 0, no_reason );
cp->enqueue_buffer( msg_type_t::packed_transaction, false, sb, 0, no_reason );
} );
} );
}
Expand Down Expand Up @@ -3250,7 +3255,7 @@ namespace eosio {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} );

enqueue_buffer( msg_type_t::block_nack_message, send_buffer, 0, no_reason );
enqueue_buffer( msg_type_t::block_nack_message, false, send_buffer, 0, no_reason );
}

void net_plugin_impl::plugin_shutdown() {
Expand Down

0 comments on commit dd0417b

Please sign in to comment.