Skip to content

Commit

Permalink
a different workaround for GCC10 coro ICE
Browse files Browse the repository at this point in the history
  • Loading branch information
spoonincode committed May 10, 2024
1 parent 87d59b3 commit 63aa492
Showing 1 changed file with 68 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,33 +119,6 @@ class session final : public session_base {
drop_exceptions([this](){ awake_if_idle(); });
}

//this reads better as a lambda directly inside of read_loop(), but gcc10.x ICEs on capturing this inside a coro
boost::asio::awaitable<void> read_loop_main_thread(const state_request& req) {
std::visit(chain::overloaded {
[this]<typename GetStatusRequestV0orV1, typename = std::enable_if_t<std::is_base_of_v<get_status_request_v0, GetStatusRequestV0orV1>>>(const GetStatusRequestV0orV1&) {
queued_status_requests.emplace_back(std::is_same_v<GetStatusRequestV0orV1, get_status_request_v1>);
},
[this]<typename GetBlocksRequestV0orV1, typename = std::enable_if_t<std::is_base_of_v<get_blocks_request_v0, GetBlocksRequestV0orV1>>>(const GetBlocksRequestV0orV1& gbr) {
current_blocks_request_v1_finality.reset();
current_blocks_request = gbr;
if constexpr(std::is_same_v<GetBlocksRequestV0orV1, get_blocks_request_v1>)
current_blocks_request_v1_finality = gbr.fetch_finality_data;

for(const block_position& haveit : current_blocks_request.have_positions) {
if(current_blocks_request.start_block_num <= haveit.block_num)
continue;
if(const std::optional<chain::block_id_type> id = get_block_id(haveit.block_num); !id || *id != haveit.block_id)
current_blocks_request.start_block_num = std::min(current_blocks_request.start_block_num, haveit.block_num);
}
current_blocks_request.have_positions.clear();
},
[this](const get_blocks_ack_request_v0& gbar0) {
send_credits += gbar0.num_messages;
}
}, req);
co_return;
}

boost::asio::awaitable<void> read_loop() {
co_await readwrite_coro_exception_wrapper([this]() -> boost::asio::awaitable<void> {
wake_timer.expires_at(std::chrono::steady_clock::time_point::max());
Expand All @@ -166,7 +139,32 @@ class session final : public session_base {
const state_request req = fc::raw::unpack<std::remove_const_t<decltype(req)>>(static_cast<const char*>(b.cdata().data()), b.size());

//TODO: how can set main thread priority on this?
co_await boost::asio::co_spawn(app().get_io_service(), read_loop_main_thread(req), boost::asio::use_awaitable);
auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro
co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<void> {
std::visit(chain::overloaded {
[&self]<typename GetStatusRequestV0orV1, typename = std::enable_if_t<std::is_base_of_v<get_status_request_v0, GetStatusRequestV0orV1>>>(const GetStatusRequestV0orV1&) {
self.queued_status_requests.emplace_back(std::is_same_v<GetStatusRequestV0orV1, get_status_request_v1>);
},
[&self]<typename GetBlocksRequestV0orV1, typename = std::enable_if_t<std::is_base_of_v<get_blocks_request_v0, GetBlocksRequestV0orV1>>>(const GetBlocksRequestV0orV1& gbr) {
self.current_blocks_request_v1_finality.reset();
self.current_blocks_request = gbr;
if constexpr(std::is_same_v<GetBlocksRequestV0orV1, get_blocks_request_v1>)
self.current_blocks_request_v1_finality = gbr.fetch_finality_data;

for(const block_position& haveit : self.current_blocks_request.have_positions) {
if(self.current_blocks_request.start_block_num <= haveit.block_num)
continue;
if(const std::optional<chain::block_id_type> id = self.get_block_id(haveit.block_num); !id || *id != haveit.block_id)
self.current_blocks_request.start_block_num = std::min(self.current_blocks_request.start_block_num, haveit.block_num);
}
self.current_blocks_request.have_positions.clear();
},
[&self](const get_blocks_ack_request_v0& gbar0) {
self.send_credits += gbar0.num_messages;
}
}, req);
co_return;
}, boost::asio::use_awaitable);

awake_if_idle();
}
Expand Down Expand Up @@ -216,53 +214,17 @@ class session final : public session_base {
}
}

struct block_package {
get_blocks_result_base blocks_result_base;
bool is_v1_request = false;
chain::block_num_type this_block_num = 0; //this shouldn't be needed post log de-mutexing
std::optional<locked_decompress_stream> trace_stream;
std::optional<locked_decompress_stream> state_stream;
std::optional<locked_decompress_stream> finality_stream;
};

//this reads better as a lambda directly inside of write_loop(), but gcc10.x ICEs on capturing this inside a coro
boost::asio::awaitable<void> write_loop_main_thread(std::deque<bool>& status_requests, std::optional<block_package>& block_to_send) {
status_requests = std::move(queued_status_requests);

//decide what block -- if any -- to send out
const chain::block_num_type latest_to_consider = current_blocks_request.irreversible_only ?
controller.last_irreversible_block_num() : controller.head_block_num();
if(send_credits && next_block_cursor <= latest_to_consider && next_block_cursor < current_blocks_request.end_block_num) {
block_to_send.emplace( block_package{
.blocks_result_base = {
.head = {controller.head_block_num(), controller.head_block_id()},
.last_irreversible = {controller.last_irreversible_block_num(), controller.last_irreversible_block_id()}
},
.is_v1_request = current_blocks_request_v1_finality.has_value(),
.this_block_num = next_block_cursor
});
if(const std::optional<chain::block_id_type> this_block_id = get_block_id(next_block_cursor)) {
block_to_send->blocks_result_base.this_block = {current_blocks_request.start_block_num, *this_block_id};
if(const std::optional<chain::block_id_type> last_block_id = get_block_id(next_block_cursor - 1))
block_to_send->blocks_result_base.prev_block = {next_block_cursor - 1, *last_block_id};
if(chain::signed_block_ptr sbp = get_block(next_block_cursor); sbp && current_blocks_request.fetch_block)
block_to_send->blocks_result_base.block = fc::raw::pack(*sbp);
if(current_blocks_request.fetch_traces && trace_log)
block_to_send->trace_stream.emplace(trace_log->create_locked_decompress_stream());
if(current_blocks_request.fetch_deltas && chain_state_log)
block_to_send->state_stream.emplace(chain_state_log->create_locked_decompress_stream());
if(block_to_send->is_v1_request && *current_blocks_request_v1_finality && finality_data_log)
block_to_send->finality_stream.emplace(finality_data_log->create_locked_decompress_stream());
}
++next_block_cursor;
--send_credits;
}
co_return;
}

boost::asio::awaitable<void> write_loop() {
co_await readwrite_coro_exception_wrapper([this]() -> boost::asio::awaitable<void> {
get_status_result_v1 current_status_result;
struct block_package {
get_blocks_result_base blocks_result_base;
bool is_v1_request = false;
chain::block_num_type this_block_num = 0; //this shouldn't be needed post log de-mutexing
std::optional<locked_decompress_stream> trace_stream;
std::optional<locked_decompress_stream> state_stream;
std::optional<locked_decompress_stream> finality_stream;
};

while(true) {
if(!stream.is_open())
Expand All @@ -271,9 +233,41 @@ class session final : public session_base {
std::deque<bool> status_requests;
std::optional<block_package> block_to_send;

//write_loop_main_thread() will populate status_requests and block_to_send with what to send this for this iteration of the loop
///TODO: How to set main thread priority?
co_await boost::asio::co_spawn(app().get_io_service(), write_loop_main_thread(status_requests, block_to_send), boost::asio::use_awaitable);
auto& self = *this; //gcc10 ICE workaround wrt capturing 'this' in a coro
co_await boost::asio::co_spawn(app().get_io_service(), [&]() -> boost::asio::awaitable<void> {
status_requests = std::move(self.queued_status_requests);

//decide what block -- if any -- to send out
const chain::block_num_type latest_to_consider = self.current_blocks_request.irreversible_only ?
self.controller.last_irreversible_block_num() : self.controller.head_block_num();
if(self.send_credits && self.next_block_cursor <= latest_to_consider && self.next_block_cursor < self.current_blocks_request.end_block_num) {
block_to_send.emplace( block_package{
.blocks_result_base = {
.head = {self.controller.head_block_num(), self.controller.head_block_id()},
.last_irreversible = {self.controller.last_irreversible_block_num(), self.controller.last_irreversible_block_id()}
},
.is_v1_request = self.current_blocks_request_v1_finality.has_value(),
.this_block_num = self.next_block_cursor
});
if(const std::optional<chain::block_id_type> this_block_id = self.get_block_id(self.next_block_cursor)) {
block_to_send->blocks_result_base.this_block = {self.current_blocks_request.start_block_num, *this_block_id};
if(const std::optional<chain::block_id_type> last_block_id = self.get_block_id(self.next_block_cursor - 1))
block_to_send->blocks_result_base.prev_block = {self.next_block_cursor - 1, *last_block_id};
if(chain::signed_block_ptr sbp = get_block(self.next_block_cursor); sbp && self.current_blocks_request.fetch_block)
block_to_send->blocks_result_base.block = fc::raw::pack(*sbp);
if(self.current_blocks_request.fetch_traces && self.trace_log)
block_to_send->trace_stream.emplace(self.trace_log->create_locked_decompress_stream());
if(self.current_blocks_request.fetch_deltas && self.chain_state_log)
block_to_send->state_stream.emplace(self.chain_state_log->create_locked_decompress_stream());
if(block_to_send->is_v1_request && *self.current_blocks_request_v1_finality && self.finality_data_log)
block_to_send->finality_stream.emplace(self.finality_data_log->create_locked_decompress_stream());
}
++self.next_block_cursor;
--self.send_credits;
}
co_return;
}, boost::asio::use_awaitable);

//if there is nothing to send, go to sleep
if(status_requests.empty() && !block_to_send) {
Expand Down

0 comments on commit 63aa492

Please sign in to comment.