From d63651699a8fdb28d10b273d35fda26e8be25812 Mon Sep 17 00:00:00 2001 From: Hartmut Kaiser Date: Wed, 10 Apr 2024 14:01:34 -0500 Subject: [PATCH] Fixing test --- .../tests/unit/concurrent_collectives.cpp | 276 +++++++++++++----- 1 file changed, 200 insertions(+), 76 deletions(-) diff --git a/libs/full/collectives/tests/unit/concurrent_collectives.cpp b/libs/full/collectives/tests/unit/concurrent_collectives.cpp index 7e242d743b42..bf0552f88c7a 100644 --- a/libs/full/collectives/tests/unit/concurrent_collectives.cpp +++ b/libs/full/collectives/tests/unit/concurrent_collectives.cpp @@ -9,12 +9,16 @@ #if !defined(HPX_COMPUTE_DEVICE_CODE) #include #include +#include +#include #include +#include #include #include #include #include +#include #include #include #include @@ -29,17 +33,111 @@ constexpr int ITERATIONS = 1000; #endif constexpr std::uint32_t num_sites = 10; -std::atomic generation(0); +//////////////////////////////////////////////////////////////////////////////// +// Each generation of the communicator has to consistently represent a +// particular operation across all localities. For this reason, this test first +// generates generations to be used for the distributed tests and exposes those +// to all participating localities. +struct generations +{ + constexpr static char const* operations[] = {"all_gather", "all_reduce", + "all_to_all", "broadcast", "exclusive_scan", "gather", "inclusive_scan", + "reduce", "scatter"}; + + generations() = default; + + void init(std::mt19937& gen, std::size_t iterations) + { + std::uniform_int_distribution dist( + 0, std::size(operations) - 1); + + for (std::size_t i = 0; i != iterations * std::size(operations); ++i) + { + data[operations[dist(gen)]].generations.push_back(i + 1); + } + + was_initialized = true; + } + + bool get_next_generation(char const* operation, std::size_t& generation) + { + if (auto it = data.find(operation); it != data.end()) + { + if (it->second.current < it->second.generations.size()) + { + generation = it->second.generations[it->second.current++]; + return true; + } + return false; // no more generations available + } + return false; + } + + std::size_t get_iterations(char const* operation) const + { + if (auto it = data.find(operation); it != data.end()) + { + return it->second.generations.size(); + } + return 0; + } + + bool initialized() const + { + return was_initialized; + } + +private: + friend class hpx::serialization::access; + + template + void load(Archive& ar, unsigned) + { + ar & data; + for (auto& [k, v] : data) + { + v.current = 0; + } + } + + template + void save(Archive& ar, unsigned) const + { + ar & data; + } + + HPX_SERIALIZATION_SPLIT_MEMBER(); + + bool was_initialized = false; + struct generation_data + { + std::size_t current = 0; + std::vector generations; + }; + std::map data; +}; + +generations distributed; +generations local; + +generations get_generations(bool local_generations) +{ + generations const& result = local_generations ? local : distributed; + hpx::util::yield_while([&] { return !result.initialized(); }); + return result; +} +HPX_PLAIN_ACTION(get_generations); + +//////////////////////////////////////////////////////////////////////////////// double test_all_gather( communicator const& comm, std::uint32_t num_localities, std::uint32_t here) { hpx::chrono::high_resolution_timer const t; - for (int i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("all_gather", gen); ++i) { - std::size_t const gen = ++generation; - hpx::future> overall_result = all_gather(comm, here + i, generation_arg(gen)); @@ -60,10 +158,9 @@ double test_all_reduce( { hpx::chrono::high_resolution_timer const t; - for (int i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("all_reduce", gen); ++i) { - std::size_t const gen = ++generation; - hpx::future overall_result = all_reduce( comm, here + i, std::plus{}, generation_arg(gen)); @@ -83,10 +180,9 @@ double test_all_to_all( { hpx::chrono::high_resolution_timer const t; - for (int i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("all_to_all", gen); ++i) { - std::size_t const gen = ++generation; - std::vector values(num_localities); std::fill(values.begin(), values.end(), here + i); @@ -110,10 +206,10 @@ double test_broadcast(communicator const& comm, std::uint32_t here) { hpx::chrono::high_resolution_timer const t; - for (std::uint32_t i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; distributed.get_next_generation("broadcast", gen); + ++i) { - std::size_t const gen = ++generation; - if (here == 0) { hpx::future result = @@ -138,10 +234,9 @@ double test_exclusive_scan(communicator const& comm, std::uint32_t here) { hpx::chrono::high_resolution_timer const t; - for (int i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("exclusive_scan", gen); ++i) { - std::size_t const gen = ++generation; - hpx::future overall_result = exclusive_scan(comm, here + i, std::plus<>{}, generation_arg(gen)); @@ -160,10 +255,10 @@ double test_gather(communicator const& comm, std::uint32_t here) { hpx::chrono::high_resolution_timer const t; - for (std::uint32_t i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; distributed.get_next_generation("gather", gen); + ++i) { - std::size_t const gen = ++generation; - if (here == 0) { hpx::future> overall_result = @@ -190,10 +285,10 @@ double test_inclusive_scan(communicator const& comm, std::uint32_t here) { hpx::chrono::high_resolution_timer const t; - for (std::uint32_t i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; + distributed.get_next_generation("inclusive_scan", gen); ++i) { - std::size_t const gen = ++generation; - hpx::future overall_result = inclusive_scan( comm, here + i, std::plus{}, generation_arg(gen)); @@ -213,10 +308,9 @@ double test_reduce( { hpx::chrono::high_resolution_timer const t; - for (std::uint32_t i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("reduce", gen); ++i) { - std::size_t const gen = ++generation; - std::uint32_t value = here + i; if (here == 0) { @@ -246,10 +340,9 @@ double test_scatter( { hpx::chrono::high_resolution_timer const t; - for (std::uint32_t i = 0; i != ITERATIONS; ++i) + std::size_t gen = 0; + for (int i = 0; distributed.get_next_generation("scatter", gen); ++i) { - std::size_t const gen = ++generation; - if (here == 0) { std::vector data(num_localities); @@ -277,13 +370,12 @@ double test_local_all_gather(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("all_gather", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -321,13 +413,12 @@ double test_local_all_reduce(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("all_reduce", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -366,13 +457,12 @@ double test_local_all_to_all(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("all_to_all", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -411,13 +501,12 @@ double test_local_broadcast(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("broadcast", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -460,13 +549,13 @@ double test_local_exclusive_scan(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("exclusive_scan", gen); + ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -503,13 +592,12 @@ double test_local_gather(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("gather", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -553,13 +641,13 @@ double test_local_inclusive_scan(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("inclusive_scan", gen); + ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -596,13 +684,12 @@ double test_local_reduce(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("reduce", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -649,13 +736,12 @@ double test_local_scatter(std::vector const& comms) { double elapsed = 0.; - for (std::uint32_t i = 0; i != 10 * ITERATIONS; ++i) + std::size_t gen = 0; + for (std::uint32_t i = 0; local.get_next_generation("scatter", gen); ++i) { std::vector> sites; sites.reserve(num_sites); - auto const gen = ++generation; - // launch num_sites threads to represent different sites for (std::uint32_t site = 0; site != num_sites; ++site) { @@ -696,10 +782,30 @@ double test_local_scatter(std::vector const& comms) } //////////////////////////////////////////////////////////////////////////////// -int hpx_main() +int hpx_main(hpx::program_options::variables_map& vm) { std::uint32_t const here = hpx::get_locality_id(); + unsigned int seed = std::random_device{}(); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + std::mt19937 gen(seed); + + if (here == 0) + { + distributed.init(gen, ITERATIONS); + local.init(gen, 10 * ITERATIONS); + } + else + { + auto console = hpx::agas::get_console_locality(); + distributed = + hpx::async(get_generations_action(), console, false).get(); + local = hpx::async(get_generations_action(), console, true).get(); + } + #if defined(HPX_HAVE_NETWORKING) if (hpx::get_num_localities(hpx::launch::sync) > 1) { @@ -730,31 +836,38 @@ int hpx_main() if (here == 0) { std::cout << "remote all_gather timing: " - << f1.get() / ITERATIONS << " [s]\n"; + << f1.get() / distributed.get_iterations("all_gather") + << " [s]\n"; std::cout << "remote all_reduce timing: " - << f2.get() / ITERATIONS << " [s]\n"; + << f2.get() / distributed.get_iterations("all_reduce") + << " [s]\n"; std::cout << "remote all_to_all timing: " - << f3.get() / ITERATIONS << " [s]\n"; + << f3.get() / distributed.get_iterations("all_to_all") + << " [s]\n"; std::cout << "remote broadcast timing: " - << f4.get() / ITERATIONS << " [s]\n"; + << f4.get() / distributed.get_iterations("broadcast") + << " [s]\n"; std::cout << "remote exclusive_scan timing: " - << f5.get() / ITERATIONS << " [s]\n"; + << f5.get() / distributed.get_iterations("exclusive_scan") + << " [s]\n"; std::cout << "remote gather timing: " - << f6.get() / ITERATIONS << " [s]\n"; + << f6.get() / distributed.get_iterations("gather") + << " [s]\n"; std::cout << "remote inclusive_scan timing: " - << f7.get() / ITERATIONS << " [s]\n"; + << f7.get() / distributed.get_iterations("inclusive_scan") + << " [s]\n"; std::cout << "remote reduce timing: " - << f8.get() / ITERATIONS << " [s]\n"; + << f8.get() / distributed.get_iterations("reduce") + << " [s]\n"; std::cout << "remote scatter timing: " - << f9.get() / ITERATIONS << " [s]\n"; + << f9.get() / distributed.get_iterations("scatter") + << " [s]\n"; } } #endif if (here == 0) { - generation = 0; - std::vector concurrent_comms; concurrent_comms.reserve(num_sites); @@ -778,23 +891,25 @@ int hpx_main() hpx::wait_all(f1, f2, f3, f4, f5, f6, f7, f8, f9); std::cout << "local all_gather timing: " - << f1.get() / (10 * ITERATIONS) << " [s]\n"; + << f1.get() / local.get_iterations("all_gather") << " [s]\n"; std::cout << "local all_reduce timing: " - << f2.get() / (10 * ITERATIONS) << " [s]\n"; + << f2.get() / local.get_iterations("all_reduce") << " [s]\n"; std::cout << "local all_to_all timing: " - << f3.get() / (10 * ITERATIONS) << " [s]\n"; + << f3.get() / local.get_iterations("all_to_all") << " [s]\n"; std::cout << "local broadcast timing: " - << f4.get() / (10 * ITERATIONS) << " [s]\n"; + << f4.get() / local.get_iterations("broadcast") << " [s]\n"; std::cout << "local exclusive_scan timing: " - << f5.get() / (10 * ITERATIONS) << " [s]\n"; + << f5.get() / local.get_iterations("exclusive_scan") + << " [s]\n"; std::cout << "local gather timing: " - << f6.get() / (10 * ITERATIONS) << " [s]\n"; + << f6.get() / local.get_iterations("gather") << " [s]\n"; std::cout << "local inclusive_scan timing: " - << f7.get() / (10 * ITERATIONS) << " [s]\n"; + << f7.get() / local.get_iterations("inclusive_scan") + << " [s]\n"; std::cout << "local reduce timing: " - << f8.get() / (10 * ITERATIONS) << " [s]\n"; + << f8.get() / local.get_iterations("reduce") << " [s]\n"; std::cout << "local scatter timing: " - << f9.get() / (10 * ITERATIONS) << " [s]\n"; + << f9.get() / local.get_iterations("scatter") << " [s]\n"; } return hpx::finalize(); @@ -802,9 +917,18 @@ int hpx_main() int main(int argc, char* argv[]) { + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + std::vector const cfg = {"hpx.run_hpx_main!=1"}; hpx::init_params init_args; + init_args.desc_cmdline = desc_commandline; init_args.cfg = cfg; HPX_TEST_EQ(hpx::init(argc, argv, init_args), 0);