From f1b8f507905b2ad8d19541e9ea21e9638886c6fe Mon Sep 17 00:00:00 2001 From: Christian Molina Date: Wed, 8 Jan 2025 14:58:54 +0800 Subject: [PATCH] Modified exception objects being thrown when converting Pyarrow tables Signed-off-by: Christian Molina --- pyiceberg/exceptions.py | 19 +++++++++ pyiceberg/io/pyarrow.py | 33 ++++++++++++--- tests/io/test_pyarrow_visitor.py | 70 +++++++++++++++++++------------- 3 files changed, 87 insertions(+), 35 deletions(-) diff --git a/pyiceberg/exceptions.py b/pyiceberg/exceptions.py index 56574ff471..7ee3c9cced 100644 --- a/pyiceberg/exceptions.py +++ b/pyiceberg/exceptions.py @@ -14,6 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +from typing import Any + +import pyarrow as pa class TableAlreadyExistsError(Exception): @@ -122,3 +125,19 @@ class CommitStateUnknownException(RESTError): class WaitingForLockException(Exception): """Need to wait for a lock, try again.""" + + +class UnsupportedPyArrowTypeException(Exception): + """Cannot convert PyArrow type to corresponding Iceberg type.""" + + def __init__(self, field: pa.Field, *args: Any): + self.field = field + super().__init__(*args) + + +class UnsupportedPyArrowIntegerTypeException(UnsupportedPyArrowTypeException): + """Cannot convert PyArrow integer type to corresponding Iceberg type.""" + + +class UnsupportedPyArrowTimestampTypeException(UnsupportedPyArrowTypeException): + """Cannot convert PyArrow timestamp type to corresponding Iceberg type.""" diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 1ce0842844..7f5c5d85bc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -72,7 +72,12 @@ from sortedcontainers import SortedList from pyiceberg.conversions import to_bytes -from pyiceberg.exceptions import ResolveError +from pyiceberg.exceptions import ( + ResolveError, + UnsupportedPyArrowIntegerTypeException, + UnsupportedPyArrowTimestampTypeException, + UnsupportedPyArrowTypeException, +) from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or from pyiceberg.expressions.literals import Literal from pyiceberg.expressions.visitors import ( @@ -1099,8 +1104,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]): """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided.""" _field_names: List[str] + _field: Optional[pa.Field] def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None: + self._field = None self._field_names = [] self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us @@ -1141,6 +1148,12 @@ def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: Icebe return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable) def primitive(self, primitive: pa.DataType) -> PrimitiveType: + field_name = None + unsupported_prefix = "Unsupported" + if len(self._field_names) > 0: + field_name = self._field_names[-1] + unsupported_prefix = f"Column '{field_name}' has an unsupported" + if pa.types.is_boolean(primitive): return BooleanType() elif pa.types.is_integer(primitive): @@ -1151,7 +1164,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return LongType() else: # Does not exist (yet) - raise TypeError(f"Unsupported integer type: {primitive}") + raise UnsupportedPyArrowIntegerTypeException(self._field, f"{unsupported_prefix} integer type: {primitive}") elif pa.types.is_float32(primitive): return FloatType() elif pa.types.is_float64(primitive): @@ -1174,11 +1187,17 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: if self._downcast_ns_timestamp_to_us: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") else: - raise TypeError( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + column_name_message = "" + if field_name: + column_name_message = f", making the column '{field_name}' unsupported" + raise UnsupportedPyArrowTimestampTypeException( + self._field, + f"Iceberg does not yet support 'ns' timestamp precision{column_name_message}. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.", ) else: - raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") + raise UnsupportedPyArrowTimestampTypeException( + self._field, f"{unsupported_prefix} precision for timestamp type: {primitive.unit}" + ) if primitive.tz in UTC_ALIASES: return TimestamptzType() @@ -1191,13 +1210,15 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: primitive = cast(pa.FixedSizeBinaryType, primitive) return FixedType(primitive.byte_width) - raise TypeError(f"Unsupported type: {primitive}") + raise UnsupportedPyArrowTypeException(self._field, f"{unsupported_prefix} type: {primitive}") def before_field(self, field: pa.Field) -> None: self._field_names.append(field.name) + self._field = field def after_field(self, field: pa.Field) -> None: self._field_names.pop() + self._field = None def before_list_element(self, element: pa.Field) -> None: self._field_names.append(LIST_ELEMENT_NAME) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 027fccae7c..f1ce61228c 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -21,6 +21,7 @@ import pyarrow as pa import pytest +from pyiceberg.exceptions import UnsupportedPyArrowTimestampTypeException, UnsupportedPyArrowTypeException from pyiceberg.expressions import ( And, BoundEqualTo, @@ -90,7 +91,7 @@ def test_pyarrow_decimal256_to_iceberg() -> None: precision = 26 scale = 20 pyarrow_type = pa.decimal256(precision, scale) - with pytest.raises(TypeError, match=re.escape("Unsupported type: decimal256(26, 20)")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: decimal256(26, 20)")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -150,16 +151,16 @@ def test_pyarrow_date32_to_iceberg() -> None: def test_pyarrow_date64_to_iceberg() -> None: pyarrow_type = pa.date64() - with pytest.raises(TypeError, match=re.escape("Unsupported type: date64")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: date64")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) def test_pyarrow_time32_to_iceberg() -> None: pyarrow_type = pa.time32("ms") - with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time32[ms]")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) pyarrow_type = pa.time32("s") - with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time32[s]")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -172,7 +173,7 @@ def test_pyarrow_time64_us_to_iceberg() -> None: def test_pyarrow_time64_ns_to_iceberg() -> None: pyarrow_type = pa.time64("ns") - with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time64[ns]")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -188,7 +189,7 @@ def test_pyarrow_timestamp_to_iceberg(precision: str) -> None: def test_pyarrow_timestamp_invalid_units() -> None: pyarrow_type = pa.timestamp(unit="ns") with pytest.raises( - TypeError, + UnsupportedPyArrowTimestampTypeException, match=re.escape( "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ), @@ -210,7 +211,7 @@ def test_pyarrow_timestamp_tz_to_iceberg() -> None: def test_pyarrow_timestamp_tz_invalid_units() -> None: pyarrow_type = pa.timestamp(unit="ns", tz="UTC") with pytest.raises( - TypeError, + UnsupportedPyArrowTimestampTypeException, match=re.escape( "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." ), @@ -220,7 +221,7 @@ def test_pyarrow_timestamp_tz_invalid_units() -> None: def test_pyarrow_timestamp_tz_invalid_tz() -> None: pyarrow_type = pa.timestamp(unit="us", tz="US/Pacific") - with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[us, tz=US/Pacific]")): + with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: timestamp[us, tz=US/Pacific]")): visit_pyarrow(pyarrow_type, _ConvertToIceberg()) @@ -583,21 +584,35 @@ def test_pyarrow_schema_to_schema_fresh_ids_nested_schema( assert visit_pyarrow(pyarrow_schema_nested_without_ids, _ConvertToIcebergWithoutIDs()) == iceberg_schema_nested_no_ids -def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: - expected_schema = pa.schema( +def test_pyarrow_schema_unsupported_type() -> None: + lat_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False) + schema = pa.schema( [ - pa.field("foo", pa.large_string(), nullable=False), - pa.field("bar", pa.int32(), nullable=False), - pa.field("baz", pa.bool_(), nullable=True), - pa.field("qux", pa.large_list(pa.large_string()), nullable=False), + pa.field("foo", pa.string(), nullable=False), pa.field( - "quux", - pa.map_( - pa.large_string(), - pa.map_(pa.large_string(), pa.int32()), + "location", + pa.large_list( + pa.struct( + [ + lat_field, + pa.field("longitude", pa.float32(), nullable=False), + ] + ), ), nullable=False, ), + ] + ) + with pytest.raises( + UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)") + ) as exc_info: + visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) + assert exc_info.value.field == lat_field + + foo_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False) + schema = pa.schema( + [ + foo_field, pa.field( "location", pa.large_list( @@ -610,19 +625,16 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa ), nullable=False, ), - pa.field( - "person", - pa.struct( - [ - pa.field("name", pa.large_string(), nullable=True), - pa.field("age", pa.int32(), nullable=False), - ] - ), - nullable=True, - ), ] ) - assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema + with pytest.raises( + UnsupportedPyArrowTypeException, + match=re.escape( + "Iceberg does not yet support 'ns' timestamp precision, making the column 'foo' unsupported. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + ), + ) as exc_info: + visit_pyarrow(schema, _ConvertToIcebergWithoutIDs()) + assert exc_info.value.field == foo_field def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: