Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

addressing the case when output region for repeat operation is too big #386

Open
wants to merge 2 commits into
base: branch-24.03
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 57 additions & 11 deletions cunumeric/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -2046,7 +2046,6 @@ def repeat(a, repeats, axis=None):
--------
Multiple GPUs, Multiple CPUs
"""

# when array is a scalar
if np.ndim(a) == 0:
if np.ndim(repeats) == 0:
Expand Down Expand Up @@ -2075,7 +2074,7 @@ def repeat(a, repeats, axis=None):
axis = np.int32(axis)

if axis >= array.ndim:
return ValueError("axis exceeds dimension of the input array")
raise ValueError("axis exceeds dimension of the input array")

# If repeats is on a zero sized axis, then return the array.
if array.shape[axis] == 0:
Expand All @@ -2100,11 +2099,36 @@ def repeat(a, repeats, axis=None):
category=UserWarning,
)
repeats = np.int64(repeats)
result = array._thunk.repeat(
repeats=repeats,
axis=axis,
scalar_repeats=True,
)
if repeats < 0:
raise ValueError(
"'repeats' should not be negative: {}".format(repeats)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use f-strings in new code

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and use raise to raise an exception instead of return

)

# check output shape (if it will fit to GPU or not)
out_shape = list(array.shape)
out_shape[axis] *= repeats
out_shape = tuple(out_shape)
size = sum(out_shape) * array.itemsize
# check if size of the output array is less 8GB. In this case we can
# use output regions, otherwise we will use statcally allocated
# array
if size < 8589934592 / 2:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bunch of comments about this, going from lower- to higher-level:

This seems to be testing for 4GB, not 8GB?

This should be a named constant, and ideally written as 8 * (2 ** 30), so it's obvious we're talking about 8GB.

This limit is not considering the available memory. 8GB may be too large or too little depending on the memory. This number should probably be a percentage of the relevant available memory.

This is considering the full size of the array, not the size of each chunk. E.g. 16GB may be totally fine if split across 8 GPUs.

It seems to me that the only real decision we're making here is whether to perform the operation using an eager output or a deferred output. Therefore, we want to also be querying the (relative) sizes of the eager and deferred pools. Ideally we would also consider the current/projected load on each pool, which is not possible right now, but might be possible in the future, if legate.core takes over more instance management responsibilities.

Finally, AFAIK the unification of eager and deferred pools is on the Legion roadmap. If that happens, then we could safely always use the more efficient eager implementation. @lightsighter how far in the future do you think this is? If nobody has complained about this issue, we may want to wait until unification lands.


result = array._thunk.repeat(
repeats=repeats, axis=axis, scalar_repeats=True
)
else:
# this implementation is taken from CuPy
result = ndarray(shape=out_shape, dtype=array.dtype)
a_index = [slice(None)] * len(out_shape)
res_index = list(a_index)
offset = 0
for i in range(a._shape[axis]):
a_index[axis] = slice(i, i + 1)
res_index[axis] = slice(offset, offset + repeats)
result[res_index] = array[a_index]
offset += repeats
Comment on lines +2126 to +2130
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this. We are emitting a separate operation for each slice. It would be more efficient if we could manually partition the result array on axis using a custom coloring, following a partitioning of a on the same axis:

p_a = axis_equal_partition(a, axis, colors)
for c in colors:
  result_coloring[c] = Rect(
    lo=p_a[c].domain.lo * repeats,
    hi=p_a[c].domain.hi * repeats
  )
p_result = partition_by_coloring(result, result_coloring)
for c in colors:
  repeat(p_result[c], p_a[c])  # index launch

However, this would require support for manual-coloring partitioning from the core. @magnatelee is this something that's reasonable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything that does coloring would be any more scalable than the original code. and it is in some sense worse as it takes away the core's ability to reason about the partitioning. for example, if the core saw multiple tasks operating on disjoint parts of the same array, it could potentially partition and map them in a way that those tasks are distributed in a balanced manner.

return result
# repeats is an array
else:
# repeats should be integer type
Expand All @@ -2115,10 +2139,32 @@ def repeat(a, repeats, axis=None):
)
repeats = repeats.astype(np.int64)
if repeats.shape[0] != array.shape[axis]:
return ValueError("incorrect shape of repeats array")
result = array._thunk.repeat(
repeats=repeats._thunk, axis=axis, scalar_repeats=False
)
raise ValueError("incorrect shape of repeats array")

# check output shape (if it will fit to GPU or not)
out_shape = list(array.shape)
n_repeats = sum(repeats)
out_shape[axis] = n_repeats
out_shape = tuple(out_shape)
size = sum(out_shape) * array.itemsize
# check if size of the output array is less 8GB. In this case we can
# use output regions, otherwise we will use statcally allocated
# array
if size < 8589934592 / 2:
result = array._thunk.repeat(
repeats=repeats._thunk, axis=axis, scalar_repeats=False
)
else: # this implementation is taken from CuPy
result = ndarray(shape=out_shape, dtype=array.dtype)
a_index = [slice(None)] * len(out_shape)
res_index = list(a_index)
offset = 0
for i in range(a._shape[axis]):
a_index[axis] = slice(i, i + 1)
res_index[axis] = slice(offset, offset + repeats[i])
result[res_index] = array[a_index]
offset += repeats[i]
return result
return ndarray(shape=result.shape, thunk=result)


Expand Down
4 changes: 4 additions & 0 deletions src/cunumeric/index/repeat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ struct RepeatImplBody<VariantKind::CPU, CODE, DIM> {
int64_t out_idx = 0;
for (size_t in_idx = 0; in_idx < volume; ++in_idx) {
auto p = in_pitches.unflatten(in_idx, in_rect.lo);
// TODO replace assert with Legate exception handeling interface when available
assert(repeats[p] >= 0);
for (size_t r = 0; r < repeats[p]; r++) out[out_idx++] = in[p];
}
}
Expand All @@ -88,6 +90,8 @@ struct RepeatImplBody<VariantKind::CPU, CODE, DIM> {
for (int64_t idx = in_rect.lo[axis]; idx <= in_rect.hi[axis]; ++idx) {
p[axis] = idx;
offsets[off_idx++] = sum;
// TODO replace assert with Legate exception handeling interface when available
assert(repeats[p] >= 0);
sum += repeats[p];
}

Expand Down
2 changes: 2 additions & 0 deletions src/cunumeric/index/repeat.cu
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ static __global__ void __launch_bounds__(THREADS_PER_BLOCK, MIN_CTAS_PER_SM)
if (offset < extent) {
auto p = origin;
p[axis] += offset;
// TODO replace assert with Legate exception handeling interface when available
assert(repeats[p] >= 0);
auto val = repeats[p];
offsets[offset] = val;
SumReduction<int64_t>::fold<true>(value, val);
Expand Down
4 changes: 3 additions & 1 deletion src/cunumeric/index/repeat_omp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ struct RepeatImplBody<VariantKind::OMP, CODE, DIM> {
int64_t axis_lo = p[axis];
#pragma omp for schedule(static) private(p)
for (int64_t idx = 0; idx < axis_extent; ++idx) {
p[axis] = axis_lo + idx;
p[axis] = axis_lo + idx;
// TODO replace assert with Legate exception handeling interface when available
assert(repeats[p] >= 0);
auto val = repeats[p];
offsets[idx] = val;
local_sums[tid] += val;
Expand Down