Skip to content

Commit

Permalink
improve(lci pp): more options to control the behavior of the LCI parc…
Browse files Browse the repository at this point in the history
…elport

- split the comp_type option into comp_type_header and comp_type_followup
- add new progress_type option: poll
- add enable_sendmc option
- add bg_work_max_count, bg_work_when_send for more control of background work invocation
- improve completion_manager_sync to better simulate MPI
  • Loading branch information
JiakunYan committed Apr 6, 2024
1 parent 3add538 commit aed9518
Show file tree
Hide file tree
Showing 19 changed files with 421 additions and 131 deletions.
4 changes: 4 additions & 0 deletions libs/full/parcelport_lci/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ set(parcelport_lci_sources
sendrecv/sender_connection_sendrecv.cpp
sendrecv/receiver_sendrecv.cpp
sendrecv/receiver_connection_sendrecv.cpp
completion_manager/completion_manager_queue.cpp
completion_manager/completion_manager_sync.cpp
completion_manager/completion_manager_sync_single.cpp
completion_manager/completion_manager_sync_single_nolock.cpp
)

include(HPX_SetupLCI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@

#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)

#include <hpx/parcelport_lci/config.hpp>
#include <hpx/parcelport_lci/completion_manager_base.hpp>

namespace hpx::parcelset::policies::lci {
struct completion_manager_queue : public completion_manager_base
{
completion_manager_queue()
completion_manager_queue(parcelport* pp)
: completion_manager_base(pp)
{
// LCI_queue_create(LCI_UR_DEVICE, &queue);
// Hack for now
Expand All @@ -38,13 +40,7 @@ namespace hpx::parcelset::policies::lci {
HPX_UNUSED(comp);
}

LCI_request_t poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;
LCI_queue_pop(queue, &request);
return request;
}
LCI_request_t poll();

LCI_comp_t get_completion_object()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@

#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)

#include <hpx/parcelport_lci/completion_manager_base.hpp>

#include <hpx/assert.hpp>
#include <hpx/parcelport_lci/completion_manager_base.hpp>
#include <deque>

namespace hpx::parcelset::policies::lci {
struct completion_manager_sync : public completion_manager_base
{
completion_manager_sync() {}
completion_manager_sync(parcelport* pp)
: completion_manager_base(pp)
{
}

~completion_manager_sync() {}

Expand All @@ -34,34 +37,7 @@ namespace hpx::parcelset::policies::lci {
sync_list.push_back(comp);
}

LCI_request_t poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;
if (sync_list.empty())
{
return request;
}
{
std::unique_lock l(lock, std::try_to_lock);
if (l.owns_lock() && !sync_list.empty())
{
LCI_comp_t sync = sync_list.front();
sync_list.pop_front();
LCI_error_t ret = LCI_sync_test(sync, &request);
if (ret == LCI_OK)
{
HPX_ASSERT(request.flag == LCI_OK);
LCI_sync_free(&sync);
}
else
{
sync_list.push_back(sync);
}
}
}
return request;
}
LCI_request_t poll();

private:
hpx::spinlock lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
namespace hpx::parcelset::policies::lci {
struct completion_manager_sync_single : public completion_manager_base
{
completion_manager_sync_single()
completion_manager_sync_single(parcelport* pp)
: completion_manager_base(pp)
{
LCI_sync_create(LCI_UR_DEVICE, 1, &sync);
}
Expand All @@ -36,20 +37,7 @@ namespace hpx::parcelset::policies::lci {
lock.unlock();
}

LCI_request_t poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;

bool succeed = lock.try_lock();
if (succeed)
{
LCI_error_t ret = LCI_sync_test(sync, &request);
if (ret == LCI_ERR_RETRY)
lock.unlock();
}
return request;
}
LCI_request_t poll();

private:
hpx::spinlock lock;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2014-2023 Thomas Heller
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#pragma once

#include <hpx/config.hpp>

#if defined(HPX_HAVE_NETWORKING) && defined(HPX_HAVE_PARCELPORT_LCI)

#include <hpx/parcelport_lci/completion_manager_base.hpp>

namespace hpx::parcelset::policies::lci {
struct completion_manager_sync_single_nolock
: public completion_manager_base
{
completion_manager_sync_single_nolock(parcelport* pp)
: completion_manager_base(pp)
{
LCI_sync_create(LCI_UR_DEVICE, 1, &sync);
}

~completion_manager_sync_single_nolock()
{
LCI_sync_free(&sync);
}

LCI_comp_t alloc_completion()
{
return sync;
}

void enqueue_completion(LCI_comp_t comp)
{
HPX_UNUSED(comp);
}

LCI_request_t poll();

private:
LCI_comp_t sync;
};
} // namespace hpx::parcelset::policies::lci

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
#include <hpx/modules/lci_base.hpp>

namespace hpx::parcelset::policies::lci {
class HPX_EXPORT parcelport;
struct completion_manager_base
{
completion_manager_base(parcelport* pp) noexcept
: pp_(pp){};
virtual ~completion_manager_base() {}
virtual LCI_comp_t alloc_completion() = 0;
virtual void enqueue_completion(LCI_comp_t comp) = 0;
Expand All @@ -23,6 +26,7 @@ namespace hpx::parcelset::policies::lci {
{
return nullptr;
}
parcelport* pp_;
};
} // namespace hpx::parcelset::policies::lci

Expand Down
20 changes: 18 additions & 2 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,27 @@ namespace hpx::parcelset::policies::lci {
putsendrecv,
};
static protocol_t protocol;
// which completion mechanism to use
static LCI_comp_type_t completion_type;
// Whether sending header requires completion
static bool enable_sendmc;
// which completion mechanism to use for header messages
enum class comp_type_t
{
queue,
sync,
sync_single,
sync_single_nolock,
};
static comp_type_t completion_type_header;
// which completion mechanism to use for followup messages
static comp_type_t completion_type_followup;
// how to run LCI_progress
enum class progress_type_t
{
rp, // HPX resource partitioner
pthread, // Normal progress pthread
worker, // HPX worker thread
pthread_worker, // Normal progress pthread + worker thread
poll, // progress when polling completion
};
static progress_type_t progress_type;
// How many progress threads to create
Expand All @@ -57,6 +69,10 @@ namespace hpx::parcelset::policies::lci {
static int send_nb_max_retry;
// The max retry count of mbuffer_alloc before yield.
static int mbuffer_alloc_max_retry;
// The max count of background_work to invoke in a row
static int bg_work_max_count;
// Whether to do background work when sending
static bool bg_work_when_send;

static void init_config(util::runtime_configuration const& rtcfg);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ namespace hpx::parcelset::policies::lci {
{
k = 0;
if (hpx::threads::get_self_id() != hpx::threads::invalid_thread_id)
{
hpx::this_thread::yield();
}
}
}
} // namespace hpx::parcelset::policies::lci
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,20 @@ namespace hpx::traits {
"backlog_queue = 0\n"
"prg_thread_num = 1\n"
"protocol = putsendrecv\n"
"comp_type = queue\n"
"comp_type_header = queue\n"
"comp_type_followup = queue\n"
"progress_type = rp\n"
"prepost_recv_num = 1\n"
"reg_mem = 1\n"
"ndevices = 1\n"
"ncomps = 1\n"
"enable_in_buffer_assembly = 1\n"
"send_nb_max_retry = 32\n"
"mbuffer_alloc_max_retry = 32\n";
"mbuffer_alloc_max_retry = 32\n"
"bg_work_max_count = 32\n"
"bg_work_when_send = 0\n"
"enable_sendmc = 0\n"
"comp_type = deprecated\n";
}
};
} // namespace hpx::traits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace hpx::parcelset::policies::lci {
postprocess_handler_type&& parcel_postprocess);
virtual void load(handler_type&& handler,
postprocess_handler_type&& parcel_postprocess) = 0;
return_t send();
return_t send(bool in_bg_work);
virtual return_t send_nb() = 0;
virtual void done() = 0;
virtual bool tryMerge(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#include <hpx/parcelport_lci/completion_manager/completion_manager_queue.hpp>
#include <hpx/parcelport_lci/parcelport_lci.hpp>

namespace hpx::parcelset::policies::lci {
LCI_request_t completion_manager_queue::poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;
LCI_queue_pop(queue, &request);
if (request.flag == LCI_ERR_RETRY)
if (config_t::progress_type == config_t::progress_type_t::poll)
pp_->do_progress_local();
return request;
}
} // namespace hpx::parcelset::policies::lci
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include <hpx/parcelport_lci/completion_manager/completion_manager_sync.hpp>
#include <hpx/parcelport_lci/parcelport_lci.hpp>

namespace hpx::parcelset::policies::lci {
LCI_request_t completion_manager_sync::poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;

LCI_comp_t sync = nullptr;
{
std::unique_lock l(lock, std::try_to_lock);
if (l.owns_lock() && !sync_list.empty())
{
sync = sync_list.front();
sync_list.pop_front();
}
}
if (sync)
{
LCI_error_t ret = LCI_sync_test(sync, &request);
if (ret == LCI_OK)
{
HPX_ASSERT(request.flag == LCI_OK);
LCI_sync_free(&sync);
}
else
{
if (config_t::progress_type == config_t::progress_type_t::poll)
pp_->do_progress_local();
std::unique_lock l(lock);
sync_list.push_back(sync);
}
}
return request;
}
} // namespace hpx::parcelset::policies::lci
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include <hpx/parcelport_lci/completion_manager/completion_manager_sync_single.hpp>
#include <hpx/parcelport_lci/parcelport_lci.hpp>

namespace hpx::parcelset::policies::lci {
LCI_request_t completion_manager_sync_single::poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;

bool succeed = lock.try_lock();
if (succeed)
{
LCI_error_t ret = LCI_sync_test(sync, &request);
if (ret == LCI_ERR_RETRY)
{
if (config_t::progress_type == config_t::progress_type_t::poll)
pp_->do_progress_local();
lock.unlock();
}
}
return request;
}
} // namespace hpx::parcelset::policies::lci
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include <hpx/parcelport_lci/completion_manager/completion_manager_sync_single_nolock.hpp>
#include <hpx/parcelport_lci/parcelport_lci.hpp>

namespace hpx::parcelset::policies::lci {
LCI_request_t completion_manager_sync_single_nolock::poll()
{
LCI_request_t request;
request.flag = LCI_ERR_RETRY;

LCI_sync_test(sync, &request);
if (request.flag == LCI_ERR_RETRY)
if (config_t::progress_type == config_t::progress_type_t::poll)
pp_->do_progress_local();
return request;
}
} // namespace hpx::parcelset::policies::lci
Loading

0 comments on commit aed9518

Please sign in to comment.