From 44645a0d0f21a14daefb83b1a1e4351b11daf053 Mon Sep 17 00:00:00 2001 From: Matthew Whitlock Date: Thu, 12 Oct 2023 11:53:50 -0700 Subject: [PATCH] Cleaned up Jacobi example --- examples/CMakeLists.txt | 4 +- examples/jacobi.cpp | 287 ----------- examples/jacobi.hpp | 447 ------------------ examples/jacobi/config.cpp | 122 +++++ examples/jacobi/config.hpp | 91 ++++ examples/{ => jacobi}/config_jacobi.json | 0 examples/{ => jacobi}/config_jacobi_1.json | 0 .../{ => jacobi}/config_jacobi_async.json | 0 .../config_jacobi_more_async.json | 0 examples/jacobi/main.cpp | 129 +++++ examples/jacobi/solver.cpp | 233 +++++++++ examples/jacobi/solver.hpp | 144 ++++++ 12 files changed, 721 insertions(+), 736 deletions(-) delete mode 100644 examples/jacobi.cpp delete mode 100644 examples/jacobi.hpp create mode 100644 examples/jacobi/config.cpp create mode 100644 examples/jacobi/config.hpp rename examples/{ => jacobi}/config_jacobi.json (100%) rename examples/{ => jacobi}/config_jacobi_1.json (100%) rename examples/{ => jacobi}/config_jacobi_async.json (100%) rename examples/{ => jacobi}/config_jacobi_more_async.json (100%) create mode 100644 examples/jacobi/main.cpp create mode 100644 examples/jacobi/solver.cpp create mode 100644 examples/jacobi/solver.hpp diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 91ac8fe..229dc74 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -50,6 +50,6 @@ if (KR_ENABLE_STDFILE) endif() if(KR_ENABLE_VT) - add_example(jacobi_checkpoint SOURCES jacobi.cpp - RESOURCES config_jacobi.json config_jacobi_async.json config_jacobi_1.json config_jacobi_more_async.json) + add_example(jacobi_checkpoint SOURCES jacobi/main.cpp jacobi/solver.cpp jacobi/config.cpp + RESOURCES jacobi/config_jacobi.json jacobi/config_jacobi_async.json jacobi/config_jacobi_1.json jacobi/config_jacobi_more_async.json) endif() diff --git a/examples/jacobi.cpp b/examples/jacobi.cpp deleted file mode 100644 index 429b405..0000000 --- a/examples/jacobi.cpp +++ /dev/null @@ -1,287 +0,0 @@ -/* -//@HEADER -// ***************************************************************************** -// -// jacobi3d_vt.cc -// DARMA/vt => Virtual Transport -// -// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC -// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. -// Government retains certain rights in this software. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright notice, -// this list of conditions and the following disclaimer in the documentation -// and/or other materials provided with the distribution. -// -// * Neither the name of the copyright holder nor the names of its -// contributors may be used to endorse or promote products derived from this -// software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -// POSSIBILITY OF SUCH DAMAGE. -// -// Questions? Contact darma@sandia.gov -// -// ***************************************************************************** -//@HEADER -*/ - -#include -#include -#include -#include - -#include "jacobi.hpp" - -const std::string MAIN_LOOP = "Jacobi main loop"; - -//Manage options for how to checkpoint, when to inject failures -struct ResilienceConfig { - //See below main for configuration flags - ResilienceConfig(int argc, char** argv); - - //Test recovery by exiting if on correct iteration and rank - void try_kill(int current_iteration){ - if(kill_iter == current_iteration && - kill_rank == vt::theContext()->getNode()){ - fmt::print(stderr, "Rank {} simulating failure on iteration {}\n", - kill_rank, kill_iter); - exit(1); - } - }; - - - //Enable treating this object just like you would the context unique_ptr - kr::ContextBase* operator->(){ return context.get(); } - void reset() { context.reset(); } - - //Which type of context to use for consistency-enforcement - std::string context_mode = "VT"; - - //How often we should start the next VT phase (requiring an epoch boundary) - int iters_per_phase = 30; - - //How often we should set an epoch boundary - //0 = set based on mode; -1 = never - int iters_per_epoch = 0; - - //Tells context when to checkpoint. - std::function checkpoint_filter; - -private: - int kill_iter = -1, kill_rank = 0; - std::unique_ptr context; -}; - -//See jacobi_impl.hpp for command-line configuration options of jacobi. -using JacobiConfig = Jacobi::Config; - -int main(int argc, char** argv) { - using namespace Jacobi; - - vt::initialize(argc, argv); - - JacobiConfig app_cfg(argc, argv); - - Kokkos::initialize(argc, argv); - { - - ResilienceConfig res_cfg(argc, argv); - - const int this_node = vt::theContext()->getNode(); - - - - // Object group of all nodes that take part in computation - // Used to determine whether the computation is finished - auto grp_proxy = vt::theObjGroup()->makeCollective("notify"); - //Register on all nodes or just one. Registering on all = a few more init messages. - res_cfg->register_to(MAIN_LOOP, grp_proxy); - - // Create the decomposition into objects - auto col_proxy = vt::makeCollection("jacobi") - .bounds(app_cfg.colRange) - .bulkInsert() - .wait(); - res_cfg->register_to(MAIN_LOOP, col_proxy); - - - - //Initialize application, unless recovering. - int recover_iter = res_cfg->latest_version(MAIN_LOOP); - bool recovering = recover_iter >= 1; - if(recovering && this_node == 0) fmt::print("Recovering to iteration {}\n", recover_iter); - if(!recovering) { - auto cfg_copy = app_cfg; - cfg_copy.objProxy = grp_proxy; - vt::runInEpochCollective([&]{ - col_proxy.broadcastCollective<&Solver::init>(cfg_copy); - }); - } - - - size_t iter_count = recovering ? recover_iter : 1; - //Accounting for count being 1-based. - const size_t max_iter = app_cfg.maxIter + 1; - - //Manage app/phase boundaries accoring to resilience configs. - //Phase boundaries supercede and restart app boundary. - size_t next_phase_boundary = iter_count + res_cfg.iters_per_phase; - if(res_cfg.iters_per_phase == -1) next_phase_boundary = max_iter+1; - - size_t next_epoch_boundary = iter_count + res_cfg.iters_per_epoch; - if(res_cfg.iters_per_epoch == -1) next_epoch_boundary = max_iter+1; - -double loop_begin_s = vt::timing::getCurrentTime(); - //Iteratively solve - while (!isWorkDone(grp_proxy) && iter_count < max_iter) { - const size_t next_boundary = std::min(std::min(next_phase_boundary, next_epoch_boundary), max_iter); - - if(this_node == 0) fmt::print(stderr, "Launching iterations [{},{}]\n", iter_count, next_boundary-1); - - vt::runInEpochCollective(fmt::format("Jacobi iters [{}-{}]", iter_count, next_boundary-1), [&]{ - while( !isWorkDone(grp_proxy) && iter_count < next_boundary ){ - res_cfg->run(MAIN_LOOP, iter_count, [&]{ - if(this_node == 0){ - res_cfg->register_to_active(col_proxy); - res_cfg->register_to_active(grp_proxy); - } - - col_proxy.broadcastCollective<&Solver::iterate>(next_boundary-1); - iter_count++; - - }, res_cfg.checkpoint_filter); - } - }); - - vt::runInEpochCollective(fmt::format("Jacobi reduce {}", iter_count), [&]{ - col_proxy.broadcastCollective<&Solver::reduce>(); - }); - - - //Update boundaries as necessary. - if(iter_count == next_phase_boundary) { - next_phase_boundary = iter_count + res_cfg.iters_per_phase; - vt::thePhase()->nextPhaseCollective(); - } - if(iter_count == next_epoch_boundary) { - next_epoch_boundary = iter_count + res_cfg.iters_per_epoch; - } - } - -double loop_end_s = vt::timing::getCurrentTime(); -if(this_node == 0) fmt::print("Loop took {}s\n", loop_end_s - loop_begin_s); - - - vt::runInEpochCollective("Cleanup objects", [&]{ - vt::theCollection()->destroy(col_proxy); - vt::theObjGroup()->destroyCollective(grp_proxy); - res_cfg.reset(); - }); - //Done! Free the resilient context for cleanup. - } - Kokkos::finalize(); - - vt::finalize(); -} - - -ResilienceConfig::ResilienceConfig(int argc, char** argv){ - //Where to find .json configuration file for KokkosResilience. - // --config - //NOTE: safest by far to give an absolute path (as well as within the config file) - std::string config_filename = "config_jacobi.json"; - - //Which context to use for consistency management - // --mode - std::string context_mode = "VT"; - - //How often to checkpoint, in iterations. Two special cases: - //negative: Do not checkpoint - // 0: Use the default checkpoint filter, settable within the config file - // --freq - int checkpoint_frequency = 0; - - for(int i = 0; i < argc; i++){ - std::string arg = argv[i]; - if(arg == "--config") - config_filename = argv[++i]; - else if(arg == "--mode") - context_mode = argv[++i]; - else if(arg == "--freq") - checkpoint_frequency = std::stoi(argv[++i]); - else if(arg == "--kill") - kill_iter = std::stoi(argv[++i]); - else if(arg == "--kill-rank") - kill_rank = std::stoi(argv[++i]); - else if(arg == "--iters-per-phase") - iters_per_phase = std::stoi(argv[++i]); - else if(arg == "--iters-per-epoch") - iters_per_epoch = std::stoi(argv[++i]); - } - - - if(context_mode == "VT") { - //Let VTContext manage epoch boundaries (apart from phases) - if(iters_per_epoch == 0) iters_per_epoch = -1; - context = kr::make_context(vt::theContext(), config_filename); - } else if(context_mode == "MPI"){ - //App must put boundary before checkpoint, as MPIContext is unaware of VT tasks - if(iters_per_epoch == 0) iters_per_epoch = checkpoint_frequency; - context = kr::make_context(MPI_COMM_WORLD, config_filename); - } else throw std::invalid_argument("Valid modes are [VT, MPI]. Provided mode: " + context_mode); - - std::string freq_str; - if(checkpoint_frequency < 0) { - freq_str = "never"; - checkpoint_filter = [](int iter){ return false; }; - } else if(checkpoint_frequency == 0){ - freq_str = "according to json"; - checkpoint_filter = context->default_filter(); - } else { - freq_str = fmt::format("every {} iterations", checkpoint_frequency); - checkpoint_filter = kr::Filter::NthIterationFilter(checkpoint_frequency); - } - - //Must be positive, or -1 for no phase/epoch bounding - assert(iters_per_phase > 0 || iters_per_phase == -1); - assert(iters_per_epoch > 0 || iters_per_epoch == -1); - - if(vt::theContext()->getNode() == 0) { - fmt::print("kr:: {} Context configured against {}\n", context_mode, config_filename); - fmt::print("kr:: Checkpointing {}\n", freq_str); - if(kill_iter > 0 && kill_rank > 0){ - fmt::print("Generating failure at iteration {} on rank {}\n", kill_iter, kill_rank); - if(kill_rank >= vt::theContext()->getNumNodes()){ - fmt::print("WARNING: kill_rank {} does not exist!\n", kill_rank); - } - } - - if(iters_per_epoch == -1){ - fmt::print("kr:: instructing app not to bound iterations\n"); - } else { - fmt::print("kr:: instructing app to bound every {} iterations\n", iters_per_epoch); - } - - if(iters_per_phase == -1){ - fmt::print("kr:: instructing app not to use phases\n"); - } else { - fmt::print("kr:: instructing app to phase every {} iterations\n", iters_per_phase); - } - } -} diff --git a/examples/jacobi.hpp b/examples/jacobi.hpp deleted file mode 100644 index 9b45383..0000000 --- a/examples/jacobi.hpp +++ /dev/null @@ -1,447 +0,0 @@ -#include -#include - -// -// This code applies a few steps of the Jacobi iteration to -// the linear system A x = 0 -// where is a banded symmetric positive definite matrix. -// The initial guess for x is a made-up non-zero vector. -// The exact solution is the vector 0. -// -// The matrix A is square and invertible. -// The number of rows is ((number of objects) * (number of rows per object)) -// -// Such a matrix A is obtained when using 2nd-order finite difference -// for discretizing -// -// -d^2 u / dx^2 -d^2 u / dy^2 - -d^2 u / dz^2 = f on [0, 1] x [0, 1] x [0, 1] -// -// with homogeneous Dirichlet condition -// -// u = 0 on the boundary of [0, 1] x [0, 1] x [0, 1] -// -// using a uniform grid with grid size -// -// 1 / ((number of objects) * (number of rows per object) + 1) -// - -namespace Jacobi { - - -struct Detector { - bool finished = false; - - template - void serialize(Serializer& s) { - s | finished; - } - - bool isWorkFinished(){ - return finished; - } - - void workFinished(){ - finished = true; - } -}; -using DetectorProxy = vt::objgroup::proxy::Proxy; - -bool isWorkDone( DetectorProxy const& proxy) { - return proxy.get()->isWorkFinished(); -}; - -struct Config { - vt::Index3D colRange; - vt::Index3D dataRange; - - double tolerance = 1e-2; - int maxIter = 100; - - bool asyncCheckpoint = false; - bool debug = false; - - DetectorProxy objProxy; - - Config() = default; - - template - void serialize(SerT& s){ - s | colRange | dataRange | tolerance | maxIter | asyncCheckpoint | objProxy | debug; - } - - Config(int argc, char** argv){ - int numXObjs = 4; - int numYObjs = 4; - int numZObjs = 4; - - int numXElms = 50; - int numYElms = 50; - int numZElms = 50; - - for(int i = 0; i < argc; i++){ - std::string arg = argv[i]; - //Set input decomposition (i.e. dimenions of col_proxy) - if( arg == "--decomp"){ - numXObjs = std::stoi(argv[++i]); - numYObjs = std::stoi(argv[++i]); - numZObjs = std::stoi(argv[++i]); - - //Set input size per element of col_proxy - //Currently just uses x and makes a cube. - } else if(arg == "--input"){ - numXElms = std::stoi(argv[++i]); - numYElms = std::stoi(argv[++i]); - numZElms = std::stoi(argv[++i]); - - } else if(arg == "--max-iters") { - maxIter = std::stoi(argv[++i]); - - } else if(arg == "--tolerance") { - tolerance = std::stod(argv[++i]); - } else if(arg == "--async-serialize") { - asyncCheckpoint = true; - } else if(arg == "--jacobi-debug"){ - debug = true; - } - } - - //using BaseIndexType = typename vt::Index3D::DenseIndexType; - colRange = vt::Index3D(numXObjs, numYObjs, numZObjs); - dataRange = vt::Index3D(numXElms, numYElms, numZElms); - - /* --- Print information about the simulation */ - if(vt::theContext()->getNode() == 0){ - fmt::print( - stdout, "\n - Solve the linear system for the Laplacian with homogeneous Dirichlet" - " on [0, 1] x [0, 1] x [0, 1]\n" - ); - fmt::print(" - Second-order centered finite difference\n"); - fmt::print(" - {} elements decomposed onto {} objects.\n", dataRange.toString(), colRange.toString()); - fmt::print(" - Maximum number of iterations {}\n", maxIter); - fmt::print(" - Convergence tolerance {}\n", tolerance); - fmt::print("\n"); - } - } -}; - -struct Solver : vt::Collection { - Config cfg; - - //Lower for edges - int nNeighbors = 6; - - std::array nElms; - - //Previous iteration's data, current result, input right hand side. - Kokkos::View previous, result, rhs; - - int iter = 0, nextLaunchIter = 0, reduceIter = -1; - std::deque epochQueue; - - int nRecv = 0, nEarlyRecv = 0; - - using Range = Kokkos::MDRangePolicy, Kokkos::IndexType>; - -public: - Solver() = default; - - template - void serialize(Serializer& s) { - vt::EpochType chkpt_epoch = vt::no_epoch; - if(s.hasTraits(vt::vrt::CheckpointInternalTrait()) && cfg.asyncCheckpoint && !s.isSizing()){ - //Checkpointing waits for enqueued iterations to finish - if(!epochQueue.empty()) { - vt::EpochType last_iter_epoch = epochQueue.back(); - - //if(vt::theContext()->getNode() == 0) fmt::print(stderr, "{}@{} delaying on epoch {} @{}\n", getIndex().toString(), iter, epochQueue.back(), vt::timing::getCurrentTime()); - chkpt_epoch = vt::theTerm()->makeEpochRooted(fmt::format("{} checkpointing!\n", getIndex().toString())); - vt::theTerm()->addDependency(last_iter_epoch, chkpt_epoch); - epochQueue.push_back(chkpt_epoch); - - kr::Util::VT::delaySerializeUntil(last_iter_epoch); - } - } - - if(!s.isSizing() && iter%150 != 0) fmt::print(stderr, "Warning: {} checkpointing iter {}\n", getIndex().toString(), iter); - //auto begin_time = vt::timing::getCurrentTime(); - //if(!s.isSizing() && vt::theContext()->getNode() == 0) fmt::print("{}@{} beginning serialization @{}\n", getIndex().toString(), iter, begin_time); - - vt::trace::TraceScopedNote trace_obj( - fmt::format("{} {}@{}", s.isSizing()?"Sizing":"Serializing", getIndex().toString(), iter), - kr::Context::VT::VTContext::serialize_proxy - ); - - vt::Collection::serialize(s); - s | cfg | nNeighbors | nElms | previous | result | rhs | iter | nextLaunchIter | reduceIter; - trace_obj.end(); - - if(chkpt_epoch != vt::no_epoch) { - epochQueue.pop_front(); - vt::theTerm()->finishedEpoch(chkpt_epoch); - } - //auto end_time = vt::timing::getCurrentTime(); - //if(!s.isSizing() && vt::theContext()->getNode() == 0) fmt::print("{}@{} finished serialization in {}s @{}\n", getIndex().toString(), iter, end_time-begin_time, end_time); - } - - - void init(Config cfg_){ - cfg = cfg_; - - auto idx = getIndex(); - for(int dim = 0; dim < 3; dim++){ - nElms[dim] = cfg.dataRange[dim]/cfg.colRange[dim]; - if(idx[dim] < (cfg.dataRange[dim]%cfg.colRange[dim])) - nElms[dim]++; - if(nElms[dim] <= 1){ - fmt::print(stderr, "{} running with only {} elements in dimension {}\n", getIndex().toString(), nElms[dim], dim); - assert(nElms[dim] > 0); - } - - if(idx[dim] == 0) nNeighbors--; - if(idx[dim] == cfg.colRange[dim]-1) nNeighbors--; - } - - - //previous/result views are swapped, so keep labels generic. - previous = Kokkos::View("ViewA", nElms[0], nElms[1], nElms[2]); - result = Kokkos::View("ViewB", nElms[0], nElms[1], nElms[2]); - rhs = Kokkos::View("RHS", nElms[0], nElms[1], nElms[2]); - Kokkos::deep_copy(previous, 0); - Kokkos::deep_copy(result, 0); - Kokkos::deep_copy(rhs, 0); - - - // - // Set the initial vector to the values of - // a "high-frequency" function - // - double hx = 1.0 / (cfg.dataRange.x()+1); - double hy = 1.0 / (cfg.dataRange.y()+1); - double hz = 1.0 / (cfg.dataRange.z()+1); - - int maxDim = std::max(std::max(cfg.dataRange[0], cfg.dataRange[1]), cfg.dataRange[2]); - int nf = 3 * int(maxDim+1) / 6; - - std::array offsets; - for(int dim = 0; dim < 3; dim++){ - offsets[dim] = idx[dim] * (cfg.dataRange[dim]/cfg.colRange[dim]); - offsets[dim] += std::min(idx[dim], (cfg.dataRange[dim]%cfg.colRange[dim])); - } - - Kokkos::parallel_for(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), - KOKKOS_LAMBDA (const int x, const int y, const int z){ - double val = pow((offsets[0]+x)*hx, 2); - val += pow((offsets[1]+y)*hy, 2); - val += pow((offsets[2]+z)*hz, 2); - - result(x,y,z) = sin(nf * M_PI * val); - }); - } - - //Wait for any queued iterations to finish, then iterate. - void iterate(int in_reduceIter){ - reduceIter = in_reduceIter; - vt::EpochType iterEpoch = vt::theTerm()->makeEpochRooted( - fmt::format("{} iteration {}", getIndex().toString(), nextLaunchIter) - ); - - vt::EpochType predEpoch = epochQueue.empty() ? vt::no_epoch : epochQueue.back(); - if(predEpoch != vt::no_epoch) { - vt::theTerm()->addDependency(predEpoch, iterEpoch); - } - - epochQueue.push_back(iterEpoch); - - int launchIter = ++nextLaunchIter; - - if(predEpoch == vt::no_epoch){ - _iterate(launchIter); - } else { -if(cfg.debug){ - fmt::print("{}@{}, delaying iterate@{} until previous finished. Predecessor epoch: {}, iter epoch: {}\n", - getIndex().toString(), iter, launchIter, predEpoch, iterEpoch); -} - vt::EpochType parent_epoch = vt::theTerm()->getEpoch(); - vt::theTerm()->addLocalDependency(parent_epoch); - - vt::theTerm()->addAction(predEpoch, [launchIter, this, parent_epoch]{ - vt::theTerm()->pushEpoch(parent_epoch); - getCollectionProxy()[getIndex()].send<&Solver::_iterate>(launchIter); - vt::theTerm()->popEpoch(parent_epoch); - - vt::theTerm()->releaseLocalDependency(parent_epoch); - }); - } - } - - //Iterate once. - void _iterate(int target) { -if(target != iter+1) fmt::print(stderr, "{}@{} expected target iteration {}\n", getIndex().toString(), iter, target-1); - assert(target == iter+1); - - //Early recvs are just recvs now - nRecv = nEarlyRecv; - bool already_recvd = nRecv == nNeighbors; - nEarlyRecv = 0; - iter++; - - - //Swap previous and result, will overwrite result w/ new - auto tmp = result; - result = previous; - previous = tmp; - - //Send edge values to neighbors - auto proxy = getCollectionProxy(); - auto idx = getIndex(); - - //vt::theMsg()->pushEpoch(epochQueue.front()); - for(int dim = 0; dim < 3; dim++){ - std::array dir = {0,0,0}; - - if(idx[dim] > 0){ - dir[dim] = -1; - proxy[idx + dir].send<&Solver::exchange>(idx, getPlane(previous, dir), iter); - } - if(idx[dim] < cfg.colRange[dim]-1){ - dir[dim] = 1; - proxy[idx + dir].send<&Solver::exchange>(idx, getPlane(previous, dir), iter); - } - } - //vt::theMsg()->popEpoch(epochQueue.front()); - - if(already_recvd) compute(); - }; - - //Reduce the global error - void reduce() { -/* while(iter != reduceIter){ - assert(!epochQueue.empty()); -if(cfg.debug){ - fmt::print("{}@{}, delaying reduce until reaching reduceIter of {}\n", - getIndex().toString(), iter, reduceIter); -} - //Retry until whatever chunk of queued up iterations finishes. - vt::theTerm()->addDependency(epochQueue.back(), vt::theTerm()->getEpoch()); - vt::theTerm()->addAction(epochQueue.back(), [&]{ - reduce(); - }); - return; - }*/ - - using ValT = typename Kokkos::MinMax::value_type; - ValT minMax; - - Kokkos::parallel_reduce(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), - KOKKOS_LAMBDA (const int x, const int y, const int z, ValT& l_minMax){ - auto& val = result(x,y,z); - l_minMax.min_val = std::min(l_minMax.min_val, val); - l_minMax.max_val = std::max(l_minMax.max_val, val); - }, Kokkos::MinMax(minMax)); - - double max = std::max(minMax.min_val*-1, minMax.max_val); - -if(cfg.debug){ - fmt::print("{}@{}, local max: {}\n", getIndex().toString(), iter, max); -} - - auto proxy = getCollectionProxy(); - proxy.reduce<&Solver::checkCompleted, vt::collective::MaxOp>(proxy(0,0,0), max); - }; - - void checkCompleted(double maxNorm) { - bool within_tolerance = maxNorm < cfg.tolerance; - bool timed_out = iter == cfg.maxIter; - bool done = within_tolerance || timed_out; - - if(done){ - if(within_tolerance) - fmt::print("\n # Jacobi reached tolerance threshold ({}<{}) in {} iterations\n\n", maxNorm, cfg.tolerance, iter); - else if(timed_out) - fmt::print("\n # Jacobi reached maximum iterations ({}) with while above tolerance ({}>{})\n\n", iter, maxNorm, cfg.tolerance); - cfg.objProxy.broadcast<&Detector::workFinished>(); - } else { - fmt::print(" # Iteration {} reached with maxNorm {}\n", iter, maxNorm); - } - }; - - void exchange(vt::Index3D sender, Kokkos::View ghost, int in_iter) { - bool early = in_iter != iter; - if(early) assert(in_iter == iter+1); - - vt::Index3D dir = getIndex() - sender; - auto dest = getGhostPlane(early ? result : previous, dir); - Kokkos::deep_copy(dest, ghost); - - if(early) nEarlyRecv++; - else nRecv++; - -if(cfg.debug){ - fmt::print("{}: Received from {} for iter {}. My iter: {}. Early? {}. Recvs: {}/{}, w/ {} early.\n", - getIndex().toString(), ghost.extent(0), ghost.extent(1), sender.toString(), in_iter, iter, early, nRecv+1, - nNeighbors, nEarlyRecv); -} - - if(!early && nRecv == nNeighbors){ - auto iter_epoch = epochQueue.front(); - vt::theTerm()->pushEpoch(iter_epoch); - getCollectionProxy()[getIndex()].send<&Solver::compute>(); - vt::theTerm()->popEpoch(iter_epoch); - vt::theTerm()->finishedEpoch(iter_epoch); - } - }; - -private: - void compute() { - // - //---- Jacobi iteration step for - //---- A banded matrix for the 8-point stencil - //---- [ 0.0 -1.0 0.0] - //---- [-1.0] - //---- [-1.0 6.0 -1.0] - //---- [-1.0] - //---- [ 0.0 -1.0 0.0] - //---- rhs_ right hand side vector - // - Kokkos::parallel_for(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), - KOKKOS_LAMBDA (const int x, const int y, const int z){ - result(x,y,z) = (1.0/6.0) * ( - rhs(x,y,x) + previous(x-1,y,z) + previous(x+1,y,z) + - previous(x,y-1,z) + previous(x,y+1,z) + previous(x,y,z-1) + - previous(x,y,z+1)); - }); - - //if(iter%150 == 0 && vt::theContext()->getNode() == 0) fmt::print(stderr, "{}@{} finished @{}\n", getIndex().toString(), iter, vt::timing::getCurrentTime()); - //Now finalize this iteration. - assert(!epochQueue.empty()); - epochQueue.pop_front(); - }; - - - Kokkos::View - getGhostPlane(Kokkos::View in, vt::Index3D dir){ - return getPlane(in, dir, true); - } - Kokkos::View - getPlane(Kokkos::View in, vt::Index3D dir, bool ghost = false){ - using Dir = vt::Index3D; - if(dir == Dir(-1,0,0)) - return Kokkos::subview(in, ghost ? 0 : 1, Kokkos::ALL(), Kokkos::ALL()); - if(dir == Dir(1,0,0)) - return Kokkos::subview(in, ghost ? nElms[0]-1 : nElms[0]-2, Kokkos::ALL(), Kokkos::ALL()); - if(dir == Dir(0,-1,0)) - return Kokkos::subview(in, Kokkos::ALL(), ghost ? 0 : 1, Kokkos::ALL()); - if(dir == Dir(0,1,0)) - return Kokkos::subview(in, Kokkos::ALL(), ghost ? nElms[0]-1 : nElms[0]-2, Kokkos::ALL()); - if(dir == Dir(0,0,-1)) - return Kokkos::subview(in, Kokkos::ALL(), Kokkos::ALL(), ghost ? 0 : 1); - if(dir == Dir(0,0,1)) - return Kokkos::subview(in, Kokkos::ALL(), Kokkos::ALL(), ghost ? nElms[0]-1 : nElms[0]-2); - - assert(false); - return Kokkos::subview(in, 0, Kokkos::ALL(), Kokkos::ALL()); - } -}; -using SolverProxy = vt::vrt::collection::CollectionProxy; - -} diff --git a/examples/jacobi/config.cpp b/examples/jacobi/config.cpp new file mode 100644 index 0000000..f79b04a --- /dev/null +++ b/examples/jacobi/config.cpp @@ -0,0 +1,122 @@ +#include "config.hpp" +#include + +namespace Jacobi { +Config::Config(int argc, char** argv){ + for(int i = 0; i < argc; i++){ + std::string arg = argv[i]; + if( arg == "--decomp"){ + int x = std::stoi(argv[++i]); + int y = std::stoi(argv[++i]); + int z = std::stoi(argv[++i]); + colRange = vt::Index3D(x,y,z); + } else if(arg == "--input"){ + int x = std::stoi(argv[++i]); + int y = std::stoi(argv[++i]); + int z = std::stoi(argv[++i]); + dataRange = vt::Index3D(x,y,z); + } else if(arg == "--max-iters") { + maxIter = std::stoi(argv[++i]); + } else if(arg == "--tolerance") { + tolerance = std::stod(argv[++i]); + } else if(arg == "--async-serialize") { + asyncCheckpoint = true; + } + } + + /* --- Print information about the simulation */ + if(vt::theContext()->getNode() == 0){ + fmt::print( + stdout, "\n - Solve the linear system for the Laplacian with homogeneous Dirichlet" + " on [0, 1] x [0, 1] x [0, 1]\n" + ); + fmt::print(" - Second-order centered finite difference\n"); + fmt::print(" - {} elements decomposed onto {} objects.\n", dataRange.toString(), colRange.toString()); + fmt::print(" - Maximum number of iterations {}\n", maxIter); + fmt::print(" - Convergence tolerance {}\n", tolerance); + fmt::print("\n"); + } +} +} + +ResilienceConfig::ResilienceConfig(int argc, char** argv, Jacobi::Config app_cfg){ + for(int i = 0; i < argc; i++){ + std::string arg = argv[i]; + if(arg == "--config") + config_filename = argv[++i]; + else if(arg == "--mode") + context_mode = argv[++i]; + else if(arg == "--freq") + checkpoint_frequency = std::stoi(argv[++i]); + else if(arg == "--kill") + kill_iter = std::stoi(argv[++i]); + else if(arg == "--kill-rank") + kill_rank = std::stoi(argv[++i]); + else if(arg == "--iters-per-phase") + iters_per_phase = std::stoi(argv[++i]); + else if(arg == "--iters-per-epoch") + iters_per_epoch = std::stoi(argv[++i]); + } + + + if(context_mode == "VT") { + if(iters_per_epoch == 0) iters_per_epoch = -1; + context = kr::make_context(vt::theContext(), config_filename); + } else if(context_mode == "MPI"){ + if(iters_per_epoch == 0){ + iters_per_epoch = checkpoint_frequency; + //Can't infer both iters_per_epoch and checkpoint_frequency + assert(checkpoint_frequency != 0); + } + context = kr::make_context(MPI_COMM_WORLD, config_filename); + } else throw std::invalid_argument("Valid --mode values are VT or MPI"); + + std::string freq_str; + if(checkpoint_frequency < 0) { + freq_str = "never"; + checkpoint_filter = [](int iter){ return false; }; + } else if(checkpoint_frequency == 0){ + freq_str = "according to json"; + checkpoint_filter = context->default_filter(); + } else { + freq_str = fmt::format("every {} iterations", checkpoint_frequency); + checkpoint_filter = kr::Filter::NthIterationFilter(checkpoint_frequency); + } + + + if(iters_per_phase < 1) iters_per_phase = app_cfg.maxIter+1; + if(iters_per_epoch < 1) iters_per_epoch = app_cfg.maxIter+1; + + + if(vt::theContext()->getNode() == 0) { + fmt::print("kr:: {} Context configured against {}\n", context_mode, config_filename); + fmt::print("kr:: Checkpointing {}\n", freq_str); + if(kill_iter > 0 && kill_rank > 0){ + fmt::print("Generating failure at iteration {} on rank {}\n", kill_iter, kill_rank); + if(kill_rank >= vt::theContext()->getNumNodes()){ + fmt::print("WARNING: kill_rank {} does not exist!\n", kill_rank); + } + } + + if(iters_per_epoch == -1){ + fmt::print("kr:: instructing app not to bound iterations\n"); + } else { + fmt::print("kr:: instructing app to bound every {} iterations\n", iters_per_epoch); + } + + if(iters_per_phase == -1){ + fmt::print("kr:: instructing app not to use phases\n"); + } else { + fmt::print("kr:: instructing app to phase every {} iterations\n", iters_per_phase); + } + } +} + +void ResilienceConfig::try_kill(int current_iteration){ + if(kill_iter == current_iteration && + kill_rank == vt::theContext()->getNode()){ + fmt::print(stderr, "Rank {} simulating failure on iteration {}\n", + kill_rank, kill_iter); + exit(1); + } +}; diff --git a/examples/jacobi/config.hpp b/examples/jacobi/config.hpp new file mode 100644 index 0000000..ae727b2 --- /dev/null +++ b/examples/jacobi/config.hpp @@ -0,0 +1,91 @@ +#ifndef JACOBI_CONFIG_HPP +#define JACOBI_CONFIG_HPP + +#include +#include + +namespace Jacobi { +//Manage solver parameters +struct Config { + //Number of solver objects to decompose the work into. + // --decomp + vt::Index3D colRange = vt::Index3D(4,4,4); + + //Input size per solver object + // --input + vt::Index3D dataRange = vt::Index3D(50,50,50); + + //Solver stops running after either maxIter iterations, or + //once tolerance reached. + // --tolerance + // --max-iters + double tolerance = 1e-2; + int maxIter = 100; + + //Whether solver ought to manage asynchronous checkpointing manually. + // --async-serialize + bool asyncCheckpoint = false; + + Config() = default; + + template + void serialize(SerT& s){ + s | colRange | dataRange | tolerance | maxIter | asyncCheckpoint; + } + + Config(int argc, char** argv); +}; +} + +//Manage resilience parameters +struct ResilienceConfig { + //Path to JSON config file for KokkosResilience + // --config + std::string config_filename = "config_jacobi.json"; + + //Which type of context to use for consistency-enforcement + // --mode + std::string context_mode = "VT"; + + //How often to checkpoint, in iterations. + // --freq + // 0 = from config file + //-1 = never + int checkpoint_frequency = 0; + + //Where and when to insert a failure. + // --kill + int kill_iter = -1; + // --kill-rank + int kill_rank = 0; + + //How often we should start the next VT phase (requiring an epoch boundary) + // --iters-per-phase + int iters_per_phase = 30; + + //How often we should arbitrarily insert an epoch boundary to test w/ + //some forced synchrony or to ensure correctness with "MPI" context_mode + // --iters-per-epoch + // 0 = matching checkpoint_frequency if "MPI" context_mode, else never + //-1 = never + int iters_per_epoch = 0; + + //Tells context when to checkpoint. + std::function checkpoint_filter; + + ResilienceConfig(int argc, char** argv, Jacobi::Config app_cfg); + + //Test recovery by exiting if on correct iteration and rank + void try_kill(int current_iteration); + + //Enable treating this object just like you would the context unique_ptr + kr::ContextBase* operator->(){ return context.get(); } + void reset() { context.reset(); } + +private: + std::unique_ptr context; +}; + + + +#endif diff --git a/examples/config_jacobi.json b/examples/jacobi/config_jacobi.json similarity index 100% rename from examples/config_jacobi.json rename to examples/jacobi/config_jacobi.json diff --git a/examples/config_jacobi_1.json b/examples/jacobi/config_jacobi_1.json similarity index 100% rename from examples/config_jacobi_1.json rename to examples/jacobi/config_jacobi_1.json diff --git a/examples/config_jacobi_async.json b/examples/jacobi/config_jacobi_async.json similarity index 100% rename from examples/config_jacobi_async.json rename to examples/jacobi/config_jacobi_async.json diff --git a/examples/config_jacobi_more_async.json b/examples/jacobi/config_jacobi_more_async.json similarity index 100% rename from examples/config_jacobi_more_async.json rename to examples/jacobi/config_jacobi_more_async.json diff --git a/examples/jacobi/main.cpp b/examples/jacobi/main.cpp new file mode 100644 index 0000000..80dc77c --- /dev/null +++ b/examples/jacobi/main.cpp @@ -0,0 +1,129 @@ +/* +//@HEADER +// ***************************************************************************** +// +// jacobi3d_vt.cc +// DARMA/vt => Virtual Transport +// +// Copyright 2019-2021 National Technology & Engineering Solutions of Sandia, LLC +// (NTESS). Under the terms of Contract DE-NA0003525 with NTESS, the U.S. +// Government retains certain rights in this software. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// * Neither the name of the copyright holder nor the names of its +// contributors may be used to endorse or promote products derived from this +// software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +// POSSIBILITY OF SUCH DAMAGE. +// +// Questions? Contact darma@sandia.gov +// +// ***************************************************************************** +//@HEADER +*/ + +#include +#include +#include +#include + +#include "config.hpp" +#include "solver.hpp" + +//Label for the resilience region. +const std::string MAIN_LOOP = "Jacobi main loop"; + +int main(int argc, char** argv) { + Kokkos::initialize(argc, argv); + { + vt::initialize(argc, argv); + const int this_node = vt::theContext()->getNode(); + + Jacobi::Config app_cfg(argc, argv); + ResilienceConfig res_cfg(argc, argv, app_cfg); + + int recover_iter = res_cfg->latest_version(MAIN_LOOP); + bool recovering = recover_iter >= 1; + if(recovering && this_node == 0) + fmt::print("Recovering to iteration {}\n", recover_iter); + + + //Object group of all nodes that take part in computation + // Used to determine whether the computation is finished + auto grp_proxy = vt::theObjGroup()->makeCollective("notify"); + + //Collection of Solver objects that perform the work. + auto col_proxy = vt::makeCollection("jacobi") + .bounds(app_cfg.colRange).bulkInsert().wait(); + if(!recovering) { + vt::runInEpochCollective([=]{ + col_proxy.broadcastCollective<&Jacobi::Solver::init>(app_cfg, grp_proxy); + }); + } + + //Register our objects, labels are pulled from the VT labels + res_cfg->register_to(MAIN_LOOP, grp_proxy); + res_cfg->register_to(MAIN_LOOP, col_proxy); + + + size_t iter = 1; + if(recovering) iter = recover_iter; + + const size_t max_iter = app_cfg.maxIter + 1; //Our iter is 1-based + + size_t next_phase_boundary = res_cfg.iters_per_phase + iter; + size_t next_epoch_boundary = res_cfg.iters_per_epoch + iter; + size_t next_boundary = std::min(std::min(next_phase_boundary, next_epoch_boundary), max_iter); + + while (!isWorkDone(grp_proxy) && iter < max_iter) { + vt::runInEpochCollective(fmt::format("Jacobi iters [{}-{}]", iter, next_boundary-1), [&]{ + if(this_node == 0) fmt::print(stderr, "Running iterations [{},{}]\n", iter, next_boundary-1); + + for( ; iter < next_boundary && !isWorkDone(grp_proxy); iter++){ + res_cfg->run(MAIN_LOOP, iter, [&]{ + res_cfg->register_to_active(col_proxy); + res_cfg->register_to_active(grp_proxy); + + col_proxy.broadcastCollective<&Jacobi::Solver::iterate>(); + }, res_cfg.checkpoint_filter); + } + }); + + vt::runInEpochCollective(fmt::format("Jacobi reduce {}", iter), [&]{ + col_proxy.broadcastCollective<&Jacobi::Solver::reduce>(); + }); + + //Update boundaries as necessary. + if(iter == next_epoch_boundary) next_epoch_boundary += res_cfg.iters_per_epoch; + if(iter == next_phase_boundary) { + next_phase_boundary += res_cfg.iters_per_phase; + vt::thePhase()->nextPhaseCollective(); + } + next_boundary = std::min(std::min(next_phase_boundary, next_epoch_boundary), max_iter); + } + + vt::finalize(); + } + Kokkos::finalize(); +} + + diff --git a/examples/jacobi/solver.cpp b/examples/jacobi/solver.cpp new file mode 100644 index 0000000..de1d7f5 --- /dev/null +++ b/examples/jacobi/solver.cpp @@ -0,0 +1,233 @@ +#include "solver.hpp" +namespace Jacobi { +bool Detector::isWorkFinished(){return finished;} +void Detector::workFinished(){finished = true;} +bool isWorkDone( DetectorProxy const& proxy) { + return proxy.get()->isWorkFinished(); +}; + +void Solver::init(Config cfg_, DetectorProxy detector_){ + cfg = cfg_; + detector = detector_; + + auto idx = getIndex(); + for(int dim = 0; dim < 3; dim++){ + nElms[dim] = cfg.dataRange[dim]/cfg.colRange[dim]; + if(idx[dim] < (cfg.dataRange[dim]%cfg.colRange[dim])) + nElms[dim]++; + if(nElms[dim] <= 1){ + fmt::print(stderr, "{} running with only {} elements in dimension {}\n", getIndex().toString(), nElms[dim], dim); + assert(nElms[dim] > 0); + } + + if(idx[dim] == 0) nNeighbors--; + if(idx[dim] == cfg.colRange[dim]-1) nNeighbors--; + } + + + //previous/result views are swapped, so keep labels generic. + previous = Kokkos::View("ViewA", nElms[0], nElms[1], nElms[2]); + result = Kokkos::View("ViewB", nElms[0], nElms[1], nElms[2]); + rhs = Kokkos::View("RHS", nElms[0], nElms[1], nElms[2]); + Kokkos::deep_copy(previous, 0); + Kokkos::deep_copy(result, 0); + Kokkos::deep_copy(rhs, 0); + + + // + // Set the initial vector to the values of + // a "high-frequency" function + // + double hx = 1.0 / (cfg.dataRange.x()+1); + double hy = 1.0 / (cfg.dataRange.y()+1); + double hz = 1.0 / (cfg.dataRange.z()+1); + + int maxDim = std::max(std::max(cfg.dataRange[0], cfg.dataRange[1]), cfg.dataRange[2]); + int nf = 3 * int(maxDim+1) / 6; + + std::array offsets; + for(int dim = 0; dim < 3; dim++){ + offsets[dim] = idx[dim] * (cfg.dataRange[dim]/cfg.colRange[dim]); + offsets[dim] += std::min(idx[dim], (cfg.dataRange[dim]%cfg.colRange[dim])); + } + + Kokkos::parallel_for(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), + KOKKOS_LAMBDA (const int x, const int y, const int z){ + double val = pow((offsets[0]+x)*hx, 2); + val += pow((offsets[1]+y)*hy, 2); + val += pow((offsets[2]+z)*hz, 2); + + result(x,y,z) = sin(nf * M_PI * val); + }); +} + + +void Solver::iterate(){ + //Grab the last iteration's epoch + vt::EpochType predEpoch = epochQueue.empty() ? vt::no_epoch : epochQueue.back(); + + //Make an epoch for this iteration, and add to the queue. + vt::EpochType iterEpoch = vt::theTerm()->makeEpochRooted( + fmt::format("{} iteration {}", getIndex().toString(), nextLaunchIter) + ); + epochQueue.push_back(iterEpoch); + + if(predEpoch == vt::no_epoch){ + //Just launch up the next iteration + _iterate(); + } else { + //Wait until prior iteration finishes to launch this one. + + //Grab up current epoch so this iteration's messages are correctly assigned + vt::EpochType parentEpoch = vt::theTerm()->getEpoch(); + vt::theTerm()->addLocalDependency(parentEpoch); + + //Add an action to run once prior iteration finishes + vt::theTerm()->addAction(predEpoch, [this, parentEpoch]{ + vt::theTerm()->pushEpoch(parentEpoch); + getCollectionProxy()[getIndex()].send<&Solver::_iterate>(); + vt::theTerm()->popEpoch(parentEpoch); + + vt::theTerm()->releaseLocalDependency(parentEpoch); + }); + + //Make the epoch dependency explicit, so VT can reduce termination detection messages. + vt::theTerm()->addDependency(predEpoch, iterEpoch); + } +} + +void Solver::_iterate() { + iter++; + + //Early recvs are just recvs now + nRecv = nEarlyRecv; + nEarlyRecv = 0; + + //Swap previous and result, will overwrite result w/ new + std::swap(result, previous); + + //Send ghost values to neighbors + auto proxy = getCollectionProxy(); + auto idx = getIndex(); + + for(int dim = 0; dim < 3; dim++){ + std::array dir = {0,0,0}; + + if(idx[dim] > 0){ + dir[dim] = -1; + proxy[idx + dir].send<&Solver::exchange>(idx, getPlane(previous, dir), iter); + } + if(idx[dim] < cfg.colRange[dim]-1){ + dir[dim] = 1; + proxy[idx + dir].send<&Solver::exchange>(idx, getPlane(previous, dir), iter); + } + } + + //We may have gotten all our ghost values while still working on last iteration + if(nRecv == nNeighbors) compute(); +}; + +void Solver::reduce() { + using ValT = typename Kokkos::MinMax::value_type; + ValT minMax; + + Kokkos::parallel_reduce(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), + KOKKOS_LAMBDA (const int x, const int y, const int z, ValT& l_minMax){ + auto& val = result(x,y,z); + l_minMax.min_val = std::min(l_minMax.min_val, val); + l_minMax.max_val = std::max(l_minMax.max_val, val); + }, Kokkos::MinMax(minMax)); + + double max = std::max(minMax.min_val*-1, minMax.max_val); + + auto proxy = getCollectionProxy(); + proxy.reduce<&Solver::checkCompleted, vt::collective::MaxOp>(proxy(0,0,0), max); +}; + +void Solver::checkCompleted(double maxNorm) { + bool within_tolerance = maxNorm < cfg.tolerance; + bool timed_out = iter == cfg.maxIter; + bool done = within_tolerance || timed_out; + + if(done){ + if(within_tolerance) + fmt::print("\n # Jacobi reached tolerance threshold ({}<{}) in {} iterations\n\n", maxNorm, cfg.tolerance, iter); + else if(timed_out) + fmt::print("\n # Jacobi reached maximum iterations ({}) while above tolerance ({}>{})\n\n", iter, maxNorm, cfg.tolerance); + detector.broadcast<&Detector::workFinished>(); + } else { + fmt::print(" # Iteration {} reached with maxNorm {}\n", iter, maxNorm); + } +}; + +void Solver::exchange(vt::Index3D sender, Kokkos::View ghost, int in_iter) { + bool early = in_iter != iter; + if(early) assert(in_iter == iter+1); + + vt::Index3D dir = sender - getIndex(); + auto dest = getGhostPlane(early ? result : previous, dir); + Kokkos::deep_copy(dest, ghost); + + if(early) nEarlyRecv++; + else nRecv++; + + if(!early && nRecv == nNeighbors){ + auto iter_epoch = epochQueue.front(); + + vt::theTerm()->pushEpoch(iter_epoch); + getCollectionProxy()[getIndex()].send<&Solver::compute>(); + vt::theTerm()->popEpoch(iter_epoch); + + vt::theTerm()->finishedEpoch(iter_epoch); + } +} + +void Solver::compute() { + // + //---- Jacobi iteration step for + //---- A banded matrix for the 8-point stencil + //---- [ 0.0 -1.0 0.0] + //---- [-1.0] + //---- [-1.0 6.0 -1.0] + //---- [-1.0] + //---- [ 0.0 -1.0 0.0] + //---- rhs_ right hand side vector + // + Kokkos::parallel_for(Range({1,1,1}, {nElms[0]-1, nElms[1]-1, nElms[2]-1}), + KOKKOS_LAMBDA (const int x, const int y, const int z){ + result(x,y,z) = (1.0/6.0) * ( + rhs(x,y,x) + previous(x-1,y,z) + previous(x+1,y,z) + + previous(x,y-1,z) + previous(x,y+1,z) + previous(x,y,z-1) + + previous(x,y,z+1)); + }); + + //No longer waiting on this iteration + assert(!epochQueue.empty()); + epochQueue.pop_front(); +}; + +Kokkos::View +Solver::getGhostPlane(Kokkos::View in, vt::Index3D dir){ + return getPlane(in, dir, true); +} + +Kokkos::View +Solver::getPlane(Kokkos::View in, vt::Index3D dir, bool ghost){ + using Dir = vt::Index3D; + if(dir == Dir(-1,0,0)) + return Kokkos::subview(in, ghost ? 0 : 1, Kokkos::ALL(), Kokkos::ALL()); + if(dir == Dir(1,0,0)) + return Kokkos::subview(in, ghost ? nElms[0]-1 : nElms[0]-2, Kokkos::ALL(), Kokkos::ALL()); + if(dir == Dir(0,-1,0)) + return Kokkos::subview(in, Kokkos::ALL(), ghost ? 0 : 1, Kokkos::ALL()); + if(dir == Dir(0,1,0)) + return Kokkos::subview(in, Kokkos::ALL(), ghost ? nElms[0]-1 : nElms[0]-2, Kokkos::ALL()); + if(dir == Dir(0,0,-1)) + return Kokkos::subview(in, Kokkos::ALL(), Kokkos::ALL(), ghost ? 0 : 1); + if(dir == Dir(0,0,1)) + return Kokkos::subview(in, Kokkos::ALL(), Kokkos::ALL(), ghost ? nElms[0]-1 : nElms[0]-2); + + assert(false); + return Kokkos::subview(in, 0, Kokkos::ALL(), Kokkos::ALL()); +} +} diff --git a/examples/jacobi/solver.hpp b/examples/jacobi/solver.hpp new file mode 100644 index 0000000..996dab5 --- /dev/null +++ b/examples/jacobi/solver.hpp @@ -0,0 +1,144 @@ +#ifndef JACOBI_SOLVER_HPP +#define JACOBI_SOLVER_HPP + +#include +#include +#include "config.hpp" + +// +// This code applies a few steps of the Jacobi iteration to +// the linear system A x = 0 +// where is a banded symmetric positive definite matrix. +// The initial guess for x is a made-up non-zero vector. +// The exact solution is the vector 0. +// +// The matrix A is square and invertible. +// The number of rows is ((number of objects) * (number of rows per object)) +// +// Such a matrix A is obtained when using 2nd-order finite difference +// for discretizing +// +// -d^2 u / dx^2 -d^2 u / dy^2 - -d^2 u / dz^2 = f on [0, 1] x [0, 1] x [0, 1] +// +// with homogeneous Dirichlet condition +// +// u = 0 on the boundary of [0, 1] x [0, 1] x [0, 1] +// +// using a uniform grid with grid size +// +// 1 / ((number of objects) * (number of rows per object) + 1) +// + +namespace Jacobi { + +struct Detector { + bool finished = false; + + template + void serialize(Serializer& s) { + s | finished; + } + + bool isWorkFinished(); + void workFinished(); +}; +using DetectorProxy = vt::objgroup::proxy::Proxy; + +bool isWorkDone( DetectorProxy const& proxy); + +struct Solver : vt::Collection { + Config cfg; + DetectorProxy detector; + + //Lower for edges + int nNeighbors = 6; + + std::array nElms; + + //Previous iteration's data, current result, input right hand side. + Kokkos::View previous, result, rhs; + + //iter tracks the iteration locally completed or currently in progress. + //nextLaunchIter is just used for sanity checking. + int iter = 0, nextLaunchIter = 0; + std::deque epochQueue; + + //Count ghost messages received. We might get some "early" messages for our next iteration + int nRecv = 0, nEarlyRecv = 0; + + //3D range Kokkos execution policy + using Range = Kokkos::MDRangePolicy, Kokkos::IndexType>; + +public: + Solver() = default; + + template + void serialize(Serializer& s) { + vt::EpochType chkpt_epoch = vt::no_epoch; + if(s.hasTraits(vt::vrt::CheckpointInternalTrait()) && cfg.asyncCheckpoint && !s.isSizing()){ + //Checkpointing waits for enqueued iterations to finish + if(!epochQueue.empty()) { + vt::EpochType last_iter_epoch = epochQueue.back(); + + chkpt_epoch = vt::theTerm()->makeEpochRooted(fmt::format("{} checkpointing!\n", getIndex().toString())); + vt::theTerm()->addDependency(last_iter_epoch, chkpt_epoch); + epochQueue.push_back(chkpt_epoch); + + kr::Util::VT::delaySerializeUntil(last_iter_epoch); + } + } + + vt::trace::TraceScopedNote trace_obj( + fmt::format("{} {}@{}", s.isSizing()?"Sizing":"Serializing", getIndex().toString(), iter), + kr::Context::VT::VTContext::serialize_proxy + ); + + vt::Collection::serialize(s); + s | cfg | detector | nNeighbors | nElms | previous | result | rhs | iter; + trace_obj.end(); + + if(chkpt_epoch != vt::no_epoch) { + epochQueue.pop_front(); + vt::theTerm()->finishedEpoch(chkpt_epoch); + } + } + + + void init(Config cfg_, DetectorProxy detector_); + + //Requests another iteration be launched. + //Manages waiting on any outstanding iterations to finish. + void iterate(); + + //Internal. Perform the actual iteration steps. + void _iterate(); + + //Reduce the global error. Not currently asynchronously safe. + void reduce(); + + //Internal. Gets reduced global error and notifies of completion if finished. + void checkCompleted(double maxNorm); + + //Internal. Handles incoming ghost values. + void exchange(vt::Index3D sender, Kokkos::View ghost, int in_iter); + +private: + void compute(); + + //Get a Kokkos subview of the edge plane of input view. + // Which edge to get is defined by dir, which should have a single non-zero dimension + // Choose low or high edge of that dimension with a -1 or 1 value. + //(Essentially, dir=neighborIndex-myIndex gives you the ghost plane in the direction of neighbor) + Kokkos::View + getGhostPlane(Kokkos::View in, vt::Index3D dir); + + //As getGhostPlane, but by default gets the edge plane of local data not the ghost values. + Kokkos::View + getPlane(Kokkos::View in, vt::Index3D dir, bool ghost = false); +}; + +using SolverProxy = vt::vrt::collection::CollectionProxy; + +} + +#endif