diff --git a/libraries/chain/controller.cpp b/libraries/chain/controller.cpp index 613a7fb16b..a61bcc1413 100644 --- a/libraries/chain/controller.cpp +++ b/libraries/chain/controller.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include @@ -946,7 +947,14 @@ struct controller_impl { signal accepted_block; signal irreversible_block; signal)> applied_transaction; - signal voted_block; + vote_signal_t voted_block; + + vote_processor_t vote_processor{voted_block, + [this](const block_id_type& id) -> block_state_ptr { + return fork_db.apply_s([&](const auto& forkdb) { + return forkdb.get_block(id); + }); + }}; int64_t set_proposed_producers( vector producers ); int64_t set_proposed_producers_legacy( vector producers ); @@ -1191,10 +1199,15 @@ struct controller_impl { my_finalizers(fc::time_point::now(), cfg.finalizers_dir / "safety.dat"), wasmif( conf.wasm_runtime, conf.eosvmoc_tierup, db, conf.state_dir, conf.eosvmoc_config, !conf.profile_accounts.empty() ) { - thread_pool.start( cfg.thread_pool_size, [this]( const fc::exception& e ) { + assert(cfg.chain_thread_pool_size > 0); + thread_pool.start( cfg.chain_thread_pool_size, [this]( const fc::exception& e ) { elog( "Exception in chain thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); if( shutdown ) shutdown(); } ); + vote_processor.start(cfg.vote_thread_pool_size, [this]( const fc::exception& e ) { + elog( "Exception in vote thread pool, exiting: ${e}", ("e", e.to_detail_string()) ); + if( shutdown ) shutdown(); + } ); set_activation_handler(); set_activation_handler(); @@ -1214,6 +1227,7 @@ struct controller_impl { irreversible_block.connect([this](const block_signal_params& t) { const auto& [ block, id] = t; wasmif.current_lib(block->block_num()); + vote_processor.notify_lib(block->block_num()); }); @@ -1239,37 +1253,6 @@ struct controller_impl { }); } - /** - * Plugins / observers listening to signals emited might trigger - * errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or - * cause a node to fork. - * - * If it is ever desirable to let a signal handler bubble an exception out of this method - * a full audit of its uses needs to be undertaken. - * - */ - template - void emit( const Signal& s, Arg&& a ) { - try { - s( std::forward( a )); - } catch (std::bad_alloc& e) { - wlog( "std::bad_alloc: ${w}", ("w", e.what()) ); - throw e; - } catch (boost::interprocess::bad_alloc& e) { - wlog( "boost::interprocess::bad alloc: ${w}", ("w", e.what()) ); - throw e; - } catch ( controller_emit_signal_exception& e ) { - wlog( "controller_emit_signal_exception: ${details}", ("details", e.to_detail_string()) ); - throw e; - } catch ( fc::exception& e ) { - wlog( "fc::exception: ${details}", ("details", e.to_detail_string()) ); - } catch ( std::exception& e ) { - wlog( "std::exception: ${details}", ("details", e.what()) ); - } catch ( ... ) { - wlog( "signal handler threw exception" ); - } - } - void dmlog_applied_transaction(const transaction_trace_ptr& t, const signed_transaction* trx = nullptr) { // dmlog_applied_transaction is called by push_scheduled_transaction // where transient transactions are not possible, and by push_transaction @@ -1377,6 +1360,12 @@ struct controller_impl { } } + block_num_type latest_known_lib_num() const { + block_id_type irreversible_block_id = if_irreversible_block_id.load(); + block_num_type savanna_lib_num = block_header::num_from_id(irreversible_block_id); + return savanna_lib_num > 0 ? savanna_lib_num : fork_db_head_irreversible_blocknum(); + } + void log_irreversible() { EOS_ASSERT( fork_db_has_root(), fork_database_exception, "fork database not properly initialized" ); @@ -1428,7 +1417,7 @@ struct controller_impl { for( auto bitr = branch.rbegin(); bitr != branch.rend() && should_process(*bitr); ++bitr ) { apply_irreversible_block(forkdb, *bitr); - emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()) ); + emit( irreversible_block, std::tie((*bitr)->block, (*bitr)->id()), __FILE__, __LINE__ ); // blog.append could fail due to failures like running out of space. // Do it before commit so that in case it throws, DB can be rolled back. @@ -2548,7 +2537,7 @@ struct controller_impl { pending->_block_report.total_elapsed_time += trace->elapsed; pending->_block_report.total_time += trace->elapsed; dmlog_applied_transaction(trace); - emit( applied_transaction, std::tie(trace, trx->packed_trx()) ); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); undo_session.squash(); return trace; } @@ -2613,7 +2602,7 @@ struct controller_impl { trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta ); dmlog_applied_transaction(trace); - emit( applied_transaction, std::tie(trace, trx->packed_trx()) ); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); trx_context.squash(); undo_session.squash(); @@ -2657,7 +2646,7 @@ struct controller_impl { trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta ); trace->elapsed = fc::time_point::now() - start; dmlog_applied_transaction(trace); - emit( applied_transaction, std::tie(trace, trx->packed_trx()) ); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); undo_session.squash(); pending->_block_report.total_net_usage += trace->net_usage; if( trace->receipt ) pending->_block_report.total_cpu_usage_us += trace->receipt->cpu_usage_us; @@ -2701,12 +2690,12 @@ struct controller_impl { trace->account_ram_delta = account_delta( gtrx.payer, trx_removal_ram_delta ); dmlog_applied_transaction(trace); - emit( applied_transaction, std::tie(trace, trx->packed_trx()) ); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); undo_session.squash(); } else { dmlog_applied_transaction(trace); - emit( applied_transaction, std::tie(trace, trx->packed_trx()) ); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); } pending->_block_report.total_net_usage += trace->net_usage; @@ -2845,7 +2834,7 @@ struct controller_impl { } dmlog_applied_transaction(trace, &trn); - emit(applied_transaction, std::tie(trace, trx->packed_trx())); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); } } @@ -2889,7 +2878,7 @@ struct controller_impl { if (!trx->is_transient()) { dmlog_applied_transaction(trace); - emit(applied_transaction, std::tie(trace, trx->packed_trx())); + emit( applied_transaction, std::tie(trace, trx->packed_trx()), __FILE__, __LINE__ ); pending->_block_report.total_net_usage += trace->net_usage; if( trace->receipt ) pending->_block_report.total_cpu_usage_us += trace->receipt->cpu_usage_us; @@ -2910,7 +2899,7 @@ struct controller_impl { { EOS_ASSERT( !pending, block_validate_exception, "pending block already exists" ); - emit( block_start, chain_head.block_num() + 1 ); + emit( block_start, chain_head.block_num() + 1, __FILE__, __LINE__ ); // at block level, no transaction specific logging is possible if (auto dm_logger = get_deep_mind_logger(false)) { @@ -3155,7 +3144,7 @@ struct controller_impl { const auto& bsp = std::get>(cb.bsp.internal()); if( s == controller::block_status::incomplete ) { forkdb.add( bsp, mark_valid_t::yes, ignore_duplicate_t::no ); - emit( accepted_block_header, std::tie(bsp->block, bsp->id()) ); + emit( accepted_block_header, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); } else { assert(s != controller::block_status::irreversible); forkdb.mark_valid( bsp ); @@ -3165,7 +3154,7 @@ struct controller_impl { } chain_head = block_handle{cb.bsp}; - emit( accepted_block, std::tie(chain_head.block(), chain_head.id()) ); + emit( accepted_block, std::tie(chain_head.block(), chain_head.id()), __FILE__, __LINE__ ); apply(chain_head, [&](const auto& head) { #warning todo: support deep_mind_logger even when in IF mode @@ -3211,7 +3200,7 @@ struct controller_impl { ilog("Produced block ${id}... #${n} @ ${t} signed by ${p} " "[trxs: ${count}, lib: ${lib}, confirmed: ${confs}, net: ${net}, cpu: ${cpu}, elapsed: ${et}, time: ${tt}]", ("p", new_b->producer)("id", id.str().substr(8, 16))("n", new_b->block_num())("t", new_b->timestamp) - ("count", new_b->transactions.size())("lib", fork_db_root_block_num())("net", br.total_net_usage) + ("count", new_b->transactions.size())("lib", latest_known_lib_num())("net", br.total_net_usage) ("cpu", br.total_cpu_usage_us)("et", br.total_elapsed_time)("tt", br.total_time)("confs", new_b->confirmed)); } @@ -3369,7 +3358,7 @@ struct controller_impl { ilog("Received block ${id}... #${n} @ ${t} signed by ${p} " // "Received" instead of "Applied" so it matches existing log output "[trxs: ${count}, lib: ${lib}, net: ${net}, cpu: ${cpu}, elapsed: ${elapsed}, time: ${time}, latency: ${latency} ms]", ("p", bsp->producer())("id", bsp->id().str().substr(8, 16))("n", bsp->block_num())("t", bsp->timestamp()) - ("count", bsp->block->transactions.size())("lib", fork_db_root_block_num()) + ("count", bsp->block->transactions.size())("lib", latest_known_lib_num()) ("net", br.total_net_usage)("cpu", br.total_cpu_usage_us) ("elapsed", br.total_elapsed_time)("time", br.total_time)("latency", (now - bsp->timestamp()).count() / 1000)); const auto& hb_id = chain_head.id(); @@ -3378,7 +3367,7 @@ struct controller_impl { ilog("Block not applied to head ${id}... #${n} @ ${t} signed by ${p} " "[trxs: ${count}, lib: ${lib}, net: ${net}, cpu: ${cpu}, elapsed: ${elapsed}, time: ${time}, latency: ${latency} ms]", ("p", hb->producer)("id", hb_id.str().substr(8, 16))("n", hb->block_num())("t", hb->timestamp) - ("count", hb->transactions.size())("lib", fork_db_root_block_num()) + ("count", hb->transactions.size())("lib", latest_known_lib_num()) ("net", br.total_net_usage)("cpu", br.total_cpu_usage_us)("elapsed", br.total_elapsed_time)("time", br.total_time) ("latency", (now - hb->timestamp).count() / 1000)); } @@ -3552,19 +3541,8 @@ struct controller_impl { // called from net threads and controller's thread pool - vote_status process_vote_message( const vote_message& vote ) { - // only aggregate votes on proper if blocks - auto aggregate_vote = [&vote](auto& forkdb) -> vote_status { - auto bsp = forkdb.get_block(vote.block_id); - if (bsp && bsp->block->is_proper_svnn_block()) { - return bsp->aggregate_vote(vote); - } - return vote_status::unknown_block; - }; - auto aggregate_vote_legacy = [](auto&) -> vote_status { - return vote_status::unknown_block; - }; - return fork_db.apply(aggregate_vote_legacy, aggregate_vote); + void process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) { + vote_processor.process_vote_message(connection_id, vote); } bool node_has_voted_if_finalizer(const block_id_type& id) const { @@ -3591,13 +3569,12 @@ struct controller_impl { // Each finalizer configured on the node which is present in the active finalizer policy may create and sign a vote. my_finalizers.maybe_vote( - *bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) { + *bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message_ptr& vote) { // net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal - emit(voted_block, vote); + emit(voted_block, std::tuple{uint32_t{0}, vote_status::success, std::cref(vote)}, __FILE__, __LINE__); - // also aggregate our own vote into the pending_qc for this block. - boost::asio::post(thread_pool.get_executor(), - [control = this, vote]() { control->process_vote_message(vote); }); + // also aggregate our own vote into the pending_qc for this block, 0 connection_id indicates our own vote + process_vote_message(0, vote); }); } @@ -3755,6 +3732,8 @@ struct controller_impl { if (conf.terminate_at_block == 0 || bsp->block_num() <= conf.terminate_at_block) { forkdb.add(bsp, mark_valid_t::no, ignore_duplicate_t::yes); + if constexpr (savanna_mode) + vote_processor.notify_new_block(); } return block_handle{bsp}; @@ -3883,7 +3862,7 @@ struct controller_impl { if constexpr (std::is_same_v>) forkdb.add( bsp, mark_valid_t::no, ignore_duplicate_t::yes ); - emit( accepted_block_header, std::tie(bsp->block, bsp->id()) ); + emit( accepted_block_header, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); }; fork_db.apply(do_accept_block); @@ -3921,7 +3900,7 @@ struct controller_impl { trusted_producer_light_validation = true; }; - emit( accepted_block_header, std::tie(bsp->block, bsp->id()) ); + emit( accepted_block_header, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); if( read_mode != db_read_mode::IRREVERSIBLE ) { if constexpr (std::is_same_v>) @@ -3970,7 +3949,7 @@ struct controller_impl { }); } - emit(accepted_block_header, std::tie(bsp->block, bsp->id())); + emit( accepted_block_header, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); controller::block_report br; if (s == controller::block_status::irreversible) { @@ -3978,7 +3957,7 @@ struct controller_impl { // On replay, log_irreversible is not called and so no irreversible_block signal is emitted. // So emit it explicitly here. - emit(irreversible_block, std::tie(bsp->block, bsp->id())); + emit( irreversible_block, std::tie(bsp->block, bsp->id()), __FILE__, __LINE__ ); if (!skip_db_sessions(s)) { db.commit(bsp->block_num()); @@ -5254,8 +5233,8 @@ void controller::set_proposed_finalizers( finalizer_policy&& fin_pol ) { } // called from net threads -vote_status controller::process_vote_message( const vote_message& vote ) { - return my->process_vote_message( vote ); +void controller::process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) { + my->process_vote_message( connection_id, vote ); }; bool controller::node_has_voted_if_finalizer(const block_id_type& id) const { @@ -5538,7 +5517,7 @@ signal& controller::accepted_block_header() { signal& controller::accepted_block() { return my->accepted_block; } signal& controller::irreversible_block() { return my->irreversible_block; } signal)>& controller::applied_transaction() { return my->applied_transaction; } -signal& controller::voted_block() { return my->voted_block; } +vote_signal_t& controller::voted_block() { return my->voted_block; } chain_id_type controller::extract_chain_id(snapshot_reader& snapshot) { chain_snapshot_header header; diff --git a/libraries/chain/hotstuff/finalizer.cpp b/libraries/chain/hotstuff/finalizer.cpp index af0d3028c2..69f6e10feb 100644 --- a/libraries/chain/hotstuff/finalizer.cpp +++ b/libraries/chain/hotstuff/finalizer.cpp @@ -86,9 +86,9 @@ finalizer::vote_result finalizer::decide_vote(const block_state_ptr& bsp) { } // ---------------------------------------------------------------------------------------- -std::optional finalizer::maybe_vote(const bls_public_key& pub_key, - const block_state_ptr& bsp, - const digest_type& digest) { +vote_message_ptr finalizer::maybe_vote(const bls_public_key& pub_key, + const block_state_ptr& bsp, + const digest_type& digest) { finalizer::vote_decision decision = decide_vote(bsp).decision; if (decision == vote_decision::strong_vote || decision == vote_decision::weak_vote) { bls_signature sig; @@ -99,7 +99,7 @@ std::optional finalizer::maybe_vote(const bls_public_key& pub_key, } else { sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()}); } - return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }}; + return std::make_shared(bsp->id(), decision == vote_decision::strong_vote, pub_key, sig); } return {}; } diff --git a/libraries/chain/include/eosio/chain/block_header.hpp b/libraries/chain/include/eosio/chain/block_header.hpp index 514879ccd0..223a3e526d 100644 --- a/libraries/chain/include/eosio/chain/block_header.hpp +++ b/libraries/chain/include/eosio/chain/block_header.hpp @@ -90,7 +90,7 @@ namespace eosio::chain { // When block header is validated in block_header_state's next(), // it is already validate if schedule_version == proper_svnn_schedule_version, // finality extension must exist. - bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); } + bool is_proper_svnn_block() const { return ( schedule_version == proper_svnn_schedule_version ); } header_extension_multimap validate_and_extract_header_extensions()const; std::optional extract_header_extension(uint16_t extension_id)const; diff --git a/libraries/chain/include/eosio/chain/block_header_state.hpp b/libraries/chain/include/eosio/chain/block_header_state.hpp index 02291c3547..0e9566dd3e 100644 --- a/libraries/chain/include/eosio/chain/block_header_state.hpp +++ b/libraries/chain/include/eosio/chain/block_header_state.hpp @@ -78,7 +78,7 @@ struct block_header_state { digest_type compute_finality_digest() const; // Returns true if the block is a Proper Savanna Block - bool is_proper_svnn_block() const; + bool is_proper_svnn_block() const { return header.is_proper_svnn_block(); } // block descending from this need the provided qc in the block extension bool is_needed(const qc_claim_t& qc_claim) const { diff --git a/libraries/chain/include/eosio/chain/config.hpp b/libraries/chain/include/eosio/chain/config.hpp index 9dd10a1b85..74af7b59ca 100644 --- a/libraries/chain/include/eosio/chain/config.hpp +++ b/libraries/chain/include/eosio/chain/config.hpp @@ -80,6 +80,7 @@ const static uint16_t default_max_auth_depth = 6; const static uint32_t default_sig_cpu_bill_pct = 50 * percent_1; // billable percentage of signature recovery const static uint32_t default_produce_block_offset_ms = 450; const static uint16_t default_controller_thread_pool_size = 2; +const static uint16_t default_vote_thread_pool_size = 4; const static uint32_t default_max_variable_signature_length = 16384u; const static uint32_t default_max_action_return_value_size = 256; diff --git a/libraries/chain/include/eosio/chain/controller.hpp b/libraries/chain/include/eosio/chain/controller.hpp index d2ac81dc32..6281e35fb7 100644 --- a/libraries/chain/include/eosio/chain/controller.hpp +++ b/libraries/chain/include/eosio/chain/controller.hpp @@ -58,6 +58,9 @@ namespace eosio::chain { using trx_meta_cache_lookup = std::function; using block_signal_params = std::tuple; + // connection_id, vote result status, vote_message processed + using vote_signal_params = std::tuple; + using vote_signal_t = signal; enum class db_read_mode { HEAD, @@ -87,7 +90,8 @@ namespace eosio::chain { uint64_t state_size = chain::config::default_state_size; uint64_t state_guard_size = chain::config::default_state_guard_size; uint32_t sig_cpu_bill_pct = chain::config::default_sig_cpu_bill_pct; - uint16_t thread_pool_size = chain::config::default_controller_thread_pool_size; + uint16_t chain_thread_pool_size = chain::config::default_controller_thread_pool_size; + uint16_t vote_thread_pool_size = 0; bool read_only = false; bool force_all_checks = false; bool disable_replay_opts = false; @@ -326,7 +330,7 @@ namespace eosio::chain { // called by host function set_finalizers void set_proposed_finalizers( finalizer_policy&& fin_pol ); // called from net threads - vote_status process_vote_message( const vote_message& msg ); + void process_vote_message( uint32_t connection_id, const vote_message_ptr& msg ); // thread safe, for testing bool node_has_voted_if_finalizer(const block_id_type& id) const; @@ -373,9 +377,8 @@ namespace eosio::chain { signal& accepted_block(); signal& irreversible_block(); signal)>& applied_transaction(); - - // Unlike other signals, voted_block can be signaled from other threads than the main thread. - signal& voted_block(); + // Unlike other signals, voted_block is signaled from other threads than the main thread. + vote_signal_t& voted_block(); const apply_handler* find_apply_handler( account_name contract, scope_name scope, action_name act )const; wasm_interface& get_wasm_interface(); @@ -406,6 +409,37 @@ namespace eosio::chain { chainbase::database& mutable_db()const; std::unique_ptr my; - }; + }; // controller + + /** + * Plugins / observers listening to signals emited might trigger + * errors and throw exceptions. Unless those exceptions are caught it could impact consensus and/or + * cause a node to fork. + * + * If it is ever desirable to let a signal handler bubble an exception out of this method + * a full audit of its uses needs to be undertaken. + * + */ + template + void emit( const Signal& s, Arg&& a, const char* file, uint32_t line ) { + try { + s( std::forward( a )); + } catch (std::bad_alloc& e) { + wlog( "${f}:${l} std::bad_alloc: ${w}", ("f", file)("l", line)("w", e.what()) ); + throw e; + } catch (boost::interprocess::bad_alloc& e) { + wlog( "${f}:${l} boost::interprocess::bad alloc: ${w}", ("f", file)("l", line)("w", e.what()) ); + throw e; + } catch ( controller_emit_signal_exception& e ) { + wlog( "${f}:${l} controller_emit_signal_exception: ${details}", ("f", file)("l", line)("details", e.to_detail_string()) ); + throw e; + } catch ( fc::exception& e ) { + wlog( "${f}:${l} fc::exception: ${details}", ("f", file)("l", line)("details", e.to_detail_string()) ); + } catch ( std::exception& e ) { + wlog( "std::exception: ${details}", ("f", file)("l", line)("details", e.what()) ); + } catch ( ... ) { + wlog( "${f}:${l} signal handler threw exception", ("f", file)("l", line) ); + } + } } /// eosio::chain diff --git a/libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp b/libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp index 762524a46b..296e11eae0 100644 --- a/libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp @@ -61,8 +61,7 @@ namespace eosio::chain { finalizer_safety_information fsi; vote_result decide_vote(const block_state_ptr& bsp); - std::optional maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp, - const digest_type& digest); + vote_message_ptr maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp, const digest_type& digest); }; // ---------------------------------------------------------------------------------------- @@ -95,7 +94,7 @@ namespace eosio::chain { if (finalizers.empty()) return; - std::vector votes; + std::vector votes; votes.reserve(finalizers.size()); // Possible improvement in the future, look at locking only individual finalizers and releasing the lock for writing the file. @@ -105,9 +104,9 @@ namespace eosio::chain { // first accumulate all the votes for (const auto& f : fin_pol.finalizers) { if (auto it = finalizers.find(f.public_key); it != finalizers.end()) { - std::optional vote_msg = it->second.maybe_vote(it->first, bsp, digest); + vote_message_ptr vote_msg = it->second.maybe_vote(it->first, bsp, digest); if (vote_msg) - votes.push_back(std::move(*vote_msg)); + votes.push_back(std::move(vote_msg)); } } // then save the safety info and, if successful, gossip the votes diff --git a/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp b/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp index b54f8d7416..9e8682b905 100644 --- a/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp +++ b/libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp @@ -25,14 +25,20 @@ namespace eosio::chain { bool strong{false}; bls_public_key finalizer_key; bls_signature sig; + + auto operator<=>(const vote_message&) const = default; + bool operator==(const vote_message&) const = default; }; + using vote_message_ptr = std::shared_ptr; + enum class vote_status { success, - duplicate, - unknown_public_key, - invalid_signature, - unknown_block + duplicate, // duplicate vote, expected as votes arrive on multiple connections + unknown_public_key, // public key is invalid, indicates invalid vote + invalid_signature, // signature is invalid, indicates invalid vote + unknown_block, // block not available, possibly less than LIB, or too far in the future + max_exceeded // received too many votes for a connection }; using bls_public_key = fc::crypto::blslib::bls_public_key; @@ -159,7 +165,7 @@ namespace eosio::chain { FC_REFLECT(eosio::chain::vote_message, (block_id)(strong)(finalizer_key)(sig)); -FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)) +FC_REFLECT_ENUM(eosio::chain::vote_status, (success)(duplicate)(unknown_public_key)(invalid_signature)(unknown_block)(max_exceeded)) FC_REFLECT(eosio::chain::valid_quorum_certificate, (_strong_votes)(_weak_votes)(_sig)); FC_REFLECT(eosio::chain::pending_quorum_certificate, (_valid_qc)(_quorum)(_max_weak_sum_before_weak_final)(_state)(_strong_sum)(_weak_sum)(_weak_votes)(_strong_votes)); FC_REFLECT_ENUM(eosio::chain::pending_quorum_certificate::state_t, (unrestricted)(restricted)(weak_achieved)(weak_final)(strong)); diff --git a/libraries/chain/include/eosio/chain/thread_utils.hpp b/libraries/chain/include/eosio/chain/thread_utils.hpp index b4e4d5a673..81dbb24dbe 100644 --- a/libraries/chain/include/eosio/chain/thread_utils.hpp +++ b/libraries/chain/include/eosio/chain/thread_utils.hpp @@ -108,7 +108,7 @@ namespace eosio { namespace chain { /// Blocks until all threads are created and completed their init function, or an exception is thrown /// during thread startup or an init function. Exceptions thrown during these stages are rethrown from start() /// but some threads might still have been started. Calling stop() after such a failure is safe. - /// @param num_threads is number of threads spawned + /// @param num_threads is number of threads spawned, if 0 then no threads are spawned and stop() is a no-op. /// @param on_except is the function to call if io_context throws an exception, is called from thread pool thread. /// if an empty function then logs and rethrows exception on thread which will terminate. Not called /// for exceptions during the init function (such exceptions are rethrown from start()) @@ -116,6 +116,8 @@ namespace eosio { namespace chain { /// @throw assert_exception if already started and not stopped. void start( size_t num_threads, on_except_t on_except, init_t init = {} ) { FC_ASSERT( !_ioc_work, "Thread pool already started" ); + if (num_threads == 0) + return; _ioc_work.emplace( boost::asio::make_work_guard( _ioc ) ); _ioc.restart(); _thread_pool.reserve( num_threads ); @@ -141,13 +143,16 @@ namespace eosio { namespace chain { } /// destroy work guard, stop io_context, join thread_pool + /// not thread safe, expected to only be called from thread that called start() void stop() { - _ioc_work.reset(); - _ioc.stop(); - for( auto& t : _thread_pool ) { - t.join(); + if (_thread_pool.size() > 0) { + _ioc_work.reset(); + _ioc.stop(); + for( auto& t : _thread_pool ) { + t.join(); + } + _thread_pool.clear(); } - _thread_pool.clear(); } private: diff --git a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp index e1231bedcb..7c73856773 100644 --- a/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp +++ b/libraries/chain/include/eosio/chain/unapplied_transaction_queue.hpp @@ -2,7 +2,6 @@ #include #include -#include #include #include diff --git a/libraries/chain/include/eosio/chain/vote_processor.hpp b/libraries/chain/include/eosio/chain/vote_processor.hpp new file mode 100644 index 0000000000..6db78e286e --- /dev/null +++ b/libraries/chain/include/eosio/chain/vote_processor.hpp @@ -0,0 +1,249 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace eosio::chain { + +/** + * Process votes in a dedicated thread pool. + */ +class vote_processor_t { + // Even 3000 vote structs are less than 1MB per connection. + // 2500 should never be reached unless a specific connection is sending garbage. + static constexpr size_t max_votes_per_connection = 2500; + // If we have not processed a vote in this amount of time, give up on it. + static constexpr fc::microseconds too_old = fc::seconds(5); + + struct by_block_num; + struct by_connection; + struct by_last_received; + + struct vote { + uint32_t connection_id; + fc::time_point received; + vote_message_ptr msg; + + const block_id_type& id() const { return msg->block_id; } + block_num_type block_num() const { return block_header::num_from_id(msg->block_id); } + }; + + using vote_index_type = boost::multi_index_container< vote, + indexed_by< + ordered_non_unique< tag, const_mem_fun, std::greater<> >, // descending + ordered_non_unique< tag, member >, + ordered_non_unique< tag, member > + > + >; + + using fetch_block_func_t = std::function; + + vote_signal_t& vote_signal; + fetch_block_func_t fetch_block_func; + + std::mutex mtx; + vote_index_type index; + block_state_ptr last_bsp; + // connection, count of messages + std::unordered_map num_messages; + + std::atomic lib{0}; + std::atomic largest_known_block_num{0}; + std::atomic queued_votes{0}; + std::atomic stopped{true}; + named_thread_pool thread_pool; + +private: + // called with unlocked mtx + void emit(uint32_t connection_id, vote_status status, const vote_message_ptr& msg) { + if (connection_id != 0) { // this nodes vote was already signaled + if (status != vote_status::duplicate) { // don't bother emitting duplicates + chain::emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)}, __FILE__, __LINE__ ); + } + } + } + + // called with locked mtx + void remove_connection(uint32_t connection_id) { + auto& idx = index.get(); + idx.erase(idx.lower_bound(connection_id), idx.upper_bound(connection_id)); + } + + // called with locked mtx + void remove_before_lib() { + auto& idx = index.get(); + idx.erase(idx.lower_bound(lib.load()), idx.end()); // descending + // don't decrement num_messages as too many before lib should be considered an error + } + + // called with locked mtx + void remove_too_old() { + auto& idx = index.get(); + fc::time_point vote_too_old = fc::time_point::now() - too_old; + idx.erase(idx.lower_bound(fc::time_point::min()), idx.upper_bound(vote_too_old)); + // don't decrement num_messages as too many that are too old should be considered an error + } + + // called with locked mtx + void queue_for_later(uint32_t connection_id, const vote_message_ptr& msg) { + fc::time_point now = fc::time_point::now(); + remove_before_lib(); + remove_too_old(); + index.insert(vote{.connection_id = connection_id, .received = now, .msg = msg}); + } + + // called with locked mtx, returns with a locked mutex + void process_any_queued_for_later(std::unique_lock& g) { + if (index.empty()) + return; + remove_too_old(); + remove_before_lib(); + auto& idx = index.get(); + std::vector unprocessed; + for (auto i = idx.begin(); i != idx.end();) { + if (stopped) + return; + vote v = std::move(*i); + idx.erase(i); + auto bsp = get_block(v.msg->block_id, g); + // g is unlocked + if (bsp) { + vote_status s = bsp->aggregate_vote(*v.msg); + emit(v.connection_id, s, v.msg); + + g.lock(); + if (auto& num = num_messages[v.connection_id]; num != 0) + --num; + } else { + unprocessed.push_back(std::move(v)); + g.lock(); + } + i = idx.begin(); // need to update since unlocked in loop + } + for (auto& v : unprocessed) { + index.insert(std::move(v)); + } + } + + // called with locked mtx, returns with unlocked mtx + block_state_ptr get_block(const block_id_type& id, std::unique_lock& g) { + block_state_ptr bsp; + if (last_bsp && last_bsp->id() == id) { + bsp = last_bsp; + } + g.unlock(); + + if (!bsp) { + bsp = fetch_block_func(id); + if (bsp) { + g.lock(); + last_bsp = bsp; + largest_known_block_num = std::max(bsp->block_num(), largest_known_block_num.load()); + g.unlock(); + } + } + return bsp; + } + +public: + explicit vote_processor_t(vote_signal_t& vote_signal, fetch_block_func_t&& get_block) + : vote_signal(vote_signal) + , fetch_block_func(get_block) + { + assert(get_block); + } + + ~vote_processor_t() { + stopped = true; + } + + size_t index_size() { + std::lock_guard g(mtx); + return index.size(); + } + + void start(size_t num_threads, decltype(thread_pool)::on_except_t&& on_except) { + if (num_threads == 0) + return; + + stopped = false; + thread_pool.start( num_threads, std::move(on_except)); + } + + // called from main thread + void notify_lib(block_num_type block_num) { + lib = block_num; + } + + // called from net threads + void notify_new_block() { + // would require a mtx lock to check if index is empty, post check to thread_pool + boost::asio::post(thread_pool.get_executor(), [this] { + std::unique_lock g(mtx); + process_any_queued_for_later(g); + }); + } + + /// called from net threads and controller's thread pool + /// msg is ignored vote_processor not start()ed + void process_vote_message(uint32_t connection_id, const vote_message_ptr& msg) { + if (stopped) + return; + assert(msg); + block_num_type msg_block_num = block_header::num_from_id(msg->block_id); + if (msg_block_num <= lib.load(std::memory_order_relaxed)) + return; + ++queued_votes; + boost::asio::post(thread_pool.get_executor(), [this, connection_id, msg] { + if (stopped) + return; + auto num_queued_votes = --queued_votes; + if (block_header::num_from_id(msg->block_id) <= lib.load(std::memory_order_relaxed)) + return; // ignore any votes lower than lib + std::unique_lock g(mtx); + if (num_queued_votes == 0 && index.empty()) // caught up, clear num_messages + num_messages.clear(); + if (auto& num_msgs = ++num_messages[connection_id]; num_msgs > max_votes_per_connection) { + remove_connection(connection_id); + g.unlock(); + // drop, too many from this connection to process, consider connection invalid + // don't clear num_messages[connection_id] so we keep reporting max_exceeded until index is drained + + ilog("Exceeded max votes per connection ${n} > ${max} for ${c}", + ("n", num_msgs)("max", max_votes_per_connection)("c", connection_id)); + emit(connection_id, vote_status::max_exceeded, msg); + } else { + block_state_ptr bsp = get_block(msg->block_id, g); + // g is unlocked + + if (!bsp) { + // queue up for later processing + g.lock(); + queue_for_later(connection_id, msg); + } else { + vote_status s = bsp->aggregate_vote(*msg); + emit(connection_id, s, msg); + + g.lock(); + if (auto& num = num_messages[connection_id]; num != 0) + --num; + + process_any_queued_for_later(g); + } + } + + }); + } + +}; + +} // namespace eosio::chain diff --git a/libraries/libfc/include/fc/crypto/bls_signature.hpp b/libraries/libfc/include/fc/crypto/bls_signature.hpp index d8c2191d4e..ebf0390f1e 100644 --- a/libraries/libfc/include/fc/crypto/bls_signature.hpp +++ b/libraries/libfc/include/fc/crypto/bls_signature.hpp @@ -44,6 +44,13 @@ namespace fc::crypto::blslib { return _jacobian_montgomery_le.equal(sig._jacobian_montgomery_le); } + auto operator<=>(const bls_signature& rhs) const { + return _affine_non_montgomery_le <=> rhs._affine_non_montgomery_le; + } + auto operator==(const bls_signature& rhs) const { + return _affine_non_montgomery_le == rhs._affine_non_montgomery_le; + } + template friend T& operator<<(T& ds, const bls_signature& sig) { // Serialization as variable length array when it is stored as a fixed length array. This makes for easier deserialization by external tools diff --git a/libraries/testing/include/eosio/testing/tester.hpp b/libraries/testing/include/eosio/testing/tester.hpp index 6ca6dbf922..210ff4b67a 100644 --- a/libraries/testing/include/eosio/testing/tester.hpp +++ b/libraries/testing/include/eosio/testing/tester.hpp @@ -426,6 +426,7 @@ namespace eosio { namespace testing { cfg.state_guard_size = 0; cfg.contracts_console = true; cfg.eosvmoc_config.cache_size = 1024*1024*8; + cfg.vote_thread_pool_size = 3; // don't enforce OC compilation subject limits for tests, // particularly EOS EVM tests may run over those limits diff --git a/libraries/testing/tester.cpp b/libraries/testing/tester.cpp index 719851a767..e9ccf4efa4 100644 --- a/libraries/testing/tester.cpp +++ b/libraries/testing/tester.cpp @@ -495,7 +495,7 @@ namespace eosio { namespace testing { // wait for this node's vote to be processed size_t retrys = 200; while (!c.node_has_voted_if_finalizer(c.head_block_id()) && --retrys) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } FC_ASSERT(retrys, "Never saw this nodes vote processed before timeout"); } diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index c98d503664..193691a051 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -158,6 +158,7 @@ class chain_plugin_impl { std::filesystem::path state_dir; bool readonly = false; flat_map loaded_checkpoints; + bool accept_votes = false; bool accept_transactions = false; bool api_accept_transactions = true; bool account_queries_enabled = false; @@ -291,6 +292,8 @@ void chain_plugin::set_program_options(options_description& cli, options_descrip "Percentage of actual signature recovery cpu to bill. Whole number percentages, e.g. 50 for 50%") ("chain-threads", bpo::value()->default_value(config::default_controller_thread_pool_size), "Number of worker threads in controller thread pool") + ("vote-threads", bpo::value(), + "Number of worker threads in vote processor thread pool. If set to 0, voting disabled, votes are not propagatged on P2P network. Defaults to 4 on producer nodes.") ("contracts-console", bpo::bool_switch()->default_value(false), "print contract's output to console") ("deep-mind", bpo::bool_switch()->default_value(false), @@ -632,9 +635,18 @@ void chain_plugin_impl::plugin_initialize(const variables_map& options) { } if( options.count( "chain-threads" )) { - chain_config->thread_pool_size = options.at( "chain-threads" ).as(); - EOS_ASSERT( chain_config->thread_pool_size > 0, plugin_config_exception, - "chain-threads ${num} must be greater than 0", ("num", chain_config->thread_pool_size) ); + chain_config->chain_thread_pool_size = options.at( "chain-threads" ).as(); + EOS_ASSERT( chain_config->chain_thread_pool_size > 0, plugin_config_exception, + "chain-threads ${num} must be greater than 0", ("num", chain_config->chain_thread_pool_size) ); + } + + if (options.count("producer-name") || options.count("vote-threads")) { + chain_config->vote_thread_pool_size = options.count("vote-threads") ? options.at("vote-threads").as() : 0; + if (chain_config->vote_thread_pool_size == 0 && options.count("producer-name")) { + chain_config->vote_thread_pool_size = config::default_vote_thread_pool_size; + ilog("Setting vote-threads to ${n} on producing node", ("n", chain_config->vote_thread_pool_size)); + } + accept_votes = chain_config->vote_thread_pool_size > 0; } chain_config->sig_cpu_bill_pct = options.at("signature-cpu-billable-pct").as(); @@ -1222,6 +1234,10 @@ void chain_plugin::enable_accept_transactions() { my->enable_accept_transactions(); } +bool chain_plugin::accept_votes() const { + return my->accept_votes; +} + void chain_plugin_impl::log_guard_exception(const chain::guard_exception&e ) { if (e.code() == chain::database_guard_exception::code_value) { diff --git a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp index 45576abe0e..33441f652c 100644 --- a/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eosio/chain_plugin/chain_plugin.hpp @@ -987,6 +987,8 @@ class chain_plugin : public plugin { // set true by other plugins if any plugin allows transactions bool accept_transactions() const; void enable_accept_transactions(); + // true if vote processing is enabled + bool accept_votes() const; static void handle_guard_exception(const chain::guard_exception& e); diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 60d8c579f2..815b2815e9 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -309,7 +309,7 @@ namespace eosio { bool have_txn( const transaction_id_type& tid ) const; void expire_txns(); - void bcast_vote_msg( const std::optional& exclude_peer, send_buffer_type msg ); + void bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg ); void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) { std::optional rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id); @@ -343,6 +343,7 @@ namespace eosio { constexpr uint32_t signed_block_which = fc::get_index(); // see protocol net_message constexpr uint32_t packed_transaction_which = fc::get_index(); // see protocol net_message + constexpr uint32_t vote_message_which = fc::get_index(); // see protocol net_message class connections_manager { public: @@ -480,6 +481,7 @@ namespace eosio { uint32_t max_nodes_per_host = 1; bool p2p_accept_transactions = true; + bool p2p_accept_votes = true; fc::microseconds p2p_dedup_cache_expire_time_us{}; chain_id_type chain_id; @@ -533,12 +535,12 @@ namespace eosio { void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id ); void on_accepted_block(); - void on_voted_block ( const vote_message& vote ); + void on_voted_block( uint32_t connection_id, vote_status stauts, const vote_message_ptr& vote ); void transaction_ack(const std::pair&); void on_irreversible_block( const block_id_type& id, uint32_t block_num ); - void bcast_vote_message( const std::optional& exclude_peer, const chain::vote_message& msg ); + void bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg ); void warn_message( uint32_t sender_peer, const chain::hs_message_warning& code ); void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr from_connection); @@ -1009,6 +1011,7 @@ namespace eosio { bool process_next_block_message(uint32_t message_length); bool process_next_trx_message(uint32_t message_length); + bool process_next_vote_message(uint32_t message_length); void update_endpoints(const tcp::endpoint& endpoint = tcp::endpoint()); public: @@ -1109,8 +1112,9 @@ namespace eosio { void handle_message( const signed_block& msg ) = delete; // signed_block_ptr overload used instead void handle_message( const block_id_type& id, signed_block_ptr ptr ); void handle_message( const packed_transaction& msg ) = delete; // packed_transaction_ptr overload used instead - void handle_message( packed_transaction_ptr trx ); - void handle_message( const vote_message& msg ); + void handle_message( const packed_transaction_ptr& trx ); + void handle_message( const vote_message_ptr& msg ); + void handle_message( const vote_message& msg ) = delete; // vote_message_ptr overload used instead // returns calculated number of blocks combined latency uint32_t calc_block_latency(); @@ -1191,12 +1195,6 @@ namespace eosio { peer_dlog( c, "handle sync_request_message" ); c->handle_message( msg ); } - - void operator()( const chain::vote_message& msg ) const { - // continue call to handle_message on connection strand - peer_dlog( c, "handle vote_message" ); - c->handle_message( msg ); - } }; @@ -2678,10 +2676,10 @@ namespace eosio { } ); } - void dispatch_manager::bcast_vote_msg( const std::optional& exclude_peer, send_buffer_type msg ) { + void dispatch_manager::bcast_vote_msg( uint32_t exclude_peer, send_buffer_type msg ) { my_impl->connections.for_each_block_connection( [exclude_peer, msg{std::move(msg)}]( auto& cp ) { if( !cp->current() ) return true; - if( exclude_peer.has_value() && cp->connection_id == exclude_peer.value() ) return true; + if( cp->connection_id == exclude_peer ) return true; cp->strand.post( [cp, msg]() { if (cp->protocol_version >= proto_instant_finality) { peer_dlog(cp, "sending vote msg"); @@ -3089,6 +3087,8 @@ namespace eosio { return process_next_block_message( message_length ); } else if( which == packed_transaction_which ) { return process_next_trx_message( message_length ); + } else if( which == vote_message_which ) { + return process_next_vote_message( message_length ); } else { auto ds = pending_message_buffer.create_datastream(); net_message msg; @@ -3201,7 +3201,8 @@ namespace eosio { auto ds = pending_message_buffer.create_datastream(); unsigned_int which{}; fc::raw::unpack( ds, which ); - shared_ptr ptr = std::make_shared(); + // shared_ptr needed here because packed_transaction_ptr is shared_ptr + std::shared_ptr ptr = std::make_shared(); fc::raw::unpack( ds, *ptr ); if( trx_in_progress_sz > def_max_trx_in_progress_size) { char reason[72]; @@ -3224,7 +3225,26 @@ namespace eosio { return true; } - handle_message( std::move( ptr ) ); + handle_message( ptr ); + return true; + } + + // called from connection strand + bool connection::process_next_vote_message(uint32_t message_length) { + if( !my_impl->p2p_accept_votes ) { + peer_dlog( this, "p2p_accept_votes=false - dropping vote" ); + pending_message_buffer.advance_read_ptr( message_length ); + return true; + } + + auto ds = pending_message_buffer.create_datastream(); + unsigned_int which{}; + fc::raw::unpack( ds, which ); + assert(which == vote_message_which); + vote_message_ptr ptr = std::make_shared(); + fc::raw::unpack( ds, *ptr ); + + handle_message( ptr ); return true; } @@ -3720,29 +3740,12 @@ namespace eosio { } } - void connection::handle_message( const vote_message& msg ) { + void connection::handle_message( const vote_message_ptr& msg ) { peer_dlog(this, "received vote: block #${bn}:${id}.., ${v}, key ${k}..", - ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) - ("v", msg.strong ? "strong" : "weak")("k", msg.finalizer_key.to_string().substr(8, 16))); + ("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16)) + ("v", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8, 16))); controller& cc = my_impl->chain_plug->chain(); - - switch( cc.process_vote_message(msg) ) { - case vote_status::success: - my_impl->bcast_vote_message(connection_id, msg); - break; - case vote_status::unknown_public_key: - case vote_status::invalid_signature: // close peer immediately - close( false ); // do not reconnect after closing - break; - case vote_status::unknown_block: // track the failure - peer_dlog(this, "vote unknown block #${bn}:${id}..", ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16))); - block_status_monitor_.rejected(); - break; - case vote_status::duplicate: // do nothing - break; - default: - assert(false); // should never happen - } + cc.process_vote_message(connection_id, msg); } size_t calc_trx_size( const packed_transaction_ptr& trx ) { @@ -3750,7 +3753,7 @@ namespace eosio { } // called from connection strand - void connection::handle_message( packed_transaction_ptr trx ) { + void connection::handle_message( const packed_transaction_ptr& trx ) { const auto& tid = trx->id(); peer_dlog( this, "received packed_transaction ${id}", ("id", tid) ); @@ -4010,20 +4013,47 @@ namespace eosio { } // called from other threads including net threads - void net_plugin_impl::on_voted_block(const vote_message& msg) { - fc_dlog(logger, "on voted signal: block #${bn} ${id}.., ${t}, key ${k}..", - ("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) - ("t", msg.strong ? "strong" : "weak")("k", msg.finalizer_key.to_string().substr(8, 16))); - bcast_vote_message(std::nullopt, msg); + void net_plugin_impl::on_voted_block(uint32_t connection_id, vote_status status, const vote_message_ptr& msg) { + fc_dlog(logger, "connection - ${c} on voted signal: ${s} block #${bn} ${id}.., ${t}, key ${k}..", + ("c", connection_id)("s", status)("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16)) + ("t", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8, 16))); + + switch( status ) { + case vote_status::success: + bcast_vote_message(connection_id, msg); + break; + case vote_status::unknown_public_key: + case vote_status::invalid_signature: + case vote_status::max_exceeded: // close peer immediately + my_impl->connections.for_each_connection([connection_id](const connection_ptr& c) { + if (c->connection_id == connection_id) { + c->close( false ); + } + }); + break; + case vote_status::unknown_block: // track the failure + fc_dlog(logger, "connection - ${c} vote unknown block #${bn}:${id}..", + ("c", connection_id)("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16))); + my_impl->connections.for_each_connection([connection_id](const connection_ptr& c) { + if (c->connection_id == connection_id) { + c->block_status_monitor_.rejected(); + } + }); + break; + case vote_status::duplicate: // do nothing + break; + default: + assert(false); // should never happen + } } - void net_plugin_impl::bcast_vote_message( const std::optional& exclude_peer, const chain::vote_message& msg ) { + void net_plugin_impl::bcast_vote_message( uint32_t exclude_peer, const chain::vote_message_ptr& msg ) { buffer_factory buff_factory; - auto send_buffer = buff_factory.get_send_buffer( msg ); + auto send_buffer = buff_factory.get_send_buffer( *msg ); fc_dlog(logger, "bcast ${t} vote: block #${bn} ${id}.., ${v}, key ${k}..", - ("t", exclude_peer ? "received" : "our")("bn", block_header::num_from_id(msg.block_id))("id", msg.block_id.str().substr(8,16)) - ("v", msg.strong ? "strong" : "weak")("k", msg.finalizer_key.to_string().substr(8,16))); + ("t", exclude_peer ? "received" : "our")("bn", block_header::num_from_id(msg->block_id))("id", msg->block_id.str().substr(8,16)) + ("v", msg->strong ? "strong" : "weak")("k", msg->finalizer_key.to_string().substr(8,16))); dispatcher.strand.post( [this, exclude_peer, msg{std::move(send_buffer)}]() mutable { dispatcher.bcast_vote_msg( exclude_peer, std::move(msg) ); @@ -4396,6 +4426,8 @@ namespace eosio { "***********************************\n" ); } + p2p_accept_votes = chain_plug->accept_votes(); + std::vector listen_addresses = p2p_addresses; EOS_ASSERT( p2p_addresses.size() == p2p_server_addresses.size(), chain::plugin_config_exception, "" ); @@ -4434,8 +4466,8 @@ namespace eosio { my->on_irreversible_block( id, block->block_num() ); } ); - cc.voted_block().connect( [my = shared_from_this()]( const vote_message& vote ) { - my->on_voted_block(vote); + cc.voted_block().connect( [my = shared_from_this()]( const vote_signal_params& vote_signal ) { + my->on_voted_block(std::get<0>(vote_signal), std::get<1>(vote_signal), std::get<2>(vote_signal)); } ); } diff --git a/tests/TestHarness/Cluster.py b/tests/TestHarness/Cluster.py index 292adaae21..40f127368a 100644 --- a/tests/TestHarness/Cluster.py +++ b/tests/TestHarness/Cluster.py @@ -256,6 +256,8 @@ def launch(self, pnodes=1, unstartedNodes=0, totalNodes=1, prodCount=21, topo="m if self.staging: argsArr.append("--nogen") nodeosArgs="" + if "--vote-threads" not in extraNodeosArgs: + nodeosArgs += " --vote-threads 4" if "--max-transaction-time" not in extraNodeosArgs: nodeosArgs += " --max-transaction-time -1" if "--abi-serializer-max-time-ms" not in extraNodeosArgs: diff --git a/tests/auto_bp_peering_test.py b/tests/auto_bp_peering_test.py index 666cbd5536..b7c8dd3011 100755 --- a/tests/auto_bp_peering_test.py +++ b/tests/auto_bp_peering_test.py @@ -113,6 +113,8 @@ def neighbors_in_schedule(name, schedule): for conn in connections["payload"]: peer_addr = conn["peer"] if len(peer_addr) == 0: + if len(conn["last_handshake"]["p2p_address"]) == 0: + continue peer_addr = conn["last_handshake"]["p2p_address"].split()[0] if peer_names[peer_addr] != "bios": peers.append(peer_names[peer_addr]) diff --git a/tests/nodeos_snapshot_forked_test.py b/tests/nodeos_snapshot_forked_test.py index c61372c1e4..803a667aa8 100755 --- a/tests/nodeos_snapshot_forked_test.py +++ b/tests/nodeos_snapshot_forked_test.py @@ -144,7 +144,7 @@ def getSnapshotsCount(nodeId): while nonProdNode.verifyAlive() and count > 0: # wait on prodNode 0 since it will continue to advance, since defproducera and defproducerb are its producers Print("Wait for next block") - assert prodAB.waitForNextBlock(timeout=6), "Production node AB should continue to advance, even after bridge node is killed" + assert prodAB.waitForNextBlock(timeout=10), "Production node AB should continue to advance, even after bridge node is killed" count -= 1 # schedule a snapshot that should get finalized diff --git a/unittests/api_tests.cpp b/unittests/api_tests.cpp index f96fe14ec3..ff6f32fcf6 100644 --- a/unittests/api_tests.cpp +++ b/unittests/api_tests.cpp @@ -1226,7 +1226,7 @@ BOOST_AUTO_TEST_CASE(checktime_pause_block_deadline_not_extended_while_loading_t // WASM load times on my machine was 35ms. // Since checktime only kicks in after WASM is loaded this needs to be large enough to load the WASM, but should be // considerably lower than the 150ms max_transaction_time - BOOST_CHECK_MESSAGE( dur < 50'000, "elapsed " << dur << "us" ); + BOOST_CHECK_MESSAGE( dur < 65'000, "elapsed " << dur << "us" ); BOOST_REQUIRE_MESSAGE( dur < 150'000, "elapsed " << dur << "us" ); // should never fail BOOST_REQUIRE_EQUAL( t.validate(), true ); diff --git a/unittests/finality_test_cluster.cpp b/unittests/finality_test_cluster.cpp index 0ea13c13ed..5f0156591f 100644 --- a/unittests/finality_test_cluster.cpp +++ b/unittests/finality_test_cluster.cpp @@ -10,13 +10,20 @@ finality_test_cluster::finality_test_cluster() { produce_and_push_block(); // make setfinalizer irreversible + // node0's votes + node0.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + last_vote_status = std::get<1>(v); + last_connection_vote = std::get<0>(v); + }); // collect node1's votes - node1.node.control->voted_block().connect( [&]( const eosio::chain::vote_message& vote ) { - node1.votes.emplace_back(vote); + node1.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + std::lock_guard g(node1.votes_mtx); + node1.votes.emplace_back(std::get<2>(v)); }); // collect node2's votes - node2.node.control->voted_block().connect( [&]( const eosio::chain::vote_message& vote ) { - node2.votes.emplace_back(vote); + node2.node.control->voted_block().connect( [&]( const eosio::chain::vote_signal_params& v ) { + std::lock_guard g(node2.votes_mtx); + node2.votes.emplace_back(std::get<2>(v)); }); // form a 3-chain to make LIB advacing on node0 @@ -35,11 +42,27 @@ finality_test_cluster::finality_test_cluster() { // clean up processed votes for (auto& n : nodes) { + std::lock_guard g(n.votes_mtx); n.votes.clear(); n.prev_lib_num = n.node.control->if_irreversible_block_num(); } } +eosio::chain::vote_status finality_test_cluster::wait_on_vote(uint32_t connection_id, bool duplicate) { + // wait for this node's vote to be processed + // duplicates are not signaled + size_t retrys = 200; + while ( (last_connection_vote != connection_id) && --retrys) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + if (!duplicate && last_connection_vote != connection_id) { + FC_ASSERT(false, "Never received vote"); + } else if (duplicate && last_connection_vote == connection_id) { + FC_ASSERT(false, "Duplicate should not have been signaled"); + } + return duplicate ? eosio::chain::vote_status::duplicate : last_vote_status.load(); +} + // node0 produces a block and pushes it to node1 and node2 void finality_test_cluster::produce_and_push_block() { auto b = node0.node.produce_block(); @@ -48,8 +71,8 @@ void finality_test_cluster::produce_and_push_block() { } // send node1's vote identified by "vote_index" in the collected votes -eosio::chain::vote_status finality_test_cluster::process_node1_vote(uint32_t vote_index, vote_mode mode) { - return process_vote( node1, vote_index, mode ); +eosio::chain::vote_status finality_test_cluster::process_node1_vote(uint32_t vote_index, vote_mode mode, bool duplicate) { + return process_vote( node1, vote_index, mode, duplicate ); } // send node1's latest vote @@ -87,8 +110,11 @@ bool finality_test_cluster::node2_lib_advancing() { // node1_votes and node2_votes when starting. bool finality_test_cluster::produce_blocks_and_verify_lib_advancing() { // start from fresh - node1.votes.clear(); - node2.votes.clear(); + { + std::scoped_lock g(node1.votes_mtx, node2.votes_mtx); + node1.votes.clear(); + node2.votes.clear(); + } for (auto i = 0; i < 3; ++i) { produce_and_push_block(); @@ -103,36 +129,40 @@ bool finality_test_cluster::produce_blocks_and_verify_lib_advancing() { } void finality_test_cluster::node1_corrupt_vote_proposal_id() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; - if( node1.votes[0].block_id.data()[0] == 'a' ) { - node1.votes[0].block_id.data()[0] = 'b'; + if( node1.votes[0]->block_id.data()[0] == 'a' ) { + node1.votes[0]->block_id.data()[0] = 'b'; } else { - node1.votes[0].block_id.data()[0] = 'a'; + node1.votes[0]->block_id.data()[0] = 'a'; } } void finality_test_cluster::node1_corrupt_vote_finalizer_key() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; // corrupt the finalizer_key (manipulate so it is different) - auto g1 = node1.votes[0].finalizer_key.jacobian_montgomery_le(); + auto g1 = node1.votes[0]->finalizer_key.jacobian_montgomery_le(); g1 = bls12_381::aggregate_public_keys(std::array{g1, g1}); auto affine = g1.toAffineBytesLE(bls12_381::from_mont::yes); - node1.votes[0].finalizer_key = fc::crypto::blslib::bls_public_key(affine); + node1.votes[0]->finalizer_key = fc::crypto::blslib::bls_public_key(affine); } void finality_test_cluster::node1_corrupt_vote_signature() { + std::lock_guard g(node1.votes_mtx); node1_orig_vote = node1.votes[0]; // corrupt the signature - auto g2 = node1.votes[0].sig.jacobian_montgomery_le(); + auto g2 = node1.votes[0]->sig.jacobian_montgomery_le(); g2 = bls12_381::aggregate_signatures(std::array{g2, g2}); auto affine = g2.toAffineBytesLE(bls12_381::from_mont::yes); - node1.votes[0].sig = fc::crypto::blslib::bls_signature(affine); + node1.votes[0]->sig = fc::crypto::blslib::bls_signature(affine); } void finality_test_cluster::node1_restore_to_original_vote() { + std::lock_guard g(node1.votes_mtx); node1.votes[0] = node1_orig_vote; } @@ -176,21 +206,27 @@ void finality_test_cluster::setup_node(node_info& node, eosio::chain::account_na } // send a vote to node0 -eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, size_t vote_index, vote_mode mode) { +eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, size_t vote_index, vote_mode mode, bool duplicate) { + std::unique_lock g(node.votes_mtx); FC_ASSERT( vote_index < node.votes.size(), "out of bound index in process_vote" ); auto& vote = node.votes[vote_index]; if( mode == vote_mode::strong ) { - vote.strong = true; + vote->strong = true; } else { - vote.strong = false; + vote->strong = false; // fetch the strong digest - auto strong_digest = node.node.control->get_strong_digest_by_id(vote.block_id); + auto strong_digest = node.node.control->get_strong_digest_by_id(vote->block_id); // convert the strong digest to weak and sign it - vote.sig = node.priv_key.sign(eosio::chain::create_weak_digest(strong_digest)); + vote->sig = node.priv_key.sign(eosio::chain::create_weak_digest(strong_digest)); } + g.unlock(); - return node0.node.control->process_vote_message( vote ); + static uint32_t connection_id = 0; + node0.node.control->process_vote_message( ++connection_id, vote ); + if (eosio::chain::block_header::num_from_id(vote->block_id) > node0.node.control->last_irreversible_block_num()) + return wait_on_vote(connection_id, duplicate); + return eosio::chain::vote_status::unknown_block; } eosio::chain::vote_status finality_test_cluster::process_vote(node_info& node, vote_mode mode) { diff --git a/unittests/finality_test_cluster.hpp b/unittests/finality_test_cluster.hpp index 97ab1aa4f0..2aefb153a9 100644 --- a/unittests/finality_test_cluster.hpp +++ b/unittests/finality_test_cluster.hpp @@ -36,7 +36,7 @@ class finality_test_cluster { void produce_and_push_block(); // send node1's vote identified by "index" in the collected votes - eosio::chain::vote_status process_node1_vote(uint32_t vote_index, vote_mode mode = vote_mode::strong); + eosio::chain::vote_status process_node1_vote(uint32_t vote_index, vote_mode mode = vote_mode::strong, bool duplicate = false); // send node1's latest vote eosio::chain::vote_status process_node1_vote(vote_mode mode = vote_mode::strong); @@ -78,16 +78,20 @@ class finality_test_cluster { struct node_info { eosio::testing::tester node; uint32_t prev_lib_num{0}; - std::vector votes; + std::mutex votes_mtx; + std::vector votes; fc::crypto::blslib::bls_private_key priv_key; }; + std::atomic last_connection_vote{0}; + std::atomic last_vote_status{}; + std::array nodes; node_info& node0 = nodes[0]; node_info& node1 = nodes[1]; node_info& node2 = nodes[2]; - eosio::chain::vote_message node1_orig_vote; + eosio::chain::vote_message_ptr node1_orig_vote; // sets up "node_index" node void setup_node(node_info& node, eosio::chain::account_name local_finalizer); @@ -96,8 +100,10 @@ class finality_test_cluster { bool lib_advancing(node_info& node); // send "vote_index" vote on node to node0 - eosio::chain::vote_status process_vote(node_info& node, size_t vote_index, vote_mode mode); + eosio::chain::vote_status process_vote(node_info& node, size_t vote_index, vote_mode mode, bool duplicate = false); // send the latest vote on "node_index" node to node0 eosio::chain::vote_status process_vote(node_info& node, vote_mode mode); + + eosio::chain::vote_status wait_on_vote(uint32_t connection_id, bool duplicate); }; diff --git a/unittests/finality_tests.cpp b/unittests/finality_tests.cpp index 6d68774fe5..1864d4cf61 100644 --- a/unittests/finality_tests.cpp +++ b/unittests/finality_tests.cpp @@ -462,7 +462,7 @@ BOOST_AUTO_TEST_CASE(delayed_strong_weak_lost_vote) { try { BOOST_REQUIRE(!cluster.node1_lib_advancing()); // The delayed vote arrives - cluster.process_node1_vote(delayed_index); + cluster.process_node1_vote(delayed_index, finality_test_cluster::vote_mode::strong, true); cluster.produce_and_push_block(); BOOST_REQUIRE(!cluster.node2_lib_advancing()); BOOST_REQUIRE(!cluster.node1_lib_advancing()); @@ -481,9 +481,9 @@ BOOST_AUTO_TEST_CASE(duplicate_votes) { try { cluster.produce_and_push_block(); for (auto i = 0; i < 5; ++i) { - cluster.process_node1_vote(i); + cluster.process_node1_vote(i, finality_test_cluster::vote_mode::strong); // vote again to make it duplicate - BOOST_REQUIRE(cluster.process_node1_vote(i) == eosio::chain::vote_status::duplicate); + BOOST_REQUIRE(cluster.process_node1_vote(i, finality_test_cluster::vote_mode::strong, true) == eosio::chain::vote_status::duplicate); cluster.produce_and_push_block(); // verify duplicate votes do not affect LIB advancing @@ -502,8 +502,7 @@ BOOST_AUTO_TEST_CASE(unknown_proposal_votes) { try { cluster.node1_corrupt_vote_proposal_id(); // process the corrupted vote - cluster.process_node1_vote(0); - BOOST_REQUIRE(cluster.process_node1_vote(0) == eosio::chain::vote_status::unknown_block); + BOOST_REQUIRE_THROW(cluster.process_node1_vote(0), fc::exception); // throws because it times out waiting on vote cluster.produce_and_push_block(); BOOST_REQUIRE(cluster.node2_lib_advancing()); @@ -512,7 +511,7 @@ BOOST_AUTO_TEST_CASE(unknown_proposal_votes) { try { // process the original vote. LIB should advance cluster.produce_and_push_block(); - cluster.process_node1_vote(0); + cluster.process_node1_vote(0, finality_test_cluster::vote_mode::strong, true); BOOST_REQUIRE(cluster.produce_blocks_and_verify_lib_advancing()); } FC_LOG_AND_RETHROW() } diff --git a/unittests/subjective_billing_tests.cpp b/unittests/subjective_billing_tests.cpp index f80e5ea909..4cb25fbaf9 100644 --- a/unittests/subjective_billing_tests.cpp +++ b/unittests/subjective_billing_tests.cpp @@ -1,6 +1,6 @@ #include -#include "eosio/chain/subjective_billing.hpp" +#include #include #include diff --git a/unittests/vote_processor_tests.cpp b/unittests/vote_processor_tests.cpp new file mode 100644 index 0000000000..7c45fab7fb --- /dev/null +++ b/unittests/vote_processor_tests.cpp @@ -0,0 +1,247 @@ +#include + +#include +#include +#include +#include +#include + +namespace std { +std::ostream& operator<<(std::ostream& os, const eosio::chain::vote_message& v) { + os << "vote_message{" << v.block_id << std::endl; + return os; +} +std::ostream& operator<<(std::ostream& os, const eosio::chain::vote_status& v) { + os << fc::reflector::to_string(v) << std::endl; + return os; +} +} + +namespace { + +using namespace eosio; +using namespace eosio::chain; + +block_id_type make_block_id(uint32_t block_num) { + block_id_type block_id; + block_id._hash[0] &= 0xffffffff00000000; + block_id._hash[0] += fc::endian_reverse_u32(block_num); + return block_id; +} + +bls_private_key bls_priv_key_0 = bls_private_key::generate(); +bls_private_key bls_priv_key_1 = bls_private_key::generate(); +bls_private_key bls_priv_key_2 = bls_private_key::generate(); +std::vector bls_priv_keys{bls_priv_key_0, bls_priv_key_1, bls_priv_key_2}; + +auto create_genesis_block_state() { // block 2 + signed_block_ptr block = std::make_shared(); + + block->producer = eosio::chain::config::system_account_name; + auto pub_key = eosio::testing::base_tester::get_public_key( block->producer, "active" ); + + std::vector finalizers; + finalizers.push_back(finalizer_authority{.description = "first", .weight = 1, .public_key = bls_priv_keys.at(0).get_public_key()}); + finalizers.push_back(finalizer_authority{.description = "first", .weight = 1, .public_key = bls_priv_keys.at(1).get_public_key()}); + finalizers.push_back(finalizer_authority{.description = "first", .weight = 1, .public_key = bls_priv_keys.at(2).get_public_key()}); + finalizer_policy new_finalizer_policy{.finalizers = finalizers}; + qc_claim_t initial_if_claim { .block_num = 2, + .is_strong_qc = false }; + emplace_extension(block->header_extensions, instant_finality_extension::extension_id(), + fc::raw::pack(instant_finality_extension{ initial_if_claim, new_finalizer_policy, {} })); + + producer_authority_schedule schedule = { 0, { producer_authority{block->producer, block_signing_authority_v0{ 1, {{pub_key, 1}} } } } }; + auto genesis = std::make_shared(); + genesis->block = block; + genesis->active_finalizer_policy = std::make_shared(new_finalizer_policy); + genesis->block->previous = make_block_id(1); + genesis->active_proposer_policy = std::make_shared(proposer_policy{.proposer_schedule = schedule}); + genesis->core = finality_core::create_core_for_genesis_block(1); + genesis->block_id = genesis->block->calculate_id(); + return genesis; +} + +auto create_test_block_state(const block_state_ptr& prev) { + static block_timestamp_type timestamp; + timestamp = timestamp.next(); // each test block state will be unique + signed_block_ptr block = std::make_shared(prev->block->clone()); + block->producer = eosio::chain::config::system_account_name; + block->previous = prev->id(); + block->timestamp = timestamp; + + auto priv_key = eosio::testing::base_tester::get_private_key( block->producer, "active" ); + auto pub_key = eosio::testing::base_tester::get_public_key( block->producer, "active" ); + + auto sig_digest = digest_type::hash("something"); + block->producer_signature = priv_key.sign( sig_digest ); + + vector signing_keys; + signing_keys.emplace_back( std::move( priv_key ) ); + + auto signer = [&]( digest_type d ) { + std::vector result; + result.reserve(signing_keys.size()); + for (const auto& k: signing_keys) + result.emplace_back(k.sign(d)); + return result; + }; + block_header_state bhs = *prev; + bhs.header = *block; + bhs.header.timestamp = timestamp; + bhs.header.previous = prev->id(); + bhs.header.schedule_version = block_header::proper_svnn_schedule_version; + bhs.block_id = block->calculate_id(); + + auto bsp = std::make_shared(bhs, + deque{}, + deque{}, + std::optional{}, + std::optional{}, + signer, + block_signing_authority_v0{ 1, {{pub_key, 1}} }, + digest_type{}); + + return bsp; +} + +vote_message_ptr make_empty_message(const block_id_type& id) { + vote_message_ptr vm = std::make_shared(); + vm->block_id = id; + return vm; +} + +vote_message_ptr make_vote_message(const block_state_ptr& bsp) { + vote_message_ptr vm = std::make_shared(); + vm->block_id = bsp->id(); + vm->strong = true; + size_t i = bsp->block_num() % bls_priv_keys.size(); + vm->finalizer_key = bls_priv_keys.at(i).get_public_key(); + vm->sig = bls_priv_keys.at(i).sign({(uint8_t*)bsp->strong_digest.data(), (uint8_t*)bsp->strong_digest.data() + bsp->strong_digest.data_size()}); + return vm; +} + +BOOST_AUTO_TEST_SUITE(vote_processor_tests) + +BOOST_AUTO_TEST_CASE( vote_processor_test ) { + vote_signal_t voted_block; + + uint32_t received_connection_id = 0; + vote_status received_vote_status = vote_status::unknown_block; + vote_message_ptr received_vote_message{}; + + std::atomic signaled = 0; + std::mutex forkdb_mtx; + std::map forkdb; + auto add_to_forkdb = [&](const block_state_ptr& bsp) { + std::lock_guard g(forkdb_mtx); + forkdb[bsp->id()] = bsp; + }; + + voted_block.connect( [&]( const vote_signal_params& vote_signal ) { + received_connection_id = std::get<0>(vote_signal); + received_vote_status = std::get<1>(vote_signal); + received_vote_message = std::get<2>(vote_signal); + ++signaled; + } ); + + vote_processor_t vp{voted_block, [&](const block_id_type& id) -> block_state_ptr { + std::lock_guard g(forkdb_mtx); + return forkdb[id]; + }}; + vp.start(2, [](const fc::exception& e) { + edump((e)); + BOOST_REQUIRE(false); + }); + + { // empty fork db, block never found, never signaled + vote_message_ptr vm1 = make_empty_message(make_block_id(1)); + signaled = 0; + vp.process_vote_message(1, vm1); + for (size_t i = 0; i < 50 && vp.index_size() < 1; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(vp.index_size() == 1); + // move lib past block + vp.notify_lib(2); + vp.notify_new_block(); + for (size_t i = 0; i < 50 && vp.index_size() > 0; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(vp.index_size() == 0); + } + { // process a valid vote + signaled = 0; + auto gensis = create_genesis_block_state(); + auto bsp = create_test_block_state(gensis); + BOOST_CHECK_EQUAL(bsp->block_num(), 3); + vote_message_ptr m1 = make_vote_message(bsp); + add_to_forkdb(bsp); + vp.process_vote_message(1, m1); + // duplicate ignored + vp.process_vote_message(1, m1); + for (size_t i = 0; i < 50 && signaled.load() < 1; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(signaled.load() == 1); + BOOST_TEST(1 == received_connection_id); + BOOST_TEST(vote_status::success == received_vote_status); + BOOST_TEST(m1 == received_vote_message); + } + { // process an invalid signature vote + signaled = 0; + auto gensis = create_genesis_block_state(); + auto bsp = create_test_block_state(gensis); + BOOST_CHECK_EQUAL(bsp->block_num(), 3); + vote_message_ptr m1 = make_vote_message(bsp); + m1->strong = false; // signed with strong_digest + add_to_forkdb(bsp); + vp.process_vote_message(1, m1); + for (size_t i = 0; i < 50 && signaled.load() < 1; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(signaled.load() == 1); + BOOST_TEST(1 == received_connection_id); + BOOST_TEST(vote_status::invalid_signature == received_vote_status); + BOOST_TEST(m1 == received_vote_message); + } + { // process two diff block votes + signaled = 0; + auto gensis = create_genesis_block_state(); + auto bsp = create_test_block_state(gensis); + auto bsp2 = create_test_block_state(bsp); + vote_message_ptr m1 = make_vote_message(bsp); + vote_message_ptr m2 = make_vote_message(bsp2); + vp.process_vote_message(2, m1); + vp.process_vote_message(3, m2); + for (size_t i = 0; i < 5; ++i) { + if (vp.index_size() == 2) break; + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(vp.index_size() == 2); + std::this_thread::sleep_for(std::chrono::milliseconds{5}); // no votes for awhile + BOOST_TEST(signaled.load() == 0); + add_to_forkdb(bsp); + vp.notify_new_block(); + for (size_t i = 0; i < 50 && signaled.load() < 2; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(signaled.load() == 1); + BOOST_TEST(2 == received_connection_id); + BOOST_TEST(vote_status::success == received_vote_status); + BOOST_CHECK(m1 == received_vote_message); + + add_to_forkdb(bsp2); + vp.notify_new_block(); + for (size_t i = 0; i < 50 && signaled.load() < 2; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds{5}); + } + BOOST_TEST(signaled.load() == 2); + BOOST_TEST(3 == received_connection_id); + BOOST_TEST(vote_status::success == received_vote_status); + BOOST_CHECK(m2 == received_vote_message); + } +} + +BOOST_AUTO_TEST_SUITE_END() + +}