From 9c7c5272c71341354e2f33be116ce9225b2f7ac4 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Sep 2024 14:22:22 -0500 Subject: [PATCH 01/11] GH-529 Add an optional id for the posted function. Non-unique handlers can be consolidated. If one already exists no reason to add another. --- .../include/eosio/chain/application.hpp | 12 ++ .../include/eosio/chain/exec_pri_queue.hpp | 110 +++++++++++++----- .../tests/custom_appbase_tests.cpp | 92 +++++++++++++++ 3 files changed, 186 insertions(+), 28 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/application.hpp b/libraries/custom_appbase/include/eosio/chain/application.hpp index 6026df97dc..e957f7e738 100644 --- a/libraries/custom_appbase/include/eosio/chain/application.hpp +++ b/libraries/custom_appbase/include/eosio/chain/application.hpp @@ -45,6 +45,18 @@ class priority_queue_executor { return main_thread_id_; } + template + void post( handler_id id, int priority, exec_queue q, Func&& func ) { + if (q == exec_queue::read_exclusive) { + // no reason to post to io_service which then places this in the read_exclusive_handlers queue. + // read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue. + pri_queue_.add(id, priority, q, --order_, std::forward(func)); + } else { + // post to io_service as the main thread may be blocked on io_service.run_one() in application::exec() + boost::asio::post(io_serv_, pri_queue_.wrap(id, priority, q, --order_, std::forward(func))); + } + } + template void post( int priority, exec_queue q, Func&& func ) { if (q == exec_queue::read_exclusive) { diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index f6d49c45af..b9e74f1b54 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -1,14 +1,23 @@ #pragma once #include +#include #include #include -#include #include namespace appbase { // adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp +// Indicate non-unique handlers. If an existing handler at the specified priority already exists then there is +// no reason to insert a new handler to be processed. +// +// Add entries for each new non-unique handler type. +enum class handler_id { + unique, // identifies handler is unique, will not de-dup + process_incoming_block // process blocks already added to forkdb +}; + enum class exec_queue { read_only, // the queue storing tasks which are safe to execute // in parallel with other read-only & read_exclusive tasks in the read-only @@ -30,6 +39,12 @@ class exec_pri_queue : public boost::asio::execution_context { public: + ~exec_pri_queue() { + clear(read_only_handlers_); + clear(read_write_handlers_); + clear(read_exclusive_handlers_); + } + // inform how many read_threads will be calling read_only/read_exclusive queues // expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_ void init_read_threads(size_t num_read_threads) { @@ -61,22 +76,47 @@ class exec_pri_queue : public boost::asio::execution_context should_exit_ = [](){ assert(false); return true; }; // should not be called when locking is disabled } - // called from appbase::application_base::exec poll_one() or run_one() template - void add(int priority, exec_queue q, size_t order, Function function) { + void add(int priority, exec_queue q, size_t order, Function&& function) { assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); prio_queue& que = priority_que(q); - std::unique_ptr handler(new queued_handler(priority, order, std::move(function))); + std::unique_ptr handler(new queued_handler(handler_id::unique, priority, order, std::forward(function))); if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive std::lock_guard g( mtx_ ); - que.push( std::move( handler ) ); + que.push( handler.release() ); if (num_waiting_) cond_.notify_one(); } else { - que.push( std::move( handler ) ); + que.push( handler.release() ); } } + // called from appbase::application_base::exec poll_one() or run_one() + template + void add(handler_id id, int priority, exec_queue q, size_t order, Function&& function) { + assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive); + if (id == handler_id::unique) { + return add(priority, q, order, std::forward(function)); + } + prio_queue& que = priority_que(q); + std::unique_lock g( mtx_, std::defer_lock ); + if (lock_enabled_ || q == exec_queue::read_exclusive) { + // called directly from any thread for read_exclusive + g.lock(); + } + if (!que.empty()) { + // find the associated priority + auto i = std::lower_bound(que.ordered_begin(), que.ordered_end(), priority, [](const auto& h, int priority) { + return h->priority() < priority; + }); + if (i != que.ordered_end() && (*i)->priority() == priority && (*i)->id() == id) + return; + } + que.push( new queued_handler(id, priority, order, std::forward(function)) ); + if (g.owns_lock() && num_waiting_) + cond_.notify_one(); + } + // only call when no lock required void clear() { read_only_handlers_ = prio_queue(); @@ -158,8 +198,8 @@ class exec_pri_queue : public boost::asio::execution_context class executor { public: - executor(exec_pri_queue& q, int p, size_t o, exec_queue que) - : context_(q), que_(que), priority_(p), order_(o) + executor(exec_pri_queue& q, handler_id id, int p, size_t o, exec_queue que) + : context_(q), que_(que), id_(id), priority_(p), order_(o) { } @@ -171,19 +211,19 @@ class exec_pri_queue : public boost::asio::execution_context template void dispatch(Function f, const Allocator&) const { - context_.add(priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, std::move(f)); } template void post(Function f, const Allocator&) const { - context_.add(priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, std::move(f)); } template void defer(Function f, const Allocator&) const { - context_.add(priority_, que_, order_, std::move(f)); + context_.add(id_, priority_, que_, order_, std::move(f)); } void on_work_started() const noexcept {} @@ -202,23 +242,32 @@ class exec_pri_queue : public boost::asio::execution_context private: exec_pri_queue& context_; exec_queue que_; + handler_id id_; int priority_; size_t order_; }; + template + boost::asio::executor_binder + wrap(handler_id id, int priority, exec_queue q, size_t order, Function&& func) + { + return boost::asio::bind_executor( executor(*this, id, priority, order, q), std::forward(func) ); + } + template boost::asio::executor_binder wrap(int priority, exec_queue q, size_t order, Function&& func) { - return boost::asio::bind_executor( executor(*this, priority, order, q), std::forward(func) ); + return boost::asio::bind_executor( executor(*this, handler_id::unique, priority, order, q), std::forward(func) ); } private: class queued_handler_base { public: - queued_handler_base( int p, size_t order ) - : priority_( p ) + queued_handler_base( handler_id id, int p, size_t order ) + : id_( id ) + , priority_( p ) , order_( order ) { } @@ -227,27 +276,26 @@ class exec_pri_queue : public boost::asio::execution_context virtual void execute() = 0; + handler_id id() const { return id_; } int priority() const { return priority_; } - // C++20 - // friend std::weak_ordering operator<=>(const queued_handler_base&, - // const queued_handler_base&) noexcept = default; - friend bool operator<(const queued_handler_base& a, - const queued_handler_base& b) noexcept - { + + friend bool operator<(const queued_handler_base& a, const queued_handler_base& b) noexcept { + // exclude id_ return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ ); } private: - int priority_; - size_t order_; + handler_id id_; // unique identifier of handler + int priority_; // priority of handler, see application_base priority + size_t order_; // maintain order within priority grouping }; template class queued_handler : public queued_handler_base { public: - queued_handler(int p, size_t order, Function f) - : queued_handler_base( p, order ) + queued_handler(handler_id id, int p, size_t order, Function f) + : queued_handler_base( id, p, order ) , function_( std::move(f) ) { } @@ -264,13 +312,13 @@ class exec_pri_queue : public boost::asio::execution_context struct deref_less { template - bool operator()(const Pointer& a, const Pointer& b) noexcept(noexcept(*a < *b)) + bool operator()(const Pointer& a, const Pointer& b) const noexcept(noexcept(*a < *b)) { return *a < *b; } }; - using prio_queue = std::priority_queue, std::deque>, deref_less>; + using prio_queue = boost::heap::binomial_heap>; prio_queue& priority_que(exec_queue q) { switch (q) { @@ -299,12 +347,18 @@ class exec_pri_queue : public boost::asio::execution_context } static std::unique_ptr pop(prio_queue& que) { - // work around std::priority_queue not having a pop() that returns value - auto t = std::move(const_cast&>(que.top())); + // work around priority_queue not having a pop() that returns value + // take back ownership of pointer + auto t = std::unique_ptr(que.top()); que.pop(); return t; } + void clear(prio_queue& que) { + while (!que.empty()) + pop(que); + } + size_t num_read_threads_ = 0; bool lock_enabled_ = false; mutable std::mutex mtx_; diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index f62f10b0a5..2a5201489f 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -125,6 +125,98 @@ BOOST_AUTO_TEST_CASE( default_exec_window ) { BOOST_CHECK_LT( rslts[6], rslts[7] ); } +BOOST_AUTO_TEST_CASE( exec_with_unique_handler_id ) { + scoped_app_thread app(true); + + // post functions + std::map rslts {}; + int seq_num = 0; + app->executor().post( handler_id::unique, priority::medium, exec_queue::read_only, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::high, exec_queue::read_only, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::low, exec_queue::read_only, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::highest,exec_queue::read_only, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( handler_id::unique, priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + + // Stop app. Use the lowest priority to make sure this function to execute the last + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { + // read_only_queue should only contain the current lambda function, + // and read_write_queue should have executed all its functions + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + app->quit(); + } ); + + app.start_exec(); + app.join(); + + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); + + // exactly number of both queues' functions processed + BOOST_REQUIRE_EQUAL( rslts.size(), 8u ); + + // same priority of functions executed by the post order + BOOST_CHECK_LT( rslts[0], rslts[1] ); // medium + BOOST_CHECK_LT( rslts[2], rslts[3] ); // high + BOOST_CHECK_LT( rslts[3], rslts[7] ); // high + BOOST_CHECK_LT( rslts[4], rslts[5] ); // low + + // higher priority posted earlier executed earlier + BOOST_CHECK_LT( rslts[3], rslts[4] ); + BOOST_CHECK_LT( rslts[6], rslts[7] ); +} + +BOOST_AUTO_TEST_CASE( exec_with_handler_id ) { + scoped_app_thread app(true); + + // post functions + std::map rslts {}; + int seq_num = 0; + auto id = handler_id::process_incoming_block; + auto un = handler_id::unique; + app->executor().post( id, priority::medium, exec_queue::read_write, [&]() { rslts[0]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::medium, exec_queue::read_write, [&]() { rslts[1]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[2]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[3]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::low, exec_queue::read_write, [&]() { rslts[4]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::low, exec_queue::read_write, [&]() { rslts[5]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::highest,exec_queue::read_write, [&]() { rslts[6]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); + app->executor().post( un, priority::high, exec_queue::read_write, [&]() { rslts[8]=seq_num; ++seq_num; } ); + + // Stop app. Use the lowest priority to make sure this function to execute the last + app->executor().post( priority::lowest, exec_queue::read_write, [&]() { + // read_only_queue should only contain the current lambda function, + // and read_write_queue should have executed all its functions + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + app->quit(); + } ); + + app.start_exec(); + app.join(); + + // all queues are cleared when exiting application::exec() + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_empty(), true); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); + + // does not post if one already exists at the same priority + BOOST_REQUIRE_EQUAL( rslts.size(), 5u ); + + BOOST_TEST(!rslts.contains(1)); // not added to execute + BOOST_TEST(!rslts.contains(3)); // not added to execute + BOOST_TEST(!rslts.contains(5)); // not added to execute + BOOST_TEST(!rslts.contains(7)); // not added to execute +} + // verify functions only from read_only queue are processed during read window on the main thread BOOST_AUTO_TEST_CASE( execute_from_read_only_queue ) { scoped_app_thread app; From a81dd0f2391331ba95ac3308b214182a61668fd7 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Sep 2024 14:23:01 -0500 Subject: [PATCH 02/11] GH-773 Use non-unique handler id to allow de-dup of posted functions --- plugins/net_plugin/net_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 8c23682cb3..a558126c9e 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -3754,7 +3754,7 @@ namespace eosio { if (best_head) { ++c->unique_blocks_rcvd_count; fc_dlog(logger, "posting incoming_block to app thread, block ${n}", ("n", ptr->block_num())); - app().executor().post(priority::medium, exec_queue::read_write, + app().executor().post(handler_id::process_incoming_block, priority::medium, exec_queue::read_write, []() { try { my_impl->producer_plug->on_incoming_block(); From 10673fc68b35f3cb5e38b0a1d90f0c32dfd2f0ac Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Sep 2024 14:40:34 -0500 Subject: [PATCH 03/11] GH-529 Add Boost::heap --- libraries/custom_appbase/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/custom_appbase/CMakeLists.txt b/libraries/custom_appbase/CMakeLists.txt index d936c95c91..bbc48702fb 100644 --- a/libraries/custom_appbase/CMakeLists.txt +++ b/libraries/custom_appbase/CMakeLists.txt @@ -1,5 +1,5 @@ add_library(custom_appbase INTERFACE) target_include_directories(custom_appbase INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include") -target_link_libraries(custom_appbase INTERFACE appbase) +target_link_libraries(custom_appbase INTERFACE appbase Boost::heap) add_subdirectory(tests) From 2cfe33220548f34175c915c84176d5c2470d9390 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Sep 2024 14:50:35 -0500 Subject: [PATCH 04/11] GH-529 Add Boost::heap --- libraries/custom_appbase/tests/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/custom_appbase/tests/CMakeLists.txt b/libraries/custom_appbase/tests/CMakeLists.txt index e0837f4896..95310ce4b3 100644 --- a/libraries/custom_appbase/tests/CMakeLists.txt +++ b/libraries/custom_appbase/tests/CMakeLists.txt @@ -7,7 +7,7 @@ endif() file(GLOB UNIT_TESTS "*.cpp") add_executable( custom_appbase_test ${UNIT_TESTS} ) -target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) +target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework Boost::heap ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} ) target_include_directories( custom_appbase_test PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../../appbase/include" ) add_test( custom_appbase_test custom_appbase_test ) From 64593d92487a67ace2625393f93d10e54a306979 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 18 Sep 2024 15:21:21 -0500 Subject: [PATCH 05/11] GH-529 priority is in reverse order --- libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index b9e74f1b54..0101339029 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -107,7 +107,7 @@ class exec_pri_queue : public boost::asio::execution_context if (!que.empty()) { // find the associated priority auto i = std::lower_bound(que.ordered_begin(), que.ordered_end(), priority, [](const auto& h, int priority) { - return h->priority() < priority; + return h->priority() > priority; }); if (i != que.ordered_end() && (*i)->priority() == priority && (*i)->id() == id) return; From cf8d7c12d680ef00f8d15a4e4a68d0129980e740 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 20 Sep 2024 17:55:57 -0500 Subject: [PATCH 06/11] GH-529 De-dup non-unique on the last inserted item of the same priority --- .../include/eosio/chain/exec_pri_queue.hpp | 15 ++++++++++++--- .../custom_appbase/tests/custom_appbase_tests.cpp | 8 +++++++- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 0101339029..922091db99 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -106,11 +106,20 @@ class exec_pri_queue : public boost::asio::execution_context } if (!que.empty()) { // find the associated priority - auto i = std::lower_bound(que.ordered_begin(), que.ordered_end(), priority, [](const auto& h, int priority) { + auto end = que.ordered_end(); + auto i = std::lower_bound(que.ordered_begin(), end, priority, [](const auto& h, int priority) { return h->priority() > priority; }); - if (i != que.ordered_end() && (*i)->priority() == priority && (*i)->id() == id) - return; + if (i != end) { + // ordered iterator appears to only be a forward iterator + auto p = i; + for (; i != end; p = i, ++i) { + if ((*i)->priority() != priority) + break; + } + if ((*p)->priority() == priority && (*p)->id() == id) + return; + } } que.push( new queued_handler(id, priority, order, std::forward(function)) ); if (g.owns_lock() && num_waiting_) diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 2a5201489f..12b77de93b 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -189,6 +189,9 @@ BOOST_AUTO_TEST_CASE( exec_with_handler_id ) { app->executor().post( id, priority::highest,exec_queue::read_write, [&]() { rslts[6]=seq_num; ++seq_num; } ); app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[7]=seq_num; ++seq_num; } ); app->executor().post( un, priority::high, exec_queue::read_write, [&]() { rslts[8]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[9]=seq_num; ++seq_num; } ); + app->executor().post( un, priority::high, exec_queue::read_write, [&]() { rslts[10]=seq_num; ++seq_num; } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { rslts[11]=seq_num; ++seq_num; } ); // Stop app. Use the lowest priority to make sure this function to execute the last app->executor().post( priority::lowest, exec_queue::read_write, [&]() { @@ -209,12 +212,15 @@ BOOST_AUTO_TEST_CASE( exec_with_handler_id ) { BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // does not post if one already exists at the same priority - BOOST_REQUIRE_EQUAL( rslts.size(), 5u ); + BOOST_TEST( rslts.size() == 8u ); BOOST_TEST(!rslts.contains(1)); // not added to execute BOOST_TEST(!rslts.contains(3)); // not added to execute BOOST_TEST(!rslts.contains(5)); // not added to execute BOOST_TEST(!rslts.contains(7)); // not added to execute + BOOST_TEST(rslts.contains(9)); + BOOST_TEST(rslts.contains(10)); + BOOST_TEST(rslts.contains(11)); } // verify functions only from read_only queue are processed during read window on the main thread From acaa23c0c6c0db12791ebd71519bce0cac200faf Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 20 Sep 2024 17:58:45 -0500 Subject: [PATCH 07/11] GH-529 Add additional test cases and produce some more blocks before syncing --- tests/nodeos_startup_catchup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/nodeos_startup_catchup.py b/tests/nodeos_startup_catchup.py index 760715f3e6..9d63d91589 100755 --- a/tests/nodeos_startup_catchup.py +++ b/tests/nodeos_startup_catchup.py @@ -62,7 +62,7 @@ specificExtraNodeosArgs[pnodes+2] = f' --sync-fetch-span 5 ' specificExtraNodeosArgs[pnodes+3] = f' --sync-fetch-span 21 ' specificExtraNodeosArgs[pnodes+4] = f' --sync-fetch-span 89 ' - specificExtraNodeosArgs[pnodes+5] = f' --sync-fetch-span 377 ' + specificExtraNodeosArgs[pnodes+5] = f' --sync-fetch-span 177 ' specificExtraNodeosArgs[pnodes+6] = f' --sync-fetch-span 1597 ' specificExtraNodeosArgs[pnodes+7] = f' --sync-fetch-span 2500 ' specificExtraNodeosArgs[pnodes+8] = f' --sync-fetch-span 6765 ' @@ -71,7 +71,7 @@ specificExtraNodeosArgs[pnodes+11] = f' --sync-fetch-span 1 --read-mode irreversible ' specificExtraNodeosArgs[pnodes+12] = f' --sync-fetch-span 5 --read-mode irreversible ' specificExtraNodeosArgs[pnodes+13] = f' --sync-fetch-span 89 --read-mode irreversible ' - specificExtraNodeosArgs[pnodes+14] = f' --sync-fetch-span 200 --read-mode irreversible ' + specificExtraNodeosArgs[pnodes+14] = f' --sync-fetch-span 150 --read-mode irreversible ' specificExtraNodeosArgs[pnodes+15] = f' --sync-fetch-span 2500 --read-mode irreversible ' if cluster.launch(prodCount=prodCount, specificExtraNodeosArgs=specificExtraNodeosArgs, activateIF=activateIF, onlyBios=False, pnodes=pnodes, totalNodes=totalNodes, totalProducers=pnodes*prodCount, unstartedNodes=catchupCount, @@ -135,9 +135,9 @@ def waitForNodeStarted(node): transactionsPerBlock=targetTpsPerGenerator*trxGeneratorCnt*timePerBlock/1000 steadyStateWait=20 startBlockNum=blockNum+steadyStateWait - numBlocks=20 + numBlocks=200 endBlockNum=startBlockNum+numBlocks - waitForBlock(node0, endBlockNum) + waitForBlock(node0, endBlockNum, timeout=numBlocks) steadyStateWindowTrxs=0 steadyStateAvg=0 steadyStateWindowBlks=0 From d2cda13264e03bd4dade139ce956d63cb93fa371 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Fri, 20 Sep 2024 17:59:19 -0500 Subject: [PATCH 08/11] GH-529 Do not attempt to sync beyond the known lib as it causes net_plugin to disconnect --- plugins/net_plugin/net_plugin.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index a97bce9563..bef9e2ee23 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2093,7 +2093,7 @@ namespace eosio { // Use last received number instead so when end of range is reached we check the IRREVERSIBLE conditions below. blk_num = sync_next_expected_num-1; } - if (blk_num >= sync_last_requested_num) { + if (blk_num >= sync_last_requested_num && sync_last_requested_num < sync_known_lib_num) { // do not allow to get too far ahead (sync_fetch_span) of chain head // use chain head instead of fork head so we do not get too far ahead of applied blocks uint32_t head_num = my_impl->get_chain_head_num(); @@ -2684,7 +2684,7 @@ namespace eosio { } void dispatch_manager::rejected_block(const block_id_type& id) { - fc_dlog( logger, "rejected block ${id}", ("id", id) ); + fc_dlog( logger, "rejected block ${bn} ${id}", ("bn", block_header::num_from_id(id))("id", id) ); } // called from any thread From 8f202201ca02656f81f02d73eb297f78126e1225 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Sat, 21 Sep 2024 09:34:31 -0500 Subject: [PATCH 09/11] GH-529 Add a perf test to compare boost heap implementations --- .../include/eosio/chain/exec_pri_queue.hpp | 2 +- .../tests/custom_appbase_tests.cpp | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 922091db99..0a9f7d326e 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -309,7 +309,7 @@ class exec_pri_queue : public boost::asio::execution_context { } - void execute() override + void execute() final { function_(); } diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index 12b77de93b..ab12697859 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -172,6 +172,37 @@ BOOST_AUTO_TEST_CASE( exec_with_unique_handler_id ) { BOOST_CHECK_LT( rslts[6], rslts[7] ); } +#if 0 // benchmarking, of all the boost::heap implementations, binomial_heap was the clear winner for this test case +BOOST_AUTO_TEST_CASE( exec_perf ) { + scoped_app_thread app; + + // post functions + constexpr size_t num = 4000; + auto start = fc::time_point::now(); + auto id = handler_id::process_incoming_block; + auto un = handler_id::unique; + for (size_t i = 0; i < num; i++) { + app->executor().post( un, priority::low, exec_queue::read_write, [&]() { std::this_thread::sleep_for( std::chrono::microseconds(100) ); } ); + app->executor().post( id, priority::high, exec_queue::read_write, [&]() { std::this_thread::sleep_for( std::chrono::milliseconds(50) ); } ); + } + + // Stop app. Use the lowest priority to make sure this function to execute the last + app->executor().post( priority::lowest, exec_queue::read_only, [&]() { + // read_only_queue should only contain the current lambda function, + // and read_write_queue should have executed all its functions + BOOST_REQUIRE_EQUAL( app->executor().read_only_queue_size(), 0u); // pop()s before execute + BOOST_REQUIRE_EQUAL( app->executor().read_exclusive_queue_size(), 0u ); + BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_size(), 0u ); + app->quit(); + } ); + + app.join(); + + fc::microseconds elapsed = fc::time_point::now() - start; + std::cout << "time: " << elapsed.count() << " us" << std::endl; +} +#endif + BOOST_AUTO_TEST_CASE( exec_with_handler_id ) { scoped_app_thread app(true); From 4763f13edacebf9438af2316bd719892dfeba88f Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Tue, 24 Sep 2024 15:54:50 -0500 Subject: [PATCH 10/11] GH-529 Add some comments --- libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 0a9f7d326e..07f220867e 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -112,11 +112,13 @@ class exec_pri_queue : public boost::asio::execution_context }); if (i != end) { // ordered iterator appears to only be a forward iterator + // find last posted handler with same priority auto p = i; for (; i != end; p = i, ++i) { if ((*i)->priority() != priority) break; } + // if last posted handler with the same priority is same id then do not post it if ((*p)->priority() == priority && (*p)->id() == id) return; } From fd21a457f8d3c5071c3261d74c6f89c31f1af7b6 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Wed, 25 Sep 2024 07:16:13 -0500 Subject: [PATCH 11/11] GH-529 If an existing handler with the same priority exists for a non-unique id then do not post it --- .../include/eosio/chain/exec_pri_queue.hpp | 14 ++++--------- .../tests/custom_appbase_tests.cpp | 20 +++++++++++-------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp index 07f220867e..39b2a31b20 100644 --- a/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp +++ b/libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp @@ -110,16 +110,10 @@ class exec_pri_queue : public boost::asio::execution_context auto i = std::lower_bound(que.ordered_begin(), end, priority, [](const auto& h, int priority) { return h->priority() > priority; }); - if (i != end) { - // ordered iterator appears to only be a forward iterator - // find last posted handler with same priority - auto p = i; - for (; i != end; p = i, ++i) { - if ((*i)->priority() != priority) - break; - } - // if last posted handler with the same priority is same id then do not post it - if ((*p)->priority() == priority && (*p)->id() == id) + // boost::heap ordered iterator is a forward iterator + // if an existing handler with the id exists within the same priority then do not post + for (; i != end && (*i)->priority() == priority; ++i) { + if ((*i)->id() == id) return; } } diff --git a/libraries/custom_appbase/tests/custom_appbase_tests.cpp b/libraries/custom_appbase/tests/custom_appbase_tests.cpp index ab12697859..e76c05cf49 100644 --- a/libraries/custom_appbase/tests/custom_appbase_tests.cpp +++ b/libraries/custom_appbase/tests/custom_appbase_tests.cpp @@ -243,15 +243,19 @@ BOOST_AUTO_TEST_CASE( exec_with_handler_id ) { BOOST_REQUIRE_EQUAL( app->executor().read_write_queue_empty(), true); // does not post if one already exists at the same priority - BOOST_TEST( rslts.size() == 8u ); - - BOOST_TEST(!rslts.contains(1)); // not added to execute - BOOST_TEST(!rslts.contains(3)); // not added to execute - BOOST_TEST(!rslts.contains(5)); // not added to execute - BOOST_TEST(!rslts.contains(7)); // not added to execute - BOOST_TEST(rslts.contains(9)); + BOOST_TEST( rslts.size() == 6u ); + + BOOST_TEST(!rslts.contains(1)); + BOOST_TEST(rslts.contains(2)); + BOOST_TEST(!rslts.contains(3)); + BOOST_TEST(rslts.contains(4)); + BOOST_TEST(!rslts.contains(5)); + BOOST_TEST(rslts.contains(6)); + BOOST_TEST(!rslts.contains(7)); + BOOST_TEST(rslts.contains(8)); + BOOST_TEST(!rslts.contains(9)); BOOST_TEST(rslts.contains(10)); - BOOST_TEST(rslts.contains(11)); + BOOST_TEST(!rslts.contains(11)); } // verify functions only from read_only queue are processed during read window on the main thread