diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d288e4f2f1..e367aa586c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -189,6 +189,14 @@ T = TypeVar("T") +class UnsupportedPyArrowTypeException(Exception): + """Cannot convert PyArrow type to corresponding Iceberg type.""" + + def __init__(self, field: pa.Field, *args: Any): + self.field = field + super().__init__(*args) + + class PyArrowLocalFileSystem(pyarrow.fs.LocalFileSystem): def open_output_stream(self, path: str, *args: Any, **kwargs: Any) -> pyarrow.NativeFile: # In LocalFileSystem, parent directories must be first created before opening an output stream @@ -952,13 +960,7 @@ def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> T: @visit_pyarrow.register(pa.StructType) def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T: - results = [] - - for field in obj: - visitor.before_field(field) - result = visit_pyarrow(field.type, visitor) - results.append(visitor.field(field, result)) - visitor.after_field(field) + results = [visit_pyarrow(field, visitor) for field in obj] return visitor.struct(obj, results) @@ -996,6 +998,20 @@ def _(obj: pa.DictionaryType, visitor: PyArrowSchemaVisitor[T]) -> T: return visit_pyarrow(obj.value_type, visitor) +@visit_pyarrow.register(pa.Field) +def _(obj: pa.Field, visitor: PyArrowSchemaVisitor[T]) -> T: + field_type = obj.type + + visitor.before_field(obj) + try: + result = visit_pyarrow(field_type, visitor) + except TypeError as e: + raise UnsupportedPyArrowTypeException(obj, f"Column '{obj.name}' has an unsupported type: {field_type}") from e + visitor.after_field(obj) + + return visitor.field(obj, result) + + @visit_pyarrow.register(pa.DataType) def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> T: if pa.types.is_nested(obj): @@ -1167,7 +1183,7 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: logger.warning("Iceberg does not yet support 'ns' timestamp precision. Downcasting to 'us'.") else: raise TypeError( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write.", ) else: raise TypeError(f"Unsupported precision for timestamp type: {primitive.unit}") diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index c1d916e0e0..8713615218 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -30,7 +30,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.io import FileIO -from pyiceberg.io.pyarrow import _pyarrow_schema_ensure_large_types +from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, _pyarrow_schema_ensure_large_types from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table @@ -616,13 +616,18 @@ def test_add_files_with_timestamp_tz_ns_fails(session_catalog: Catalog, format_v # add the parquet files as data files with pytest.raises( - TypeError, - match=re.escape( - "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." - ), - ): + UnsupportedPyArrowTypeException, + match=re.escape("Column 'quux' has an unsupported type: timestamp[ns, tz=UTC]"), + ) as exc_info: tbl.add_files(file_paths=[file_path]) + exception_cause = exc_info.value.__cause__ + assert isinstance(exception_cause, TypeError) + assert ( + "Iceberg does not yet support 'ns' timestamp precision. Use 'downcast-ns-timestamp-to-us-on-write' configuration property to automatically downcast 'ns' to 'us' on write." + in exception_cause.args[0] + ) + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/io/test_pyarrow_visitor.py b/tests/io/test_pyarrow_visitor.py index 027fccae7c..d13822f5ce 100644 --- a/tests/io/test_pyarrow_visitor.py +++ b/tests/io/test_pyarrow_visitor.py @@ -33,6 +33,7 @@ ) from pyiceberg.expressions.literals import literal from pyiceberg.io.pyarrow import ( + UnsupportedPyArrowTypeException, _ConvertToArrowSchema, _ConvertToIceberg, _ConvertToIcebergWithoutIDs, @@ -625,6 +626,91 @@ def test_pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids: pa assert _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) == expected_schema +def test_pyarrow_schema_unsupported_type() -> None: + unsupported_field = pa.field("latitude", pa.decimal256(20, 26), nullable=False, metadata={"PARQUET:field_id": "2"}) + schema = pa.schema( + [ + pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}), + pa.field( + "location", + pa.large_list( + pa.field( + "item", + pa.struct( + [ + unsupported_field, + pa.field("longitude", pa.float32(), nullable=False, metadata={"PARQUET:field_id": "3"}), + ] + ), + metadata={"PARQUET:field_id": "4"}, + ) + ), + nullable=False, + metadata={"PARQUET:field_id": "5"}, + ), + ], + metadata={"PARQUET:field_id": "6"}, + ) + with pytest.raises( + UnsupportedPyArrowTypeException, match=re.escape("Column 'latitude' has an unsupported type: decimal256(20, 26)") + ) as exc_info: + pyarrow_to_schema(schema) + assert exc_info.value.field == unsupported_field + exception_cause = exc_info.value.__cause__ + assert isinstance(exception_cause, TypeError) + assert "Unsupported type: decimal256(20, 26)" in exception_cause.args[0] + + unsupported_field = pa.field( + "quux", + pa.map_( + pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field( + "value", + pa.map_( + pa.field("key", pa.string(), nullable=False, metadata={"PARQUET:field_id": "5"}), + pa.field("value", pa.decimal256(2, 3), metadata={"PARQUET:field_id": "6"}), + ), + nullable=False, + metadata={"PARQUET:field_id": "4"}, + ), + ), + nullable=False, + metadata={"PARQUET:field_id": "3"}, + ) + schema = pa.schema( + [ + pa.field("foo", pa.string(), nullable=False, metadata={"PARQUET:field_id": "1"}), + unsupported_field, + ] + ) + with pytest.raises( + UnsupportedPyArrowTypeException, + match=re.escape("Column 'quux' has an unsupported type: map>"), + ) as exc_info: + pyarrow_to_schema(schema) + assert exc_info.value.field == unsupported_field + exception_cause = exc_info.value.__cause__ + assert isinstance(exception_cause, TypeError) + assert "Unsupported type: decimal256(2, 3)" in exception_cause.args[0] + + unsupported_field = pa.field("foo", pa.timestamp(unit="ns"), nullable=False, metadata={"PARQUET:field_id": "1"}) + schema = pa.schema( + [ + unsupported_field, + pa.field("bar", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "2"}), + ] + ) + with pytest.raises( + UnsupportedPyArrowTypeException, + match=re.escape("Column 'foo' has an unsupported type: timestamp[ns]"), + ) as exc_info: + pyarrow_to_schema(schema) + assert exc_info.value.field == unsupported_field + exception_cause = exc_info.value.__cause__ + assert isinstance(exception_cause, TypeError) + assert "Iceberg does not yet support 'ns' timestamp precision" in exception_cause.args[0] + + def test_pyarrow_schema_round_trip_ensure_large_types_and_then_small_types(pyarrow_schema_nested_without_ids: pa.Schema) -> None: schema_with_large_types = _pyarrow_schema_ensure_large_types(pyarrow_schema_nested_without_ids) assert _pyarrow_schema_ensure_small_types(schema_with_large_types) == pyarrow_schema_nested_without_ids