Skip to content

Commit

Permalink
Code cleanup and WorkTag commit. Bugged.
Browse files Browse the repository at this point in the history
  • Loading branch information
ElisabethGiem committed Oct 16, 2024
1 parent 1741fe8 commit 6d1ce26
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 121 deletions.
47 changes: 17 additions & 30 deletions src/resilience/openMP/OpenMPResParallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace KokkosResilience{

inline void inject_error_duplicates() {

if (KokkosResilience::global_error_settings->error_rate){
if (KokkosResilience::global_error_settings){
//Per kernel, seed the first inject index
KokkosResilience::ErrorInject::global_next_inject = KokkosResilience::global_error_settings->geometric(KokkosResilience::ErrorInject::random_gen); ;

Expand Down Expand Up @@ -114,20 +114,21 @@ class ParallelFor< FunctorType
private:
using Policy = Kokkos::RangePolicy<Traits...>;
using WorkTag = typename Policy::work_tag;
//MiniMD
using LaunchBounds = typename Policy::launch_bounds;
using Member = typename Policy::member_type;

const FunctorType & m_functor;
//const FunctorType & m_functor;
const FunctorType m_functor;
const Policy m_policy;

ParallelFor() = delete ;
ParallelFor & operator = ( const ParallelFor & ) = delete ;

using surrogate_policy = Kokkos::RangePolicy < Kokkos::OpenMP, WorkTag, LaunchBounds>;

auto MakeWrapper (int64_t work_size, int64_t offset){
auto MakeWrapper (int64_t work_size, int64_t offset, const FunctorType &m_functor_0, const FunctorType &m_functor_1) const{
if constexpr (std::is_void_v<WorkTag>){
std::cout << "In MakeWrapper void WorkTag branch." << std::endl;
auto wrapper_functor = [&](auto i){
if (i < work_size)
{
Expand All @@ -144,7 +145,8 @@ class ParallelFor< FunctorType
};
return wrapper_functor;
}else if constexpr (!std::is_void_v<WorkTag>)
{
{
std::cout << "In MakeWrapper WorkTag branch." << std::endl;
auto wrapper_functor = [&](WorkTag work_tag, auto i){
if (i < work_size)
{
Expand All @@ -166,14 +168,15 @@ class ParallelFor< FunctorType
public:
inline void execute() const {
//! The execution() function in this class performs an OpenMP execution of parallel for with
//! triple modular redundancy. Non-constant views equipped with the triggering subscribers are
//! modular redundancy. Non-constant views equipped with the triggering subscribers are
//! duplicated and three concurrent executions divided equally between the available pool
//! of OpenMP threads proceed. Duplicate views are combined back into a single view by calling
//! a combiner to majority vote on the correct values. This process is repeated until
//! a combiner to majority vote on the correct values. This process is optionally repeated until
//! a value is voted correct or a given number of attempts is exceeded.
//! There are some subtleties regarding which views are copied per kernel in the default subscriber
//! See KokkosResilience::ResilienctDuplicatesSubscriber::duplicates_cache for details

//TODO: Cmake option reexecution_max off or number, enable spare view in the subscriber
const int max_repeats = 5;
int repeats = max_repeats; //! This integer represents the maximum number of attempts to reach consensus allowed.
bool success = 0; //! This bool indicates that all views successfully reached a consensus.
Expand Down Expand Up @@ -201,7 +204,8 @@ class ParallelFor< FunctorType
auto m_functor_1 = m_functor;
KokkosResilience::ResilientDuplicatesSubscriber::in_resilient_parallel_loop = false;

auto wrapper_functor = MakeWrapper (work_size, offset);
std::cout << "Right before MakeWrapper" <<std::endl;
auto wrapper_functor = MakeWrapper (work_size, offset, m_functor_0, m_functor_1);

/* auto wrapper_functor = [&](m_worktag ,auto i){
if (i < work_size)
Expand Down Expand Up @@ -296,9 +300,9 @@ class ParallelFor< FunctorType
#endif
#ifdef KR_ENABLE_WRAPPER

// TMR with scheduling
// Feed in three-times as long range policy (wrapper-policy)
// With wrapped functor, so that the iterations are bound to the duplicated functors/views
// TMR with kernel fusion
// TODO: separate into fusion cmake option, enabled on dmr or tmr
// Functor is fused, with iterations bound to duplicated functors in 3x range
Impl::ParallelFor< decltype(wrapper_functor) , surrogate_policy, Kokkos::OpenMP > closure( wrapper_functor , wrapper_policy );

closure.execute();
Expand All @@ -311,15 +315,15 @@ class ParallelFor< FunctorType

// Combine the duplicate views and majority vote on correctness
success = KokkosResilience::combine_resilient_duplicates();
// Does not clear the cache map, user must clear cache map before Kokkos::finalize()
KokkosResilience::clear_duplicates_map();
#endif
repeats--;

}// while (!success & repeats left)

if(success==0 && repeats == 0){
// Abort if 5 repeKokkos::abort(ated tries at executing failed to find acceptable match
// Abort if max-reexecutions exceeded
// TODO: cmake option
auto &handler = KokkosResilience::get_unrecoverable_data_corruption_handler();
handler(max_repeats);
}
Expand Down Expand Up @@ -350,10 +354,6 @@ class ParallelFor<FunctorType,
using Policy = typename MDRangePolicy::impl_range_policy;
using WorkTag = typename MDRangePolicy::work_tag;

//TODO: MiniMD may need Launchbounds

using Member = typename Policy::member_type;

using index_type = typename Policy::index_type;
using iterate_type = typename Kokkos::Impl::HostIterateTile<
MDRangePolicy, FunctorType, typename MDRangePolicy::work_tag, void>;
Expand All @@ -366,7 +366,6 @@ class ParallelFor<FunctorType,
ParallelFor() = delete ;
ParallelFor & operator = ( const ParallelFor & ) = delete ;

// TODO: MiniMD may require added LaunchBounds
using surrogate_policy = Kokkos::MDRangePolicy < Kokkos::OpenMP, WorkTag>;

public:
Expand Down Expand Up @@ -420,19 +419,7 @@ class ParallelFor<FunctorType,
namespace Kokkos {
namespace Impl {

//TODO:: This comment invalidated by rework of Kokkos with CombinedFunctorReducerType??
// Will eventually need enable_if to partially specialize on the different reducer types
// This specialization is for view type only, but written as only instantiation for now

// Range policy implementation
#if 0
template <class FunctorType, class ReducerType, class... Traits>
class ParallelReduce< FunctorType
, Kokkos::RangePolicy< Traits... >
, ReducerType
, KokkosResilience::ResOpenMP>{
#endif

template <class CombinedFunctorReducerType, class... Traits>
class ParallelReduce< CombinedFunctorReducerType
, Kokkos::RangePolicy< Traits... >
Expand Down
4 changes: 0 additions & 4 deletions src/resilience/openMP/OpenMPResSubscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ bool ResilientDuplicatesSubscriber::in_resilient_parallel_loop = false;
#ifdef KR_ENABLE_DMR
bool ResilientDuplicatesSubscriber::dmr_failover_to_tmr = false;
#endif

//std::unordered_map< ResilientDuplicatesSubscriber::key_type, CombineDuplicatesBase * > ResilientDuplicatesSubscriber::duplicates_map;
//std::unordered_map< ResilientDuplicatesSubscriber::key_type, std::unique_ptr< CombineDuplicatesBase > > ResilientDuplicatesSubscriber::duplicates_cache;

}

#endif //defined(KOKKOS_ENABLE_OPENMP)
Expand Down
67 changes: 3 additions & 64 deletions src/resilience/openMP/OpenMPResSubscriber.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,9 @@ struct CombineDuplicates: public CombineDuplicatesBase
// Note: 2 copies allocated even in DMR
View copy[2];

//This hack for a bool was replacable in the CUDA version and may be a c++ <17 artifact
//Attempt to replace hack due to yucky error
//Kokkos::View <bool*> success{"success", 1};
mutable bool success = 1;

static constexpr size_t rank = View::rank();
// The rank of the view is known at compile-time, and there
// is 1 subscriber per view. Therefore it is not templated on the
// instantiation of the original, but on the View itself

void clear() override
{
Expand All @@ -193,7 +187,6 @@ struct CombineDuplicates: public CombineDuplicatesBase

bool execute() override
{
//success(0) = true;
success = true;

#ifdef KR_ENABLE_DMR
Expand All @@ -219,10 +212,8 @@ struct CombineDuplicates: public CombineDuplicatesBase
}else{

Kokkos::parallel_for("SubscriberCombiner1D", original.size(), *this);
//Kokkos::fence();
}
}
//return success(0);
return success;
}

Expand All @@ -236,7 +227,7 @@ struct CombineDuplicates: public CombineDuplicatesBase
#ifdef KR_ENABLE_DMR
//Indicates dmr_failover_to_tmr tripped
if(duplicate_count == 2 ){
//Main Combiner begin, dmr failover has tripped into TMR
//Main combiner begin, dmr failover has tripped into TMR
for (int j = 0; j < 2; j ++) {
if (check_equality.compare(copy[j](std::forward<Args>(its)...), original(std::forward<Args>(its)...))) {
return;
Expand All @@ -247,21 +238,18 @@ struct CombineDuplicates: public CombineDuplicatesBase
return;
}
//No match found, all three executions return different number
//success(0) = false;
success = false;
}
// DMR has not failed over, only 1 copy exists
// Slight correction: 2 copies instantiated, 1 initialized
// DMR has not failed over, 2 copies instantiated, 1 initialized
else{
//DMR combiner begin with no failover
if (check_equality.compare(copy[0](std::forward<Args>(its)...), original(std::forward<Args>(its)...))){
return;
}
//success(0) = false;
success = false;
}
#else
//Main Combiner Begin
//Main combiner begin
for (int j = 0; j < 2; j ++) {
if (check_equality.compare(copy[j](std::forward<Args>(its)...), original(std::forward<Args>(its)...))) {
return;
Expand All @@ -272,7 +260,6 @@ struct CombineDuplicates: public CombineDuplicatesBase
return;
}
//No match found, all three executions return different number
//success(0) = false;
success = false;
#endif
}
Expand Down Expand Up @@ -332,26 +319,6 @@ struct CombineDuplicates: public CombineDuplicatesBase
std::cout << "total_extent is " << total_extent << std::endl;
std::cout << "next_inject is " << next_inject << std::endl;
#endif
#if 0
//Completely closed off print loop. DELETE!
for (int j=0; j<=2; j++){
while (next_inject < total_extent)
{
std::cout << "The value at next_inject translates to array(" << floor(next_inject/original.extent(0)) << ","
<< next_inject - (original.extent(0) * floor(next_inject/original.extent(0))) << ") = "
<< static_cast<typename View::value_type>(original((int)floor(next_inject/original.extent(0)),next_inject - (original.extent(0) * (int)floor(next_inject/original.extent(0)))))
<< "." << std::endl;

ErrorInject::error_counter++;
next_inject = global_error_settings->geometric(ErrorInject::random_gen)+next_inject+1;
std::cout << "next_inject is " << next_inject << std::endl;
}
if(total_extent != 1){
next_inject = next_inject - total_extent;
}
}
#endif


//#if 0
for (int j = 0; j<=2; j++){
Expand All @@ -365,7 +332,6 @@ struct CombineDuplicates: public CombineDuplicatesBase
#endif
next_inject = global_error_settings->geometric(ErrorInject::random_gen)+next_inject+1;
//std::cout << "next_inject is " << next_inject << std::endl;


if (j==0){//Inject in the original if j is 0
original((int)floor(next_inject/original.extent(0)), next_inject % original.extent(0))
Expand Down Expand Up @@ -399,22 +365,18 @@ struct CombineDuplicates: public CombineDuplicatesBase
#ifdef KR_ENABLE_DMR
//Implies dmr_failover_to_tmr
if(duplicate_count == 2) {
//goto main tmr inject
TwoDimTMRInject();
}
else{//Actual DMR error injection with only 1 copy
}//End DMR error injection
#else
//Working not perfect
//std::cout << "We got into the 2d error injector section\n";
TwoDimTMRInject();
#endif
}else{

#ifdef KR_ENABLE_DMR
//Implies dmr_failover_to_tmr has tripped
if(duplicate_count == 2 ){
//goto main_tmr_inject;
oneD_tmr_inject();
}
else{//The actual dmr error inject with only 1 copy
Expand Down Expand Up @@ -556,24 +518,9 @@ struct ResilientDuplicatesSubscriber {
duplicate = View(label_ss.str(), original.layout());
}

/*
// A template argument V (view), a template itself, having at least one parameter
// the first one (T), to determine between the const/non-const copy constructor in overload
// Class because C++14
template<template<typename, typename ...> class V, typename T, typename... Args>
static void copy_constructed( V < const T *, Args...> &self, const V < const T *, Args...> &other) {
// If View is constant do nothing, not triggering the rest of the subscriber.
}
template< template< typename, typename ...> class V, typename T, typename... Args>
static void copy_constructed( V < T *, Args... > &self, const V < T *, Args... > &other)
{
*/

template<typename View>
static void copy_constructed( View &self, const View &other)
{
//if constexpr ( !std::is_const_v< typename V::value_type > )
if constexpr( std::is_same_v< typename View::non_const_data_type, typename View::data_type > )
{
// If view is non-constant and in the parallel loop, cascade the rest of the subscriber
Expand All @@ -583,7 +530,6 @@ struct ResilientDuplicatesSubscriber {
auto res = duplicates_map.emplace(std::piecewise_construct,
std::forward_as_tuple(other.data()),
std::forward_as_tuple(combiner));
//auto &c = dynamic_cast< CombineDuplicates< V<T*, Args...> > & > (*res.first-> second);
auto &c = dynamic_cast< CombineDuplicates< View > & > (*res.first->second);

// The first copy constructor in a parallel_for for the given view
Expand Down Expand Up @@ -611,16 +557,9 @@ struct ResilientDuplicatesSubscriber {

KOKKOS_INLINE_FUNCTION
void print_total_error_time() {
//std::lock_guard<std::mutex> lock(ETimer::global_time_mutex);

ETimer::global_time_mutex.lock();

//actual time (in seconds) of duration d = d.count() * D::period::num / D::period::den
//D::period::num = 1
//D::period::den = 1000000000
//Selected because tick rate of stable_clock
//auto time = ETimer::total_error_time.count() / 1000000000;
//const auto time = std::chrono::duration_cast<std::chrono::seconds>(ETimer::total_error_time);
std::cout << "The value of ETimer::total_error_time.count() is " << ETimer::total_error_time.count() << " nanoseconds." << std::endl;
std::cout << "The total number of errors inserted is " << ErrorInject::error_counter << " errors." << std::endl;
ETimer::global_time_mutex.unlock();
Expand Down
19 changes: 0 additions & 19 deletions src/resilience/openMP/ResOpenMP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,6 @@ ResOpenMP::ResOpenMP()
: OpenMP() {
}

/*void ResOpenMP::print_configuration( std::ostream & s , bool ) const
{
os << "KokkosResilience::ResOpenMP:\n";
const bool is_initialized = Kokkos::Impl::t_openmp_instance != nullptr;
if (is_initialized) {
const int numa_count = 1;
const int core_per_numa = Kokkos::Impl::g_openmp_hardware_max_threads;
const int thread_per_core = 1;
s << " thread_pool_topology[ " << numa_count << " x " << core_per_numa
<< " x " << thread_per_core << " ]" << std::endl;
} else {
s << " not initialized" << std::endl;
}
}*/

const char* ResOpenMP::name() { return "ResOpenMP"; }

} // namespace KokkosResilience
Expand Down
4 changes: 0 additions & 4 deletions src/resilience/openMP/ResOpenMP.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ class ResOpenMP : public Kokkos::OpenMP {

/*------------------------------------*/

// Print configuration information to the given output stream.
//static void print_configuration(std::ostream&, const bool verbose = false);
//void print_configuration(std::ostream& os, bool verbose = false) const;

static const char* name();

}; //template class ResOpenMP execution space
Expand Down

0 comments on commit 6d1ce26

Please sign in to comment.