From 8afe25940130246b4df3bb435e7b449bf9711760 Mon Sep 17 00:00:00 2001 From: Nora Abi Akar Date: Mon, 21 Jun 2021 12:10:32 +0300 Subject: [PATCH] Fix stack overflow bug in the task system (#1583) * Turn each task queue in the task system into a finite set of queues of increasing priority. The number of queues is a compile-time constant, currently two. * When a task of priority higher than that of the highest priority queue is scheduled, execute it synchronously. * When scheduling tasks in a task group, use a priority one higher than that of the calling task; when waiting on the task group, work only on tasks with this priority or higher. This is sufficient to avoid the deep recursion issue seen in issue #1570. Fixes #1570. Co-authored-by: Sam Yates --- arbor/threading/threading.cpp | 144 +++++++++----- arbor/threading/threading.hpp | 253 ++++++++++++++++++++---- test/ubench/task_system.cpp | 28 ++- test/unit/test_thread.cpp | 358 ++++++++++++++++++++++++++-------- 4 files changed, 609 insertions(+), 174 deletions(-) diff --git a/arbor/threading/threading.cpp b/arbor/threading/threading.cpp index ae251d6e89..552fda90a9 100644 --- a/arbor/threading/threading.cpp +++ b/arbor/threading/threading.cpp @@ -1,49 +1,64 @@ #include -#include "threading.hpp" +#include +#include + +#include "threading/threading.hpp" using namespace arb::threading::impl; using namespace arb::threading; using namespace arb; -task notification_queue::try_pop() { - task tsk; +priority_task notification_queue::try_pop(int priority) { + arb_assert(priority < (int)q_tasks_.size()); lock q_lock{q_mutex_, std::try_to_lock}; - if (q_lock && !q_tasks_.empty()) { - tsk = std::move(q_tasks_.front()); - q_tasks_.pop_front(); + + if (q_lock) { + auto& q = q_tasks_.at(priority); + if (!q.empty()) { + priority_task ptsk(std::move(q.front()), priority); + q.pop_front(); + return ptsk; + } } - return tsk; + + return {}; } -task notification_queue::pop() { - task tsk; +priority_task notification_queue::pop() { lock q_lock{q_mutex_}; - while (q_tasks_.empty() && !quit_) { + + while (empty() && !quit_) { q_tasks_available_.wait(q_lock); } - if (!q_tasks_.empty()) { - tsk = std::move(q_tasks_.front()); - q_tasks_.pop_front(); + for (int pri = n_priority-1; pri>=0; --pri) { + auto& q = q_tasks_.at(pri); + if (!q.empty()) { + priority_task ptsk{std::move(q.front()), pri}; + q.pop_front(); + return ptsk; + } } - return tsk; + return {}; } -bool notification_queue::try_push(task& tsk) { +bool notification_queue::try_push(priority_task& ptsk) { + arb_assert(ptsk.priority < (int)q_tasks_.size()); { lock q_lock{q_mutex_, std::try_to_lock}; if (!q_lock) return false; - q_tasks_.push_back(std::move(tsk)); - tsk = 0; + + q_tasks_.at(ptsk.priority).push_front(ptsk.release()); } q_tasks_available_.notify_all(); return true; } -void notification_queue::push(task&& tsk) { +void notification_queue::push(priority_task&& ptsk) { + arb_assert(ptsk.priority < (int)q_tasks_.size()); { lock q_lock{q_mutex_}; - q_tasks_.push_back(std::move(tsk)); + q_tasks_.at(ptsk.priority).push_front(ptsk.release()); } q_tasks_available_.notify_all(); } @@ -56,31 +71,63 @@ void notification_queue::quit() { q_tasks_available_.notify_all(); } -void task_system::run_tasks_loop(int i){ +bool notification_queue::empty() { + for(const auto& q: q_tasks_) { + if (!q.empty()) return false; + } + return true; +} + +void task_system::run(priority_task ptsk) { + arb_assert(ptsk); + auto guard = util::on_scope_exit([pri = current_task_priority_] { current_task_priority_ = pri; }); + + current_task_priority_ = ptsk.priority; + ptsk.run(); +} + +void task_system::run_tasks_loop(int i) { + auto guard = util::on_scope_exit([] { current_task_queue_ = -1; }); + current_task_queue_ = i; + while (true) { - task tsk; - for (unsigned n = 0; n != count_; n++) { - tsk = q_[(i + n) % count_].try_pop(); - if (tsk) break; + priority_task ptsk; + // Loop over the levels of priority starting from highest to lowest + for (int pri = n_priority-1; pri>=0; --pri) { + // Loop over the threads trying to pop a task of the requested priority. + for (unsigned n = 0; n=0 && i=lowest_priority; --pri) { + // Loop over the threads trying to pop a task of the requested priority. + for (unsigned n = 0; n != count_; n++) { + if (auto ptsk = q_[(i + n) % count_].try_pop(pri)) { + run(std::move(ptsk)); + return; + } } } } +thread_local int task_system::current_task_priority_ = -1; +thread_local unsigned task_system::current_task_queue_ = -1; + // Default construct with one thread. task_system::task_system(): task_system(1) {} @@ -88,9 +135,14 @@ task_system::task_system(int nthreads): count_(nthreads), q_(nthreads) { if (nthreads <= 0) throw std::runtime_error("Non-positive number of threads in thread pool"); + for (unsigned p = 0; p=n_priority) { + run(std::move(ptsk)); } - q_[i % count_].push(std::move(tsk)); -} + else { + arb_assert(ptsk.priority < (int)index_.size()); + auto i = index_[ptsk.priority]++; -int task_system::get_num_threads() const { - return threads_.size() + 1; + for (unsigned n = 0; n != count_; n++) { + if (q_[(i + n) % count_].try_push(ptsk)) return; + } + q_[i % count_].push(std::move(ptsk)); + } } std::unordered_map task_system::get_thread_ids() const { diff --git a/arbor/threading/threading.hpp b/arbor/threading/threading.hpp index 9a3d94c5b2..ad2b1d74a6 100644 --- a/arbor/threading/threading.hpp +++ b/arbor/threading/threading.hpp @@ -17,79 +17,200 @@ namespace arb { namespace threading { -// Forward declare task_group at bottom of this header -class task_group; - using std::mutex; using lock = std::unique_lock; using std::condition_variable; using task = std::function; +// Tasks with priority higher than max_async_task_priority will be run synchronously. +constexpr int max_async_task_priority = 1; + +// Wrap task and priority; provide move/release/reset operations and reset on run() +// to help ensure no wrapped task is run twice. +struct priority_task { + task t; + int priority = -1; + + priority_task() = default; + priority_task(task&& t, int priority): t(std::move(t)), priority(priority) {} + + priority_task(priority_task&& other) noexcept { + std::swap(t, other.t); + priority = other.priority; + } + + priority_task& operator=(priority_task&& other) noexcept { + reset(); + std::swap(t, other.t); + priority = other.priority; + return *this; + } + + priority_task(const priority_task&) = delete; + priority_task& operator=(const priority_task&) = delete; + + explicit operator bool() const noexcept { return static_cast(t); } + + void run() { + release()(); + } + + task release() { + task u = std::move(t); + reset(); + return u; + } + + void reset() noexcept { + t = nullptr; + } +}; + namespace impl { + class notification_queue { + // Number of priority levels in notification queues. + static constexpr int n_priority = max_async_task_priority+1; + +public: + // Tries to acquire the lock to get a task of a requested priority. + // If unsuccessful returns an empty task. If the lock is acquired + // successfully, and the deque containing the tasks of the requested + // priority is not empty; pops a task from that deque and returns it. + // Otherwise returns an empty task. + priority_task try_pop(int priority); + + // Acquires the lock and pops a task from the highest priority deque + // that is not empty. If all deques are empty, it waits for a task to + // be enqueued. If after a task is enqueued, it still can't acquire it + // (because it was popped by another thread), returns an empty task. + // If quit_ is set and the deques are all empty, returns an empty task. + priority_task pop(); + + // Acquires the lock and pushes the task into the deque containing + // tasks of the same priority, then notifies the condition variable to + // awaken waiting threads. + void push(priority_task&&); + + // Tries to acquire the lock: if successful, pushes the task onto the + // deque containing tasks of the same priority, notifies the condition + // variable to awaken waiting threads and returns true. If unsuccessful + // returns false. + bool try_push(priority_task&); + + // Finish popping all waiting tasks on queue then stop trying to pop + // new tasks + void quit(); + + // Check whether the deques are all empty. + bool empty(); + private: - // FIFO of pending tasks. - std::deque q_tasks_; + // deques of pending tasks. Each deque contains tasks of a single priority. + // q_tasks_[i+1] has higher priority than q_tasks_[i] + std::array, n_priority> q_tasks_; - // Lock and signal on task availability change this is the crucial bit. + // Lock and signal on task availability change. This is the crucial bit. mutex q_mutex_; condition_variable q_tasks_available_; // Flag to handle exit from all threads. bool quit_ = false; - -public: - // Pops a task from the task queue returns false when queue is empty. - task try_pop(); - task pop(); - - // Pushes a task into the task queue and increases task group counter. - void push(task&& tsk); // TODO: need to use value? - bool try_push(task& tsk); - - // Finish popping all waiting tasks on queue then stop trying to pop new tasks - void quit(); }; + }// namespace impl class task_system { private: + // Number of notification queues. unsigned count_; + // Worker threads. std::vector threads_; - // queue of tasks + // Local thread storage: used to encode the priority of the task + // currently executed by the thread. + // It is initialized to -1 and reset to -1 in the destructor, + // where a value of -1 => not running a task system task. + static thread_local int current_task_priority_; + + // Queue index for the running thread, if any, + // A value of -1 indicates that the executing thread is not one in + // threads_. + static thread_local unsigned current_task_queue_; + + // Number of priority levels in notification queues. + static constexpr int n_priority = max_async_task_priority+1; + + // Notification queues containing n_priority deques representing + // different priority levels. std::vector q_; - // threads -> index + // Map from thread id to index in the vector of threads. std::unordered_map thread_ids_; - // total number of tasks pushed in all queues - std::atomic index_{0}; + // Total number of tasks pushed in each priority level. + // Used to index which notification queue to enqueue tasks on to, + // to balance the workload among the queues. + std::array, n_priority> index_; public: + // Create zero new threads. Only worker thread is the main thread. task_system(); - // Create nthreads-1 new c std threads + + // Create nthreads-1 new std::threads running run_tasks_loop(tid) task_system(int nthreads); - // task_system is a singleton. task_system(const task_system&) = delete; task_system& operator=(const task_system&) = delete; + // Quits the notification queues. Joins the threads. Resets thread_depth_. + // Won't wait for the existing tasks in the notification queues to be executed. ~task_system(); - // Pushes tasks into notification queue. - void async(task tsk); - - // Runs tasks until quit is true. + // Pushes tasks into a notification queue, on a deque of the requested priority. + // Will first attempt to push on all the notification queues, round-robin, starting + // with the notification queue at index_[priority]. If unsuccessful, forces a push + // onto the notification queue at index_[priority]. + + // Public interface: run task asynchronously if priority <= max_async_task_priority, + // else equivalent to task_system::run(priority_task) below. + void async(priority_task ptsk); + + // Public interface: run task synchronously with current task priority set. + void run(priority_task ptsk); + + // Convenience interfaces with priority parameter: + void async(task t, int priority) { async({std::move(t), priority}); } + void run(task t, int priority) { run({std::move(t), priority}); } + + // The main function that all worker std::threads execute. + // It will try to acquire a task of the highest possible of priority from all + // of the notification queues. If unsuccessful it will force pop any task from + // the thread's personal queue, trying again from highest to lowest priority. + // Note on stack overflow possibility: the force pop can seem like it could cause + // an issue in cases when the personal queue only has low priority tasks that + // spawn higher priority tasks that end up on other queues. In that case the thread + // can end up in a situation where it keeps executing low priority tasks as it + // waits for higher priority tasks to finish, causing a stack overflow. The key + // point here is that while the thread is waiting for other tasks to finish, it + // is not executing the run_tasks_loop, but the task_group::wait() loop which + // doesn't use pop but always try_pop. + // `i` is the thread idx, used to select the thread's personal notification queue. void run_tasks_loop(int i); - // Request that the task_system attempts to find and run a _single_ task. - // Will return without executing a task if no tasks available. - void try_run_task(); + // Public interface: try to dequeue and run a single task with at least the + // requested priority level. Will return without executing a task if no tasks + // are available or if the lock can't be acquired. + // + // Will start with queue corresponding to calling thread, if one exists. + void try_run_task(int lowest_priority); + + // Number of threads in pool, including master thread. + // Equivalently, number of notification queues. + int get_num_threads() const { return (int)count_; } - // Includes master thread. - int get_num_threads() const; + static int get_task_priority() { return current_task_priority_; } // Returns the thread_id map std::unordered_map get_thread_ids() const; @@ -97,6 +218,18 @@ class task_system { class task_group { private: + // For tracking exceptions raised inside the task_system. + // If multiple tasks raise exceptions, any exception can be + // saved and returned. Once an exception has been raised, the + // rest of the tasks don't need to be executed, but if a few are + // executed anyway, that's okay. + // For the reset to work correctly using the relaxed memory order, + // it is necessary that both task_group::run and task_group::wait + // are synchronization points, which they are because they require + // mutex acquisition. The reason behind this requirement is that + // exception_state::reset is called at the end of task_group::wait, + // and we need it to actually reset exception_state::error_ before + // we start running any new tasks in the group. struct exception_state { std::atomic error_{false}; std::exception_ptr exception_; @@ -123,10 +256,10 @@ class task_group { } }; + // Number of tasks that are queued but not yet executed. std::atomic in_flight_{0}; - // Set by run(), cleared by wait(). Used to check task completion status - // in destructor. + // Set by run(), cleared by wait(). Used to check task completion status in destructor. bool running_ = false; // We use a raw pointer here instead of a shared_ptr to avoid a race condition @@ -149,7 +282,7 @@ class task_group { exception_state& exception_status_; public: - // Construct from a compatible function and atomic counter + // Construct from a compatible function, atomic counter, and exception_state. template explicit wrap(F2&& other, std::atomic& c, exception_state& ex): f_(std::forward(other)), @@ -171,8 +304,10 @@ class task_group { exception_status_(other.exception_status_) {} + // This is where tasks of the task_group are actually executed. void operator()() { if (!exception_status_) { + // Execute the task. Save exceptions if they occur. try { f_(); } @@ -180,6 +315,7 @@ class task_group { exception_status_.set(std::current_exception()); } } + // Decrement the atomic counter of the tasks in the task_group; --counter_; } }; @@ -192,17 +328,38 @@ class task_group { return wrap>(std::forward(f), c, ex); } + // Adds new tasks to be executed in the task_group. + // Use priority one higher than that of the task in the currently + // executing thread, if any, so that tasks in nested task groups + // are completed before any peers of the parent task. Returns this + // priority. + template + int run(F&& f) { + int priority = task_system::get_task_priority()+1; + run(std::forward(f), priority); + return priority; + } + + // Adds a new task with a given priority to be executed. template - void run(F&& f) { + void run(F&& f, int priority) { running_ = true; ++in_flight_; - task_system_->async(make_wrapped_function(std::forward(f), in_flight_, exception_status_)); + task_system_->async(priority_task{make_wrapped_function(std::forward(f), in_flight_, exception_status_), priority}); } // Wait till all tasks in this group are done. + // While waiting the thread will participate in executing the tasks. + // It's necessary that the waiting thread participate in execution: + // otherwise, due to nested parallelism, all the threads could become + // stuck waiting forever, while no new tasks get executed. + // To shorten waiting time, and reduce the chances of stack overflow, + // the waiting thread can only execute tasks with a higher priority + // than the task it is currently running. void wait() { + int lowest_priority = task_system::get_task_priority()+1; while (in_flight_) { - task_system_->try_run_task(); + task_system_->try_run_task(lowest_priority); } running_ = false; @@ -220,14 +377,26 @@ class task_group { // algorithms /////////////////////////////////////////////////////////////////////// struct parallel_for { + // Creates a task group, enqueues tasks and waits for their completion. + // If a batching size if not specified, a default batch size of 1 is used. template - static void apply(int left, int right, task_system* ts, F f) { + static void apply(int left, int right, int batch_size, task_system* ts, F f) { task_group g(ts); - for (int i = left; i < right; ++i) { - g.run([=] {f(i);}); + for (int i = left; i < right; i += batch_size) { + g.run([=] { + int r = i + batch_size < right ? i + batch_size : right; + for (int j = i; j < r; ++j) { + f(j); + } + }); } g.wait(); } + + template + static void apply(int left, int right, task_system* ts, F f) { + apply(left, right, 1, ts, std::move(f)); + } }; } // namespace threading diff --git a/test/ubench/task_system.cpp b/test/ubench/task_system.cpp index 873b16ea96..dac7a094fd 100644 --- a/test/ubench/task_system.cpp +++ b/test/ubench/task_system.cpp @@ -21,12 +21,31 @@ void run(unsigned long us_per_task, unsigned tasks, threading::task_system* ts) [&](unsigned i){std::this_thread::sleep_for(duration);}); } +void run_nested(unsigned long us_per_task, unsigned tasks, threading::task_system* ts) { + auto duration = std::chrono::microseconds(us_per_task); + arb::threading::parallel_for::apply(0, tasks, ts, [&](unsigned i) { + arb::threading::parallel_for::apply(0, 1, ts, [&](unsigned i) { + std::this_thread::sleep_for(duration);});}); +} + +void task_test_nested(benchmark::State& state) { + const unsigned us_per_task = state.range(0); + arb::threading::task_system ts; + const auto nthreads = ts.get_num_threads(); + const unsigned total_us = 1000000; + const unsigned num_tasks = nthreads*total_us/us_per_task; + + while (state.KeepRunning()) { + run_nested(us_per_task, num_tasks, &ts); + } +} + void task_test(benchmark::State& state) { const unsigned us_per_task = state.range(0); arb::threading::task_system ts; const auto nthreads = ts.get_num_threads(); - const unsigned us_per_s = 1000000; - const unsigned num_tasks = nthreads*us_per_s/us_per_task; + const unsigned total_us = 1000000; + const unsigned num_tasks = nthreads*total_us/us_per_task; while (state.KeepRunning()) { run(us_per_task, num_tasks, &ts); @@ -34,10 +53,11 @@ void task_test(benchmark::State& state) { } void us_per_task(benchmark::internal::Benchmark *b) { - for (auto ncomps: {100, 250, 500, 1000, 10000}) { - b->Args({ncomps}); + for (auto us_per_task: {10, 100, 250, 500, 1000, 10000}) { + b->Args({us_per_task}); } } BENCHMARK(task_test)->Apply(us_per_task); +BENCHMARK(task_test_nested)->Apply(us_per_task); BENCHMARK_MAIN(); diff --git a/test/unit/test_thread.cpp b/test/unit/test_thread.cpp index e1f0b2ed11..49d46d0539 100644 --- a/test/unit/test_thread.cpp +++ b/test/unit/test_thread.cpp @@ -38,11 +38,13 @@ struct ftor { }; struct ftor_wait { + int duration_us = 100; ftor_wait() {} + ftor_wait(int us): duration_us(us) {} void operator()() const { - auto duration = std::chrono::microseconds(100); + auto duration = std::chrono::microseconds(duration_us); std::this_thread::sleep_for(duration); } }; @@ -63,34 +65,38 @@ struct ftor_parallel_wait { } TEST(task_system, test_copy) { - task_system ts; + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); - ftor f; - ts.async(f); + ftor f; + ts.async(f, 0); - // Copy into new ftor and move ftor into a task (std::function) - EXPECT_EQ(1, nmove); - EXPECT_EQ(1, ncopy); - reset(); + // Copy into new ftor and move ftor into a task (std::function) + EXPECT_EQ(1, nmove); + EXPECT_EQ(1, ncopy); + reset(); + } } TEST(task_system, test_move) { - task_system ts; + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); - ftor f; - ts.async(std::move(f)); + ftor f; + ts.async(std::move(f), 0); - // Move into new ftor and move ftor into a task (std::function) - EXPECT_LE(nmove, 2); - EXPECT_LE(ncopy, 1); - reset(); + // Move into new ftor and move ftor into a task (std::function) + EXPECT_LE(nmove, 2); + EXPECT_LE(ncopy, 1); + reset(); + } } TEST(notification_queue, test_copy) { notification_queue q; ftor f; - q.push(f); + q.push({task(f), 0}); // Copy into new ftor and move ftor into a task (std::function) EXPECT_EQ(1, nmove); @@ -104,113 +110,297 @@ TEST(notification_queue, test_move) { ftor f; // Move into new ftor and move ftor into a task (std::function) - q.push(std::move(f)); + q.push({task(std::move(f)), 1}); EXPECT_LE(nmove, 2); EXPECT_LE(ncopy, 1); reset(); } TEST(task_group, test_copy) { - task_system ts; - task_group g(&ts); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + task_group g(&ts); - ftor f; - g.run(f); - g.wait(); + ftor f; + g.run(f); + g.wait(); - // Copy into "wrap" and move wrap into a task (std::function) - EXPECT_EQ(1, nmove); - EXPECT_EQ(1, ncopy); - reset(); + // Copy into "wrap" and move wrap into a task (std::function) + EXPECT_EQ(1, nmove); + EXPECT_EQ(1, ncopy); + reset(); + } } TEST(task_group, test_move) { - task_system ts; - task_group g(&ts); - - ftor f; - g.run(std::move(f)); - g.wait(); - - // Move into wrap and move wrap into a task (std::function) - EXPECT_LE(nmove, 2); - EXPECT_LE(ncopy, 1); - reset(); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + task_group g(&ts); + + ftor f; + g.run(std::move(f)); + g.wait(); + + // Move into wrap and move wrap into a task (std::function) + EXPECT_LE(nmove, 2); + EXPECT_LE(ncopy, 1); + reset(); + } } TEST(task_group, individual_tasks) { // Simple check for deadlock - task_system ts; - task_group g(&ts); - - auto nthreads = ts.get_num_threads(); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + task_group g(&ts); - ftor_wait f; - for (int i = 0; i < 32 * nthreads; i++) { - g.run(f); + ftor_wait f; + for (int i = 0; i < 32 * nthreads; i++) { + g.run(f); + } + g.wait(); } - g.wait(); } TEST(task_group, parallel_for_sleep) { // Simple check for deadlock for nested parallelism - task_system ts; - auto nthreads = ts.get_num_threads(); - task_group g(&ts); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + task_group g(&ts); - ftor_parallel_wait f(&ts); - for (int i = 0; i < nthreads; i++) { - g.run(f); + ftor_parallel_wait f(&ts); + for (int i = 0; i < nthreads; i++) { + g.run(f); + } + g.wait(); } - g.wait(); } TEST(task_group, parallel_for) { - task_system ts; - for (int n = 0; n < 10000; n=!n?1:2*n) { - std::vector v(n, -1); - parallel_for::apply(0, n, &ts, [&](int i) {v[i] = i;}); - for (int i = 0; i< n; i++) { - EXPECT_EQ(i, v[i]); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + for (int n = 0; n < 10000; n = !n ? 1 : 2 * n) { + std::vector v(n, -1); + parallel_for::apply(0, n, &ts, [&](int i) { v[i] = i; }); + for (int i = 0; i < n; i++) { + EXPECT_EQ(i, v[i]); + } + } + } +} + + +TEST(task_group, manual_nested_parallel_for) { + // Check for deadlock or stack overflow + const int ntasks = 100000; + { + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + std::vector v(ntasks); + task_system ts(nthreads); + + auto nested = [&](int j) { + task_group g1(&ts); + g1.run([&](){v[j] = j;}); + g1.wait(); + }; + + task_group g0(&ts); + for (int i = 0; i < ntasks; i++) { + g0.run([=](){nested(i);}); + } + g0.wait(); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } + } + { + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + std::vector v(ntasks); + task_system ts(nthreads); + + auto double_nested = [&](int i) { + task_group g2(&ts); + g2.run([&](){v[i] = i;}); + g2.wait(); + }; + + auto nested = [&](int i) { + task_group g1(&ts); + g1.run([=](){double_nested(i);}); + g1.wait(); + }; + + task_group g0(&ts); + for (int i = 0; i < ntasks; i++) { + g0.run([=](){nested(i);}); + } + g0.wait(); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } } } } TEST(task_group, nested_parallel_for) { - task_system ts; - for (int m = 1; m < 512; m*=2) { - for (int n = 0; n < 1000; n=!n?1:2*n) { - std::vector> v(n, std::vector(m, -1)); - parallel_for::apply(0, n, &ts, [&](int i) { - auto &w = v[i]; - parallel_for::apply(0, m, &ts, [&](int j) { w[j] = i + j; }); - }); - for (int i = 0; i < n; i++) { - for (int j = 0; j < m; j++) { - EXPECT_EQ(i + j, v[i][j]); + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + for (int m = 1; m < 512; m *= 2) { + for (int n = 0; n < 1000; n = !n ? 1 : 2 * n) { + std::vector> v(n, std::vector(m, -1)); + parallel_for::apply(0, n, &ts, [&](int i) { + auto& w = v[i]; + parallel_for::apply(0, m, &ts, [&](int j) { w[j] = i + j; }); + }); + for (int i = 0; i < n; i++) { + for (int j = 0; j < m; j++) { + EXPECT_EQ(i + j, v[i][j]); + } } } } } } -TEST(enumerable_thread_specific, test) { - task_system_handle ts = task_system_handle(new task_system); - enumerable_thread_specific buffers(ts); - task_group g(ts.get()); - - for (int i = 0; i < 100000; i++) { - g.run([&](){ - auto& buf = buffers.local(); - buf++; - }); +TEST(task_group, multi_nested_parallel_for) { + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system ts(nthreads); + for (int m = 1; m < 512; m *= 2) { + for (int n = 0; n < 128; n = !n ? 1 : 2 * n) { + for (int p = 0; p < 16; p = !p ? 1 : 2 * p) { + std::vector>> v(n, std::vector>(m, std::vector(p, -1))); + parallel_for::apply(0, n, &ts, [&](int i) { + auto& w = v[i]; + parallel_for::apply(0, m, &ts, [&](int j) { + auto& u = w[j]; + parallel_for::apply(0, p, &ts, [&](int k) { + u[k] = i + j + k; + }); + }); + }); + for (int i = 0; i < n; i++) { + for (int j = 0; j < m; j++) { + for (int k = 0; k < p; k++) { + EXPECT_EQ(i + j + k, v[i][j][k]); + } + } + } + } + } + } + } +} + +TEST(task_group, nested_parallel_for_unbalanced) { + // Top level parallel for has many more tasks than lower level + const int ntasks = 100000; + { + // Default batching + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, ntasks, &ts, [&](int i) { + parallel_for::apply(0, 1, &ts, [&](int j) { v[i] = i; }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } + // 128 tasks per batch + const int batch_size = 128; + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, ntasks, batch_size, &ts, [&](int i) { + parallel_for::apply(0, 1, batch_size, &ts, [&](int j) { v[i] = i; }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } + } + // lower level parallel for has many more tasks than top level + { + // Default batching + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, 1, &ts, [&](int i) { + parallel_for::apply(0, ntasks, &ts, [&](int j) { v[j] = j; }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } + // 128 tasks per batch + const int batch_size = 128; + for (int nthreads = 1; nthreads < 20; nthreads *= 4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, 1, batch_size, &ts, [&](int i) { + parallel_for::apply(0, ntasks, batch_size, &ts, [&](int j) { v[j] = j; }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } } - g.wait(); +} - int sum = 0; - for (auto b: buffers) { - sum += b; +TEST(task_group, multi_nested_parallel_for_unbalanced) { + // Top level parallel for has many more tasks than lower level + const int ntasks = 100000; + for (int nthreads = 1; nthreads < 20; nthreads*=4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, ntasks, &ts, [&](int i) { + parallel_for::apply(0, 1, &ts, [&](int j) { + parallel_for::apply(0, 1, &ts, [&](int k) { + v[i] = i; + }); + }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } + } + // lower level parallel for has many more tasks than top level + for (int nthreads = 1; nthreads < 20; nthreads*=4) { + task_system ts(nthreads); + std::vector v(ntasks); + parallel_for::apply(0, 1, &ts, [&](int i) { + parallel_for::apply(0, 1, &ts, [&](int j) { + parallel_for::apply(0, ntasks, &ts, [&](int k) { + v[k] = k; + }); + }); + }); + for (int i = 0; i < ntasks; i++) { + EXPECT_EQ(i, v[i]); + } } +} - EXPECT_EQ(100000, sum); +TEST(enumerable_thread_specific, test) { + for (int nthreads = 1; nthreads < 20; nthreads*=2) { + task_system_handle ts = task_system_handle(new task_system(nthreads)); + enumerable_thread_specific buffers(ts); + task_group g(ts.get()); + + for (int i = 0; i < 100000; i++) { + g.run([&]() { + auto& buf = buffers.local(); + buf++; + }); + } + g.wait(); + + int sum = 0; + for (auto b: buffers) { + sum += b; + } + + EXPECT_EQ(100000, sum); + } }