Skip to content

Commit

Permalink
dash::copy: Pass local_chunks by the caller
Browse files Browse the repository at this point in the history
This way one can call copy_impl multiple times, before triggering local
copies.
  • Loading branch information
bertwesarg committed Mar 3, 2020
1 parent cd90860 commit 64a9f8d
Showing 1 changed file with 36 additions and 24 deletions.
60 changes: 36 additions & 24 deletions dash/include/dash/algorithm/Copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,13 @@ template <
typename ValueType,
typename GlobInputIt >
ValueType * copy_impl(
GlobInputIt begin,
GlobInputIt end,
ValueType * out_first,
std::vector<dart_handle_t> * handles)
GlobInputIt begin,
GlobInputIt end,
ValueType * out_first,
std::vector<dart_handle_t> * handles,
local_copy_chunks<
typename GlobInputIt::value_type,
ValueType> & local_chunks)
{
DASH_LOG_TRACE("dash::internal::copy_impl() global -> local",
"in_first:", begin.pos(),
Expand All @@ -171,8 +174,6 @@ ValueType * copy_impl(

ContiguousRangeSet<GlobInputIt> range_set{begin, end};

local_copy_chunks<input_value_type, output_value_type> local_chunks;

//
// Copy elements from every unit:
//
Expand Down Expand Up @@ -216,8 +217,6 @@ ValueType * copy_impl(
num_elem_copied += num_copy_elem;
}

do_local_copies(local_chunks);

DASH_ASSERT_EQ(num_elem_copied, num_elem_total,
"Failed to find all contiguous subranges in range");

Expand All @@ -238,10 +237,13 @@ template <
typename ValueType,
typename GlobOutputIt >
GlobOutputIt copy_impl(
ValueType * begin,
ValueType * end,
GlobOutputIt out_first,
std::vector<dart_handle_t> * handles)
ValueType * begin,
ValueType * end,
GlobOutputIt out_first,
std::vector<dart_handle_t> * handles,
local_copy_chunks<
ValueType,
typename GlobOutputIt::value_type> & local_chunks)
{
DASH_LOG_TRACE("dash::copy_impl() local -> global",
"in_first:", begin,
Expand Down Expand Up @@ -271,8 +273,6 @@ GlobOutputIt copy_impl(

ContiguousRangeSet<GlobOutputIt> range_set{out_first, out_last};

local_copy_chunks<input_value_type, output_value_type> local_chunks;

auto in_first = begin;

//
Expand Down Expand Up @@ -317,8 +317,6 @@ GlobOutputIt copy_impl(
num_elem_copied += num_copy_elem;
}

do_local_copies(local_chunks);

DASH_ASSERT_EQ(num_elem_copied, num_elem_total,
"Failed to find all contiguous subranges in range");

Expand Down Expand Up @@ -355,9 +353,10 @@ dash::Future<ValueType *> copy_async(
}

auto handles = std::make_shared<std::vector<dart_handle_t>>();

auto out_last = dash::internal::copy_impl(in_first, in_last,
out_first, handles.get());
dash::internal::local_copy_chunks<typename GlobInputIt::value_type, ValueType> local_chunks;
auto out_last = dash::internal::copy_impl(in_first, in_last, out_first,
handles.get(), local_chunks);
dash::internal::do_local_copies(local_chunks);

if (handles->empty()) {
DASH_LOG_TRACE("dash::copy_async", "all transfers completed");
Expand Down Expand Up @@ -439,24 +438,29 @@ ValueType * copy(
}

ValueType *out_last;
dash::internal::local_copy_chunks<typename GlobInputIt::value_type, ValueType> local_chunks;
if (UseHandles) {
std::vector<dart_handle_t> handles;
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
&handles);
&handles,
local_chunks);
if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
"num_handles: ", handles.size());
dart_waitall_local(handles.data(), handles.size());
}
dash::internal::do_local_copies(local_chunks);

} else {
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
nullptr);
nullptr,
local_chunks);
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete");
dash::internal::do_local_copies(local_chunks);
dart_flush_local_all(in_first.dart_gptr());
}

Expand Down Expand Up @@ -489,10 +493,13 @@ dash::Future<GlobOutputIt> copy_async(
}

auto handles = std::make_shared<std::vector<dart_handle_t>>();
dash::internal::local_copy_chunks<ValueType, typename GlobOutputIt::value_type> local_chunks;
auto out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
handles.get());
handles.get(),
local_chunks);
dash::internal::do_local_copies(local_chunks);

if (handles->empty()) {
return dash::Future<GlobOutputIt>(out_last);
Expand Down Expand Up @@ -561,12 +568,15 @@ GlobOutputIt copy(
DASH_LOG_TRACE("dash::copy()", "blocking, local to global");
// handles to wait on at the end
GlobOutputIt out_last;
dash::internal::local_copy_chunks<ValueType, typename GlobOutputIt::value_type> local_chunks;
if (UseHandles) {
std::vector<dart_handle_t> handles;
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
&handles);
&handles,
local_chunks);
dash::internal::do_local_copies(local_chunks);

if (!handles.empty()) {
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete,",
Expand All @@ -577,8 +587,10 @@ GlobOutputIt copy(
out_last = dash::internal::copy_impl(in_first,
in_last,
out_first,
nullptr);
nullptr,
local_chunks);
DASH_LOG_TRACE("dash::copy", "Waiting for remote transfers to complete");
dash::internal::do_local_copies(local_chunks);
dart_flush_all(out_first.dart_gptr());
}
return out_last;
Expand Down

0 comments on commit 64a9f8d

Please sign in to comment.