From 6e8335264082444251f32415e3a568efff7d84ab Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Thu, 7 Nov 2024 16:09:50 -0600 Subject: [PATCH 1/4] lci pp: fix messages larger than INT_MAX; add very_big_parcel_test --- .../include/hpx/parcelport_lci/header.hpp | 18 +- .../parcelport_lci/putva/receiver_putva.hpp | 7 +- .../sendrecv/receiver_connection_sendrecv.hpp | 2 +- .../sendrecv/sender_connection_sendrecv.hpp | 2 +- .../src/putva/sender_connection_putva.cpp | 4 +- .../sendrecv/receiver_connection_sendrecv.cpp | 22 ++- .../sendrecv/sender_connection_sendrecv.cpp | 12 +- .../tests/regressions/CMakeLists.txt | 44 ++++- .../tests/regressions/very_big_parcel.cpp | 165 ++++++++++++++++++ 9 files changed, 239 insertions(+), 37 deletions(-) create mode 100644 libs/full/parcelset/tests/regressions/very_big_parcel.cpp diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp index 5d871462b3dd..aa0a718ac600 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci { { HPX_ASSERT(buffer.transmission_chunks_.size() == size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = - static_cast(buffer.transmission_chunks_.size() * - sizeof(typename parcel_buffer::transmission_chunk_type)); - if (tchunk_size <= int(max_header_size - current_header_size)) + size_t tchunk_size = buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type); + if (tchunk_size <= max_header_size - current_header_size) { current_header_size += tchunk_size; } @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci { { HPX_ASSERT(buffer.transmission_chunks_.size() == size_t(num_zero_copy_chunks + num_non_zero_copy_chunks)); - int tchunk_size = - static_cast(buffer.transmission_chunks_.size() * - sizeof(typename parcel_buffer::transmission_chunk_type)); + size_t tchunk_size = buffer.transmission_chunks_.size() * + sizeof(typename parcel_buffer::transmission_chunk_type); set(static_cast(tchunk_size)); - if (tchunk_size <= int(max_header_size - current_header_size)) + if (tchunk_size <= max_header_size - current_header_size) { data_[pos_piggy_back_flag_tchunk] = 1; std::memcpy(&data_[current_header_size], diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp index 2419ff28257e..cdfdd1d01f82 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/putva/receiver_putva.hpp @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci { buffer.num_chunks_.second = num_non_zero_copy_chunks; auto& tchunks = buffer.transmission_chunks_; tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - int tchunks_length = static_cast(tchunks.size() * - sizeof(buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(buffer_type::transmission_chunk_type); char* piggy_back_tchunk = header_.piggy_back_tchunk(); if (piggy_back_tchunk) { @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci { buffer.chunks_.resize(num_zero_copy_chunks); for (int j = 0; j < num_zero_copy_chunks; ++j) { - std::size_t chunk_size = - buffer.transmission_chunks_[j].second; + size_t chunk_size = buffer.transmission_chunks_[j].second; HPX_ASSERT(iovec.lbuffers[i].length == chunk_size); buffer.chunks_[j] = serialization::create_pointer_chunk( iovec.lbuffers[i].address, chunk_size); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp index 87fd6a4076c2..67b1cf4fc50a 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/receiver_connection_sendrecv.hpp @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci { rcvd_chunks, locked }; - LCI_comp_t unified_recv(void* address, int length); + LCI_comp_t unified_recv(void* address, size_t length); return_t receive_transmission_chunks(); return_t receive_data(); return_t receive_chunks(); diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp index b54ef136ed77..e18b1419b4d6 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sendrecv/sender_connection_sendrecv.hpp @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci { locked, }; return_t send_header(); - return_t unified_followup_send(void* address, int length); + return_t unified_followup_send(void* address, size_t length); return_t send_transmission_chunks(); return_t send_data(); return_t send_chunks(); diff --git a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp index e9da675f29ae..48ceed145eab 100644 --- a/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp +++ b/libs/full/parcelport_lci/src/putva/sender_connection_putva.cpp @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci { std::vector< typename parcel_buffer_type::transmission_chunk_type>& tchunks = buffer_.transmission_chunks_; - int tchunks_length = static_cast(tchunks.size() * - sizeof(parcel_buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(parcel_buffer_type::transmission_chunk_type); iovec.lbuffers[i].address = tchunks.data(); iovec.lbuffers[i].length = tchunks_length; if (config_t::reg_mem) diff --git a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp index d3d6e2180d17..c46be6d9cd19 100644 --- a/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/receiver_connection_sendrecv.cpp @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci { buffer.num_chunks_.second = num_non_zero_copy_chunks; auto& tchunks = buffer.transmission_chunks_; tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks); - int tchunks_length = static_cast(tchunks.size() * - sizeof(receiver_base::buffer_type::transmission_chunk_type)); + size_t tchunks_length = tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type); char* piggy_back_tchunk = header_.piggy_back_tchunk(); if (piggy_back_tchunk) { @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci { } LCI_comp_t receiver_connection_sendrecv::unified_recv( - void* address, int length) + void* address, size_t length) { LCI_comp_t completion = device_p->completion_manager_p->recv_followup->alloc_completion(); - if (length <= LCI_MEDIUM_SIZE) + if (length <= (size_t) LCI_MEDIUM_SIZE) { LCI_mbuffer_t mbuffer; mbuffer.address = address; @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci { if (need_recv_tchunks) { auto& tchunks = buffer.transmission_chunks_; - int tchunk_length = static_cast(tchunks.size() * - sizeof(receiver_base::buffer_type::transmission_chunk_type)); + size_t tchunk_length = tchunks.size() * + sizeof(receiver_base::buffer_type::transmission_chunk_type); state.store(connection_state::locked, std::memory_order_relaxed); LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length); state.store(next_state, std::memory_order_release); @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci { if (need_recv_data) { state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = unified_recv( - buffer.data_.data(), static_cast(buffer.data_.size())); + LCI_comp_t completion = + unified_recv(buffer.data_.data(), buffer.data_.size()); state.store(next_state, std::memory_order_release); return {false, completion}; } @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci { HPX_UNUSED(chunk_size); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = - unified_recv(chunk.data(), static_cast(chunk.size())); + LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); return {false, completion}; } @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci { buffer.chunks_[idx] = serialization::create_pointer_chunk(chunk.data(), chunk.size()); state.store(connection_state::locked, std::memory_order_relaxed); - LCI_comp_t completion = - unified_recv(chunk.data(), static_cast(chunk.size())); + LCI_comp_t completion = unified_recv(chunk.data(), chunk.size()); state.store(current_state, std::memory_order_release); return {false, completion}; } diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp index 6979f8b348dc..05a8607b704d 100644 --- a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci { } sender_connection_sendrecv::return_t - sender_connection_sendrecv::unified_followup_send(void* address, int length) + sender_connection_sendrecv::unified_followup_send( + void* address, size_t length) { - if (length <= LCI_MEDIUM_SIZE) + if (length <= (size_t) LCI_MEDIUM_SIZE) { LCI_mbuffer_t buffer; buffer.address = address; @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci { std::vector& tchunks = buffer_.transmission_chunks_; - int tchunks_size = (int) tchunks.size() * + size_t tchunks_size = tchunks.size() * sizeof(parcel_buffer_type::transmission_chunk_type); state.store(connection_state::locked, std::memory_order_relaxed); auto ret = unified_followup_send(tchunks.data(), tchunks_size); @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci { { state.store( connection_state::locked, std::memory_order_relaxed); - auto ret = - unified_followup_send(const_cast(chunk.data_.cpos_), - static_cast(chunk.size_)); + auto ret = unified_followup_send( + const_cast(chunk.data_.cpos_), chunk.size_); if (ret.status == return_status_t::done) { ++send_chunks_idx; diff --git a/libs/full/parcelset/tests/regressions/CMakeLists.txt b/libs/full/parcelset/tests/regressions/CMakeLists.txt index 2f7420810a42..5d01459d7f51 100644 --- a/libs/full/parcelset/tests/regressions/CMakeLists.txt +++ b/libs/full/parcelset/tests/regressions/CMakeLists.txt @@ -1,5 +1,47 @@ -# Copyright (c) 2020-2021 The STE||AR-Group +# Copyright (c) 2024 The STE||AR-Group # # SPDX-License-Identifier: BSL-1.0 # Distributed under the Boost Software License, Version 1.0. (See accompanying # file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +# Copyright (c) 2024 Hartmut Kaiser +# +# SPDX-License-Identifier: BSL-1.0 Distributed under the Boost Software License, +# Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at +# http://www.boost.org/LICENSE_1_0.txt) + +if(HPX_WITH_NETWORKING) + set(tests ${tests} very_big_parcel) + set(very_big_parcel_PARAMETERS LOCALITIES 2) +endif() + +foreach(test ${tests}) + set(sources ${test}.cpp) + + source_group("Source Files" FILES ${sources}) + + # add example executable + add_hpx_executable( + ${test}_test INTERNAL_FLAGS + SOURCES ${sources} ${${test}_FLAGS} + EXCLUDE_FROM_ALL + HPX_PREFIX ${HPX_BUILD_PREFIX} + FOLDER "Tests/Regressions/Modules/Full/Parcelset" + ) + + add_hpx_regression_test( + "modules.parcelset" ${test} ${${test}_PARAMETERS} TIMEOUT 900 + ) + +endforeach() + +if(HPX_WITH_NETWORKING) + # very_big_parcel with one additional configurations + add_hpx_regression_test( + "modules.parcelset" very_big_parcel_int_max_plus_1 + EXECUTABLE very_big_parcel + PARCELPORTS tcp lci TIMEOUT 900 + PSEUDO_DEPS_NAME very_big_parcel ${very_big_parcel_PARAMETERS} + --nbytes-add=1 + ) +endif() diff --git a/libs/full/parcelset/tests/regressions/very_big_parcel.cpp b/libs/full/parcelset/tests/regressions/very_big_parcel.cpp new file mode 100644 index 000000000000..145a9ca2432c --- /dev/null +++ b/libs/full/parcelset/tests/regressions/very_big_parcel.cpp @@ -0,0 +1,165 @@ +// Copyright (c) 2024 Jiakun Yan +// Copyright (c) 2024 Marco Diers +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#if !defined(HPX_COMPUTE_DEVICE_CODE) +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +/////////////////////////////////////////////////////////////////////////////// +const std::size_t nbytes_default = (std::numeric_limits::max)(); +const std::size_t nbytes_add_default = 0; + +struct config_t +{ + size_t nbytes; + size_t nbytes_add; +} config; +/////////////////////////////////////////////////////////////////////////////// +class Data +{ +public: + Data() = default; + Data(std::size_t size) + : _data(size, 'a') + { + } + auto size() const + { + return _data.size(); + } + + char& operator[](size_t idx) + { + return _data[idx]; + } + + char operator[](size_t idx) const + { + return _data[idx]; + } + + template + void serialize(Archive& ar, const unsigned int) + { + // clang-format off + ar & _data; + // clang-format on + } + +private: + std::vector _data{}; +}; + +class Component : public hpx::components::component_base +{ +public: + Component() = default; + + auto call(Data data) -> void + { + std::cout << "Data size: " << data.size() << '\n'; + bool flag = true; + size_t idx = 0; + for (; idx < data.size(); ++idx) + { + if (data[idx] != 'a') + { + flag = false; + break; + } + } + if (!flag) + std::cout << "Data[" << idx << "] = " << data[idx] + << " instead of a\n"; + else + std::cout << "data is correct\n"; + HPX_TEST_EQ(flag, true); + return; + } + + HPX_DEFINE_COMPONENT_ACTION(Component, call) +}; + +HPX_REGISTER_COMPONENT(hpx::components::component, Component); +HPX_REGISTER_ACTION(Component::call_action); + +class ComponentClient + : public hpx::components::client_base +{ + using BaseType = hpx::components::client_base; + +public: + template + ComponentClient(Arguments... arguments) + : BaseType(std::move(arguments)...) + { + } + + template + auto call(Arguments... arguments) + { + return hpx::async( + this->get_id(), std::move(arguments)...); + } +}; + +int hpx_main(hpx::program_options::variables_map& b_arg) +{ + config.nbytes = b_arg["nbytes"].as(); + config.nbytes_add = b_arg["nbytes-add"].as(); + + std::vector clients; + auto localities(hpx::find_remote_localities()); + std::transform(std::begin(localities), std::end(localities), + std::back_inserter(clients), + [](auto& loc) { return hpx::new_(loc); }); + + Data data(config.nbytes + config.nbytes_add); + std::vector calls; + for (auto& client : clients) + { + calls.emplace_back(client.call(data)); + } + hpx::wait_all(calls); + + return hpx::finalize(); +} + +/////////////////////////////////////////////////////////////////////////////// +int main(int argc, char* argv[]) +{ + namespace po = hpx::program_options; + po::options_description description("HPX big parcel test"); + + description.add_options()("nbytes", + po::value()->default_value(nbytes_default), + "number of bytes to send")("nbytes-add", + po::value()->default_value(nbytes_add_default), + "number of additional bytes to send"); + + hpx::init_params init_args; + init_args.desc_cmdline = description; + + // Initialize and run HPX + HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0); + + return hpx::util::report_errors(); +} +#endif From c0d453e3e22d7b539ba585942bed6648489014f9 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Sat, 9 Nov 2024 22:13:43 -0600 Subject: [PATCH 2/4] bump HPX_WITH_LCI_TAG to v1.7.8 --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 943d75d0bc52..7d3dc674a23a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1229,7 +1229,7 @@ if(HPX_WITH_NETWORKING) ADVANCED ) hpx_option( - HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.7" + HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.8" CATEGORY "Build Targets" ADVANCED ) From e353f0241c99f235a5aa04daf4cfefb312baf27f Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Mon, 18 Nov 2024 16:20:32 -0500 Subject: [PATCH 3/4] Fix messages larger than INT_MAX for mpi --- .../include/hpx/mpi_base/mpi_environment.hpp | 5 + libs/core/mpi_base/src/mpi_environment.cpp | 92 +++++++++++++++++++ .../parcelport_mpi/receiver_connection.hpp | 68 ++++---------- .../hpx/parcelport_mpi/sender_connection.hpp | 69 ++++---------- 4 files changed, 130 insertions(+), 104 deletions(-) diff --git a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp index dde6149131bc..384b993b35b0 100644 --- a/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp +++ b/libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp @@ -1,4 +1,5 @@ // Copyright (c) 2013-2015 Thomas Heller +// Copyright (c) 2024 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -42,6 +43,10 @@ namespace hpx::util { static std::string get_processor_name(); + static MPI_Datatype type_contiguous(size_t nbytes); + static MPI_Request isend(void* address, size_t size, int rank, int tag); + static MPI_Request irecv(void* address, size_t size, int rank, int tag); + struct HPX_CORE_EXPORT scoped_lock { scoped_lock(); diff --git a/libs/core/mpi_base/src/mpi_environment.cpp b/libs/core/mpi_base/src/mpi_environment.cpp index a155bc262e45..a251fc579932 100644 --- a/libs/core/mpi_base/src/mpi_environment.cpp +++ b/libs/core/mpi_base/src/mpi_environment.cpp @@ -2,6 +2,7 @@ // Copyright (c) 2020 Google // Copyright (c) 2022 Patrick Diehl // Copyright (c) 2023 Hartmut Kaiser +// Copyright (c) 2024 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -467,6 +468,97 @@ namespace hpx::util { report_error(sl, error_code); } + + // Acknowledgement: code adapted from github.com/jeffhammond/BigMPI + MPI_Datatype mpi_environment::type_contiguous(size_t nbytes) + { + size_t int_max = (std::numeric_limits::max)(); + + size_t c = nbytes / int_max; + size_t r = nbytes % int_max; + + HPX_ASSERT(c < int_max); + HPX_ASSERT(r < int_max); + + MPI_Datatype chunks; + MPI_Type_vector(c, int_max, int_max, MPI_BYTE, &chunks); + + MPI_Datatype remainder; + MPI_Type_contiguous(r, MPI_BYTE, &remainder); + + MPI_Aint remdisp = (MPI_Aint) c * int_max; + int blocklengths[2] = {1, 1}; + MPI_Aint displacements[2] = {0, remdisp}; + MPI_Datatype types[2] = {chunks, remainder}; + MPI_Datatype newtype; + MPI_Type_create_struct(2, blocklengths, displacements, types, &newtype); + + MPI_Type_free(&chunks); + MPI_Type_free(&remainder); + + return newtype; + } + + MPI_Request mpi_environment::isend( + void* address, size_t size, int rank, int tag) + { + MPI_Request request; + MPI_Datatype datatype; + int length; + if (size > static_cast((std::numeric_limits::max)())) + { + datatype = type_contiguous(size); + MPI_Type_commit(&datatype); + length = 1; + } + else + { + datatype = MPI_BYTE; + length = static_cast(size); + } + + { + scoped_lock l; + int const ret = MPI_Isend( + address, length, datatype, rank, tag, communicator(), &request); + check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret); + } + + if (datatype != MPI_BYTE) + MPI_Type_free(&datatype); + return request; + } + + MPI_Request mpi_environment::irecv( + void* address, size_t size, int rank, int tag) + { + MPI_Request request; + MPI_Datatype datatype; + int length; + if (size > static_cast((std::numeric_limits::max)())) + { + datatype = type_contiguous(size); + MPI_Type_commit(&datatype); + length = 1; + } + else + { + datatype = MPI_BYTE; + length = static_cast(size); + } + + { + scoped_lock l; + int const ret = MPI_Irecv( + address, length, datatype, rank, tag, communicator(), &request); + check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret); + } + + if (datatype != MPI_BYTE) + MPI_Type_free(&datatype); + + return request; + } } // namespace hpx::util #endif diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp index bed4e218af3a..0bbdeb00a185 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/receiver_connection.hpp @@ -1,6 +1,6 @@ // Copyright (c) 2014-2015 Thomas Heller // Copyright (c) 2007-2024 Hartmut Kaiser -// Copyright (c) 2023 Jiakun Yan +// Copyright (c) 2023-2024 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -163,14 +163,11 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - int const ret = MPI_Irecv(buffer_.transmission_chunks_.data(), - static_cast(buffer_.transmission_chunks_.size() * - sizeof(buffer_type::transmission_chunk_type)), - MPI_BYTE, src_, tag_, util::mpi_environment::communicator(), - &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::irecv( + buffer_.transmission_chunks_.data(), + buffer_.transmission_chunks_.size() * + sizeof(buffer_type::transmission_chunk_type), + src_, tag_); request_ptr_ = &request_; state_ = connection_state::rcvd_transmission_chunks; @@ -207,12 +204,8 @@ namespace hpx::parcelset::policies::mpi { ack_ = static_cast( connection_state::acked_transmission_chunks); - int const ret = - MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::isend( + &ack_, sizeof(ack_), src_, ack_tag()); request_ptr_ = &request_; } @@ -241,14 +234,8 @@ namespace hpx::parcelset::policies::mpi { if (need_recv_data) { - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Irecv(buffer_.data_.data(), - static_cast(buffer_.data_.size()), MPI_BYTE, src_, - tag_, util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::irecv( + buffer_.data_.data(), buffer_.data_.size(), src_, tag_); request_ptr_ = &request_; state_ = connection_state::rcvd_data; @@ -276,15 +263,8 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(request_ptr_ == nullptr); { - util::mpi_environment::scoped_lock l; - - ack_ = static_cast(connection_state::acked_data); - int const ret = - MPI_Isend(&ack_, sizeof(ack_), MPI_BYTE, src_, ack_tag(), - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::isend( + &ack_, sizeof(ack_), src_, ack_tag()); request_ptr_ = &request_; } @@ -372,17 +352,9 @@ namespace hpx::parcelset::policies::mpi { "zero-copy chunk buffers should have been initialized " "during de-serialization"); - { - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Irecv(c.data(), - static_cast(chunk_size), MPI_BYTE, src_, tag_, - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - - request_ptr_ = &request_; - } + request_ = util::mpi_environment::irecv( + c.data(), chunk_size, src_, tag_); + request_ptr_ = &request_; } HPX_ASSERT_MSG( zero_copy_chunks_idx_ == buffer_.num_chunks_.first, @@ -412,14 +384,8 @@ namespace hpx::parcelset::policies::mpi { c.data(), chunk_size); { - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Irecv(c.data(), - static_cast(c.size()), MPI_BYTE, src_, tag_, - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::irecv( + c.data(), c.size(), src_, tag_); request_ptr_ = &request_; } } diff --git a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp index 7b80fec43900..9eb969d3708e 100644 --- a/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp +++ b/libs/full/parcelport_mpi/include/hpx/parcelport_mpi/sender_connection.hpp @@ -1,6 +1,6 @@ // Copyright (c) 2007-2024 Hartmut Kaiser // Copyright (c) 2014-2015 Thomas Heller -// Copyright (c) 2023 Jiakun Yan +// Copyright (c) 2023-2024 Jiakun Yan // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -177,17 +177,9 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(state_ == connection_state::initialized); HPX_ASSERT(request_ptr_ == nullptr); - { - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Isend(header_buffer.data(), - static_cast(header_buffer.size()), MPI_BYTE, dst_, 0, - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - - request_ptr_ = &request_; - } + request_ = util::mpi_environment::isend( + header_buffer.data(), header_buffer.size(), dst_, 0); + request_ptr_ = &request_; state_ = connection_state::sent_header; return send_transmission_chunks(); @@ -206,16 +198,10 @@ namespace hpx::parcelset::policies::mpi { auto const& chunks = buffer_.transmission_chunks_; if (!chunks.empty() && !header_.piggy_back_tchunk()) { - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Isend(chunks.data(), - static_cast(chunks.size() * - sizeof(parcel_buffer_type::transmission_chunk_type)), - MPI_BYTE, dst_, tag_, util::mpi_environment::communicator(), - &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::isend( + const_cast( + reinterpret_cast(chunks.data())), + chunks.size(), dst_, tag_); request_ptr_ = &request_; state_ = connection_state::sent_transmission_chunks; @@ -250,14 +236,8 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(request_ptr_ == nullptr); { - util::mpi_environment::scoped_lock l; - - int const ret = - MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::irecv( + &ack_, sizeof(ack_), dst_, ack_tag()); request_ptr_ = &request_; } @@ -283,12 +263,8 @@ namespace hpx::parcelset::policies::mpi { { util::mpi_environment::scoped_lock l; - int const ret = MPI_Isend(buffer_.data_.data(), - static_cast(buffer_.data_.size()), MPI_BYTE, dst_, - tag_, util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::isend( + buffer_.data_.data(), buffer_.data_.size(), dst_, tag_); request_ptr_ = &request_; state_ = connection_state::sent_data; @@ -321,14 +297,8 @@ namespace hpx::parcelset::policies::mpi { HPX_ASSERT(request_ptr_ == nullptr); { - util::mpi_environment::scoped_lock l; - - int const ret = - MPI_Irecv(&ack_, sizeof(ack_), MPI_BYTE, dst_, ack_tag(), - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::irecv( + &ack_, sizeof(ack_), dst_, ack_tag()); request_ptr_ = &request_; } @@ -352,15 +322,8 @@ namespace hpx::parcelset::policies::mpi { return false; } HPX_ASSERT(request_ptr_ == nullptr); - - util::mpi_environment::scoped_lock l; - - int const ret = MPI_Isend(c.data_.cpos_, - static_cast(c.size_), MPI_BYTE, dst_, tag_, - util::mpi_environment::communicator(), &request_); - util::mpi_environment::check_mpi_error( - l, HPX_CURRENT_SOURCE_LOCATION(), ret); - + request_ = util::mpi_environment::isend( + const_cast(c.data()), c.size(), dst_, tag_); request_ptr_ = &request_; } From 07fed692bfa8f0082ba37e9b5df5415c04db26c5 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Sat, 21 Dec 2024 11:30:22 -0600 Subject: [PATCH 4/4] Disable very_big_parcel_test due to limited CircleCI resources --- .../tests/regressions/CMakeLists.txt | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/libs/full/parcelset/tests/regressions/CMakeLists.txt b/libs/full/parcelset/tests/regressions/CMakeLists.txt index 5d01459d7f51..b0a7ef5d7ead 100644 --- a/libs/full/parcelset/tests/regressions/CMakeLists.txt +++ b/libs/full/parcelset/tests/regressions/CMakeLists.txt @@ -29,19 +29,19 @@ foreach(test ${tests}) FOLDER "Tests/Regressions/Modules/Full/Parcelset" ) - add_hpx_regression_test( - "modules.parcelset" ${test} ${${test}_PARAMETERS} TIMEOUT 900 - ) + # Disable the test due to limited CircleCI resources + + # add_hpx_regression_test( "modules.parcelset" ${test} ${${test}_PARAMETERS} + # TIMEOUT 900 ) endforeach() if(HPX_WITH_NETWORKING) + # Disable the test due to limited CircleCI resources + # very_big_parcel with one additional configurations - add_hpx_regression_test( - "modules.parcelset" very_big_parcel_int_max_plus_1 - EXECUTABLE very_big_parcel - PARCELPORTS tcp lci TIMEOUT 900 - PSEUDO_DEPS_NAME very_big_parcel ${very_big_parcel_PARAMETERS} - --nbytes-add=1 - ) + + # add_hpx_regression_test( "modules.parcelset" very_big_parcel_int_max_plus_1 + # EXECUTABLE very_big_parcel TIMEOUT 900 PSEUDO_DEPS_NAME very_big_parcel + # ${very_big_parcel_PARAMETERS} --nbytes-add=1 ) endif()