Skip to content

Commit

Permalink
Merge pull request #6580 from uiuc-hpc/fix-bigmsg-mpi
Browse files Browse the repository at this point in the history
mpi pp: fix messages larger than INT_MAX
  • Loading branch information
hkaiser authored Jan 15, 2025
2 parents a04cd6f + 07fed69 commit ef4f92e
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 142 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ if(HPX_WITH_NETWORKING)
ADVANCED
)
hpx_option(
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.7"
HPX_WITH_LCI_TAG STRING "LCI repository tag or branch" "v1.7.8"
CATEGORY "Build Targets"
ADVANCED
)
Expand Down
5 changes: 5 additions & 0 deletions libs/core/mpi_base/include/hpx/mpi_base/mpi_environment.hpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2013-2015 Thomas Heller
// Copyright (c) 2024 Jiakun Yan
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -42,6 +43,10 @@ namespace hpx::util {

static std::string get_processor_name();

static MPI_Datatype type_contiguous(size_t nbytes);
static MPI_Request isend(void* address, size_t size, int rank, int tag);
static MPI_Request irecv(void* address, size_t size, int rank, int tag);

struct HPX_CORE_EXPORT scoped_lock
{
scoped_lock();
Expand Down
92 changes: 92 additions & 0 deletions libs/core/mpi_base/src/mpi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2020 Google
// Copyright (c) 2022 Patrick Diehl
// Copyright (c) 2023 Hartmut Kaiser
// Copyright (c) 2024 Jiakun Yan
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
Expand Down Expand Up @@ -467,6 +468,97 @@ namespace hpx::util {

report_error(sl, error_code);
}

// Acknowledgement: code adapted from github.com/jeffhammond/BigMPI
MPI_Datatype mpi_environment::type_contiguous(size_t nbytes)
{
size_t int_max = (std::numeric_limits<int>::max)();

size_t c = nbytes / int_max;
size_t r = nbytes % int_max;

HPX_ASSERT(c < int_max);
HPX_ASSERT(r < int_max);

MPI_Datatype chunks;
MPI_Type_vector(c, int_max, int_max, MPI_BYTE, &chunks);

MPI_Datatype remainder;
MPI_Type_contiguous(r, MPI_BYTE, &remainder);

MPI_Aint remdisp = (MPI_Aint) c * int_max;
int blocklengths[2] = {1, 1};
MPI_Aint displacements[2] = {0, remdisp};
MPI_Datatype types[2] = {chunks, remainder};
MPI_Datatype newtype;
MPI_Type_create_struct(2, blocklengths, displacements, types, &newtype);

MPI_Type_free(&chunks);
MPI_Type_free(&remainder);

return newtype;
}

MPI_Request mpi_environment::isend(
void* address, size_t size, int rank, int tag)
{
MPI_Request request;
MPI_Datatype datatype;
int length;
if (size > static_cast<size_t>((std::numeric_limits<int>::max)()))
{
datatype = type_contiguous(size);
MPI_Type_commit(&datatype);
length = 1;
}
else
{
datatype = MPI_BYTE;
length = static_cast<int>(size);
}

{
scoped_lock l;
int const ret = MPI_Isend(
address, length, datatype, rank, tag, communicator(), &request);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);
}

if (datatype != MPI_BYTE)
MPI_Type_free(&datatype);
return request;
}

MPI_Request mpi_environment::irecv(
void* address, size_t size, int rank, int tag)
{
MPI_Request request;
MPI_Datatype datatype;
int length;
if (size > static_cast<size_t>((std::numeric_limits<int>::max)()))
{
datatype = type_contiguous(size);
MPI_Type_commit(&datatype);
length = 1;
}
else
{
datatype = MPI_BYTE;
length = static_cast<int>(size);
}

{
scoped_lock l;
int const ret = MPI_Irecv(
address, length, datatype, rank, tag, communicator(), &request);
check_mpi_error(l, HPX_CURRENT_SOURCE_LOCATION(), ret);
}

if (datatype != MPI_BYTE)
MPI_Type_free(&datatype);

return request;
}
} // namespace hpx::util

#endif
18 changes: 8 additions & 10 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,10 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
if (tchunk_size <= max_header_size - current_header_size)
{
current_header_size += tchunk_size;
}
Expand Down Expand Up @@ -118,12 +117,11 @@ namespace hpx::parcelset::policies::lci {
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
size_t tchunk_size = buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type);
set<pos_numbytes_tchunk>(static_cast<value_type>(tchunk_size));
if (tchunk_size <= int(max_header_size - current_header_size))
if (tchunk_size <= max_header_size - current_header_size)
{
data_[pos_piggy_back_flag_tchunk] = 1;
std::memcpy(&data_[current_header_size],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand All @@ -178,8 +178,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_.resize(num_zero_copy_chunks);
for (int j = 0; j < num_zero_copy_chunks; ++j)
{
std::size_t chunk_size =
buffer.transmission_chunks_[j].second;
size_t chunk_size = buffer.transmission_chunks_[j].second;
HPX_ASSERT(iovec.lbuffers[i].length == chunk_size);
buffer.chunks_[j] = serialization::create_pointer_chunk(
iovec.lbuffers[i].address, chunk_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
rcvd_chunks,
locked
};
LCI_comp_t unified_recv(void* address, int length);
LCI_comp_t unified_recv(void* address, size_t length);
return_t receive_transmission_chunks();
return_t receive_data();
return_t receive_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace hpx::parcelset::policies::lci {
locked,
};
return_t send_header();
return_t unified_followup_send(void* address, int length);
return_t unified_followup_send(void* address, size_t length);
return_t send_transmission_chunks();
return_t send_data();
return_t send_chunks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ namespace hpx::parcelset::policies::lci {
std::vector<
typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
iovec.lbuffers[i].address = tchunks.data();
iovec.lbuffers[i].length = tchunks_length;
if (config_t::reg_mem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ namespace hpx::parcelset::policies::lci {
buffer.num_chunks_.second = num_non_zero_copy_chunks;
auto& tchunks = buffer.transmission_chunks_;
tchunks.resize(num_zero_copy_chunks + num_non_zero_copy_chunks);
int tchunks_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunks_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
char* piggy_back_tchunk = header_.piggy_back_tchunk();
if (piggy_back_tchunk)
{
Expand Down Expand Up @@ -135,11 +135,11 @@ namespace hpx::parcelset::policies::lci {
}

LCI_comp_t receiver_connection_sendrecv::unified_recv(
void* address, int length)
void* address, size_t length)
{
LCI_comp_t completion =
device_p->completion_manager_p->recv_followup->alloc_completion();
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t mbuffer;
mbuffer.address = address;
Expand Down Expand Up @@ -197,8 +197,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_tchunks)
{
auto& tchunks = buffer.transmission_chunks_;
int tchunk_length = static_cast<int>(tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type));
size_t tchunk_length = tchunks.size() *
sizeof(receiver_base::buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(tchunks.data(), tchunk_length);
state.store(next_state, std::memory_order_release);
Expand All @@ -221,8 +221,8 @@ namespace hpx::parcelset::policies::lci {
if (need_recv_data)
{
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion = unified_recv(
buffer.data_.data(), static_cast<int>(buffer.data_.size()));
LCI_comp_t completion =
unified_recv(buffer.data_.data(), buffer.data_.size());
state.store(next_state, std::memory_order_release);
return {false, completion};
}
Expand Down Expand Up @@ -316,8 +316,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(chunk_size);

state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand All @@ -344,8 +343,7 @@ namespace hpx::parcelset::policies::lci {
buffer.chunks_[idx] =
serialization::create_pointer_chunk(chunk.data(), chunk.size());
state.store(connection_state::locked, std::memory_order_relaxed);
LCI_comp_t completion =
unified_recv(chunk.data(), static_cast<int>(chunk.size()));
LCI_comp_t completion = unified_recv(chunk.data(), chunk.size());
state.store(current_state, std::memory_order_release);
return {false, completion};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ namespace hpx::parcelset::policies::lci {
}

sender_connection_sendrecv::return_t
sender_connection_sendrecv::unified_followup_send(void* address, int length)
sender_connection_sendrecv::unified_followup_send(
void* address, size_t length)
{
if (length <= LCI_MEDIUM_SIZE)
if (length <= (size_t) LCI_MEDIUM_SIZE)
{
LCI_mbuffer_t buffer;
buffer.address = address;
Expand Down Expand Up @@ -323,7 +324,7 @@ namespace hpx::parcelset::policies::lci {

std::vector<typename parcel_buffer_type::transmission_chunk_type>&
tchunks = buffer_.transmission_chunks_;
int tchunks_size = (int) tchunks.size() *
size_t tchunks_size = tchunks.size() *
sizeof(parcel_buffer_type::transmission_chunk_type);
state.store(connection_state::locked, std::memory_order_relaxed);
auto ret = unified_followup_send(tchunks.data(), tchunks_size);
Expand Down Expand Up @@ -389,9 +390,8 @@ namespace hpx::parcelset::policies::lci {
{
state.store(
connection_state::locked, std::memory_order_relaxed);
auto ret =
unified_followup_send(const_cast<void*>(chunk.data_.cpos_),
static_cast<int>(chunk.size_));
auto ret = unified_followup_send(
const_cast<void*>(chunk.data_.cpos_), chunk.size_);
if (ret.status == return_status_t::done)
{
++send_chunks_idx;
Expand Down
Loading

0 comments on commit ef4f92e

Please sign in to comment.