diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index b5280b12f0..ec23fbc3c8 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -627,6 +627,7 @@ namespace eosio { void reset() { fc::lock_guard g( _mtx ); _write_queue.clear(); + _sync_write_queue.clear(); _trx_write_queue.clear(); _write_queue_size = 0; _out_queue.clear(); @@ -635,6 +636,7 @@ namespace eosio { void clear_write_queue() { fc::lock_guard g( _mtx ); _write_queue.clear(); + _sync_write_queue.clear(); _trx_write_queue.clear(); _write_queue_size = 0; } @@ -650,17 +652,13 @@ namespace eosio { return _write_queue_size; } - bool is_out_queue_empty() const { - fc::lock_guard g( _mtx ); - return _out_queue.empty(); - } - // called from connection strand bool ready_to_send(uint32_t connection_id) const { fc::unique_lock g( _mtx ); // if out_queue is not empty then async_write is in progress - bool async_write_in_progress = !_out_queue.empty(); - bool ready = ((!_trx_write_queue.empty() || !_write_queue.empty()) && !async_write_in_progress); + const bool async_write_in_progress = !_out_queue.empty(); + const bool ready = !async_write_in_progress && + (!_sync_write_queue.empty() || !_write_queue.empty() || !_trx_write_queue.empty()); g.unlock(); if (async_write_in_progress) { fc_dlog(logger, "Connection - ${id} not ready to send data, async write in progress", ("id", connection_id)); @@ -669,12 +667,15 @@ namespace eosio { } // @param callback must not callback into queued_buffer - bool add_write_queue( const std::shared_ptr>& buff, - std::function callback, - msg_type_t net_msg ) { + bool add_write_queue(msg_type_t net_msg, + bool to_sync_queue, + const std::shared_ptr>& buff, + std::function callback) { fc::lock_guard g( _mtx ); if( net_msg == msg_type_t::packed_transaction ) { _trx_write_queue.push_back( {buff, std::move(callback)} ); + } else if (to_sync_queue) { + _sync_write_queue.push_back( {buff, std::move(callback)} ); } else { _write_queue.push_back( {buff, std::move(callback)} ); } @@ -687,11 +688,13 @@ namespace eosio { void fill_out_buffer( std::vector& bufs ) { fc::lock_guard g( _mtx ); - if( !_write_queue.empty() ) { // always send msgs from write_queue first + if (!_sync_write_queue.empty()) { // always send msgs from sync_write_queue first + fill_out_buffer( bufs, _sync_write_queue ); + } else if (!_write_queue.empty()) { // always send msgs from write_queue before trx queue fill_out_buffer( bufs, _write_queue ); } else { fill_out_buffer( bufs, _trx_write_queue ); - assert(_trx_write_queue.empty() && _write_queue.empty() && _write_queue_size == 0); + assert(_trx_write_queue.empty() && _write_queue.empty() && _sync_write_queue.empty() && _write_queue_size == 0); } } @@ -722,8 +725,9 @@ namespace eosio { alignas(hardware_destructive_interference_sz) mutable fc::mutex _mtx; - uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue and _trx_write_queue - deque _write_queue GUARDED_BY(_mtx); // queued messages, all messages except trxs + uint32_t _write_queue_size GUARDED_BY(_mtx) {0}; // size of _write_queue + _sync_write_queue + _trx_write_queue + deque _write_queue GUARDED_BY(_mtx); // queued messages, all messages except sync & trxs + deque _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue blocks will be sent first deque _trx_write_queue GUARDED_BY(_mtx); // queued trx messages, trx_write_queue will be sent last deque _out_queue GUARDED_BY(_mtx); // currently being async_write @@ -1006,8 +1010,9 @@ namespace eosio { void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num ); void enqueue( const net_message& msg ); - size_t enqueue_block( const std::vector& sb, uint32_t block_num ); + size_t enqueue_block( const std::vector& sb, uint32_t block_num, bool to_sync_queue ); void enqueue_buffer( msg_type_t net_msg, + bool to_sync_queue, const std::shared_ptr>& send_buffer, block_num_type block_num, go_away_reason close_after_send); @@ -1020,6 +1025,7 @@ namespace eosio { void sync_wait(); void queue_write(msg_type_t net_msg, + bool to_sync_queue, const std::shared_ptr>& buff, std::function callback); void do_queue_write(); @@ -1607,9 +1613,10 @@ namespace eosio { // called from connection strand void connection::queue_write(msg_type_t net_msg, + bool to_sync_queue, const std::shared_ptr>& buff, std::function callback) { - if( !buffer_queue.add_write_queue( buff, std::move(callback), net_msg )) { + if( !buffer_queue.add_write_queue( net_msg, to_sync_queue, buff, std::move(callback) )) { peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) ); close(); return; @@ -1725,7 +1732,7 @@ namespace eosio { } } block_sync_throttling = false; - auto sent = enqueue_block( sb, num ); + auto sent = enqueue_block( sb, num, true ); block_sync_total_bytes_sent += sent; block_sync_frame_bytes_sent += sent; ++peer_requested->last; @@ -1883,29 +1890,30 @@ namespace eosio { buffer_factory buff_factory; const auto& send_buffer = buff_factory.get_send_buffer( m ); - enqueue_buffer( to_msg_type_t(m.index()), send_buffer, 0, close_after_send ); + enqueue_buffer( to_msg_type_t(m.index()), false, send_buffer, 0, close_after_send ); } // called from connection strand - size_t connection::enqueue_block( const std::vector& b, uint32_t block_num ) { + size_t connection::enqueue_block( const std::vector& b, uint32_t block_num, bool to_sync_queue ) { peer_dlog( this, "enqueue block ${num}", ("num", block_num) ); verify_strand_in_this_thread( strand, __func__, __LINE__ ); block_buffer_factory buff_factory; const auto& sb = buff_factory.get_send_buffer( b ); latest_blk_time = std::chrono::steady_clock::now(); - enqueue_buffer( msg_type_t::signed_block, sb, block_num, no_reason); + enqueue_buffer( msg_type_t::signed_block, to_sync_queue, sb, block_num, no_reason); return sb->size(); } // called from connection strand void connection::enqueue_buffer( msg_type_t net_msg, + bool to_sync_queue, const std::shared_ptr>& send_buffer, block_num_type block_num, // only valid for net_msg == signed_block variant which go_away_reason close_after_send) { connection_ptr self = shared_from_this(); - queue_write(net_msg, send_buffer, + queue_write(net_msg, to_sync_queue, send_buffer, [conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) { if (ec) { fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message())); @@ -2750,16 +2758,12 @@ namespace eosio { cp->latest_blk_time = std::chrono::steady_clock::now(); peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) ); cp->enqueue_buffer( msg_type_t::block_notice_message, send_buffer, 0, no_reason ); + cp->enqueue_buffer( msg_type_t::block_notice_message, false, send_buffer, 0, no_reason ); }); return; } } - if( !add_peer_block( id, cp->connection_id ) ) { - fc_dlog( logger, "not bcast block ${b} to connection - ${cid}", ("b", bnum)("cid", cp->connection_id) ); - return; - } - send_buffer_type sb = buff_factory.get_send_buffer( b ); boost::asio::post(cp->strand, [cp, bnum, sb{std::move(sb)}]() { @@ -2768,6 +2772,7 @@ namespace eosio { if( !has_block ) { peer_dlog( cp, "bcast block ${b}", ("b", bnum) ); cp->enqueue_buffer( msg_type_t::signed_block, sb, bnum, no_reason ); + cp->enqueue_buffer( msg_type_t::signed_block, false, sb, bnum, no_reason ); } }); } ); @@ -2781,7 +2786,7 @@ namespace eosio { boost::asio::post(cp->strand, [cp, msg]() { if (vote_logger.is_enabled(fc::log_level::debug)) peer_dlog(cp, "sending vote msg"); - cp->enqueue_buffer( msg_type_t::vote_message, msg, 0, no_reason ); + cp->enqueue_buffer( msg_type_t::vote_message, false, msg, 0, no_reason ); }); return true; } ); @@ -2802,7 +2807,7 @@ namespace eosio { send_buffer_type sb = buff_factory.get_send_buffer( trx ); fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) ); boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() { - cp->enqueue_buffer( msg_type_t::packed_transaction, sb, 0, no_reason ); + cp->enqueue_buffer( msg_type_t::packed_transaction, false, sb, 0, no_reason ); } ); } ); } @@ -3250,7 +3255,7 @@ namespace eosio { buffer_factory buff_factory; auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} ); - enqueue_buffer( msg_type_t::block_nack_message, send_buffer, 0, no_reason ); + enqueue_buffer( msg_type_t::block_nack_message, false, send_buffer, 0, no_reason ); } void net_plugin_impl::plugin_shutdown() {