Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor refactoring and fixes to the LCI parcelport and pingpong_performance2 benchmark #6411

Merged
merged 4 commits into from
Jan 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ namespace hpx::parcelset::policies::lci {
static int ndevices;
// How many completion managers to use
static int ncomps;
// Whether to enable in-buffer assembly for the header messages.
static bool enable_in_buffer_assembly;

static void init_config(util::runtime_configuration const& rtcfg);
};
Expand Down
31 changes: 31 additions & 0 deletions libs/full/parcelport_lci/include/hpx/parcelport_lci/header.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,37 @@ namespace hpx::parcelset::policies::lci {
pos_piggy_back_address = 8 * sizeof(value_type) + 2
};

template <typename buffer_type, typename ChunkType>
static size_t get_header_size(
parcel_buffer<buffer_type, ChunkType> const& buffer,
size_t max_header_size) noexcept
{
HPX_ASSERT(max_header_size >= pos_piggy_back_address);

size_t current_header_size = pos_piggy_back_address;
if (buffer.data_.size() <= (max_header_size - current_header_size))
{
current_header_size += buffer.data_.size();
}
int num_zero_copy_chunks = buffer.num_chunks_.first;
[[maybe_unused]] int num_non_zero_copy_chunks =
buffer.num_chunks_.second;
if (num_zero_copy_chunks != 0)
{
HPX_ASSERT(buffer.transmission_chunks_.size() ==
size_t(num_zero_copy_chunks + num_non_zero_copy_chunks));
int tchunk_size =
static_cast<int>(buffer.transmission_chunks_.size() *
sizeof(typename parcel_buffer<buffer_type,
ChunkType>::transmission_chunk_type));
if (tchunk_size <= int(max_header_size - current_header_size))
{
current_header_size += tchunk_size;
}
}
return current_header_size;
}

template <typename buffer_type, typename ChunkType>
header(parcel_buffer<buffer_type, ChunkType> const& buffer,
char* header_buffer, size_t max_header_size) noexcept
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,29 +210,30 @@ namespace hpx::traits {
hpx::threads::policies::scheduler_mode::
do_background_work_only);

size_t ncores_to_add =
size_t npus_to_add =
parcelset::policies::lci::config_t::progress_thread_num;
std::vector<const hpx::resource::core*> cores;
std::vector<const hpx::resource::pu*> pus;
for (auto& numa_domain : rp.numa_domains())
{
for (auto& core : numa_domain.cores())
{
cores.push_back(&core);
for (auto& pu : core.pus())
pus.push_back(&pu);
}
}
if (cores.size() <= 1)
if (pus.size() <= 1)
{
fprintf(stderr, "We don't have enough cores!\n");
fprintf(stderr, "We don't have enough pus!\n");
exit(1);
}
if ((size_t) ncores_to_add > cores.size() / 2)
if ((size_t) npus_to_add > pus.size() / 2)
{
ncores_to_add = cores.size() / 2;
npus_to_add = pus.size() / 2;
}
for (size_t i = 0; i < ncores_to_add; ++i)
for (size_t i = 0; i < npus_to_add; ++i)
{
size_t next_core = i * cores.size() / ncores_to_add;
rp.add_resource(*cores[next_core], "lci-progress-pool");
size_t next_pu = i * pus.size() / npus_to_add;
rp.add_resource(*pus[next_pu], "lci-progress-pool");
}
}
}
Expand Down Expand Up @@ -264,8 +265,9 @@ namespace hpx::traits {
"progress_type = rp\n"
"prepost_recv_num = 1\n"
"reg_mem = 1\n"
"ndevices = 2\n"
"ncomps = 1\n";
"ndevices = 1\n"
"ncomps = 1\n"
"enable_in_buffer_assembly = 1\n";
}
};
} // namespace hpx::traits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace hpx::parcelset::policies::lci {
hpx::chrono::high_resolution_timer timer_;
header header_;
LCI_mbuffer_t header_buffer;
std::vector<char> header_buffer_vector;
bool need_send_data;
bool need_send_tchunks;
LCI_tag_t tag;
Expand Down
3 changes: 3 additions & 0 deletions libs/full/parcelport_lci/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace hpx::parcelset::policies::lci {
bool config_t::reg_mem;
int config_t::ndevices;
int config_t::ncomps;
bool config_t::enable_in_buffer_assembly;

void config_t::init_config(util::runtime_configuration const& rtcfg)
{
Expand Down Expand Up @@ -105,6 +106,8 @@ namespace hpx::parcelset::policies::lci {
reg_mem = util::get_entry_as(rtcfg, "hpx.parcel.lci.reg_mem", 1);
ndevices = util::get_entry_as(rtcfg, "hpx.parcel.lci.ndevices", 1);
ncomps = util::get_entry_as(rtcfg, "hpx.parcel.lci.ncomps", 1);
enable_in_buffer_assembly = util::get_entry_as(
rtcfg, "hpx.parcel.lci.enable_in_buffer_assembly", 1);

if (!enable_send_immediate && enable_lci_backlog_queue)
{
Expand Down
17 changes: 8 additions & 9 deletions libs/full/parcelport_lci/src/parcelport_lci.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,14 @@ namespace hpx::parcelset::policies::lci {
if (hpx::this_thread::get_pool() ==
&hpx::resource::get_thread_pool("lci-progress-pool"))
{
std::size_t prg_thread_id =
hpx::get_local_worker_thread_num();
double rate = (double) config_t::ndevices /
config_t::progress_thread_num;
for (int i = prg_thread_id * rate;
i < (prg_thread_id + 1) * rate; ++i)
int prg_thread_id =
static_cast<int>(hpx::get_local_worker_thread_num());
HPX_ASSERT(prg_thread_id < config_t::progress_thread_num);
for (int i = prg_thread_id * config_t::ndevices /
config_t::progress_thread_num;
i < (prg_thread_id + 1) * config_t::ndevices /
config_t::progress_thread_num;
++i)
{
devices_to_progress.push_back(&devices[i]);
}
Expand Down Expand Up @@ -459,9 +461,6 @@ namespace hpx::parcelset::policies::lci {
hpx::threads::get_self_id() == hpx::threads::invalid_thread_id))
{
static thread_local unsigned int tls_rand_seed = rand();
util::lci_environment::log(
util::lci_environment::log_level_t::debug, "device",
"Rank %d unusual phase\n", LCI_RANK);
return devices[rand_r(&tls_rand_seed) % devices.size()];
}
if (tls_device_idx == std::size_t(-1))
Expand Down
6 changes: 3 additions & 3 deletions libs/full/parcelport_lci/src/sender_connection_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ namespace hpx::parcelset::policies::lci {
char buf[1024];
size_t consumed = 0;
consumed += snprintf(buf + consumed, sizeof(buf) - consumed,
"%d:%lf:send_connection(%p) start:%d:%d:%d:[", LCI_RANK,
"%d:%lf:send_connection(%p) start:%d:%d:%d:%d:[", LCI_RANK,
hpx::chrono::high_resolution_clock::now() / 1e9, (void*) this,
header_.numbytes_nonzero_copy(), header_.numbytes_tchunk(),
header_.num_zero_copy_chunks());
dst_rank, header_.numbytes_nonzero_copy(),
header_.numbytes_tchunk(), header_.num_zero_copy_chunks());
HPX_ASSERT(sizeof(buf) > consumed);
for (int i = 0; i < header_.num_zero_copy_chunks(); ++i)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,24 @@ namespace hpx::parcelset::policies::lci {
postprocess_handler_ = HPX_MOVE(parcel_postprocess);

// build header
while (LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
if (config_t::enable_in_buffer_assembly)
{
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
HPX_ASSERT(header_buffer.length == (size_t) LCI_MEDIUM_SIZE);
header_ = header(
buffer_, (char*) header_buffer.address, header_buffer.length);
header_buffer.length = header_.size();
}
else
{
header_buffer_vector.resize(
header::get_header_size(buffer_, LCI_MEDIUM_SIZE));
header_ =
header(buffer_, static_cast<char*>(header_buffer_vector.data()),
header_buffer_vector.size());
}
HPX_ASSERT((header_.num_zero_copy_chunks() == 0) ==
buffer_.transmission_chunks_.empty());
need_send_data = false;
Expand Down Expand Up @@ -85,6 +97,15 @@ namespace hpx::parcelset::policies::lci {
"Rank %d Wrap around!\n", LCI_RANK);
header_.set_device_idx(device_p->idx);
header_.set_tag(tag);
if (!config_t::enable_in_buffer_assembly)
{
while (
LCI_mbuffer_alloc(device_p->device, &header_buffer) != LCI_OK)
continue;
memcpy(header_buffer.address, header_buffer_vector.data(),
header_buffer_vector.size());
header_buffer.length = header_buffer_vector.size();
}
send_chunks_idx = 0;
completion = nullptr;
segment_to_use = LCI_SEGMENT_ALL;
Expand Down
61 changes: 44 additions & 17 deletions tests/performance/network/pingpong_performance2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const std::size_t nsteps_default = 1;
const std::size_t window_default = 10000;
const std::size_t inject_rate_default = 0;
const std::size_t batch_size_default = 10;
const std::size_t nwarmups_default = 1;
const std::size_t niters_default = 1;

size_t window;
size_t inject_rate;
Expand Down Expand Up @@ -79,6 +81,7 @@ void on_recv(hpx::id_type to, std::vector<char> const& in, std::size_t counter)
if (result + 1 == window)
{
hpx::post<on_done_action>(hpx::find_root_locality());
done_counter = 0;
}
return;
}
Expand All @@ -104,13 +107,21 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
window = b_arg["window"].as<std::size_t>();
inject_rate = b_arg["inject-rate"].as<std::size_t>();
batch_size = b_arg["batch-size"].as<std::size_t>();
std::size_t const nwarmups = b_arg["nwarmups"].as<std::size_t>();
std::size_t const niters = b_arg["niters"].as<std::size_t>();

if (nsteps == 0)
{
std::cout << "nsteps is 0!" << std::endl;
return 0;
}

if (window == 0)
{
std::cout << "window is 0!" << std::endl;
return 0;
}

std::vector<hpx::id_type> localities = hpx::find_remote_localities();

hpx::id_type to;
Expand All @@ -126,32 +137,43 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
set_window_action act;
act(to, window);

hpx::chrono::high_resolution_timer timer_total;

for (size_t i = 0; i < window; i += batch_size)
double inject_time = 0;
double time = 0;
for (size_t j = 0; j < nwarmups + niters; ++j)
{
while (inject_rate > 0 &&
static_cast<double>(i) / timer_total.elapsed() >
static_cast<double>(inject_rate))
hpx::chrono::high_resolution_timer timer_total;

for (size_t i = 0; i < window; i += batch_size)
{
continue;
while (inject_rate > 0 &&
static_cast<double>(i) / timer_total.elapsed() >
static_cast<double>(inject_rate))
{
continue;
}
hpx::post<on_inject_action>(hpx::find_here(), to, nbytes, nsteps);
}
hpx::post<on_inject_action>(hpx::find_here(), to, nbytes, nsteps);
}
double achieved_inject_rate =
static_cast<double>(window) / timer_total.elapsed() / 1e3;
if (j >= nwarmups)
inject_time += timer_total.elapsed();

semaphore.wait();
semaphore.wait();
if (j >= nwarmups)
time += timer_total.elapsed();
}

double time = timer_total.elapsed();
double latency = time * 1e6 / static_cast<double>(nsteps);
double msg_rate = static_cast<double>(nsteps * window) / time / 1e3;
double achieved_inject_rate =
static_cast<double>(window * niters) / inject_time / 1e3;
double latency = time * 1e6 / static_cast<double>(nsteps * niters);
double msg_rate =
static_cast<double>(nsteps * window * niters) / time / 1e3;
double bandwidth =
static_cast<double>(nbytes * nsteps * window) / time / 1e6;
static_cast<double>(nbytes * nsteps * window * niters) / time / 1e6;
if (verbose)
{
std::cout << "[hpx_pingpong]" << std::endl
<< "total_time(secs)=" << time << std::endl
<< "nwarmups=" << nwarmups << std::endl
<< "niters=" << niters << std::endl
<< "nbytes=" << nbytes << std::endl
<< "window=" << window << std::endl
<< "latency(us)=" << latency << std::endl
Expand All @@ -165,6 +187,7 @@ int hpx_main(hpx::program_options::variables_map& b_arg)
{
std::cout << "[hpx_pingpong]"
<< ":total_time(secs)=" << time << ":nbytes=" << nbytes
<< ":nwarmups=" << nwarmups << ":niters=" << niters
<< ":window=" << window << ":latency(us)=" << latency
<< ":inject_rate(K/s)=" << achieved_inject_rate
<< ":msg_rate(M/s)=" << msg_rate
Expand Down Expand Up @@ -192,7 +215,11 @@ int main(int argc, char* argv[])
po::value<std::size_t>()->default_value(inject_rate_default),
"the rate of injecting the first message of ping-pong")("batch-size",
po::value<std::size_t>()->default_value(batch_size_default),
"the number of messages to inject per inject thread")("verbose",
"the number of messages to inject per inject thread")("nwarmups",
po::value<std::size_t>()->default_value(nwarmups_default),
"the iteration count of warmup runs")("niters",
po::value<std::size_t>()->default_value(niters_default),
"the iteration count of measurement iterations.")("verbose",
po::value<bool>()->default_value(true),
"verbosity of output,if false output is for awk");

Expand Down
Loading