Skip to content

Commit

Permalink
Modified exception when converting Pyarrow (#1498)
Browse files Browse the repository at this point in the history
* Modified exception objects being thrown when converting Pyarrow tables

Signed-off-by: Christian Molina <[email protected]>

* Added visit_pyarrow dispatch for pyarrow field

Signed-off-by: Christian Molina <[email protected]>

* Removed unnecessary codes and modified testing

Signed-off-by: Christian Molina <[email protected]>

* Fixed integration test

Signed-off-by: Christian Molina <[email protected]>

* Moved UnsupportedPyArrowTypeException to pyarrow.py

Signed-off-by: Christian Molina <[email protected]>

---------

Signed-off-by: Christian Molina <[email protected]>
  • Loading branch information
DevChrisCross authored Jan 21, 2025
1 parent 5a3c346 commit 7f41565
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 14 deletions.
32 changes: 24 additions & 8 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@
T = TypeVar("T")


class UnsupportedPyArrowTypeException(Exception):
"""Cannot convert PyArrow type to corresponding Iceberg type."""

def __init__(self, field: pa.Field, *args: Any):
self.field = field
super().__init__(*args)


class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem):
def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile:
# In LocalFileSystem, parent directories must be first created before opening an output stream
Expand Down Expand Up @@ -952,13 +960,7 @@ def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T:

@visit_pyarrow.register(pa.StructType)
def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:
results = []

for field in obj:
visitor.before_field(field)
result = visit_pyarrow(field.type, visitor)
results.append(visitor.field(field, result))
visitor.after_field(field)
results = [visit_pyarrow(field, visitor) for field in obj]

return visitor.struct(obj, results)

Expand Down Expand Up @@ -996,6 +998,20 @@ def _(obj: pa.DictionaryType, visitor: PyArrowSchemaVisitor[T]) -> T:
return visit_pyarrow(obj.value_type, visitor)


@visit_pyarrow.register(pa.Field)
def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T:
field_type = obj.type

visitor.before_field(obj)
try:
result = visit_pyarrow(field_type, visitor)
except TypeError as e:
raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e
visitor.after_field(obj)

return visitor.field(obj, result)


@visit_pyarrow.register(pa.DataType)
def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T:
if pa.types.is_nested(obj):
Expand Down Expand Up @@ -1167,7 +1183,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType:
logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.")
else:
raise TypeError(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.",
)
else:
raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}")
Expand Down
17 changes: 11 additions & 6 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.io import FileIO
from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_large_types
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
Expand Down Expand Up @@ -616,13 +616,18 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v

# add the parquet files as data files
with pytest.raises(
TypeError,
match=re.escape(
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
),
):
UnsupportedPyArrowTypeException,
match=re.escape("Column 'quux' has an unsupported type: timestamp[ns, tz=UTC]"),
) as exc_info:
tbl.add_files(file_paths=[file_path])

exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert (
"Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write."
in exception_cause.args[0]
)


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
Expand Down
86 changes: 86 additions & 0 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)
from pyiceberg.expressions.literals import literal
from pyiceberg.io.pyarrow import (
UnsupportedPyArrowTypeException,
_ConvertToArrowSchema,
_ConvertToIceberg,
_ConvertToIcebergWithoutIDs,
Expand Down Expand Up @@ -625,6 +626,91 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa
assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema


def test_pyarrow_schema_unsupported_type() -> None:
unsupported_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False, metadata={"PARQUET:field_id": "2"})
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
pa.field(
"location",
pa.large_list(
pa.field(
"item",
pa.struct(
[
unsupported_field,
pa.field("longitude", pa.float32(), nullable=False, metadata={"PARQUET:field_id": "3"}),
]
),
metadata={"PARQUET:field_id": "4"},
)
),
nullable=False,
metadata={"PARQUET:field_id": "5"},
),
],
metadata={"PARQUET:field_id": "6"},
)
with pytest.raises(
UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)")
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(20, 26)" in exception_cause.args[0]

unsupported_field = pa.field(
"quux",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}),
pa.field(
"value",
pa.map_(
pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "5"}),
pa.field("value", pa.decimal256(2, 3), metadata={"PARQUET:field_id": "6"}),
),
nullable=False,
metadata={"PARQUET:field_id": "4"},
),
),
nullable=False,
metadata={"PARQUET:field_id": "3"},
)
schema = pa.schema(
[
pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}),
unsupported_field,
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'quux' has an unsupported type: map<string, map<string, decimal256(2, 3)>>"),
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Unsupported type: decimal256(2, 3)" in exception_cause.args[0]

unsupported_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False, metadata={"PARQUET:field_id": "1"})
schema = pa.schema(
[
unsupported_field,
pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}),
]
)
with pytest.raises(
UnsupportedPyArrowTypeException,
match=re.escape("Column 'foo' has an unsupported type: timestamp[ns]"),
) as exc_info:
pyarrow_to_schema(schema)
assert exc_info.value.field == unsupported_field
exception_cause = exc_info.value.__cause__
assert isinstance(exception_cause, TypeError)
assert "Iceberg does not yet support 'ns' timestamp precision" in exception_cause.args[0]


def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None:
schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids)
assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids
Expand Down

0 comments on commit 7f41565

Please sign in to comment.