Skip to content

Commit

Permalink
GH-43728: [Python] ChunkedArray fails gracefully on non-cpu devices (#…
Browse files Browse the repository at this point in the history
…43795)

### Rationale for this change

ChunkedArrays that are backed by non-cpu memory should not segfault when the user invokes an incompatible API.

### What changes are included in this PR?

* Add IsCpu() to ChunkedArray
* Throw a python exception for known incompatible APIs on non-cpu device

### Are these changes tested?

Unit tests

### Are there any user-facing changes?

The user should no longer see segfaults for certain APIs, just python exceptions.
* GitHub Issue: #43728

Authored-by: Dane Pitkin <[email protected]>
Signed-off-by: Dane Pitkin <[email protected]>
  • Loading branch information
danepitkin authored Sep 4, 2024
1 parent 6382c0a commit 50219ef
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 8 deletions.
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:

CResult[vector[shared_ptr[CChunkedArray]]] Flatten(CMemoryPool* pool)

c_bool is_cpu() const

CStatus Validate() const
CStatus ValidateFull() const

Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ cdef class ChunkedArray(_PandasConvertible):
cdef:
shared_ptr[CChunkedArray] sp_chunked_array
CChunkedArray* chunked_array
c_bool _is_cpu
c_bool _init_is_cpu

cdef readonly:
# To allow Table to propagate metadata to pandas.Series
Expand Down
45 changes: 44 additions & 1 deletion python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cdef class ChunkedArray(_PandasConvertible):

def __cinit__(self):
self.chunked_array = NULL
self._init_is_cpu = False

def __init__(self):
raise TypeError("Do not call ChunkedArray's constructor directly, use "
Expand All @@ -69,6 +70,7 @@ cdef class ChunkedArray(_PandasConvertible):
self.chunked_array = chunked_array.get()

def __reduce__(self):
self._assert_cpu()
return chunked_array, (self.chunks, self.type)

@property
Expand Down Expand Up @@ -198,6 +200,7 @@ cdef class ChunkedArray(_PandasConvertible):
ArrowInvalid
"""
if full:
self._assert_cpu()
with nogil:
check_status(self.sp_chunked_array.get().ValidateFull())
else:
Expand All @@ -220,6 +223,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.null_count
1
"""
self._assert_cpu()
return self.chunked_array.null_count()

@property
Expand All @@ -245,6 +249,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.nbytes
49
"""
self._assert_cpu()
cdef:
CResult[int64_t] c_res_buffer

Expand All @@ -271,6 +276,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.get_total_buffer_size()
49
"""
self._assert_cpu()
cdef:
int64_t total_buffer_size

Expand Down Expand Up @@ -299,13 +305,14 @@ cdef class ChunkedArray(_PandasConvertible):
-------
value : Scalar (index) or ChunkedArray (slice)
"""

self._assert_cpu()
if isinstance(key, slice):
return _normalize_slice(self, key)

return self.getitem(_normalize_index(key, self.chunked_array.length()))

cdef getitem(self, int64_t i):
self._assert_cpu()
return Scalar.wrap(GetResultValue(self.chunked_array.GetScalar(i)))

def is_null(self, *, nan_is_null=False):
Expand Down Expand Up @@ -338,6 +345,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
options = _pc().NullOptions(nan_is_null=nan_is_null)
return _pc().call_function('is_null', [self], options)

Expand All @@ -363,6 +371,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().is_nan(self)

def is_valid(self):
Expand All @@ -388,6 +397,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().is_valid(self)

def __eq__(self, other):
Expand Down Expand Up @@ -430,6 +440,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().fill_null(self, fill_value)

def equals(self, ChunkedArray other):
Expand Down Expand Up @@ -458,6 +469,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.equals(animals)
False
"""
self._assert_cpu()
if other is None:
return False

Expand All @@ -472,6 +484,7 @@ cdef class ChunkedArray(_PandasConvertible):
return result

def _to_pandas(self, options, types_mapper=None, **kwargs):
self._assert_cpu()
return _array_like_to_pandas(self, options, types_mapper=types_mapper)

def to_numpy(self, zero_copy_only=False):
Expand All @@ -495,6 +508,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.to_numpy()
array([ 2, 2, 4, 4, 5, 100])
"""
self._assert_cpu()
if np is None:
raise ImportError(
"Cannot return a numpy.ndarray if NumPy is not present")
Expand Down Expand Up @@ -529,6 +543,7 @@ cdef class ChunkedArray(_PandasConvertible):
return values

def __array__(self, dtype=None, copy=None):
self._assert_cpu()
if copy is False:
raise ValueError(
"Unable to avoid a copy while creating a numpy array as requested "
Expand Down Expand Up @@ -574,6 +589,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs_seconds.type
DurationType(duration[s])
"""
self._assert_cpu()
return _pc().cast(self, target_type, safe=safe, options=options)

def dictionary_encode(self, null_encoding='mask'):
Expand Down Expand Up @@ -636,6 +652,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
options = _pc().DictionaryEncodeOptions(null_encoding)
return _pc().call_function('dictionary_encode', [self], options)

Expand Down Expand Up @@ -700,6 +717,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.type
DataType(int64)
"""
self._assert_cpu()
cdef:
vector[shared_ptr[CChunkedArray]] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
Expand Down Expand Up @@ -751,6 +769,7 @@ cdef class ChunkedArray(_PandasConvertible):
100
]
"""
self._assert_cpu()
if self.num_chunks == 0:
return array([], type=self.type)
else:
Expand Down Expand Up @@ -791,6 +810,7 @@ cdef class ChunkedArray(_PandasConvertible):
100
]
"""
self._assert_cpu()
return _pc().call_function('unique', [self])

def value_counts(self):
Expand Down Expand Up @@ -837,6 +857,7 @@ cdef class ChunkedArray(_PandasConvertible):
1
]
"""
self._assert_cpu()
return _pc().call_function('value_counts', [self])

def slice(self, offset=0, length=None):
Expand Down Expand Up @@ -959,6 +980,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().filter(self, mask, null_selection_behavior)

def index(self, value, start=None, end=None, *, memory_pool=None):
Expand Down Expand Up @@ -1006,6 +1028,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.index(4, start=3)
<pyarrow.Int64Scalar: 3>
"""
self._assert_cpu()
return _pc().index(self, value, start, end, memory_pool=memory_pool)

def take(self, object indices):
Expand Down Expand Up @@ -1052,6 +1075,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().take(self, indices)

def drop_null(self):
Expand Down Expand Up @@ -1091,6 +1115,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
return _pc().drop_null(self)

def sort(self, order="ascending", **kwargs):
Expand All @@ -1110,6 +1135,7 @@ cdef class ChunkedArray(_PandasConvertible):
-------
result : ChunkedArray
"""
self._assert_cpu()
indices = _pc().sort_indices(
self,
options=_pc().SortOptions(sort_keys=[("", order)], **kwargs)
Expand Down Expand Up @@ -1209,6 +1235,7 @@ cdef class ChunkedArray(_PandasConvertible):
]
]
"""
self._assert_cpu()
cdef:
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
shared_ptr[CChunkedArray] c_result
Expand Down Expand Up @@ -1333,6 +1360,7 @@ cdef class ChunkedArray(_PandasConvertible):
>>> n_legs.to_pylist()
[2, 2, 4, 4, None, 100]
"""
self._assert_cpu()
result = []
for i in range(self.num_chunks):
result += self.chunk(i).to_pylist()
Expand All @@ -1354,6 +1382,7 @@ cdef class ChunkedArray(_PandasConvertible):
PyCapsule
A capsule containing a C ArrowArrayStream struct.
"""
self._assert_cpu()
cdef:
ChunkedArray chunked
ArrowArrayStream* c_stream = NULL
Expand Down Expand Up @@ -1410,6 +1439,20 @@ cdef class ChunkedArray(_PandasConvertible):
self.init(c_chunked_array)
return self

@property
def is_cpu(self):
"""
Whether all chunks in the ChunkedArray are CPU-accessible.
"""
if not self._init_is_cpu:
self._is_cpu = self.chunked_array.is_cpu()
self._init_is_cpu = True
return self._is_cpu

def _assert_cpu(self):
if not self.is_cpu:
raise NotImplementedError("Implemented only for data on CPU device")


def chunked_array(arrays, type=None):
"""
Expand Down
Loading

0 comments on commit 50219ef

Please sign in to comment.