Skip to content

Commit

Permalink
GH-284 Add timelimit to apply_blocks and return if complete or incomp…
Browse files Browse the repository at this point in the history
…lete
  • Loading branch information
heifner committed Oct 10, 2024
1 parent 346e84e commit dd5a078
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 30 deletions.
45 changes: 30 additions & 15 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1826,8 +1826,9 @@ struct controller_impl {
// loading from snapshot without a block log so fork_db can't be considered valid
fork_db_reset_root_to_chain_head();
} else if( !except_ptr && !check_shutdown() && !irreversible_mode() && forkdb.head()) {
// applies all blocks up to forkdb head from forkdb
maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{});
// applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
auto head = forkdb.head();
ilog( "reversible blocks replayed to ${bn} : ${id}", ("bn", head->block_num())("id", head->id()) );
}
Expand Down Expand Up @@ -2008,7 +2009,9 @@ struct controller_impl {
// See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0`
if (chain_head_is_root || conf.num_configured_p2p_peers > 0) {
ilog("applying branch from fork database ending with block: ${id}", ("id", pending_head->id()));
maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{});
// applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
}
}
} else {
Expand Down Expand Up @@ -4296,23 +4299,25 @@ struct controller_impl {
} FC_LOG_AND_RETHROW( )
}

void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
try {
if( !irreversible_mode() ) {
maybe_apply_blocks( cb, trx_lookup );
} else {
log_irreversible();
transition_to_savanna_if_needed();
return maybe_apply_blocks( cb, trx_lookup );
}

log_irreversible();
transition_to_savanna_if_needed();
return controller::apply_blocks_result::complete;
} FC_LOG_AND_RETHROW( )
}

void maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
controller::apply_blocks_result maybe_apply_blocks( const forked_callback_t& forked_cb, const trx_meta_cache_lookup& trx_lookup )
{
controller::apply_blocks_result result = controller::apply_blocks_result::complete;
auto do_apply_blocks = [&](auto& forkdb) {
auto new_head = forkdb.head(); // use best head
if (!new_head)
return; // nothing to do, forkdb at root
return;// nothing to do, forkdb at root
auto [new_head_branch, old_head_branch] = forkdb.fetch_branch_from( new_head->id(), chain_head.id() );

bool switch_fork = !old_head_branch.empty();
Expand Down Expand Up @@ -4358,9 +4363,17 @@ struct controller_impl {
try {
bool applied = apply_block( bsp, bsp->is_valid() ? controller::block_status::validated
: controller::block_status::complete, trx_lookup );
if (!switch_fork && (!applied || check_shutdown())) {
shutdown();
break;
if (!switch_fork) { // always complete a switch fork
if (!applied || check_shutdown()) {
shutdown();
break; // result should be complete since we are shutting down
}
// Break every ~500ms to allow other tasks (e.g. get_info, SHiP) opportunity to run. User expected
// to call apply_blocks again if this returns incomplete.
if (!replaying && fc::time_point::now() - start > fc::milliseconds(500)) {
result = controller::apply_blocks_result::incomplete;
break;
}
}
} catch ( const std::bad_alloc& ) {
throw;
Expand Down Expand Up @@ -4417,6 +4430,8 @@ struct controller_impl {
};

fork_db.apply<void>(do_apply_blocks);

return result;
}

deque<transaction_metadata_ptr> abort_block() {
Expand Down Expand Up @@ -5179,9 +5194,9 @@ void controller::set_async_aggregation(async_t val) {
my->async_aggregation = val;
}

void controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
controller::apply_blocks_result controller::apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup) {
validate_db_available_size();
my->apply_blocks(cb, trx_lookup);
return my->apply_blocks(cb, trx_lookup);
}


Expand Down
10 changes: 7 additions & 3 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,20 @@ namespace eosio::chain {
void set_async_voting(async_t val);
void set_async_aggregation(async_t val);

/// Apply any blocks that are ready from the forkdb
void apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

struct accepted_block_result {
const bool is_new_best_head = false; // true if new best head
std::optional<block_handle> block; // empty optional if block is unlinkable
};
// thread-safe
accepted_block_result accept_block( const block_id_type& id, const signed_block_ptr& b ) const;

/// Apply any blocks that are ready from the forkdb
enum class apply_blocks_result {
complete, // all ready blocks in forkdb have been applied
incomplete // time limit reached, additional blocks may be available in forkdb to process
};
apply_blocks_result apply_blocks(const forked_callback_t& cb, const trx_meta_cache_lookup& trx_lookup);

boost::asio::io_context& get_thread_pool();

const chainbase::database& db()const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, controller::apply_blocks_result(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class producer_plugin : public appbase::plugin<producer_plugin> {
virtual void plugin_shutdown();
void handle_sighup() override;

bool on_incoming_block();
controller::apply_blocks_result on_incoming_block();

void pause();
void resume();
Expand Down
25 changes: 15 additions & 10 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

// called on incoming blocks from net_plugin on the main thread. Will notify controller to process any
// blocks ready in the fork database.
bool on_incoming_block() {
controller::apply_blocks_result on_incoming_block() {
auto now = fc::time_point::now();
_time_tracker.add_idle_time(now);

Expand All @@ -905,12 +905,14 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
("num", fhead.block_num())("id", fhead.id()));
}
_time_tracker.add_other_time();
return true; // return true because block was accepted
// return complete as we are producing and don't want to be interrupted right now. Next start_block will
// give an opportunity for this incoming block to be processed.
return controller::apply_blocks_result::complete;
}

// no reason to abort_block if we have nothing ready to process
if (chain.head().id() == chain.fork_db_head().id()) {
return true; // return true as nothing failed
return controller::apply_blocks_result::complete; // nothing to do
}

// start a new speculative block, adds to time tracker which includes this method's time
Expand All @@ -919,21 +921,22 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
// abort the pending block
abort_block();

controller::apply_blocks_result result = controller::apply_blocks_result::complete;
try {
chain.apply_blocks(
result = chain.apply_blocks(
[this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
} catch (const guard_exception& e) {
chain_plugin::handle_guard_exception(e);
return false;
return controller::apply_blocks_result::complete; // shutting down
} catch (const std::bad_alloc&) {
chain_apis::api_base::handle_bad_alloc();
} catch (boost::interprocess::bad_alloc&) {
chain_apis::api_base::handle_db_exhaustion();
} catch (const fork_database_exception& e) {
fc_elog(_log, "Cannot recover from ${e}. Shutting down.", ("e", e.to_detail_string()));
appbase::app().quit();
return false;
return controller::apply_blocks_result::complete; // shutting down
} catch (const fc::exception& e) {
throw;
} catch (const std::exception& e) {
Expand All @@ -945,7 +948,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_production_enabled = true;
}

return true;
return result;
}

void restart_speculative_block() {
Expand Down Expand Up @@ -1650,7 +1653,7 @@ void producer_plugin::handle_sighup() {
fc::logger::update(transient_trx_failed_trace_logger_name, _transient_trx_failed_trace_log);
}

bool producer_plugin::on_incoming_block() {
controller::apply_blocks_result producer_plugin::on_incoming_block() {
return my->on_incoming_block();
}

Expand Down Expand Up @@ -1992,8 +1995,10 @@ producer_plugin_impl::start_block_result producer_plugin_impl::start_block() {

abort_block();

chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
auto r = chain.apply_blocks([this](const transaction_metadata_ptr& trx) { _unapplied_transactions.add_forked(trx); },
[this](const transaction_id_type& id) { return _unapplied_transactions.get_trx(id); });
if (r != controller::apply_blocks_result::complete)
return start_block_result::failed;

if (chain.should_terminate()) {
app().quit();
Expand Down

0 comments on commit dd5a078

Please sign in to comment.