Skip to content

Commit

Permalink
GH-1091 Use an enum instead of a bool for to_sync_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Jan 17, 2025
1 parent f414b80 commit 4e5acfa
Showing 1 changed file with 23 additions and 22 deletions.
45 changes: 23 additions & 22 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,18 +667,19 @@ namespace eosio {
return ready;
}

enum class queue_t { block_sync, general };
// @param callback must not callback into queued_buffer
bool add_write_queue(msg_type_t net_msg,
bool to_sync_queue,
queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
fc::lock_guard g( _mtx );
if( net_msg == msg_type_t::packed_transaction ) {
_trx_write_queue.push_back( {buff, std::move(callback)} );
} else if (to_sync_queue) {
_sync_write_queue.push_back( {buff, std::move(callback)} );
_trx_write_queue.emplace_back( buff, std::move(callback) );
} else if (queue == queue_t::block_sync) {
_sync_write_queue.emplace_back( buff, std::move(callback) );
} else {
_write_queue.push_back( {buff, std::move(callback)} );
_write_queue.emplace_back( buff, std::move(callback) );
}
_write_queue_size += buff->size();
if( _write_queue_size > 2 * def_max_write_queue_size ) {
Expand Down Expand Up @@ -923,7 +924,7 @@ namespace eosio {

// block nack support
static constexpr uint16_t consecutive_block_nacks_threshold{2}; // stop sending blocks when reached
uint16_t consecutive_blocks_nacks{0};
block_num_type consecutive_blocks_nacks{0};
block_id_type last_block_nack;
block_id_type last_block_notice;
request_message last_request_message GUARDED_BY(conn_mtx);
Expand Down Expand Up @@ -1012,9 +1013,9 @@ namespace eosio {
void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num );

void enqueue( const net_message& msg );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num, bool to_sync_queue );
size_t enqueue_block( const std::vector<char>& sb, uint32_t block_num, queued_buffer::queue_t queue );
void enqueue_buffer( msg_type_t net_msg,
bool to_sync_queue,
queued_buffer::queue_t queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num,
go_away_reason close_after_send);
Expand All @@ -1027,7 +1028,7 @@ namespace eosio {
void sync_wait();

void queue_write(msg_type_t net_msg,
bool to_sync_queue,
queued_buffer::queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback);
void do_queue_write();
Expand Down Expand Up @@ -1620,10 +1621,10 @@ namespace eosio {

// called from connection strand
void connection::queue_write(msg_type_t net_msg,
bool to_sync_queue,
queued_buffer::queue_t queue,
const std::shared_ptr<vector<char>>& buff,
std::function<void(boost::system::error_code, std::size_t)> callback) {
if( !buffer_queue.add_write_queue( net_msg, to_sync_queue, buff, std::move(callback) )) {
if( !buffer_queue.add_write_queue( net_msg, queue, buff, std::move(callback) )) {
peer_wlog( this, "write_queue full ${s} bytes, giving up on connection", ("s", buffer_queue.write_queue_size()) );
close();
return;
Expand Down Expand Up @@ -1739,7 +1740,7 @@ namespace eosio {
}
}
block_sync_throttling = false;
auto sent = enqueue_block( sb, num, true );
auto sent = enqueue_block( sb, num, queued_buffer::queue_t::block_sync );
block_sync_total_bytes_sent += sent;
block_sync_frame_bytes_sent += sent;
++peer_requested->last;
Expand Down Expand Up @@ -1897,30 +1898,30 @@ namespace eosio {

buffer_factory buff_factory;
const auto& send_buffer = buff_factory.get_send_buffer( m );
enqueue_buffer( to_msg_type_t(m.index()), false, send_buffer, 0, close_after_send );
enqueue_buffer( to_msg_type_t(m.index()), queued_buffer::queue_t::general, send_buffer, 0, close_after_send );
}

// called from connection strand
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num, bool to_sync_queue ) {
size_t connection::enqueue_block( const std::vector<char>& b, uint32_t block_num, queued_buffer::queue_t queue ) {
peer_dlog( this, "enqueue block ${num}", ("num", block_num) );
verify_strand_in_this_thread( strand, __func__, __LINE__ );

block_buffer_factory buff_factory;
const auto& sb = buff_factory.get_send_buffer( b );
latest_blk_time = std::chrono::steady_clock::now();
enqueue_buffer( msg_type_t::signed_block, to_sync_queue, sb, block_num, no_reason);
enqueue_buffer( msg_type_t::signed_block, queue, sb, block_num, no_reason);
return sb->size();
}

// called from connection strand
void connection::enqueue_buffer( msg_type_t net_msg,
bool to_sync_queue,
queued_buffer::queue_t queue,
const std::shared_ptr<std::vector<char>>& send_buffer,
block_num_type block_num, // only valid for net_msg == signed_block variant which
go_away_reason close_after_send)
{
connection_ptr self = shared_from_this();
queue_write(net_msg, to_sync_queue, send_buffer,
queue_write(net_msg, queue, send_buffer,
[conn{std::move(self)}, close_after_send, net_msg, block_num](boost::system::error_code ec, std::size_t ) {
if (ec) {
fc_elog(logger, "Connection - ${cid} - send failed with: ${e}", ("cid", conn->connection_id)("e", ec.message()));
Expand Down Expand Up @@ -2770,7 +2771,7 @@ namespace eosio {
boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::block_notice_message, false, send_buffer, 0, no_reason );
cp->enqueue_buffer( msg_type_t::block_notice_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason );
});
return;
}
Expand All @@ -2784,7 +2785,7 @@ namespace eosio {
bool has_block = cp->peer_fork_db_root_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
cp->enqueue_buffer( msg_type_t::signed_block, false, sb, bnum, no_reason );
cp->enqueue_buffer( msg_type_t::signed_block, queued_buffer::queue_t::general, sb, bnum, no_reason );
}
});
} );
Expand All @@ -2798,7 +2799,7 @@ namespace eosio {
boost::asio::post(cp->strand, [cp, msg]() {
if (vote_logger.is_enabled(fc::log_level::debug))
peer_dlog(cp, "sending vote msg");
cp->enqueue_buffer( msg_type_t::vote_message, false, msg, 0, no_reason );
cp->enqueue_buffer( msg_type_t::vote_message, queued_buffer::queue_t::general, msg, 0, no_reason );
});
return true;
} );
Expand All @@ -2819,7 +2820,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( trx );
fc_dlog( logger, "sending trx: ${id}, to connection - ${cid}", ("id", trx->id())("cid", cp->connection_id) );
boost::asio::post(cp->strand, [cp, sb{std::move(sb)}]() {
cp->enqueue_buffer( msg_type_t::packed_transaction, false, sb, 0, no_reason );
cp->enqueue_buffer( msg_type_t::packed_transaction, queued_buffer::queue_t::general, sb, 0, no_reason );
} );
} );
}
Expand Down Expand Up @@ -3267,7 +3268,7 @@ namespace eosio {
buffer_factory buff_factory;
auto send_buffer = buff_factory.get_send_buffer( block_nack_message{block_id} );

enqueue_buffer( msg_type_t::block_nack_message, false, send_buffer, 0, no_reason );
enqueue_buffer( msg_type_t::block_nack_message, queued_buffer::queue_t::general, send_buffer, 0, no_reason );
}

void net_plugin_impl::plugin_shutdown() {
Expand Down

0 comments on commit 4e5acfa

Please sign in to comment.