Skip to content

Commit

Permalink
GH-3 Simplify vote_processor by processing votes on a first-come-firs…
Browse files Browse the repository at this point in the history
…t-serve basis.
  • Loading branch information
heifner committed Apr 17, 2024
1 parent 83434d1 commit 042ce30
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 118 deletions.
2 changes: 2 additions & 0 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3765,6 +3765,8 @@ struct controller_impl {

if (conf.terminate_at_block == 0 || bsp->block_num() <= conf.terminate_at_block) {
forkdb.add(bsp, mark_valid_t::no, ignore_duplicate_t::yes);
if constexpr (savanna_mode)
vote_processor.notify_new_block();
}

return block_handle{bsp};
Expand Down
220 changes: 125 additions & 95 deletions libraries/chain/include/eosio/chain/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class vote_processor_t {
// Even 3000 vote structs are less than 1MB per connection.
// 2500 is should never be reached unless a specific connection is sending garbage.
static constexpr size_t max_votes_per_connection = 2500;
static constexpr std::chrono::milliseconds block_wait_time{10};
// If we have not processed a vote in this amount of time, give up on it.
static constexpr fc::microseconds too_old = fc::seconds(5);

struct by_block_num;
struct by_connection;
struct by_vote;
struct by_last_received;

struct vote {
uint32_t connection_id;
fc::time_point received;
vote_message_ptr msg;

const block_id_type& id() const { return msg->block_id; }
Expand All @@ -35,13 +37,9 @@ class vote_processor_t {

using vote_index_type = boost::multi_index_container< vote,
indexed_by<
ordered_non_unique<tag<by_block_num>,
composite_key<vote,
const_mem_fun<vote, block_num_type, &vote::block_num>,
const_mem_fun<vote, const block_id_type&, &vote::id>
>, composite_key_compare< std::greater<>, sha256_less > // greater for block_num
>,
ordered_non_unique< tag<by_connection>, member<vote, uint32_t, &vote::connection_id> >
ordered_non_unique< tag<by_block_num>, const_mem_fun<vote, block_num_type, &vote::block_num>, std::greater<> >,
ordered_non_unique< tag<by_connection>, member<vote, uint32_t, &vote::connection_id> >,
ordered_non_unique< tag<by_last_received>, member<vote, fc::time_point, &vote::received> >
>
>;

Expand All @@ -51,12 +49,14 @@ class vote_processor_t {
fetch_block_func_t fetch_block_func;

std::mutex mtx;
std::condition_variable cv;
vote_index_type index;
block_state_ptr last_bsp;
// connection, count of messages
std::map<uint32_t, uint16_t> num_messages;

std::atomic<block_num_type> lib{0};
std::atomic<block_num_type> largest_known_block_num{0};
std::atomic<uint32_t> queued_votes{0};
std::atomic<bool> stopped{true};
named_thread_pool<vote> thread_pool;

Expand All @@ -83,53 +83,111 @@ class vote_processor_t {
}
}

// called with unlocked mtx
void emit(uint32_t connection_id, vote_status status, const vote_message_ptr& msg) {
if (connection_id != 0) { // this nodes vote was already signaled
emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)} );
if (status != vote_status::duplicate) { // don't bother emitting duplicates
emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)} );
}
}
}

// called with locked mtx
void remove_connection(uint32_t connection_id) {
auto& idx = index.get<by_connection>();
idx.erase(idx.lower_bound(connection_id), idx.upper_bound(connection_id));
}

// called with locked mtx
void remove_before_lib() {
auto& idx = index.get<by_block_num>();
idx.erase(idx.lower_bound(lib.load()), idx.end()); // descending
// don't decrement num_messages as too many before lib should be considered an error
}

bool remove_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && it->id() == id) {
if (auto& num = num_messages[it->connection_id]; num != 0)
--num;
// called with locked mtx
void remove_too_old() {
auto& idx = index.get<by_last_received>();
fc::time_point vote_too_old = fc::time_point::now() - too_old;
idx.erase(idx.lower_bound(fc::time_point::min()), idx.upper_bound(vote_too_old));
// don't decrement num_messages as too many that are too old should be considered an error
}

it = idx.erase(it);
// called with locked mtx
void queue_for_later(uint32_t connection_id, const vote_message_ptr& msg) {
fc::time_point now = fc::time_point::now();
remove_before_lib();
remove_too_old();
index.insert(vote{.connection_id = connection_id, .received = now, .msg = msg});
}

// called with locked mtx
void process_any_queued_for_later(std::unique_lock<std::mutex>& g) {
if (index.empty())
return;
remove_too_old();
remove_before_lib();
auto& idx = index.get<by_last_received>();
std::vector<vote> unprocessed;
unprocessed.reserve(std::min<size_t>(21u, idx.size())); // maybe increase if we increase # of finalizers from 21
for (auto i = idx.begin(); i != idx.end();) {
if (stopped)
return;
vote v = *i;
idx.erase(i);
auto bsp = get_block(i->msg->block_id, g);
// g is unlocked
if (bsp) {
vote_status s = bsp->aggregate_vote(*v.msg);
emit(v.connection_id, s, v.msg);

g.lock();
if (auto& num = num_messages[v.connection_id]; num != 0)
--num;
} else {
unprocessed.push_back(std::move(v));
g.lock();
}
i = idx.begin(); // need to update since unlocked in loop
}
for (auto& v : unprocessed) {
index.insert(std::move(v));
}
return it == idx.end();
}

bool skip_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && it->id() == id) {
++it;
// called with locked mtx, returns with unlocked mtx
block_state_ptr get_block(const block_id_type& id, std::unique_lock<std::mutex>& g) {
block_state_ptr bsp;
if (last_bsp && last_bsp->id() == id) {
bsp = last_bsp;
}
return it == idx.end();
g.unlock();

if (!bsp) {
bsp = fetch_block_func(id);
if (bsp) {
g.lock();
last_bsp = bsp;
largest_known_block_num = std::max(bsp->block_num(), largest_known_block_num.load());
g.unlock();
}
}
return bsp;
}

public:
explicit vote_processor_t(vote_signal_t& vote_signal, fetch_block_func_t&& get_block)
: vote_signal(vote_signal)
, fetch_block_func(get_block)
{}
{
assert(get_block);
}

~vote_processor_t() {
stopped = true;
std::lock_guard g(mtx);
cv.notify_one();
}

size_t size() {
size_t index_size() {
std::lock_guard g(mtx);
return index.size();
}
Expand All @@ -140,98 +198,70 @@ class vote_processor_t {

stopped = false;
thread_pool.start( num_threads, std::move(on_except));

// one coordinator thread
boost::asio::post(thread_pool.get_executor(), [&]() {
block_id_type not_in_forkdb_id{};
while (!stopped) {
std::unique_lock g(mtx);
cv.wait(g, [&]() {
return !index.empty() || stopped;
});
if (stopped)
break;
remove_before_lib();
if (index.empty()) {
num_messages.clear();
continue;
}
auto& idx = index.get<by_block_num>();
if (auto i = idx.begin(); i != idx.end() && not_in_forkdb_id == i->id()) { // same block as last while loop
g.unlock();
std::this_thread::sleep_for(block_wait_time);
g.lock();
}
for (auto i = idx.begin(); i != idx.end();) {
if (stopped)
break;
auto& vt = *i;
block_state_ptr bsp = fetch_block_func(vt.id());
if (bsp) {
if (!bsp->is_proper_svnn_block()) {
if (remove_all_for_block(idx, i, bsp->id()))
break;
continue;
}
auto iter_of_bsp = i;
std::vector<vote> to_process;
to_process.reserve(std::min<size_t>(21u, idx.size())); // increase if we increase # of finalizers from 21
for(; i != idx.end() && bsp->id() == i->id(); ++i) {
// although it is the highest contention on block state pending mutex posting all of the same bsp,
// the highest priority is processing votes for this block state.
to_process.push_back(*i);
}
bool should_break = remove_all_for_block(idx, iter_of_bsp, bsp->id());
g.unlock(); // do not hold lock when posting
for (auto& v : to_process) {
boost::asio::post(thread_pool.get_executor(), [this, bsp, v=std::move(v)]() {
vote_status s = bsp->aggregate_vote(*v.msg);
if (s != vote_status::duplicate) { // don't bother emitting duplicates
emit(v.connection_id, s, v.msg);
}
});
}
if (should_break)
break;
g.lock();
i = idx.begin();
} else {
not_in_forkdb_id = vt.id();
if (skip_all_for_block(idx, i, i->id()))
break;
}
}
}
dlog("Exiting vote processor coordinator thread");
});
}

// called from main thread
void notify_lib(block_num_type block_num) {
lib = block_num;
}

// called from net threads
void notify_new_block() {
// would require a mtx lock to check if index is empty, post check to thread_pool
boost::asio::post(thread_pool.get_executor(), [this] {
std::unique_lock g(mtx);
process_any_queued_for_later(g);
});
}

/// called from net threads and controller's thread pool
/// msg is ignored vote_processor not start()ed
void process_vote_message(uint32_t connection_id, const vote_message_ptr& msg) {
if (stopped)
return;
assert(msg);
block_num_type msg_block_num = block_header::num_from_id(msg->block_id);
if (msg_block_num <= lib.load(std::memory_order_relaxed))
return;
++queued_votes;
boost::asio::post(thread_pool.get_executor(), [this, connection_id, msg] {
if (stopped)
return;
auto num_queued_votes = --queued_votes;
bool reset_num_messages = num_queued_votes == 0; // caught up, so clear num_messages
if (block_header::num_from_id(msg->block_id) <= lib.load(std::memory_order_relaxed))
return; // ignore any votes lower than lib
std::unique_lock g(mtx);
if (reset_num_messages)
num_messages.clear();
if (++num_messages[connection_id] > max_votes_per_connection) {
// consider the connection invalid, remove all votes of connection
// don't clear num_messages[connection_id] so we keep reporting max_exceeded until index is drained
remove_connection(connection_id);
g.unlock();
// drop, too many from this connection to process, consider connection invalid
// don't clear num_messages[connection_id] so we keep reporting max_exceeded until index is drained

elog("Exceeded max votes per connection for ${c}", ("c", connection_id));
emit(connection_id, vote_status::max_exceeded, msg);
} else if (block_header::num_from_id(msg->block_id) < lib.load(std::memory_order_relaxed)) {
// ignore
} else {
index.insert(vote{.connection_id = connection_id, .msg = msg});
cv.notify_one();
block_state_ptr bsp = get_block(msg->block_id, g);
// g is unlocked

if (!bsp) {
// queue up for later processing
g.lock();
queue_for_later(connection_id, msg);
} else {
vote_status s = bsp->aggregate_vote(*msg);
emit(connection_id, s, msg);

g.lock();
if (auto& num = num_messages[connection_id]; num != 0)
--num;

process_any_queued_for_later(g);
}
}

});
}

Expand Down
5 changes: 1 addition & 4 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,8 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) {
chain_config->vote_thread_pool_size = options.count("vote-threads") ? options.at("vote-threads").as<uint16_t>() : 0;
if (chain_config->vote_thread_pool_size == 0 && options.count("producer-name")) {
chain_config->vote_thread_pool_size = config::default_vote_thread_pool_size;
ilog("Setting vote-threads to ${n} on producing node", ("n", chain_config->vote_thread_pool_size));
}
EOS_ASSERT( chain_config->vote_thread_pool_size > 1 || chain_config->vote_thread_pool_size == 0, plugin_config_exception,
"vote-threads ${num} must be greater than 1, or equal to 0 to disable. "
"Voting disabled if set to 0 (votes are not propagatged on P2P network).",
("num", chain_config->vote_thread_pool_size) );
accept_votes = chain_config->vote_thread_pool_size > 0;
}

Expand Down
Loading

0 comments on commit 042ce30

Please sign in to comment.