diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 74692f85b8..67ebaa810f 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -122,7 +122,6 @@ pre_order_visit, promote, prune_columns, - sanitize_column_names, visit, visit_with_partner, ) @@ -966,12 +965,7 @@ def _task_to_table( with fs.open_input_file(path) as fin: fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema - schema_raw = None - if metadata := physical_schema.metadata: - schema_raw = metadata.get(ICEBERG_SCHEMA) - file_schema = ( - Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping) - ) + file_schema = pyarrow_to_schema(physical_schema, name_mapping) pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): @@ -979,7 +973,7 @@ def _task_to_table( bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - file_project_schema = sanitize_column_names(prune_columns(file_schema, projected_field_ids, select_full_types=False)) + file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") diff --git a/tests/conftest.py b/tests/conftest.py index 7da0a0a85a..e0d829105a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1931,9 +1931,11 @@ def data_file(table_schema_simple: Schema, tmp_path: str) -> str: import pyarrow as pa from pyarrow import parquet as pq + from pyiceberg.io.pyarrow import schema_to_pyarrow + table = pa.table( {"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]}, - metadata={"iceberg.schema": table_schema_simple.model_dump_json()}, + schema=schema_to_pyarrow(table_schema_simple), ) file_path = f"{tmp_path}/0000-data.parquet" diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index e1526d2a5e..775a6f9d42 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -271,6 +271,28 @@ def get_current_snapshot_id(identifier: str) -> int: assert tbl.current_snapshot().snapshot_id == get_current_snapshot_id(identifier) # type: ignore +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_python_writes_special_character_column_with_spark_reads( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = "default.python_writes_special_character_column_with_spark_reads" + column_name_with_special_character = "letter/abc" + TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN = { + column_name_with_special_character: ['a', None, 'z'], + } + pa_schema = pa.schema([ + (column_name_with_special_character, pa.string()), + ]) + arrow_table_with_special_character_column = pa.Table.from_pydict(TEST_DATA_WITH_SPECIAL_CHARACTER_COLUMN, schema=pa_schema) + tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, schema=pa_schema) + + tbl.overwrite(arrow_table_with_special_character_column) + spark_df = spark.sql(f"SELECT * FROM {identifier}").toPandas() + pyiceberg_df = tbl.scan().to_pandas() + assert spark_df.equals(pyiceberg_df) + + @pytest.mark.integration def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.write_bin_pack_data_files" diff --git a/tests/integration/test_writes/utils.py b/tests/integration/test_writes/utils.py index 792e25185d..742b1e14fc 100644 --- a/tests/integration/test_writes/utils.py +++ b/tests/integration/test_writes/utils.py @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name -from typing import List, Optional +from typing import List, Optional, Union import pyarrow as pa @@ -65,6 +65,7 @@ def _create_table( properties: Properties, data: Optional[List[pa.Table]] = None, partition_spec: Optional[PartitionSpec] = None, + schema: Union[Schema, "pa.Schema"] = TABLE_SCHEMA, ) -> Table: try: session_catalog.drop_table(identifier=identifier) @@ -73,10 +74,10 @@ def _create_table( if partition_spec: tbl = session_catalog.create_table( - identifier=identifier, schema=TABLE_SCHEMA, properties=properties, partition_spec=partition_spec + identifier=identifier, schema=schema, properties=properties, partition_spec=partition_spec ) else: - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) + tbl = session_catalog.create_table(identifier=identifier, schema=schema, properties=properties) if data: for d in data: diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 46ece77880..ef2c4cecbb 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1373,7 +1373,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp str(with_deletes) == """pyarrow.Table foo: string -bar: int64 not null +bar: int32 not null baz: bool ---- foo: [["a","c"]] @@ -1411,7 +1411,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_ str(with_deletes) == """pyarrow.Table foo: string -bar: int64 not null +bar: int32 not null baz: bool ---- foo: [["a","c"]] @@ -1442,7 +1442,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc str(projection) == """pyarrow.Table foo: string -bar: int64 not null +bar: int32 not null baz: bool ---- foo: [["a","b","c"]]