From 25d15b684a87a54bc1fb962b311ba7ba45235971 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Thu, 7 Nov 2024 16:09:50 -0600 Subject: [PATCH] lci pp: fix messages larger than INT_MAX --- .../include/hpx/parcelport_lci/header.hpp | 18 +- .../parcelport_lci/putva/receiver_putva.hpp | 7 +- .../sendrecv/receiver_connection_sendrecv.hpp | 2 +- .../sendrecv/sender_connection_sendrecv.hpp | 2 +- .../src/putva/sender_connection_putva.cpp | 4 +- .../sendrecv/receiver_connection_sendrecv.cpp | 22 ++- .../sendrecv/sender_connection_sendrecv.cpp | 12 +- .../tests/regressions/CMakeLists.txt | 41 ++++- .../tests/regressions/very_big_parcel.cpp | 162 ++++++++++++++++++ 9 files changed, 233 insertions(+), 37 deletions(-) create mode 100644 libs/full/parcelset/tests/regressions/very_big_parcel.cpp diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index 5d871462b3dd..aa0a718ac600 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci { { HPX_ASSERT(buffer.transmission_chunks_.size() == size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = - static_cast(buffer.transmission_chunks_.size() * - sizeof(typename parcel_buffer::transmission_chunk_type)); - if (tchunk_size <= int(max_header_size - current_header_size)) + size_t tchunk_size = buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type); + if (tchunk_size <= max_header_size - current_header_size) { current_header_size += tchunk_size; } @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci { { HPX_ASSERT(buffer.transmission_chunks_.size() == size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = - static_cast(buffer.transmission_chunks_.size() * - sizeof(typename parcel_buffer::transmission_chunk_type)); + size_t tchunk_size = buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type); set(static_cast(tchunk_size)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= max_header_size - current_header_size) { data_[pos_piggy_back_flag_tchunk] = 1; std::memcpy(&data_[current_header_size], diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp index 2419ff28257e..cdfdd1d01f82 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci { buffer.num_chunks_.second = num_non_zero_copy_chunks; auto& tchunks = buffer.transmission_chunks_; tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - int tchunks_length = static_cast(tchunks.size() * - sizeof(buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(buffer_type::transmission_chunk_type); char* piggy_back_tchunk = header_.piggy_back_tchunk(); if (piggy_back_tchunk) { @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci { buffer.chunks_.resize(num_zero_copy_chunks); for (int j = 0; j < num_zero_copy_chunks; ++j) { - std::size_t chunk_size = - buffer.transmission_chunks_[j].second; + size_t chunk_size = buffer.transmission_chunks_[j].second; HPX_ASSERT(iovec.lbuffers[i].length == chunk_size); buffer.chunks_[j] = serialization::create_pointer_chunk( iovec.lbuffers[i].address, chunk_size); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp index 87fd6a4076c2..67b1cf4fc50a 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci { rcvd_chunks, locked }; - LCI_comp_t unified_recv(void* address, int length); + LCI_comp_t unified_recv(void* address, size_t length); return_t receive_transmission_chunks(); return_t receive_data(); return_t receive_chunks(); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp index b54ef136ed77..e18b1419b4d6 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci { locked, }; return_t send_header(); - return_t unified_followup_send(void* address, int length); + return_t unified_followup_send(void* address, size_t length); return_t send_transmission_chunks(); return_t send_data(); return_t send_chunks(); diff --git a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp index e9da675f29ae..48ceed145eab 100644 --- a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp +++ b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci { std::vector< typename parcel_buffer_type::transmission_chunk_type>& tchunks = buffer_.transmission_chunks_; - int tchunks_length = static_cast(tchunks.size() * - sizeof(parcel_buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(parcel_buffer_type::transmission_chunk_type); iovec.lbuffers[i].address = tchunks.data(); iovec.lbuffers[i].length = tchunks_length; if (config_t::reg_mem) diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp index d3d6e2180d17..c46be6d9cd19 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci { buffer.num_chunks_.second = num_non_zero_copy_chunks; auto& tchunks = buffer.transmission_chunks_; tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - int tchunks_length = static_cast(tchunks.size() * - sizeof(receiver_base::buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type); char* piggy_back_tchunk = header_.piggy_back_tchunk(); if (piggy_back_tchunk) { @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci { } LCI_comp_t receiver_connection_sendrecv::unified_recv( - void* address, int length) + void* address, size_t length) { LCI_comp_t completion = device_p->completion_manager_p->recv_followup->alloc_completion(); - if (length <= LCI_MEDIUM_SIZE) + if (length <= (size_t) LCI_MEDIUM_SIZE) { LCI_mbuffer_t mbuffer; mbuffer.address = address; @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci { if (need_recv_tchunks) { auto& tchunks = buffer.transmission_chunks_; - int tchunk_length = static_cast(tchunks.size() * - sizeof(receiver_base::buffer_type::transmission_chunk_type)); + size_t tchunk_length = tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type); state.store(connection_state::locked, std::memory_order_relaxed); LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length); state.store(next_state, std::memory_order_release); @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci { if (need_recv_data) { state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = unified_recv( - buffer.data_.data(), static_cast(buffer.data_.size())); + LCI_comp_t completion = + unified_recv(buffer.data_.data(), buffer.data_.size()); state.store(next_state, std::memory_order_release); return {false, completion}; } @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci { HPX_UNUSED(chunk_size); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = - unified_recv(chunk.data(), static_cast(chunk.size())); + LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); return {false, completion}; } @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci { buffer.chunks_[idx] = serialization::create_pointer_chunk(chunk.data(), chunk.size()); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = - unified_recv(chunk.data(), static_cast(chunk.size())); + LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); return {false, completion}; } diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp index 6979f8b348dc..05a8607b704d 100644 --- a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci { } sender_connection_sendrecv::return_t - sender_connection_sendrecv::unified_followup_send(void* address, int length) + sender_connection_sendrecv::unified_followup_send( + void* address, size_t length) { - if (length <= LCI_MEDIUM_SIZE) + if (length <= (size_t) LCI_MEDIUM_SIZE) { LCI_mbuffer_t buffer; buffer.address = address; @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci { std::vector& tchunks = buffer_.transmission_chunks_; - int tchunks_size = (int) tchunks.size() * + size_t tchunks_size = tchunks.size() * sizeof(parcel_buffer_type::transmission_chunk_type); state.store(connection_state::locked, std::memory_order_relaxed); auto ret = unified_followup_send(tchunks.data(), tchunks_size); @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci { { state.store( connection_state::locked, std::memory_order_relaxed); - auto ret = - unified_followup_send(const_cast(chunk.data_.cpos_), - static_cast(chunk.size_)); + auto ret = unified_followup_send( + const_cast(chunk.data_.cpos_), chunk.size_); if (ret.status == return_status_t::done) { ++send_chunks_idx; diff --git a/libs/full/parcelset/tests/regressions/CMakeLists.txt b/libs/full/parcelset/tests/regressions/CMakeLists.txt index 2f7420810a42..34945680df49 100644 --- a/libs/full/parcelset/tests/regressions/CMakeLists.txt +++ b/libs/full/parcelset/tests/regressions/CMakeLists.txt @@ -1,5 +1,44 @@ -# Copyright (c) 2020-2021 The STE||AR-Group +# Copyright (c) 2024 The STE||AR-Group # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying # file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +# Copyright (c) 2024 Hartmut Kaiser +# +# SPDX-License-Identifier: BSL-1.0 +# Distributed under the Boost Software License, Version 1.0. (See accompanying +# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +if(HPX_WITH_NETWORKING) + set(tests ${tests} very_big_parcel) + set(very_big_parcel_PARAMETERS LOCALITIES 2) +endif() + +foreach(test ${tests}) + set(sources ${test}.cpp) + + source_group("Source Files" FILES ${sources}) + + # add example executable + add_hpx_executable( + ${test}_test INTERNAL_FLAGS + SOURCES ${sources} ${${test}_FLAGS} + EXCLUDE_FROM_ALL + HPX_PREFIX ${HPX_BUILD_PREFIX} + FOLDER "Tests/Regressions/Modules/Full/Parcelset" + ) + + add_hpx_regression_test("modules.parcelset" ${test} ${${test}_PARAMETERS}) + +endforeach() + +if(HPX_WITH_NETWORKING) + # very_big_parcel with one additional configurations + add_hpx_regression_test( + "modules.parcelset" very_big_parcel_int_max_plus_1 + EXECUTABLE very_big_parcel + PSEUDO_DEPS_NAME very_big_parcel ${very_big_parcel_PARAMETERS} + --nbytes-add=1 + ) +endif() diff --git a/libs/full/parcelset/tests/regressions/very_big_parcel.cpp b/libs/full/parcelset/tests/regressions/very_big_parcel.cpp new file mode 100644 index 000000000000..df90a03dae9f --- /dev/null +++ b/libs/full/parcelset/tests/regressions/very_big_parcel.cpp @@ -0,0 +1,162 @@ +// Copyright (c) 2024 Jiakun Yan +// Copyright (c) 2024 Marco Diers +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +const std::size_t nbytes_default = std::numeric_limits::max(); +const std::size_t nbytes_add_default = 0; + +struct config_t +{ + size_t nbytes; + size_t nbytes_add; +} config; +/////////////////////////////////////////////////////////////////////////////// +class Data +{ +public: + Data() = default; + Data(std::size_t size) + : _data(size, 'a') + { + } + auto size() const + { + return _data.size(); + } + + char& operator[](size_t idx) + { + return _data[idx]; + } + + char operator[](size_t idx) const + { + return _data[idx]; + } + + template + friend auto serialize(Archive& archive, Data& object, unsigned int version) + { + archive& object._data; + return; + } + +private: + std::vector _data{}; +}; + +class Component : public hpx::components::component_base +{ +public: + Component() = default; + + auto call(Data data) -> void + { + std::cout << "Data size: " << data.size() << '\n'; + bool flag = true; + size_t idx = 0; + for (; idx < data.size(); ++idx) + { + if (data[idx] != 'a') + { + flag = false; + break; + } + } + if (!flag) + std::cout << "Data[" << idx << "] = " << data[idx] + << " instead of a\n"; + else + std::cout << "data is correct\n"; + HPX_TEST_EQ(flag, true); + return; + } + + HPX_DEFINE_COMPONENT_ACTION(Component, call); +}; + +HPX_REGISTER_COMPONENT(hpx::components::component, Component); +HPX_REGISTER_ACTION(Component::call_action); + +class ComponentClient + : public hpx::components::client_base +{ + using BaseType = hpx::components::client_base; + +public: + template + ComponentClient(Arguments... arguments) + : BaseType(std::move(arguments)...) + { + } + + template + auto call(Arguments... arguments) + { + return hpx::async( + this->get_id(), std::move(arguments)...); + } +}; + +int hpx_main(hpx::program_options::variables_map& b_arg) +{ + config.nbytes = b_arg["nbytes"].as(); + config.nbytes_add = b_arg["nbytes-add"].as(); + + std::vector clients; + auto localities(hpx::find_all_localities()); + std::transform(std::begin(localities), std::end(localities), + std::back_inserter(clients), + [](auto& loc) { return hpx::new_(loc); }); + + Data data(config.nbytes + config.nbytes_add); + std::vector calls; + for (auto& client : clients) + { + calls.emplace_back(client.call(data)); + } + hpx::wait_all(calls); + + return hpx::finalize(); +} + +/////////////////////////////////////////////////////////////////////////////// +int main(int argc, char* argv[]) +{ + namespace po = hpx::program_options; + po::options_description description("HPX big parcel test"); + + description.add_options()("nbytes", + po::value()->default_value(nbytes_default), + "number of bytes to send")("nbytes-add", + po::value()->default_value(nbytes_add_default), + "number of additional bytes to send"); + + hpx::init_params init_args; + init_args.desc_cmdline = description; + + // Initialize and run HPX + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + + return hpx::util::report_errors(); +} +#endif