Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IF: Process votes in a dedicated thread pool #24

Merged
merged 42 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7fc69ac
GH-3 Move vote processing off net threads into a dedicated thread pool
heifner Apr 10, 2024
c3a9c95
GH-3 Track num_messages per connection
heifner Apr 11, 2024
6b0c4c4
GH-3 Make vote processor thread pool size configurable including disa…
heifner Apr 11, 2024
fba5561
GH-3 Fix issue with not re-locking after unlock
heifner Apr 11, 2024
1dcf7c1
GH-3 Shutdown nodeos on vote thread pool exception
heifner Apr 11, 2024
295f7d4
GH-3 Add unittest for vote_processor. Modify vote_processor to make i…
heifner Apr 11, 2024
18c2fca
GH-3 std::latch not available on all platforms; use std::atomic and f…
heifner Apr 11, 2024
d5cef78
GH-3 ci/cd is slow, allow more time.
heifner Apr 11, 2024
da0106b
GH-3 Make test more robust by checking that node has received a hands…
heifner Apr 11, 2024
bf545e3
GH-3 Optimization: Don't emit duplicate votes
heifner Apr 11, 2024
d1fe636
GH-3 Fix: num_messages.clear() called before decrement
heifner Apr 11, 2024
f3ff3e8
GH-3 Avoid vote_message copies by using shared_ptr
heifner Apr 12, 2024
ae02196
GH-3 Avoid vote_message copies by using shared_ptr
heifner Apr 12, 2024
a6e4de8
GH-3 Fix tests now that duplicates are not signaled
heifner Apr 12, 2024
382f648
GH-3 Fix use of packed_transaction_ptr over shared_ptr<packed_transac…
heifner Apr 12, 2024
508b584
GH-3 There are three producers A,B,C if the test happens to hit this …
heifner Apr 12, 2024
4d70ab3
GH-3 Test failed on asan run because it took 55ms to load the WASM. I…
heifner Apr 12, 2024
23b5739
Merge remote-tracking branch 'spring/savanna' into GH-3-process-votes
heifner Apr 12, 2024
1641a07
GH-3 Report known lib instead of fork db root as these log statements…
heifner Apr 12, 2024
4b4eca6
GH-3 Change default --vote-threads to 0 to be disabled by default. EO…
heifner Apr 15, 2024
895cef7
GH-3 --vote-threads needed for all producers
heifner Apr 15, 2024
796dc5b
Merge remote-tracking branch 'spring/savanna' into GH-3-process-votes
heifner Apr 16, 2024
c3bdd8a
GH-12 Add vote threads needed for unittests
heifner Apr 16, 2024
e4010d2
GH-12 Add vote threads
heifner Apr 16, 2024
bf8fe18
GH-12 Add vote threads to tests
heifner Apr 16, 2024
8bfbe0a
GH-3 Modify named_thread_pool to be a no-op if given 0 for num_threads.
heifner Apr 16, 2024
117f02d
GH-3 Check for stopped in inner loop
heifner Apr 16, 2024
027d278
GH-3 Add better descriptions
heifner Apr 16, 2024
8f26a50
GH-3 Default vote-threads to 4 when a block producer
heifner Apr 16, 2024
6cc6820
GH-3 Simplify by adding a vote_signal_t type
heifner Apr 16, 2024
166a580
GH-3 Use chain pluging accept_votes to init p2p_accept_votes
heifner Apr 16, 2024
83434d1
GH-3 Add vote-threads to all nodes so bridge nodes process votes
heifner Apr 16, 2024
042ce30
GH-3 Simplify vote_processor by processing votes on a first-come-firs…
heifner Apr 17, 2024
d80def5
GH-3 Fix use of iterator after erase
heifner Apr 17, 2024
317dd61
GH-3 Remove unneeded if
heifner Apr 18, 2024
955a72a
GH-3 Move emit to controller.hpp and use in vote_processor
heifner Apr 18, 2024
4a759b3
GH-3 Use same value as default for producer
heifner Apr 18, 2024
51503c4
GH-3 Use unordered_map
heifner Apr 18, 2024
b694aad
GH-3 Add better log message
heifner Apr 18, 2024
344b778
GH-3 Do not clear num_messages if there are votes in the index to be …
heifner Apr 18, 2024
ecc8cd4
GH-3 More descriptive emit logs
heifner Apr 18, 2024
8d3a838
GH-3 Fix spelling
heifner Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <eosio/chain/hotstuff/finalizer.hpp>
#include <eosio/chain/hotstuff/finalizer_policy.hpp>
#include <eosio/chain/hotstuff/hotstuff.hpp>
#include <eosio/chain/vote_processor.hpp>

#include <chainbase/chainbase.hpp>
#include <eosio/vm/allocator.hpp>
Expand Down Expand Up @@ -946,7 +947,14 @@ struct controller_impl {
signal<void(const block_signal_params&)> accepted_block;
signal<void(const block_signal_params&)> irreversible_block;
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)> applied_transaction;
signal<void(const vote_message&)> voted_block;
signal<void(const vote_signal_params&)> voted_block;

vote_processor_t vote_processor{voted_block,
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
[this](const block_id_type& id) -> block_state_ptr {
return fork_db.apply_s<block_state_ptr>([&](const auto& forkdb) {
return forkdb.get_block(id);
});
}};

int64_t set_proposed_producers( vector<producer_authority> producers );
int64_t set_proposed_producers_legacy( vector<producer_authority> producers );
Expand Down Expand Up @@ -1191,10 +1199,16 @@ struct controller_impl {
my_finalizers(fc::time_point::now(), cfg.finalizers_dir / "safety.dat"),
wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() )
{
thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) {
thread_pool.start( cfg.chain_thread_pool_size, [this]( const fc::exception& e ) {
linh2931 marked this conversation as resolved.
Show resolved Hide resolved
elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );
if (cfg.vote_thread_pool_size > 0) {
vote_processor.start(cfg.vote_thread_pool_size, [this]( const fc::exception& e ) {
elog( "Exception in vote thread pool, exiting: ${e}", ("e", e.to_detail_string()) );
if( shutdown ) shutdown();
} );
}

set_activation_handler<builtin_protocol_feature_t::preactivate_feature>();
set_activation_handler<builtin_protocol_feature_t::replace_deferred>();
Expand All @@ -1214,6 +1228,7 @@ struct controller_impl {
irreversible_block.connect([this](const block_signal_params& t) {
const auto& [ block, id] = t;
wasmif.current_lib(block->block_num());
vote_processor.notify_lib(block->block_num());
});


Expand Down Expand Up @@ -3552,19 +3567,10 @@ struct controller_impl {


// called from net threads and controller's thread pool
vote_status process_vote_message( const vote_message& vote ) {
// only aggregate votes on proper if blocks
auto aggregate_vote = [&vote](auto& forkdb) -> vote_status {
auto bsp = forkdb.get_block(vote.block_id);
if (bsp && bsp->block->is_proper_svnn_block()) {
return bsp->aggregate_vote(vote);
}
return vote_status::unknown_block;
};
auto aggregate_vote_legacy = [](auto&) -> vote_status {
return vote_status::unknown_block;
};
return fork_db.apply<vote_status>(aggregate_vote_legacy, aggregate_vote);
void process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) {
if (conf.vote_thread_pool_size > 0) {
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
vote_processor.process_vote_message(connection_id, vote);
}
}

bool node_has_voted_if_finalizer(const block_id_type& id) const {
Expand All @@ -3591,13 +3597,12 @@ struct controller_impl {

// Each finalizer configured on the node which is present in the active finalizer policy may create and sign a vote.
my_finalizers.maybe_vote(
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) {
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message_ptr& vote) {
// net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal
emit(voted_block, vote);
emit(voted_block, std::tuple{uint32_t{0}, vote_status::success, std::cref(vote)});
linh2931 marked this conversation as resolved.
Show resolved Hide resolved

// also aggregate our own vote into the pending_qc for this block.
boost::asio::post(thread_pool.get_executor(),
[control = this, vote]() { control->process_vote_message(vote); });
process_vote_message(0, vote);
});
}

Expand Down Expand Up @@ -5254,8 +5259,8 @@ void controller::set_proposed_finalizers( finalizer_policy&& fin_pol ) {
}

// called from net threads
vote_status controller::process_vote_message( const vote_message& vote ) {
return my->process_vote_message( vote );
void controller::process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) {
my->process_vote_message( connection_id, vote );
};

bool controller::node_has_voted_if_finalizer(const block_id_type& id) const {
Expand Down Expand Up @@ -5538,7 +5543,7 @@ signal<void(const block_signal_params&)>& controller::accepted_block_header() {
signal<void(const block_signal_params&)>& controller::accepted_block() { return my->accepted_block; }
signal<void(const block_signal_params&)>& controller::irreversible_block() { return my->irreversible_block; }
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& controller::applied_transaction() { return my->applied_transaction; }
signal<void(const vote_message&)>& controller::voted_block() { return my->voted_block; }
signal<void(const vote_signal_params&)>& controller::voted_block() { return my->voted_block; }

chain_id_type controller::extract_chain_id(snapshot_reader& snapshot) {
chain_snapshot_header header;
Expand Down
8 changes: 4 additions & 4 deletions libraries/chain/hotstuff/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ finalizer::vote_result finalizer::decide_vote(const block_state_ptr& bsp) {
}

// ----------------------------------------------------------------------------------------
std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
vote_message_ptr finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
finalizer::vote_decision decision = decide_vote(bsp).decision;
if (decision == vote_decision::strong_vote || decision == vote_decision::weak_vote) {
bls_signature sig;
Expand All @@ -99,7 +99,7 @@ std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
} else {
sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()});
}
return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }};
return std::make_shared<vote_message>(bsp->id(), decision == vote_decision::strong_vote, pub_key, sig);
}
return {};
}
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace eosio::chain {
// When block header is validated in block_header_state's next(),
// it is already validate if schedule_version == proper_svnn_schedule_version,
// finality extension must exist.
bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); }
bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); }

header_extension_multimap validate_and_extract_header_extensions()const;
std::optional<block_header_extension> extract_header_extension(uint16_t extension_id)const;
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eosio/chain/block_header_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct block_header_state {
digest_type compute_finality_digest() const;

// Returns true if the block is a Proper Savanna Block
bool is_proper_svnn_block() const;
bool is_proper_svnn_block() const { return header.is_proper_svnn_block(); }

// block descending from this need the provided qc in the block extension
bool is_needed(const qc_claim_t& qc_claim) const {
Expand Down
1 change: 1 addition & 0 deletions libraries/chain/include/eosio/chain/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ const static uint16_t default_max_auth_depth = 6;
const static uint32_t default_sig_cpu_bill_pct = 50 * percent_1; // billable percentage of signature recovery
const static uint32_t default_produce_block_offset_ms = 450;
const static uint16_t default_controller_thread_pool_size = 2;
const static uint16_t default_vote_thread_pool_size = 4;
const static uint32_t default_max_variable_signature_length = 16384u;
const static uint32_t default_max_action_return_value_size = 256;

Expand Down
11 changes: 6 additions & 5 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace eosio::chain {
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

using block_signal_params = std::tuple<const signed_block_ptr&, const block_id_type&>;
using vote_signal_params = std::tuple<uint32_t, vote_status, const vote_message_ptr&>;

enum class db_read_mode {
HEAD,
Expand Down Expand Up @@ -87,7 +88,8 @@ namespace eosio::chain {
uint64_t state_size = chain::config::default_state_size;
uint64_t state_guard_size = chain::config::default_state_guard_size;
uint32_t sig_cpu_bill_pct = chain::config::default_sig_cpu_bill_pct;
uint16_t thread_pool_size = chain::config::default_controller_thread_pool_size;
uint16_t chain_thread_pool_size = chain::config::default_controller_thread_pool_size;
uint16_t vote_thread_pool_size = chain::config::default_vote_thread_pool_size;
bool read_only = false;
bool force_all_checks = false;
bool disable_replay_opts = false;
Expand Down Expand Up @@ -326,7 +328,7 @@ namespace eosio::chain {
// called by host function set_finalizers
void set_proposed_finalizers( finalizer_policy&& fin_pol );
// called from net threads
vote_status process_vote_message( const vote_message& msg );
void process_vote_message( uint32_t connection_id, const vote_message_ptr& msg );
// thread safe, for testing
bool node_has_voted_if_finalizer(const block_id_type& id) const;

Expand Down Expand Up @@ -373,9 +375,8 @@ namespace eosio::chain {
signal<void(const block_signal_params&)>& accepted_block();
signal<void(const block_signal_params&)>& irreversible_block();
signal<void(std::tuple<const transaction_trace_ptr&, const packed_transaction_ptr&>)>& applied_transaction();

// Unlike other signals, voted_block can be signaled from other threads than the main thread.
signal<void(const vote_message&)>& voted_block();
// Unlike other signals, voted_block is signaled from other threads than the main thread.
signal<void(const vote_signal_params&)>& voted_block();

const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const;
wasm_interface& get_wasm_interface();
Expand Down
9 changes: 4 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ namespace eosio::chain {
finalizer_safety_information fsi;

vote_result decide_vote(const block_state_ptr& bsp);
std::optional<vote_message> maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp,
const digest_type& digest);
vote_message_ptr maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp, const digest_type& digest);
};

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -95,7 +94,7 @@ namespace eosio::chain {
if (finalizers.empty())
return;

std::vector<vote_message> votes;
std::vector<vote_message_ptr> votes;
votes.reserve(finalizers.size());

// Possible improvement in the future, look at locking only individual finalizers and releasing the lock for writing the file.
Expand All @@ -105,9 +104,9 @@ namespace eosio::chain {
// first accumulate all the votes
for (const auto& f : fin_pol.finalizers) {
if (auto it = finalizers.find(f.public_key); it != finalizers.end()) {
std::optional<vote_message> vote_msg = it->second.maybe_vote(it->first, bsp, digest);
vote_message_ptr vote_msg = it->second.maybe_vote(it->first, bsp, digest);
if (vote_msg)
votes.push_back(std::move(*vote_msg));
votes.push_back(std::move(vote_msg));
}
}
// then save the safety info and, if successful, gossip the votes
Expand Down
16 changes: 11 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ namespace eosio::chain {
bool strong{false};
bls_public_key finalizer_key;
bls_signature sig;

auto operator<=>(const vote_message&) const = default;
bool operator==(const vote_message&) const = default;
};

using vote_message_ptr = std::shared_ptr<vote_message>;

enum class vote_status {
success,
duplicate,
unknown_public_key,
invalid_signature,
unknown_block
duplicate, // duplicate vote, expected as votes arrive on multiple connections
unknown_public_key, // public key is invalid, indicates invalid vote
invalid_signature, // signature is invalid, indicates invalid vote
unknown_block, // block not available, possibly less than LIB, or too far in the future
max_exceeded // received too many votes for a connection
};

using bls_public_key = fc::crypto::blslib::bls_public_key;
Expand Down Expand Up @@ -159,7 +165,7 @@ namespace eosio::chain {


FC_REFLECT(eosio::chain::vote_message, (block_id)(strong)(finalizer_key)(sig));
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block))
FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)(max_exceeded))
FC_REFLECT(eosio::chain::valid_quorum_certificate, (_strong_votes)(_weak_votes)(_sig));
FC_REFLECT(eosio::chain::pending_quorum_certificate, (_valid_qc)(_quorum)(_max_weak_sum_before_weak_final)(_state)(_strong_sum)(_weak_sum)(_weak_votes)(_strong_votes));
FC_REFLECT_ENUM(eosio::chain::pending_quorum_certificate::state_t, (unrestricted)(restricted)(weak_achieved)(weak_final)(strong));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <eosio/chain/transaction_metadata.hpp>
#include <eosio/chain/trace.hpp>
#include <eosio/chain/block_state_legacy.hpp>
#include <eosio/chain/exceptions.hpp>

#include <boost/multi_index_container.hpp>
Expand Down
Loading
Loading