Skip to content

Commit

Permalink
Modified exception objects being thrown when converting Pyarrow tables
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Molina <[email protected]>
  • Loading branch information
DevChrisCross committed Jan 11, 2025
1 parent cad0ad7 commit f1b8f50
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 35 deletions.
19 changes: 19 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any

import pyarrow as pa


class TableAlreadyExistsError(Exception):
Expand Down Expand Up @@ -122,3 +125,19 @@ class CommitStateUnknownException(RESTError):

class WaitingForLockException(Exception):
"""Need to wait for a lock, try again."""


class UnsupportedPyArrowTypeException(Exception):
"""Cannot convert PyArrow type to corresponding Iceberg type."""

def __init__(self, field: pa.Field, *args: Any):
self.field = field
super().__init__(*args)


class UnsupportedPyArrowIntegerTypeException(UnsupportedPyArrowTypeException):
"""Cannot convert PyArrow integer type to corresponding Iceberg type."""


class UnsupportedPyArrowTimestampTypeException(UnsupportedPyArrowTypeException):
"""Cannot convert PyArrow timestamp type to corresponding Iceberg type."""
33 changes: 27 additions & 6 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@
from sortedcontainers import SortedList

from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ResolveError
from pyiceberg.exceptions import (
ResolveError,
UnsupportedPyArrowIntegerTypeException,
UnsupportedPyArrowTimestampTypeException,
UnsupportedPyArrowTypeException,
)
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or
from pyiceberg.expressions.literals import Literal
from pyiceberg.expressions.visitors import (
Expand Down Expand Up @@ -1099,8 +1104,10 @@ class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
"""Converts PyArrowSchema to Iceberg Schema. Applies the IDs from name_mapping if provided."""

_field_names: List[str]
_field: Optional[pa.Field]

def __init__(self, downcast_ns_timestamp_to_us: bool = False) -> None:
self._field = None
self._field_names = []
self._downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us

Expand Down Expand Up @@ -1141,6 +1148,12 @@ def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: Icebe
return MapType(key_id, key_result, value_id, value_result, value_required=not value_field.nullable)

def primitive(self, primitive: pa.DataType) -> PrimitiveType:
field_name = None
unsupported_prefix = "Unsupported"
if len(self._field_names) > 0:
field_name = self._field_names[-1]
unsupported_prefix = f"Column '{field_name}' has an unsupported"

if pa.types.is_boolean(primitive):
return BooleanType()
elif pa.types.is_integer(primitive):
Expand All @@ -1151,7 +1164,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
return LongType()
else:
# Does not exist (yet)
raise TypeError(f"Unsupported integer type: {primitive}")
raise UnsupportedPyArrowIntegerTypeException(self._field, f"{unsupported_prefix} integer type: {primitive}")
elif pa.types.is_float32(primitive):
return FloatType()
elif pa.types.is_float64(primitive):
Expand All @@ -1174,11 +1187,17 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
if self._downcast_ns_timestamp_to_us:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
column_name_message = ""
if field_name:
column_name_message = f", making the column '{field_name}' unsupported"
raise UnsupportedPyArrowTimestampTypeException(
self._field,
f"Iceberg does not yet support 'ns' timestamp precision{column_name_message}. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
raise UnsupportedPyArrowTimestampTypeException(
self._field, f"{unsupported_prefix} precision for timestamp type: {primitive.unit}"
)

if primitive.tz in UTC_ALIASES:
return TimestamptzType()
Expand All @@ -1191,13 +1210,15 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
primitive = cast(pa.FixedSizeBinaryType, primitive)
return FixedType(primitive.byte_width)

raise TypeError(f"Unsupported type: {primitive}")
raise UnsupportedPyArrowTypeException(self._field, f"{unsupported_prefix} type: {primitive}")

def before_field(self, field: pa.Field) -> None:
self._field_names.append(field.name)
self._field = field

def after_field(self, field: pa.Field) -> None:
self._field_names.pop()
self._field = None

def before_list_element(self, element: pa.Field) -> None:
self._field_names.append(LIST_ELEMENT_NAME)
Expand Down
70 changes: 41 additions & 29 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pyarrow as pa
import pytest

from pyiceberg.exceptions import UnsupportedPyArrowTimestampTypeException, UnsupportedPyArrowTypeException
from pyiceberg.expressions import (
And,
BoundEqualTo,
Expand Down Expand Up @@ -90,7 +91,7 @@ def test_pyarrow_decimal256_to_iceberg() -> None:
precision = 26
scale = 20
pyarrow_type = pa.decimal256(precision, scale)
with pytest.raises(TypeError, match=re.escape("Unsupported type: decimal256(26, 20)")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: decimal256(26, 20)")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand Down Expand Up @@ -150,16 +151,16 @@ def test_pyarrow_date32_to_iceberg() -> None:

def test_pyarrow_date64_to_iceberg() -> None:
pyarrow_type = pa.date64()
with pytest.raises(TypeError, match=re.escape("Unsupported type: date64")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: date64")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


def test_pyarrow_time32_to_iceberg() -> None:
pyarrow_type = pa.time32("ms")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[ms]")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time32[ms]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.time32("s")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time32[s]")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time32[s]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand All @@ -172,7 +173,7 @@ def test_pyarrow_time64_us_to_iceberg() -> None:

def test_pyarrow_time64_ns_to_iceberg() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type: time64[ns]")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: time64[ns]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand All @@ -188,7 +189,7 @@ def test_pyarrow_timestamp_to_iceberg(precision: str) -> None:
def test_pyarrow_timestamp_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ns")
with pytest.raises(
TypeError,
UnsupportedPyArrowTimestampTypeException,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
Expand All @@ -210,7 +211,7 @@ def test_pyarrow_timestamp_tz_to_iceberg() -> None:
def test_pyarrow_timestamp_tz_invalid_units() -> None:
pyarrow_type = pa.timestamp(unit="ns", tz="UTC")
with pytest.raises(
TypeError,
UnsupportedPyArrowTimestampTypeException,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
Expand All @@ -220,7 +221,7 @@ def test_pyarrow_timestamp_tz_invalid_units() -> None:

def test_pyarrow_timestamp_tz_invalid_tz() -> None:
pyarrow_type = pa.timestamp(unit="us", tz="US/Pacific")
with pytest.raises(TypeError, match=re.escape("Unsupported type: timestamp[us, tz=US/Pacific]")):
with pytest.raises(UnsupportedPyArrowTypeException, match=re.escape("Unsupported type: timestamp[us, tz=US/Pacific]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())


Expand Down Expand Up @@ -583,21 +584,35 @@ def test_pyarrow_schema_to_schema_fresh_ids_nested_schema(
assert visit_pyarrow(pyarrow_schema_nested_without_ids, _ConvertToIcebergWithoutIDs()) == iceberg_schema_nested_no_ids


def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
expected_schema = pa.schema(
def test_pyarrow_schema_unsupported_type() -> None:
lat_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False)
schema = pa.schema(
[
pa.field("foo", pa.large_string(), nullable=False),
pa.field("bar", pa.int32(), nullable=False),
pa.field("baz", pa.bool_(), nullable=True),
pa.field("qux", pa.large_list(pa.large_string()), nullable=False),
pa.field("foo", pa.string(), nullable=False),
pa.field(
"quux",
pa.map_(
pa.large_string(),
pa.map_(pa.large_string(), pa.int32()),
"location",
pa.large_list(
pa.struct(
[
lat_field,
pa.field("longitude", pa.float32(), nullable=False),
]
),
),
nullable=False,
),
]
)
with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)")
) as exc_info:
visit_pyarrow(schema, _ConvertToIcebergWithoutIDs())
assert exc_info.value.field == lat_field

foo_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False)
schema = pa.schema(
[
foo_field,
pa.field(
"location",
pa.large_list(
Expand All @@ -610,19 +625,16 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
),
nullable=False,
),
pa.field(
"person",
pa.struct(
[
pa.field("name", pa.large_string(), nullable=True),
pa.field("age", pa.int32(), nullable=False),
]
),
nullable=True,
),
]
)
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision, making the column 'foo' unsupported. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
) as exc_info:
visit_pyarrow(schema, _ConvertToIcebergWithoutIDs())
assert exc_info.value.field == foo_field


def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
Expand Down

0 comments on commit f1b8f50

Please sign in to comment.