Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified exception objects being thrown when converting Pyarrow tables #1498

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pyiceberg/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Any

import pyarrow as pa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oof, for those that use PyIceberg with just s3fs, this import will be problematic. We should move this into pyarrow.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I will move this into the pyarrow.py. :)



class TableAlreadyExistsError(Exception):
Expand Down Expand Up @@ -122,3 +125,11 @@ class CommitStateUnknownException(RESTError):

class WaitingForLockException(Exception):
"""Need to wait for a lock, try again."""


class UnsupportedPyArrowTypeException(Exception):
"""Cannot convert PyArrow type to corresponding Iceberg type."""

def __init__(self, field: pa.Field, *args: Any):
self.field = field
DevChrisCross marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(*args)
29 changes: 20 additions & 9 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@
from sortedcontainers import SortedList

from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ResolveError
from pyiceberg.exceptions import (
ResolveError,
UnsupportedPyArrowTypeException,
)
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or
from pyiceberg.expressions.literals import Literal
from pyiceberg.expressions.visitors import (
Expand Down Expand Up @@ -960,13 +963,7 @@ def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:

@visit_pyarrow.register(pa.StructType)
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
results = []

for field in obj:
visitor.before_field(field)
result = visit_pyarrow(field.type, visitor)
results.append(visitor.field(field, result))
visitor.after_field(field)
results = [visit_pyarrow(field, visitor) for field in obj]

return visitor.struct(obj, results)

Expand Down Expand Up @@ -1004,6 +1001,20 @@ def _(obj: pa.DictionaryType, visitor: PyArrowSchemaVisitor[T]) -> T:
return visit_pyarrow(obj.value_type, visitor)


@visit_pyarrow.register(pa.Field)
DevChrisCross marked this conversation as resolved.
Show resolved Hide resolved
def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T:
field_type = obj.type

visitor.before_field(obj)
try:
result = visit_pyarrow(field_type, visitor)
except TypeError as e:
raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e
visitor.after_field(obj)

return visitor.field(obj, result)


@visit_pyarrow.register(pa.DataType)
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
if pa.types.is_nested(obj):
Expand Down Expand Up @@ -1175,7 +1186,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
Expand Down
86 changes: 86 additions & 0 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import pyarrow as pa
import pytest

from pyiceberg.exceptions import UnsupportedPyArrowTypeException
from pyiceberg.expressions import (
And,
BoundEqualTo,
Expand Down Expand Up @@ -625,6 +626,91 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema


def test_pyarrow_schema_unsupported_type() -> None:
unsupported_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False, metadata={"PARQUET:field_id": "2"})
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field(
"location",
pa.large_list(
pa.field(
"item",
pa.struct(
[
unsupported_field,
pa.field("longitude", pa.float32(), nullable=False, metadata={"PARQUET:field_id": "3"}),
]
),
metadata={"PARQUET:field_id": "4"},
)
),
nullable=False,
metadata={"PARQUET:field_id": "5"},
),
],
metadata={"PARQUET:field_id": "6"},
)
with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)")
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(20, 26)" in exception_cause.args[0]

unsupported_field = pa.field(
"quux",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field(
"value",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "5"}),
pa.field("value", pa.decimal256(2, 3), metadata={"PARQUET:field_id": "6"}),
),
nullable=False,
metadata={"PARQUET:field_id": "4"},
),
),
nullable=False,
metadata={"PARQUET:field_id": "3"},
)
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
unsupported_field,
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'quux' has an unsupported type: map<string, map<string, decimal256(2, 3)>>"),
DevChrisCross marked this conversation as resolved.
Show resolved Hide resolved
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(2, 3)" in exception_cause.args[0]

unsupported_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False, metadata={"PARQUET:field_id": "1"})
schema = pa.schema(
[
unsupported_field,
pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}),
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'foo' has an unsupported type: timestamp[ns]"),
) as exc_info:
DevChrisCross marked this conversation as resolved.
Show resolved Hide resolved
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Iceberg does not yet support 'ns' timestamp precision" in exception_cause.args[0]


def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids)
assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids
Expand Down
Loading