From aed9518e10cd1d6802094733758638bac0ffa8d2 Mon Sep 17 00:00:00 2001 From: Jiakun Yan Date: Tue, 12 Mar 2024 17:31:53 -0500 Subject: [PATCH] improve(lci pp): more options to control the behavior of the LCI parcelport - split the comp_type option into comp_type_header and comp_type_followup - add new progress_type option: poll - add enable_sendmc option - add bg_work_max_count, bg_work_when_send for more control of background work invocation - improve completion_manager_sync to better simulate MPI --- libs/full/parcelport_lci/CMakeLists.txt | 4 + .../completion_manager_queue.hpp | 12 +- .../completion_manager_sync.hpp | 38 +----- .../completion_manager_sync_single.hpp | 18 +-- .../completion_manager_sync_single_nolock.hpp | 47 +++++++ .../completion_manager_base.hpp | 4 + .../include/hpx/parcelport_lci/config.hpp | 20 ++- .../include/hpx/parcelport_lci/helper.hpp | 2 + .../hpx/parcelport_lci/parcelport_lci.hpp | 9 +- .../parcelport_lci/sender_connection_base.hpp | 2 +- .../completion_manager_queue.cpp | 15 +++ .../completion_manager_sync.cpp | 37 ++++++ .../completion_manager_sync_single.cpp | 23 ++++ .../completion_manager_sync_single_nolock.cpp | 16 +++ libs/full/parcelport_lci/src/config.cpp | 83 +++++++++++- .../parcelport_lci/src/parcelport_lci.cpp | 120 +++++++++++------- libs/full/parcelport_lci/src/sender_base.cpp | 3 +- .../src/sender_connection_base.cpp | 29 ++++- .../sendrecv/sender_connection_sendrecv.cpp | 70 ++++++++-- 19 files changed, 421 insertions(+), 131 deletions(-) create mode 100644 libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp create mode 100644 libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp create mode 100644 libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp create mode 100644 libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp create mode 100644 libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp diff --git a/libs/full/parcelport_lci/CMakeLists.txt b/libs/full/parcelport_lci/CMakeLists.txt index e899b7dec3ec..7399f3b78f4e 100644 --- a/libs/full/parcelport_lci/CMakeLists.txt +++ b/libs/full/parcelport_lci/CMakeLists.txt @@ -50,6 +50,10 @@ set(parcelport_lci_sources sendrecv/sender_connection_sendrecv.cpp sendrecv/receiver_sendrecv.cpp sendrecv/receiver_connection_sendrecv.cpp + completion_manager/completion_manager_queue.cpp + completion_manager/completion_manager_sync.cpp + completion_manager/completion_manager_sync_single.cpp + completion_manager/completion_manager_sync_single_nolock.cpp ) include(HPX_SetupLCI) diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp index af6404ed0bbc..e5df7a84f59f 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp @@ -10,12 +10,14 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) +#include #include namespace hpx::parcelset::policies::lci { struct completion_manager_queue : public completion_manager_base { - completion_manager_queue() + completion_manager_queue(parcelport* pp) + : completion_manager_base(pp) { // LCI_queue_create(LCI_UR_DEVICE, &queue); // Hack for now @@ -38,13 +40,7 @@ namespace hpx::parcelset::policies::lci { HPX_UNUSED(comp); } - LCI_request_t poll() - { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; - LCI_queue_pop(queue, &request); - return request; - } + LCI_request_t poll(); LCI_comp_t get_completion_object() { diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp index 33ab477cbdef..56f7f82ad9ee 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp @@ -10,14 +10,17 @@ #if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) -#include - #include +#include +#include namespace hpx::parcelset::policies::lci { struct completion_manager_sync : public completion_manager_base { - completion_manager_sync() {} + completion_manager_sync(parcelport* pp) + : completion_manager_base(pp) + { + } ~completion_manager_sync() {} @@ -34,34 +37,7 @@ namespace hpx::parcelset::policies::lci { sync_list.push_back(comp); } - LCI_request_t poll() - { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; - if (sync_list.empty()) - { - return request; - } - { - std::unique_lock l(lock, std::try_to_lock); - if (l.owns_lock() && !sync_list.empty()) - { - LCI_comp_t sync = sync_list.front(); - sync_list.pop_front(); - LCI_error_t ret = LCI_sync_test(sync, &request); - if (ret == LCI_OK) - { - HPX_ASSERT(request.flag == LCI_OK); - LCI_sync_free(&sync); - } - else - { - sync_list.push_back(sync); - } - } - } - return request; - } + LCI_request_t poll(); private: hpx::spinlock lock; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp index bf8ab3f9dc8c..6f5a23c3dab0 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp @@ -15,7 +15,8 @@ namespace hpx::parcelset::policies::lci { struct completion_manager_sync_single : public completion_manager_base { - completion_manager_sync_single() + completion_manager_sync_single(parcelport* pp) + : completion_manager_base(pp) { LCI_sync_create(LCI_UR_DEVICE, 1, &sync); } @@ -36,20 +37,7 @@ namespace hpx::parcelset::policies::lci { lock.unlock(); } - LCI_request_t poll() - { - LCI_request_t request; - request.flag = LCI_ERR_RETRY; - - bool succeed = lock.try_lock(); - if (succeed) - { - LCI_error_t ret = LCI_sync_test(sync, &request); - if (ret == LCI_ERR_RETRY) - lock.unlock(); - } - return request; - } + LCI_request_t poll(); private: hpx::spinlock lock; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp new file mode 100644 index 000000000000..efc372cc3c9a --- /dev/null +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp @@ -0,0 +1,47 @@ +// Copyright (c) 2014-2023 Thomas Heller +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#pragma once + +#include + +#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI) + +#include + +namespace hpx::parcelset::policies::lci { + struct completion_manager_sync_single_nolock + : public completion_manager_base + { + completion_manager_sync_single_nolock(parcelport* pp) + : completion_manager_base(pp) + { + LCI_sync_create(LCI_UR_DEVICE, 1, &sync); + } + + ~completion_manager_sync_single_nolock() + { + LCI_sync_free(&sync); + } + + LCI_comp_t alloc_completion() + { + return sync; + } + + void enqueue_completion(LCI_comp_t comp) + { + HPX_UNUSED(comp); + } + + LCI_request_t poll(); + + private: + LCI_comp_t sync; + }; +} // namespace hpx::parcelset::policies::lci + +#endif diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp index 914e954d18bf..f0d29c0514cf 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/completion_manager_base.hpp @@ -13,8 +13,11 @@ #include namespace hpx::parcelset::policies::lci { + class HPX_EXPORT parcelport; struct completion_manager_base { + completion_manager_base(parcelport* pp) noexcept + : pp_(pp){}; virtual ~completion_manager_base() {} virtual LCI_comp_t alloc_completion() = 0; virtual void enqueue_completion(LCI_comp_t comp) = 0; @@ -23,6 +26,7 @@ namespace hpx::parcelset::policies::lci { { return nullptr; } + parcelport* pp_; }; } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp index 55365eeb9a44..3db9e402c3ea 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp @@ -29,8 +29,19 @@ namespace hpx::parcelset::policies::lci { putsendrecv, }; static protocol_t protocol; - // which completion mechanism to use - static LCI_comp_type_t completion_type; + // Whether sending header requires completion + static bool enable_sendmc; + // which completion mechanism to use for header messages + enum class comp_type_t + { + queue, + sync, + sync_single, + sync_single_nolock, + }; + static comp_type_t completion_type_header; + // which completion mechanism to use for followup messages + static comp_type_t completion_type_followup; // how to run LCI_progress enum class progress_type_t { @@ -38,6 +49,7 @@ namespace hpx::parcelset::policies::lci { pthread, // Normal progress pthread worker, // HPX worker thread pthread_worker, // Normal progress pthread + worker thread + poll, // progress when polling completion }; static progress_type_t progress_type; // How many progress threads to create @@ -57,6 +69,10 @@ namespace hpx::parcelset::policies::lci { static int send_nb_max_retry; // The max retry count of mbuffer_alloc before yield. static int mbuffer_alloc_max_retry; + // The max count of background_work to invoke in a row + static int bg_work_max_count; + // Whether to do background work when sending + static bool bg_work_when_send; static void init_config(util::runtime_configuration const& rtcfg); }; diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/helper.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/helper.hpp index f1e6235c4020..0446328b854e 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/helper.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/helper.hpp @@ -20,7 +20,9 @@ namespace hpx::parcelset::policies::lci { { k = 0; if (hpx::threads::get_self_id() != hpx::threads::invalid_thread_id) + { hpx::this_thread::yield(); + } } } } // namespace hpx::parcelset::policies::lci diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp index 967cc2c6acf1..4e0e879b0eb4 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/parcelport_lci.hpp @@ -261,7 +261,8 @@ namespace hpx::traits { "backlog_queue = 0\n" "prg_thread_num = 1\n" "protocol = putsendrecv\n" - "comp_type = queue\n" + "comp_type_header = queue\n" + "comp_type_followup = queue\n" "progress_type = rp\n" "prepost_recv_num = 1\n" "reg_mem = 1\n" @@ -269,7 +270,11 @@ namespace hpx::traits { "ncomps = 1\n" "enable_in_buffer_assembly = 1\n" "send_nb_max_retry = 32\n" - "mbuffer_alloc_max_retry = 32\n"; + "mbuffer_alloc_max_retry = 32\n" + "bg_work_max_count = 32\n" + "bg_work_when_send = 0\n" + "enable_sendmc = 0\n" + "comp_type = deprecated\n"; } }; } // namespace hpx::traits diff --git a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp index 4a5a5ede9842..fa178d50ddd4 100644 --- a/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp +++ b/libs/full/parcelport_lci/include/hpx/parcelport_lci/sender_connection_base.hpp @@ -73,7 +73,7 @@ namespace hpx::parcelset::policies::lci { postprocess_handler_type&& parcel_postprocess); virtual void load(handler_type&& handler, postprocess_handler_type&& parcel_postprocess) = 0; - return_t send(); + return_t send(bool in_bg_work); virtual return_t send_nb() = 0; virtual void done() = 0; virtual bool tryMerge( diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp new file mode 100644 index 000000000000..0823304fd43d --- /dev/null +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_queue.cpp @@ -0,0 +1,15 @@ +#include +#include + +namespace hpx::parcelset::policies::lci { + LCI_request_t completion_manager_queue::poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + LCI_queue_pop(queue, &request); + if (request.flag == LCI_ERR_RETRY) + if (config_t::progress_type == config_t::progress_type_t::poll) + pp_->do_progress_local(); + return request; + } +} // namespace hpx::parcelset::policies::lci \ No newline at end of file diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp new file mode 100644 index 000000000000..2e9db982d0fa --- /dev/null +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync.cpp @@ -0,0 +1,37 @@ +#include +#include + +namespace hpx::parcelset::policies::lci { + LCI_request_t completion_manager_sync::poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + + LCI_comp_t sync = nullptr; + { + std::unique_lock l(lock, std::try_to_lock); + if (l.owns_lock() && !sync_list.empty()) + { + sync = sync_list.front(); + sync_list.pop_front(); + } + } + if (sync) + { + LCI_error_t ret = LCI_sync_test(sync, &request); + if (ret == LCI_OK) + { + HPX_ASSERT(request.flag == LCI_OK); + LCI_sync_free(&sync); + } + else + { + if (config_t::progress_type == config_t::progress_type_t::poll) + pp_->do_progress_local(); + std::unique_lock l(lock); + sync_list.push_back(sync); + } + } + return request; + } +} // namespace hpx::parcelset::policies::lci \ No newline at end of file diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp new file mode 100644 index 000000000000..97421a3d1008 --- /dev/null +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single.cpp @@ -0,0 +1,23 @@ +#include +#include + +namespace hpx::parcelset::policies::lci { + LCI_request_t completion_manager_sync_single::poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + + bool succeed = lock.try_lock(); + if (succeed) + { + LCI_error_t ret = LCI_sync_test(sync, &request); + if (ret == LCI_ERR_RETRY) + { + if (config_t::progress_type == config_t::progress_type_t::poll) + pp_->do_progress_local(); + lock.unlock(); + } + } + return request; + } +} // namespace hpx::parcelset::policies::lci \ No newline at end of file diff --git a/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp new file mode 100644 index 000000000000..dd9b5d122892 --- /dev/null +++ b/libs/full/parcelport_lci/src/completion_manager/completion_manager_sync_single_nolock.cpp @@ -0,0 +1,16 @@ +#include +#include + +namespace hpx::parcelset::policies::lci { + LCI_request_t completion_manager_sync_single_nolock::poll() + { + LCI_request_t request; + request.flag = LCI_ERR_RETRY; + + LCI_sync_test(sync, &request); + if (request.flag == LCI_ERR_RETRY) + if (config_t::progress_type == config_t::progress_type_t::poll) + pp_->do_progress_local(); + return request; + } +} // namespace hpx::parcelset::policies::lci \ No newline at end of file diff --git a/libs/full/parcelport_lci/src/config.cpp b/libs/full/parcelport_lci/src/config.cpp index ec4c6725c26f..82a259bd7fcb 100644 --- a/libs/full/parcelport_lci/src/config.cpp +++ b/libs/full/parcelport_lci/src/config.cpp @@ -21,7 +21,8 @@ namespace hpx::parcelset::policies::lci { bool config_t::enable_send_immediate; bool config_t::enable_lci_backlog_queue; config_t::protocol_t config_t::protocol; - LCI_comp_type_t config_t::completion_type; + config_t::comp_type_t config_t::completion_type_header; + config_t::comp_type_t config_t::completion_type_followup; config_t::progress_type_t config_t::progress_type; int config_t::progress_thread_num; int config_t::prepost_recv_num; @@ -31,6 +32,9 @@ namespace hpx::parcelset::policies::lci { bool config_t::enable_in_buffer_assembly; int config_t::send_nb_max_retry; int config_t::mbuffer_alloc_max_retry; + int config_t::bg_work_max_count; + bool config_t::bg_work_when_send; + bool config_t::enable_sendmc; void config_t::init_config(util::runtime_configuration const& rtcfg) { @@ -64,18 +68,51 @@ namespace hpx::parcelset::policies::lci { // set completion mechanism to use std::string completion_str = util::get_entry_as( rtcfg, "hpx.parcel.lci.comp_type", ""); - if (completion_str == "queue") + if (completion_str != "deprecated") { - completion_type = LCI_COMPLETION_QUEUE; + fprintf(stderr, "hpx.parcel.lci.comp_type is deprecated!\n"); } - else if (completion_str == "sync") + // set completion mechanism to use for header messages + std::string header_completion_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.comp_type_header", ""); + if (header_completion_str == "queue") { - completion_type = LCI_COMPLETION_SYNC; + completion_type_header = comp_type_t::queue; + } + else if (header_completion_str == "sync") + { + completion_type_header = comp_type_t::sync; + } + else if (header_completion_str == "sync_single") + { + completion_type_header = comp_type_t::sync_single; + } + else if (header_completion_str == "sync_single_nolock") + { + completion_type_header = comp_type_t::sync_single_nolock; + } + else + { + throw std::runtime_error( + "Unknown completion type for header messages" + + header_completion_str); + } + // set completion mechanism to use for follow-up messages + std::string followup_completion_str = util::get_entry_as( + rtcfg, "hpx.parcel.lci.comp_type_followup", ""); + if (followup_completion_str == "queue") + { + completion_type_followup = comp_type_t::queue; + } + else if (followup_completion_str == "sync") + { + completion_type_followup = comp_type_t::sync; } else { throw std::runtime_error( - "Unknown completion type " + completion_str); + "Unknown completion type for followup messages " + + followup_completion_str); } // set the way to run LCI_progress std::string progress_type_str = util::get_entry_as( @@ -96,6 +133,10 @@ namespace hpx::parcelset::policies::lci { { progress_type = progress_type_t::pthread_worker; } + else if (progress_type_str == "poll") + { + progress_type = progress_type_t::poll; + } else { throw std::runtime_error( @@ -117,6 +158,12 @@ namespace hpx::parcelset::policies::lci { rtcfg, "hpx.parcel.lci.send_nb_max_retry", 0 /* Does not matter*/); mbuffer_alloc_max_retry = util::get_entry_as(rtcfg, "hpx.parcel.lci.mbuffer_alloc_max_retry", 0 /* Does not matter*/); + bg_work_max_count = util::get_entry_as( + rtcfg, "hpx.parcel.lci.bg_work_max_count", 0 /* Does not matter*/); + bg_work_when_send = util::get_entry_as( + rtcfg, "hpx.parcel.lci.bg_work_when_send", 0 /* Does not matter*/); + enable_sendmc = util::get_entry_as( + rtcfg, "hpx.parcel.lci.enable_sendmc", 0 /* Does not matter*/); if (!enable_send_immediate && enable_lci_backlog_queue) { @@ -147,9 +194,31 @@ namespace hpx::parcelset::policies::lci { fprintf(stderr, "WARNING: the number of completion managers (%d) " "cannot exceed the number of devices (%d). " - "ncomps is adjusted accordingly (%d).", + "ncomps is adjusted accordingly (%d).\n", old_ncomps, ndevices, ncomps); } + if (protocol != protocol_t::sendrecv && + completion_type_header != comp_type_t::queue) + { + fprintf(stderr, + "WARNING: we have to use completion type `queue` " + "for putsendrecv/putva protocol. comp_type_header " + "is adjusted accordingly\n"); + completion_type_header = comp_type_t::queue; + } + if (protocol == protocol_t::sendrecv && + (prepost_recv_num > 1 || ndevices > ncomps) && + !(completion_type_header == comp_type_t::queue || + completion_type_header == comp_type_t::sync)) + { + fprintf(stderr, + "WARNING: we have to use completion type `queue` " + "or `sync` for sendrecv protocol with more than " + "one preposted recvs or devices sharing completion" + "managers. comp_type_header " + "is adjusted accordingly\n"); + completion_type_header = comp_type_t::queue; + } } } // namespace hpx::parcelset::policies::lci #endif diff --git a/libs/full/parcelport_lci/src/parcelport_lci.cpp b/libs/full/parcelport_lci/src/parcelport_lci.cpp index 3a296db159f8..73533f189ccb 100644 --- a/libs/full/parcelport_lci/src/parcelport_lci.cpp +++ b/libs/full/parcelport_lci/src/parcelport_lci.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -197,7 +198,13 @@ namespace hpx::parcelset::policies::lci { } else { - has_work = base_type::do_background_work(num_thread, mode); + for (int i = 0; i < config_t::bg_work_max_count; ++i) + { + bool ret = base_type::do_background_work(num_thread, mode); + has_work = ret || has_work; + if (!ret) + break; + } } return has_work; } @@ -209,13 +216,19 @@ namespace hpx::parcelset::policies::lci { return false; bool has_work = false; + if (config_t::progress_type == config_t::progress_type_t::worker || + config_t::progress_type == + config_t::progress_type_t::pthread_worker) + { + has_work = do_progress_local() || has_work; + } + if (mode & parcelport_background_mode::receive) + { + has_work = receiver_p->background_work() || has_work; + } if (mode & parcelport_background_mode::send) { - has_work = sender_p->background_work(num_thread); - if (config_t::progress_type == config_t::progress_type_t::worker || - config_t::progress_type == - config_t::progress_type_t::pthread_worker) - do_progress_local(); + has_work = sender_p->background_work(num_thread) || has_work; if (config_t::enable_lci_backlog_queue) // try to send pending messages has_work = @@ -224,14 +237,6 @@ namespace hpx::parcelset::policies::lci { num_thread) || has_work; } - if (mode & parcelport_background_mode::receive) - { - has_work = receiver_p->background_work() || has_work; - if (config_t::progress_type == config_t::progress_type_t::worker || - config_t::progress_type == - config_t::progress_type_t::pthread_worker) - do_progress_local(); - } return has_work; } @@ -312,39 +317,41 @@ namespace hpx::parcelset::policies::lci { completion_managers.resize(config_t::ncomps); for (auto& completion_manager : completion_managers) { - if (config_t::protocol == config_t::protocol_t::sendrecv && - config_t::completion_type == LCI_COMPLETION_SYNC) - { - if (config_t::prepost_recv_num == 1 && - config_t::ndevices == config_t::ncomps) - { - completion_manager.recv_new = - std::make_shared(); - } - else - { - completion_manager.recv_new = - std::make_shared(); - } - } - else + switch (config_t::completion_type_header) { + case config_t::comp_type_t::queue: + completion_manager.recv_new = + std::make_shared(this); + break; + case config_t::comp_type_t::sync: completion_manager.recv_new = - std::make_shared(); + std::make_shared(this); + break; + case config_t::comp_type_t::sync_single: + completion_manager.recv_new = + std::make_shared(this); + break; + case config_t::comp_type_t::sync_single_nolock: + completion_manager.recv_new = + std::make_shared( + this); + break; + default: + throw std::runtime_error("Unknown completion type!"); } - switch (config_t::completion_type) + switch (config_t::completion_type_followup) { - case LCI_COMPLETION_QUEUE: + case config_t::comp_type_t::queue: completion_manager.send = - std::make_shared(); + std::make_shared(this); completion_manager.recv_followup = - std::make_shared(); + std::make_shared(this); break; - case LCI_COMPLETION_SYNC: + case config_t::comp_type_t::sync: completion_manager.send = - std::make_shared(); + std::make_shared(this); completion_manager.recv_followup = - std::make_shared(); + std::make_shared(this); break; default: throw std::runtime_error("Unknown completion type!"); @@ -371,21 +378,40 @@ namespace hpx::parcelset::policies::lci { // Create the LCI endpoint LCI_plist_t plist_; LCI_plist_create(&plist_); - LCI_plist_set_comp_type( - plist_, LCI_PORT_COMMAND, config_t::completion_type); - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, config_t::completion_type); + switch (config_t::completion_type_followup) + { + case config_t::comp_type_t::queue: + LCI_plist_set_comp_type( + plist_, LCI_PORT_COMMAND, LCI_COMPLETION_QUEUE); + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); + break; + case config_t::comp_type_t::sync: + LCI_plist_set_comp_type( + plist_, LCI_PORT_COMMAND, LCI_COMPLETION_SYNC); + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); + break; + default: + throw std::runtime_error("Unknown completion type!"); + } LCI_endpoint_init(&device.endpoint_followup, device.device, plist_); LCI_plist_set_default_comp(plist_, device.completion_manager_p->recv_new->get_completion_object()); - if (config_t::protocol == config_t::protocol_t::sendrecv && - config_t::completion_type == LCI_COMPLETION_SYNC) - LCI_plist_set_comp_type( - plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); - else + switch (config_t::completion_type_header) { + case config_t::comp_type_t::queue: LCI_plist_set_comp_type( plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_QUEUE); + break; + case config_t::comp_type_t::sync: + case config_t::comp_type_t::sync_single: + case config_t::comp_type_t::sync_single_nolock: + LCI_plist_set_comp_type( + plist_, LCI_PORT_MESSAGE, LCI_COMPLETION_SYNC); + break; + default: + throw std::runtime_error("Unknown completion type!"); } if (config_t::protocol == config_t::protocol_t::sendrecv) LCI_plist_set_match_type(plist_, LCI_MATCH_TAG); diff --git a/libs/full/parcelport_lci/src/sender_base.cpp b/libs/full/parcelport_lci/src/sender_base.cpp index 5eef8fb8c4c9..5887eb594a20 100644 --- a/libs/full/parcelport_lci/src/sender_base.cpp +++ b/libs/full/parcelport_lci/src/sender_base.cpp @@ -41,7 +41,8 @@ namespace hpx::parcelset::policies::lci { auto useful_bg_start = util::lci_environment::pcounter_now(); did_some_work = true; auto* sharedPtr_p = (connection_ptr*) request.user_context; - sender_connection_base::return_t ret = (*sharedPtr_p)->send(); + HPX_ASSERT(sharedPtr_p->get()); + sender_connection_base::return_t ret = (*sharedPtr_p)->send(true); if (ret.status == sender_connection_base::return_status_t::done) { (*sharedPtr_p)->done(); diff --git a/libs/full/parcelport_lci/src/sender_connection_base.cpp b/libs/full/parcelport_lci/src/sender_connection_base.cpp index 48ce02cb2697..9c5a42efa4ee 100644 --- a/libs/full/parcelport_lci/src/sender_connection_base.cpp +++ b/libs/full/parcelport_lci/src/sender_connection_base.cpp @@ -36,7 +36,7 @@ namespace hpx::parcelset::policies::lci { device_p = &pp_->get_tls_device(); load(HPX_FORWARD(handler_type, handler), HPX_FORWARD(postprocess_handler_type, parcel_postprocess)); - return_t ret = send(); + return_t ret = send(false); if (ret.status == return_status_t::done) { done(); @@ -51,8 +51,12 @@ namespace hpx::parcelset::policies::lci { util::lci_environment::pcounter_since(async_write_start_time)); } - sender_connection_base::return_t sender_connection_base::send() + sender_connection_base::return_t sender_connection_base::send( + bool in_bg_work) { + // FIXME: set it properly in the future + // if (HPX_LIKELY(pp_->is_initialized)) + // in_bg_work = false; auto start_time = util::lci_environment::pcounter_now(); return_t ret; if (!config_t::enable_lci_backlog_queue || @@ -66,12 +70,21 @@ namespace hpx::parcelset::policies::lci { ret = send_nb(); if (ret.status == return_status_t::retry) { - if (config_t::progress_type == + if (config_t::bg_work_when_send) + { + pp_->do_background_work(0, + in_bg_work ? parcelport_background_mode::receive : + parcelport_background_mode::all); + } + else if (config_t::progress_type == config_t::progress_type_t::worker || config_t::progress_type == - config_t::progress_type_t::pthread_worker) - while (pp_->do_progress_local()) - continue; + config_t::progress_type_t::pthread_worker || + config_t::progress_type == + config_t::progress_type_t::poll) + { + pp_->do_progress_local(); + } yield_k(retry_count, config_t::send_nb_max_retry); } } while (ret.status == return_status_t::retry); @@ -92,6 +105,10 @@ namespace hpx::parcelset::policies::lci { } } } + if (config_t::bg_work_when_send) + pp_->do_background_work(0, + in_bg_work ? parcelport_background_mode::receive : + parcelport_background_mode::all); util::lci_environment::pcounter_add(util::lci_environment::send_timer, util::lci_environment::pcounter_since(start_time)); return ret; diff --git a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp index e18427e58967..b9e428e4a2f8 100644 --- a/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp +++ b/libs/full/parcelport_lci/src/sendrecv/sender_connection_sendrecv.cpp @@ -50,7 +50,11 @@ namespace hpx::parcelset::policies::lci { int retry_count = 0; while ( LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK) + { + if (config_t::bg_work_when_send) + pp_->do_background_work(0, parcelport_background_mode::all); yield_k(retry_count, config_t::mbuffer_alloc_max_retry); + } HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE); header_ = header( buffer_, (char*) header_buffer.address, header_buffer.length); @@ -86,12 +90,13 @@ namespace hpx::parcelset::policies::lci { } tag = 0; // If no need to post send, then tag can be ignored. sharedPtr_p = nullptr; - if (num_send > 0) + if (config_t::enable_sendmc || num_send > 0) { - tag = next_tag.fetch_add(num_send) % LCI_MAX_TAG; sharedPtr_p = new std::shared_ptr( std::dynamic_pointer_cast( shared_from_this())); + if (num_send > 0) + tag = next_tag.fetch_add(num_send) % LCI_MAX_TAG; } if ((int) tag <= LCI_MAX_TAG && (int) tag + num_send > LCI_MAX_TAG) util::lci_environment::log( @@ -104,7 +109,11 @@ namespace hpx::parcelset::policies::lci { int retry_count = 0; while ( LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK) + { + if (config_t::bg_work_when_send) + pp_->do_background_work(0, parcelport_background_mode::all); yield_k(retry_count, config_t::mbuffer_alloc_max_retry); + } memcpy(header_buffer.address, header_buffer_vector.data(), header_buffer_vector.size()); header_buffer.length = header_buffer_vector.size(); @@ -165,16 +174,26 @@ namespace hpx::parcelset::policies::lci { HPX_ASSERT(state.load(std::memory_order_acquire) == current_state); HPX_UNUSED(current_state); LCI_error_t ret; + if (config_t::enable_sendmc) + { + if (completion == nullptr) + { + completion = + device_p->completion_manager_p->send->alloc_completion(); + } + state.store(connection_state::locked, std::memory_order_relaxed); + } if (config_t::protocol == config_t::protocol_t::putsendrecv) { - ret = LCI_putmna(device_p->endpoint_new, header_buffer, dst_rank, 0, - LCI_DEFAULT_COMP_REMOTE); + ret = LCI_putmac(device_p->endpoint_new, header_buffer, dst_rank, 0, + LCI_DEFAULT_COMP_REMOTE, + config_t::enable_sendmc ? completion : nullptr, sharedPtr_p); } else { HPX_ASSERT(config_t::protocol == config_t::protocol_t::sendrecv); - ret = - LCI_sendmn(device_p->endpoint_new, header_buffer, dst_rank, 0); + ret = LCI_sendmc(device_p->endpoint_new, header_buffer, dst_rank, 0, + config_t::enable_sendmc ? completion : nullptr, sharedPtr_p); } if (ret == LCI_OK) { @@ -185,12 +204,24 @@ namespace hpx::parcelset::policies::lci { "LCI_putmna" : "LCI_sendmn", LCI_RANK, dst_rank, tag, header_buffer.length); - state.store(next_state, std::memory_order_release); - return send_transmission_chunks(); + if (config_t::enable_sendmc) + { + auto ret_comp = completion; + completion = nullptr; + state.store(next_state, std::memory_order_release); + return {return_status_t::wait, ret_comp}; + } + else + { + state.store(next_state, std::memory_order_release); + return send_transmission_chunks(); + } } else { HPX_ASSERT(ret == LCI_ERR_RETRY); + if (config_t::enable_sendmc) + state.store(current_state, std::memory_order_release); return {return_status_t::retry, nullptr}; } } @@ -203,8 +234,14 @@ namespace hpx::parcelset::policies::lci { LCI_mbuffer_t buffer; buffer.address = address; buffer.length = length; - LCI_error_t ret = - LCI_sendm(device_p->endpoint_followup, buffer, dst_rank, tag); + if (config_t::enable_sendmc && completion == nullptr) + { + completion = + device_p->completion_manager_p->send->alloc_completion(); + } + LCI_error_t ret = LCI_sendmc(device_p->endpoint_followup, buffer, + dst_rank, tag, config_t::enable_sendmc ? completion : nullptr, + sharedPtr_p); if (ret == LCI_OK) { util::lci_environment::log( @@ -212,7 +249,14 @@ namespace hpx::parcelset::policies::lci { "sendm (%d, %d, %d) device %d tag %d size %d\n", LCI_RANK, dst_rank, original_tag, device_p->idx, tag, length); tag = (tag + 1) % LCI_MAX_TAG; - return {return_status_t::done, nullptr}; + if (config_t::enable_sendmc) + { + auto ret_comp = completion; + completion = nullptr; + return {return_status_t::wait, ret_comp}; + } + else + return {return_status_t::done, nullptr}; } else { @@ -393,6 +437,10 @@ namespace hpx::parcelset::policies::lci { LCI_memory_deregister(&segment_used); segment_used = LCI_SEGMENT_ALL; } + if (config_t::enable_sendmc) + { + LCI_mbuffer_free(header_buffer); + } HPX_ASSERT(completion == nullptr); HPX_ASSERT(segment_to_use == LCI_SEGMENT_ALL); buffer_.clear();