diff --git a/.circleci/tests.unit1.algorithms b/.circleci/tests.unit1.algorithms index d35e591525da..337b63ab6a42 100644 --- a/.circleci/tests.unit1.algorithms +++ b/.circleci/tests.unit1.algorithms @@ -48,13 +48,18 @@ tests.unit.modules.algorithms.algorithms.foreachn_bad_alloc tests.unit.modules.algorithms.algorithms.for_loop tests.unit.modules.algorithms.algorithms.for_loop_exception tests.unit.modules.algorithms.algorithms.for_loop_induction +tests.unit.modules.algorithms.algorithms.for_loop_induction_sender tests.unit.modules.algorithms.algorithms.for_loop_induction_async tests.unit.modules.algorithms.algorithms.for_loop_n +tests.unit.modules.algorithms.algorithms.for_loop_n_sender tests.unit.modules.algorithms.algorithms.for_loop_n_strided +tests.unit.modules.algorithms.algorithms.for_loop_n_strided_sender tests.unit.modules.algorithms.algorithms.for_loop_reduction +tests.unit.modules.algorithms.algorithms.for_loop_reduction_sender tests.unit.modules.algorithms.algorithms.for_loop_reduction_async tests.unit.modules.algorithms.algorithms.for_loop_sender tests.unit.modules.algorithms.algorithms.for_loop_strided +tests.unit.modules.algorithms.algorithms.for_loop_strided_sender tests.unit.modules.algorithms.algorithms.generate tests.unit.modules.algorithms.algorithms.generaten tests.unit.modules.algorithms.algorithms.is_heap diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp index bcb1b62c2b4a..991ac3e9f385 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop.hpp @@ -1154,13 +1154,13 @@ namespace hpx::parallel { template - static auto parallel(ExPolicy&& policy, IterOrR iter_or_r, + static decltype(auto) parallel(ExPolicy&& policy, IterOrR iter_or_r, Size size, F&& f, Ts&&... ts) { - constexpr bool is_scheduler_policy = + constexpr bool has_scheduler_executor = hpx::execution_policy_has_scheduler_executor_v; - if constexpr (!is_scheduler_policy) + if constexpr (!has_scheduler_executor) { if (size == 0) { @@ -1171,7 +1171,7 @@ namespace hpx::parallel { if constexpr (sizeof...(Ts) == 0) { if constexpr (hpx::is_async_execution_policy_v || - is_scheduler_policy) + has_scheduler_executor) { return util::detail::algorithm_result::get( util::partitioner::call( @@ -1190,14 +1190,6 @@ namespace hpx::parallel { } else { - // any of the induction or reduction operations prevent us - // from sharing the part_iteration between threads - decltype(auto) hinted_policy = - parallel::util::adapt_sharing_mode( - HPX_FORWARD(ExPolicy, policy), - hpx::threads::thread_sharing_hint:: - do_not_share_function); - using policy_type = std::decay_t; // we need to decay copy here to properly transport @@ -1209,10 +1201,10 @@ namespace hpx::parallel { return util::detail::algorithm_result::get( util::partitioner::call_with_index( - hinted_policy, iter_or_r, size, 1, + HPX_FORWARD(ExPolicy, policy), iter_or_r, size, 1, part_iterations{ HPX_FORWARD(F, f), args}, - [=](auto&&) mutable { + [=](auto&&...) mutable { auto pack = hpx::util::make_index_pack_t(); @@ -1335,10 +1327,10 @@ namespace hpx::parallel { static auto parallel(ExPolicy&& policy, B first, Size size, S stride, F&& f, Ts&&... ts) { - constexpr bool is_scheduler_policy = + constexpr bool has_scheduler_executor = hpx::execution_policy_has_scheduler_executor_v; - if constexpr (!is_scheduler_policy) + if constexpr (!has_scheduler_executor) { if (size == 0) { @@ -1348,7 +1340,7 @@ namespace hpx::parallel { if constexpr (sizeof...(Ts) == 0) { - if constexpr (!is_scheduler_policy) + if constexpr (!has_scheduler_executor) { if (stride == 1) { @@ -1357,12 +1349,14 @@ namespace hpx::parallel { HPX_FORWARD(ExPolicy, policy), first, size, part_iterations{ HPX_FORWARD(F, f)}, - [](auto&&) { return hpx::util::unused; })); + [](auto&&...) { + return hpx::util::unused; + })); } } if constexpr (hpx::is_async_execution_policy_v || - is_scheduler_policy) + has_scheduler_executor) { return util::detail::algorithm_result::get( util::partitioner::call_with_index( @@ -1370,7 +1364,7 @@ namespace hpx::parallel { stride, part_iterations{ HPX_FORWARD(F, f), stride}, - [](auto&&) { return hpx::util::unused; })); + [](auto&&...) { return hpx::util::unused; })); } else { @@ -1384,14 +1378,6 @@ namespace hpx::parallel { } else { - // any of the induction or reduction operations prevent us - // from sharing the part_iteration between threads - decltype(auto) hinted_policy = - parallel::util::adapt_sharing_mode( - HPX_FORWARD(ExPolicy, policy), - hpx::threads::thread_sharing_hint:: - do_not_share_function); - using policy_type = std::decay_t; // we need to decay copy here to properly transport @@ -1403,10 +1389,10 @@ namespace hpx::parallel { return util::detail::algorithm_result::get( util::partitioner::call_with_index( - hinted_policy, first, size, stride, + HPX_FORWARD(ExPolicy, policy), first, size, stride, part_iterations{ HPX_FORWARD(F, f), stride, args}, - [=](auto&&) mutable { + [=](auto&&...) mutable { auto pack = hpx::util::make_index_pack_t(); @@ -1564,9 +1550,8 @@ namespace hpx::parallel { // reshuffle arguments, last argument is function object, will go first template - util::detail::algorithm_result_t for_loop_n(ExPolicy&& policy, - B first, Size size, S stride, hpx::util::index_pack, - Args&&... args) + decltype(auto) for_loop_n(ExPolicy&& policy, B first, Size size, + S stride, hpx::util::index_pack, Args&&... args) { // stride shall not be zero HPX_ASSERT(stride != 0); @@ -1653,10 +1638,9 @@ namespace hpx::experimental { (hpx::traits::is_iterator_v || std::is_integral_v) )> // clang-format on - friend hpx::parallel::util::detail::algorithm_result_t - tag_fallback_invoke(hpx::experimental::for_loop_strided_t, - ExPolicy&& policy, std::decay_t first, I last, S stride, - Args&&... args) + friend decltype(auto) tag_fallback_invoke( + hpx::experimental::for_loop_strided_t, ExPolicy&& policy, + std::decay_t first, I last, S stride, Args&&... args) { static_assert(sizeof...(Args) >= 1, "for_loop_strided must be called with at least a function " @@ -1704,9 +1688,9 @@ namespace hpx::experimental { (hpx::traits::is_iterator_v || std::is_integral_v) )> // clang-format on - friend hpx::parallel::util::detail::algorithm_result_t - tag_fallback_invoke(hpx::experimental::for_loop_n_t, ExPolicy&& policy, - I first, Size size, Args&&... args) + friend decltype(auto) tag_fallback_invoke( + hpx::experimental::for_loop_n_t, ExPolicy&& policy, I first, + Size size, Args&&... args) { static_assert(sizeof...(Args) >= 1, "for_loop_n must be called with at least a function object"); @@ -1753,9 +1737,9 @@ namespace hpx::experimental { (hpx::traits::is_iterator_v || std::is_integral_v) )> // clang-format on - friend hpx::parallel::util::detail::algorithm_result_t - tag_fallback_invoke(hpx::experimental::for_loop_n_strided_t, - ExPolicy&& policy, I first, Size size, S stride, Args&&... args) + friend decltype(auto) tag_fallback_invoke( + hpx::experimental::for_loop_n_strided_t, ExPolicy&& policy, I first, + Size size, S stride, Args&&... args) { static_assert(sizeof...(Args) >= 1, "for_loop_n_strided must be called with at least a function " diff --git a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop_induction.hpp b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop_induction.hpp index 9fa50d3ce26f..6547e62f9efc 100644 --- a/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop_induction.hpp +++ b/libs/core/algorithms/include/hpx/parallel/algorithms/for_loop_induction.hpp @@ -11,16 +11,84 @@ #pragma once #include - +#include #include +#include +#include + +#if !defined(HPX_HAVE_CXX17_SHARED_PTR_ARRAY) +#include +#else +#include +#endif +#include #include #include #include +#include "hpx/concepts/concepts.hpp" + namespace hpx::parallel::detail { /// \cond NOINTERNAL + template + struct hpx_thread_local + { + private: + using element_type = hpx::util::cache_line_data; + +#if defined(HPX_HAVE_CXX17_SHARED_PTR_ARRAY) + using array_type = std::shared_ptr; +#else + using array_type = boost::shared_array; +#endif + + public: + constexpr explicit hpx_thread_local(T const& init) + { + const std::size_t threads = + hpx::parallel::execution::detail::get_os_thread_count(); + data_.reset(new element_type[threads]); + std::fill_n(data_.get(), threads, element_type{init}); + } + + // clang-format off + template + )> + // clang-format on + constexpr hpx_thread_local& operator=(O const& other) + { + data_[hpx::get_worker_thread_num()].data_ = other; + return *this; + } + + // clang-format off + constexpr operator T const&() const + // clang-format on + { + return data_[hpx::get_worker_thread_num()].data_; + } + + constexpr operator T&() + { + return data_[hpx::get_worker_thread_num()].data_; + } + + private: + array_type data_; + }; + + template + HPX_HOST_DEVICE HPX_FORCEINLINE constexpr Iterable next( + const hpx_thread_local& val, Stride offset) + { + return hpx::parallel::detail::next( + static_cast(val), offset); + } + /////////////////////////////////////////////////////////////////////// template struct induction_helper @@ -54,7 +122,7 @@ namespace hpx::parallel::detail { private: std::decay_t var_; - T curr_; + hpx_thread_local curr_; }; template @@ -94,7 +162,7 @@ namespace hpx::parallel::detail { private: T& live_out_var_; T var_; - T curr_; + hpx_thread_local curr_; }; /////////////////////////////////////////////////////////////////////// @@ -123,6 +191,8 @@ namespace hpx::parallel::detail { HPX_HOST_DEVICE constexpr void next_iteration() noexcept { + /*for (std::size_t i{}; i < stride_; ++i) + ++curr_;*/ curr_ = parallel::detail::next(curr_, stride_); } @@ -131,7 +201,7 @@ namespace hpx::parallel::detail { private: std::decay_t var_; - T curr_; + hpx_thread_local curr_; std::size_t stride_; }; @@ -174,7 +244,7 @@ namespace hpx::parallel::detail { private: T& live_out_var_; T var_; - T curr_; + hpx_thread_local curr_; std::size_t stride_; }; diff --git a/libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt b/libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt index 8b5426b9b544..755895dd21c8 100644 --- a/libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt +++ b/libs/core/algorithms/tests/unit/algorithms/CMakeLists.txt @@ -60,15 +60,20 @@ set(tests foreachn_exception foreachn_bad_alloc for_loop + for_loop_sender for_loop_exception for_loop_induction + for_loop_induction_sender for_loop_induction_async for_loop_n + for_loop_n_sender for_loop_n_strided + for_loop_n_strided_sender for_loop_reduction + for_loop_reduction_sender for_loop_reduction_async - for_loop_sender for_loop_strided + for_loop_strided_sender generate generaten is_heap diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_induction_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_induction_sender.cpp new file mode 100644 index 000000000000..cb1bf032ee42 --- /dev/null +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_induction_sender.cpp @@ -0,0 +1,355 @@ +// Copyright (c) 2016 Hartmut Kaiser +// Copyright (c) 2024 Tobias Wukovitsch +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_utils.hpp" + +/////////////////////////////////////////////////////////////////////////////// +int seed = std::random_device{}(); +std::mt19937 gen(seed); + +template +void test_for_loop_induction_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_execution_policy::value, + "hpx::is_execution_policy::value"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::vector d(10007); + std::iota(std::begin(c), std::end(c), gen()); + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::induction(0), + [&d](iterator it, std::size_t i) { + *it = 42; + d[i] = 42; + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, static_cast(42)); + ++count; + }); + std::for_each(std::begin(d), std::end(d), [](std::size_t v) -> void { + HPX_TEST_EQ(v, static_cast(42)); + }); + HPX_TEST_EQ(count, c.size()); +} + +template +void test_for_loop_induction_stride_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_execution_policy::value, + "hpx::is_execution_policy::value"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::vector d(10007); + std::iota(std::begin(c), std::end(c), gen()); + + tt::sync_wait( + ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::induction(0), hpx::experimental::induction(0, 2), + [&d](iterator it, std::size_t i, std::size_t j) { + *it = 42; + d[i] = 42; + HPX_TEST_EQ(2 * i, j); + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + std::for_each(std::begin(d), std::end(d), + [](std::size_t v) -> void { HPX_TEST_EQ(v, std::size_t(42)); }); + HPX_TEST_EQ(count, c.size()); +} + +template +void test_for_loop_induction_life_out_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_execution_policy::value, + "hpx::is_execution_policy::value"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::vector d(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t curr = 0; + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::induction(curr), + [&d](iterator it, std::size_t i) { + *it = 42; + d[i] = 42; + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + HPX_TEST_EQ(curr, c.size()); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + std::for_each(std::begin(d), std::end(d), + [](std::size_t v) -> void { HPX_TEST_EQ(v, std::size_t(42)); }); + HPX_TEST_EQ(count, c.size()); +} + +template +void test_for_loop_induction_stride_life_out_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_execution_policy::value, + "hpx::is_execution_policy::value"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::vector d(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t curr1 = 0; + std::size_t curr2 = 0; + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::induction(curr1), + hpx::experimental::induction(curr2, 2), + [&d](iterator it, std::size_t i, std::size_t j) { + *it = 42; + d[i] = 42; + HPX_TEST_EQ(2 * i, j); + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + HPX_TEST_EQ(curr1, c.size()); + HPX_TEST_EQ(curr2, 2 * c.size()); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + std::for_each(std::begin(d), std::end(d), + [](std::size_t v) -> void { HPX_TEST_EQ(v, std::size_t(42)); }); + HPX_TEST_EQ(count, c.size()); +} + +/////////////////////////////////////////////////////////////////////////////// +template +void test_for_loop_induction_sender() +{ + using namespace hpx::execution; + const auto sync = hpx::launch::sync; + const auto async = hpx::launch::async; + + test_for_loop_induction_sender(sync, seq(task), IteratorTag()); + test_for_loop_induction_sender(async, par(task), IteratorTag()); + test_for_loop_induction_sender(async, par_unseq(task), IteratorTag()); + + test_for_loop_induction_stride_sender(sync, seq(task), IteratorTag()); + test_for_loop_induction_stride_sender(async, par(task), IteratorTag()); + test_for_loop_induction_stride_sender( + async, par_unseq(task), IteratorTag()); + + test_for_loop_induction_life_out_sender(sync, seq(task), IteratorTag()); + test_for_loop_induction_life_out_sender(async, par(task), IteratorTag()); + test_for_loop_induction_life_out_sender( + async, par_unseq(task), IteratorTag()); + + test_for_loop_induction_stride_life_out_sender( + sync, seq(task), IteratorTag()); + test_for_loop_induction_stride_life_out_sender( + async, par(task), IteratorTag()); + test_for_loop_induction_stride_life_out_sender( + async, par_unseq(task), IteratorTag()); +} + +void for_loop_induction_test_sender() +{ + test_for_loop_induction_sender(); + test_for_loop_induction_sender(); +} + +/////////////////////////////////////////////////////////////////////////////// +template +void test_for_loop_induction_idx_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + tt::sync_wait(ex::just(0, c.size(), hpx::experimental::induction(0), + [&c](std::size_t i, std::size_t j) { + c[i] = 42; + HPX_TEST_EQ(i, j); + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + HPX_TEST_EQ(count, c.size()); +} + +template +void test_for_loop_induction_stride_idx_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + tt::sync_wait(ex::just(0, c.size(), hpx::experimental::induction(0), + hpx::experimental::induction(0, 2), + [&c](std::size_t i, std::size_t j, std::size_t k) { + c[i] = 42; + HPX_TEST_EQ(i, j); + HPX_TEST_EQ(2 * i, k); + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + HPX_TEST_EQ(count, c.size()); +} + +void for_loop_induction_test_idx_sender() +{ + using namespace hpx::execution; + const auto sync = hpx::launch::sync; + const auto async = hpx::launch::async; + + test_for_loop_induction_idx_sender(sync, seq(task)); + test_for_loop_induction_idx_sender(async, par(task)); + test_for_loop_induction_idx_sender(async, par_unseq(task)); + + test_for_loop_induction_stride_idx_sender(sync, seq(task)); + test_for_loop_induction_stride_idx_sender(async, par(task)); + test_for_loop_induction_stride_idx_sender(async, par_unseq(task)); +} + +/////////////////////////////////////////////////////////////////////////////// +int hpx_main(hpx::program_options::variables_map& vm) +{ + unsigned int seed = (unsigned int) std::time(nullptr); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + gen.seed(seed); + + for_loop_induction_test_sender(); + for_loop_induction_test_idx_sender(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + + // By default this test should run on all available cores + std::vector const cfg = {"hpx.os_threads=all"}; + + // Initialize and run HPX + hpx::local::init_params init_args; + init_args.desc_cmdline = desc_commandline; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_n_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_n_sender.cpp new file mode 100644 index 000000000000..3a114884df39 --- /dev/null +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_n_sender.cpp @@ -0,0 +1,106 @@ +// Copyright (c) 2024 Tobias Wukovitsch +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_utils.hpp" + +/////////////////////////////////////////////////////////////////////////////// +int seed = std::random_device{}(); +std::mt19937 gen(seed); + +template +void test_for_loop_n_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + tt::sync_wait(ex::just(iterator(std::begin(c)), c.size(), [](iterator it) { + *it = 42; + }) | hpx::experimental::for_loop_n(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + std::for_each(std::begin(c), std::end(c), [&count](std::size_t v) -> void { + HPX_TEST_EQ(v, std::size_t(42)); + ++count; + }); + HPX_TEST_EQ(count, c.size()); +} + +template +void for_loop_n_sender_test() +{ + using namespace hpx::execution; + test_for_loop_n_sender(hpx::launch::sync, seq(task), IteratorTag()); + test_for_loop_n_sender(hpx::launch::sync, unseq(task), IteratorTag()); + + test_for_loop_n_sender(hpx::launch::async, par(task), IteratorTag()); + test_for_loop_n_sender(hpx::launch::async, par_unseq(task), IteratorTag()); +} + +int hpx_main(hpx::program_options::variables_map& vm) +{ + unsigned int seed = (unsigned int) std::time(nullptr); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + std::srand(seed); + + for_loop_n_sender_test(); + for_loop_n_sender_test(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + + // By default this test should run on all available cores + std::vector const cfg = {"hpx.os_threads=all"}; + + // Initialize and run HPX + hpx::local::init_params init_args; + init_args.desc_cmdline = desc_commandline; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_n_strided_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_n_strided_sender.cpp new file mode 100644 index 000000000000..f056272352fe --- /dev/null +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_n_strided_sender.cpp @@ -0,0 +1,125 @@ +// Copyright (c) 2024 Tobias Wukovitsch +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_utils.hpp" + +/////////////////////////////////////////////////////////////////////////////// +int seed = std::random_device{}(); +std::mt19937 gen(seed); +std::uniform_int_distribution<> dis(1, 10006); + +template +void test_for_loop_n_strided_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::for_each(std::begin(c), std::end(c), [](std::size_t& v) -> void { + if (v == 42) + v = 43; + }); + + int stride = dis(gen); //-V103 + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + tt::sync_wait(ex::just(iterator(std::begin(c)), c.size(), stride, + [](iterator it) { *it = 42; }) | + hpx::experimental::for_loop_n_strided(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + for (std::size_t i = 0; i != c.size(); ++i) + { + if (i % stride == 0) //-V104 + { + HPX_TEST_EQ(c[i], std::size_t(42)); + } + else + { + HPX_TEST_NEQ(c[i], std::size_t(42)); + } + ++count; + } + HPX_TEST_EQ(count, c.size()); +} + +template +void for_loop_n_strided_sender_test() +{ + using namespace hpx::execution; + test_for_loop_n_strided_sender(hpx::launch::sync, seq(task), IteratorTag()); + test_for_loop_n_strided_sender( + hpx::launch::sync, unseq(task), IteratorTag()); + + test_for_loop_n_strided_sender( + hpx::launch::async, par(task), IteratorTag()); + test_for_loop_n_strided_sender( + hpx::launch::async, par_unseq(task), IteratorTag()); +} + +int hpx_main(hpx::program_options::variables_map& vm) +{ + unsigned int seed = (unsigned int) std::time(nullptr); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + std::srand(seed); + + for_loop_n_strided_sender_test(); + for_loop_n_strided_sender_test(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + + // By default this test should run on all available cores + std::vector const cfg = {"hpx.os_threads=all"}; + + // Initialize and run HPX + hpx::local::init_params init_args; + init_args.desc_cmdline = desc_commandline; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_reduction_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_reduction_sender.cpp new file mode 100644 index 000000000000..2fb7e183dc1b --- /dev/null +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_reduction_sender.cpp @@ -0,0 +1,266 @@ +// Copyright (c) 2016 Hartmut Kaiser +// Copyright (c) 2024 Tobias Wukovitsch +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_utils.hpp" + +/////////////////////////////////////////////////////////////////////////////// +int seed = std::random_device{}(); +std::mt19937 gen(seed); + +template +void test_for_loop_reduction_plus_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t sum = 0; + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::reduction_plus(sum), + [](iterator it, std::size_t& sum) { sum += *it; }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t sum2 = + std::accumulate(std::begin(c), std::end(c), std::size_t(0)); + HPX_TEST_EQ(sum, sum2); +} + +template +void test_for_loop_reduction_multiplies_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t prod = 0; + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::reduction_multiplies(prod), + [](iterator it, std::size_t& prod) { prod *= *it; }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t prod2 = std::accumulate(std::begin(c), std::end(c), + std::size_t(1), std::multiplies()); + HPX_TEST_EQ(prod, prod2); +} + +template +void test_for_loop_reduction_min_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t minval = c[0]; + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + hpx::experimental::reduction_min(minval), + [](iterator it, std::size_t& minval) { + minval = (std::min)(minval, *it); + }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t minval2 = std::accumulate(std::begin(c), std::end(c), c[0], + hpx::parallel::detail::min_of()); + HPX_TEST_EQ(minval, minval2); +} + +/////////////////////////////////////////////////////////////////////////////// +template +void test_for_loop_reduction_sender() +{ + using namespace hpx::execution; + const auto sync = hpx::launch::sync; + const auto async = hpx::launch::async; + + test_for_loop_reduction_plus_sender(sync, seq(task), IteratorTag()); + test_for_loop_reduction_plus_sender(async, par(task), IteratorTag()); + test_for_loop_reduction_plus_sender(async, par_unseq(task), IteratorTag()); + + test_for_loop_reduction_multiplies_sender(sync, seq(task), IteratorTag()); + test_for_loop_reduction_multiplies_sender(async, par(task), IteratorTag()); + test_for_loop_reduction_multiplies_sender( + async, par_unseq(task), IteratorTag()); + + test_for_loop_reduction_min_sender(sync, seq(task), IteratorTag()); + test_for_loop_reduction_min_sender(async, par(task), IteratorTag()); + test_for_loop_reduction_min_sender(async, par_unseq(task), IteratorTag()); +} + +void for_loop_reduction_test_sender() +{ + test_for_loop_reduction_sender(); + test_for_loop_reduction_sender(); +} + +/////////////////////////////////////////////////////////////////////////////// +template +void test_for_loop_reduction_bit_and_idx_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t bits = ~std::size_t(0); + + tt::sync_wait( + ex::just(0, c.size(), hpx::experimental::reduction_bit_and(bits), + [&c](std::size_t i, std::size_t& bits) { bits &= c[i]; }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t bits2 = std::accumulate(std::begin(c), std::end(c), + ~std::size_t(0), std::bit_and()); + HPX_TEST_EQ(bits, bits2); +} + +template +void test_for_loop_reduction_bit_or_idx_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::size_t bits = 0; + + tt::sync_wait( + ex::just(0, c.size(), hpx::experimental::reduction_bit_or(bits), + [&c](std::size_t i, std::size_t& bits) { bits |= c[i]; }) | + hpx::experimental::for_loop(ex_policy.on(exec))); + + // verify values + std::size_t bits2 = std::accumulate( + std::begin(c), std::end(c), std::size_t(0), std::bit_or()); + HPX_TEST_EQ(bits, bits2); +} + +void for_loop_reduction_test_idx_sender() +{ + using namespace hpx::execution; + const auto sync = hpx::launch::sync; + const auto async = hpx::launch::async; + + test_for_loop_reduction_bit_and_idx_sender(sync, seq(task)); + test_for_loop_reduction_bit_and_idx_sender(async, par(task)); + test_for_loop_reduction_bit_and_idx_sender(async, par_unseq(task)); + + test_for_loop_reduction_bit_or_idx_sender(sync, seq(task)); + test_for_loop_reduction_bit_or_idx_sender(async, par(task)); + test_for_loop_reduction_bit_or_idx_sender(async, par_unseq(task)); +} + +/////////////////////////////////////////////////////////////////////////////// +int hpx_main(hpx::program_options::variables_map& vm) +{ + unsigned int seed = (unsigned int) std::time(nullptr); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + gen.seed(seed); + + for_loop_reduction_test_sender(); + for_loop_reduction_test_idx_sender(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + + // By default this test should run on all available cores + std::vector const cfg = {"hpx.os_threads=all"}; + + // Initialize and run HPX + hpx::local::init_params init_args; + init_args.desc_cmdline = desc_commandline; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +} diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_sender.cpp index c09defd81c9f..86d0abb0c769 100644 --- a/libs/core/algorithms/tests/unit/algorithms/for_loop_sender.cpp +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_sender.cpp @@ -70,9 +70,15 @@ void test_for_loop_sender_direct_async(Policy l, ExPolicy&& policy, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); +#ifdef HPX_HAVE_STDEXEC + tt::sync_wait( + hpx::experimental::for_loop(policy.on(exec), iterator(std::begin(c)), + iterator(std::end(c)), [](iterator it) { *it = 42; })); +#else hpx::experimental::for_loop(policy.on(exec), iterator(std::begin(c)), iterator(std::end(c)), [](iterator it) { *it = 42; }) | tt::sync_wait(); +#endif // verify values std::size_t count = 0; @@ -129,8 +135,13 @@ void test_for_loop_sender(Policy l, ExPolicy&& policy, IteratorTag) using scheduler_t = ex::thread_pool_policy_scheduler; auto exec = ex::explicit_scheduler_executor(scheduler_t(l)); +#ifdef HPX_HAVE_STDEXEC + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), f) | + hpx::experimental::for_loop(policy.on(exec))); +#else ex::just(iterator(std::begin(c)), iterator(std::end(c)), f) | hpx::experimental::for_loop(policy.on(exec)) | tt::sync_wait(); +#endif // verify values std::size_t count = 0; diff --git a/libs/core/algorithms/tests/unit/algorithms/for_loop_strided_sender.cpp b/libs/core/algorithms/tests/unit/algorithms/for_loop_strided_sender.cpp new file mode 100644 index 000000000000..e42c31bda50a --- /dev/null +++ b/libs/core/algorithms/tests/unit/algorithms/for_loop_strided_sender.cpp @@ -0,0 +1,123 @@ +// Copyright (c) 2024 Tobias Wukovitsch +// +// SPDX-License-Identifier: BSL-1.0 +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "test_utils.hpp" + +/////////////////////////////////////////////////////////////////////////////// +int seed = std::random_device{}(); +std::mt19937 gen(seed); +std::uniform_int_distribution<> dis(1, 10006); + +template +void test_for_loop_strided_sender( + LnPolicy ln_policy, ExPolicy&& ex_policy, IteratorTag) +{ + static_assert(hpx::is_async_execution_policy_v, + "hpx::is_async_execution_policy_v"); + + using base_iterator = std::vector::iterator; + using iterator = test::test_iterator; + + namespace ex = hpx::execution::experimental; + namespace tt = hpx::this_thread::experimental; + using scheduler_t = ex::thread_pool_policy_scheduler; + + std::vector c(10007); + std::iota(std::begin(c), std::end(c), gen()); + + std::for_each(std::begin(c), std::end(c), [](std::size_t& v) -> void { + if (v == 42) + v = 43; + }); + + int stride = dis(gen); //-V103 + + auto exec = ex::explicit_scheduler_executor(scheduler_t(ln_policy)); + + tt::sync_wait(ex::just(iterator(std::begin(c)), iterator(std::end(c)), + stride, [](iterator it) { *it = 42; }) | + hpx::experimental::for_loop_strided(ex_policy.on(exec))); + + // verify values + std::size_t count = 0; + for (std::size_t i = 0; i != c.size(); ++i) + { + if (i % stride == 0) //-V104 + { + HPX_TEST_EQ(c[i], std::size_t(42)); + } + else + { + HPX_TEST_NEQ(c[i], std::size_t(42)); + } + ++count; + } + HPX_TEST_EQ(count, c.size()); +} + +template +void for_loop_strided_sender_test() +{ + using namespace hpx::execution; + test_for_loop_strided_sender(hpx::launch::sync, seq(task), IteratorTag()); + test_for_loop_strided_sender(hpx::launch::sync, unseq(task), IteratorTag()); + + test_for_loop_strided_sender(hpx::launch::async, par(task), IteratorTag()); + test_for_loop_strided_sender( + hpx::launch::async, par_unseq(task), IteratorTag()); +} + +int hpx_main(hpx::program_options::variables_map& vm) +{ + unsigned int seed = (unsigned int) std::time(nullptr); + if (vm.count("seed")) + seed = vm["seed"].as(); + + std::cout << "using seed: " << seed << std::endl; + std::srand(seed); + + for_loop_strided_sender_test(); + for_loop_strided_sender_test(); + + return hpx::local::finalize(); +} + +int main(int argc, char* argv[]) +{ + // add command line option which controls the random number generator seed + using namespace hpx::program_options; + options_description desc_commandline( + "Usage: " HPX_APPLICATION_STRING " [options]"); + + desc_commandline.add_options()("seed,s", value(), + "the random number generator seed to use for this run"); + + // By default this test should run on all available cores + std::vector const cfg = {"hpx.os_threads=all"}; + + // Initialize and run HPX + hpx::local::init_params init_args; + init_args.desc_cmdline = desc_commandline; + init_args.cfg = cfg; + + HPX_TEST_EQ_MSG(hpx::local::init(hpx_main, argc, argv, init_args), 0, + "HPX main exited with non-zero status"); + + return hpx::util::report_errors(); +}