Skip to content

Commit

Permalink
Merge pull request #802 from AntelopeIO/GH-529-optimize-block-process…
Browse files Browse the repository at this point in the history
…ing-part3

Optimize block processing - Part 3
  • Loading branch information
heifner authored Sep 26, 2024
2 parents 6520b77 + a57af4b commit d589ea5
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 36 deletions.
2 changes: 1 addition & 1 deletion libraries/custom_appbase/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_library(custom_appbase INTERFACE)
target_include_directories(custom_appbase INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include")
target_link_libraries(custom_appbase INTERFACE appbase)
target_link_libraries(custom_appbase INTERFACE appbase Boost::heap)

add_subdirectory(tests)
12 changes: 12 additions & 0 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ class priority_queue_executor {
return main_thread_id_;
}

template <typename Func>
void post( handler_id id, int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_service which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(id, priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_service as the main thread may be blocked on io_service.run_one() in application::exec()
boost::asio::post(io_serv_, pri_queue_.wrap(id, priority, q, --order_, std::forward<Func>(func)));
}
}

template <typename Func>
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
Expand Down
117 changes: 88 additions & 29 deletions libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/heap/binomial_heap.hpp>

#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>

namespace appbase {
// adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp

// Indicate non-unique handlers. If an existing handler at the specified priority already exists then there is
// no reason to insert a new handler to be processed.
//
// Add entries for each new non-unique handler type.
enum class handler_id {
unique, // identifies handler is unique, will not de-dup
process_incoming_block // process blocks already added to forkdb
};

enum class exec_queue {
read_only, // the queue storing tasks which are safe to execute
// in parallel with other read-only & read_exclusive tasks in the read-only
Expand All @@ -30,6 +39,12 @@ class exec_pri_queue : public boost::asio::execution_context
{
public:

~exec_pri_queue() {
clear(read_only_handlers_);
clear(read_write_handlers_);
clear(read_exclusive_handlers_);
}

// inform how many read_threads will be calling read_only/read_exclusive queues
// expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_
void init_read_threads(size_t num_read_threads) {
Expand Down Expand Up @@ -61,20 +76,50 @@ class exec_pri_queue : public boost::asio::execution_context
should_exit_ = [](){ assert(false); return true; }; // should not be called when locking is disabled
}

// called from appbase::application_base::exec poll_one() or run_one()
template <typename Function>
void add(int priority, exec_queue q, size_t order, Function function) {
void add(int priority, exec_queue q, size_t order, Function&& function) {
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
prio_queue& que = priority_que(q);
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(priority, order, std::move(function)));
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(handler_id::unique, priority, order, std::forward<Function>(function)));
if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive
std::lock_guard g( mtx_ );
que.push( std::move( handler ) );
que.push( handler.release() );
if (num_waiting_)
cond_.notify_one();
} else {
que.push( std::move( handler ) );
que.push( handler.release() );
}
}

// called from appbase::application_base::exec poll_one() or run_one()
template <typename Function>
void add(handler_id id, int priority, exec_queue q, size_t order, Function&& function) {
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
if (id == handler_id::unique) {
return add(priority, q, order, std::forward<Function>(function));
}
prio_queue& que = priority_que(q);
std::unique_lock g( mtx_, std::defer_lock );
if (lock_enabled_ || q == exec_queue::read_exclusive) {
// called directly from any thread for read_exclusive
g.lock();
}
if (!que.empty()) {
// find the associated priority
auto end = que.ordered_end();
auto i = std::lower_bound(que.ordered_begin(), end, priority, [](const auto& h, int priority) {
return h->priority() > priority;
});
// boost::heap ordered iterator is a forward iterator
// if an existing handler with the id exists within the same priority then do not post
for (; i != end && (*i)->priority() == priority; ++i) {
if ((*i)->id() == id)
return;
}
}
que.push( new queued_handler<Function>(id, priority, order, std::forward<Function>(function)) );
if (g.owns_lock() && num_waiting_)
cond_.notify_one();
}

// only call when no lock required
Expand Down Expand Up @@ -158,8 +203,8 @@ class exec_pri_queue : public boost::asio::execution_context
class executor
{
public:
executor(exec_pri_queue& q, int p, size_t o, exec_queue que)
: context_(q), que_(que), priority_(p), order_(o)
executor(exec_pri_queue& q, handler_id id, int p, size_t o, exec_queue que)
: context_(q), que_(que), id_(id), priority_(p), order_(o)
{
}

Expand All @@ -171,19 +216,19 @@ class exec_pri_queue : public boost::asio::execution_context
template <typename Function, typename Allocator>
void dispatch(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

template <typename Function, typename Allocator>
void post(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

template <typename Function, typename Allocator>
void defer(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

void on_work_started() const noexcept {}
Expand All @@ -202,23 +247,32 @@ class exec_pri_queue : public boost::asio::execution_context
private:
exec_pri_queue& context_;
exec_queue que_;
handler_id id_;
int priority_;
size_t order_;
};

template <typename Function>
boost::asio::executor_binder<Function, executor>
wrap(handler_id id, int priority, exec_queue q, size_t order, Function&& func)
{
return boost::asio::bind_executor( executor(*this, id, priority, order, q), std::forward<Function>(func) );
}

template <typename Function>
boost::asio::executor_binder<Function, executor>
wrap(int priority, exec_queue q, size_t order, Function&& func)
{
return boost::asio::bind_executor( executor(*this, priority, order, q), std::forward<Function>(func) );
return boost::asio::bind_executor( executor(*this, handler_id::unique, priority, order, q), std::forward<Function>(func) );
}

private:
class queued_handler_base
{
public:
queued_handler_base( int p, size_t order )
: priority_( p )
queued_handler_base( handler_id id, int p, size_t order )
: id_( id )
, priority_( p )
, order_( order )
{
}
Expand All @@ -227,32 +281,31 @@ class exec_pri_queue : public boost::asio::execution_context

virtual void execute() = 0;

handler_id id() const { return id_; }
int priority() const { return priority_; }
// C++20
// friend std::weak_ordering operator<=>(const queued_handler_base&,
// const queued_handler_base&) noexcept = default;
friend bool operator<(const queued_handler_base& a,
const queued_handler_base& b) noexcept
{

friend bool operator<(const queued_handler_base& a, const queued_handler_base& b) noexcept {
// exclude id_
return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ );
}

private:
int priority_;
size_t order_;
handler_id id_; // unique identifier of handler
int priority_; // priority of handler, see application_base priority
size_t order_; // maintain order within priority grouping
};

template <typename Function>
class queued_handler : public queued_handler_base
{
public:
queued_handler(int p, size_t order, Function f)
: queued_handler_base( p, order )
queued_handler(handler_id id, int p, size_t order, Function f)
: queued_handler_base( id, p, order )
, function_( std::move(f) )
{
}

void execute() override
void execute() final
{
function_();
}
Expand All @@ -264,13 +317,13 @@ class exec_pri_queue : public boost::asio::execution_context
struct deref_less
{
template<typename Pointer>
bool operator()(const Pointer& a, const Pointer& b) noexcept(noexcept(*a < *b))
bool operator()(const Pointer& a, const Pointer& b) const noexcept(noexcept(*a < *b))
{
return *a < *b;
}
};

using prio_queue = std::priority_queue<std::unique_ptr<queued_handler_base>, std::deque<std::unique_ptr<queued_handler_base>>, deref_less>;
using prio_queue = boost::heap::binomial_heap<queued_handler_base*, boost::heap::compare<deref_less>>;

prio_queue& priority_que(exec_queue q) {
switch (q) {
Expand Down Expand Up @@ -299,12 +352,18 @@ class exec_pri_queue : public boost::asio::execution_context
}

static std::unique_ptr<exec_pri_queue::queued_handler_base> pop(prio_queue& que) {
// work around std::priority_queue not having a pop() that returns value
auto t = std::move(const_cast<std::unique_ptr<queued_handler_base>&>(que.top()));
// work around priority_queue not having a pop() that returns value
// take back ownership of pointer
auto t = std::unique_ptr<queued_handler_base>(que.top());
que.pop();
return t;
}

void clear(prio_queue& que) {
while (!que.empty())
pop(que);
}

size_t num_read_threads_ = 0;
bool lock_enabled_ = false;
mutable std::mutex mtx_;
Expand Down
2 changes: 1 addition & 1 deletion libraries/custom_appbase/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ endif()

file(GLOB UNIT_TESTS "*.cpp")
add_executable( custom_appbase_test ${UNIT_TESTS} )
target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework Boost::heap ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
target_include_directories( custom_appbase_test PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../../appbase/include" )

add_test( custom_appbase_test custom_appbase_test )
Loading

0 comments on commit d589ea5

Please sign in to comment.