Skip to content

Commit

Permalink
Read: fetch file_schema directly from pyarrow_to_schema (#597)
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX authored and Fokko committed Apr 14, 2024
1 parent 20093c3 commit bc6bea1
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
10 changes: 2 additions & 8 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@
pre_order_visit,
promote,
prune_columns,
sanitize_column_names,
visit,
visit_with_partner,
)
Expand Down Expand Up @@ -950,20 +949,15 @@ def _task_to_table(
with fs.open_input_file(path) as fin:
fragment = arrow_format.make_fragment(fin)
physical_schema = fragment.physical_schema
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
file_schema = (
Schema.model_validate_json(schema_raw) if schema_raw is not None else pyarrow_to_schema(physical_schema, name_mapping)
)
file_schema = pyarrow_to_schema(physical_schema, name_mapping)

pyarrow_filter = None
if bound_row_filter is not AlwaysTrue():
translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive)
bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive)
pyarrow_filter = expression_to_pyarrow(bound_file_filter)

file_project_schema = sanitize_column_names(prune_columns(file_schema, projected_field_ids, select_full_types=False))
file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

if file_schema is None:
raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
Expand Down
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1913,9 +1913,11 @@ def data_file(table_schema_simple: Schema, tmp_path: str) -> str:
import pyarrow as pa
from pyarrow import parquet as pq

from pyiceberg.io.pyarrow import schema_to_pyarrow

table = pa.table(
{"foo": ["a", "b", "c"], "bar": [1, 2, 3], "baz": [True, False, None]},
metadata={"iceberg.schema": table_schema_simple.model_dump_json()},
schema=schema_to_pyarrow(table_schema_simple),
)

file_path = f"{tmp_path}/0000-data.parquet"
Expand Down
6 changes: 3 additions & 3 deletions tests/io/test_pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ def test_delete(deletes_file: str, example_task: FileScanTask, table_schema_simp
str(with_deletes)
== """pyarrow.Table
foo: string
bar: int64 not null
bar: int32 not null
baz: bool
----
foo: [["a","c"]]
Expand Down Expand Up @@ -1426,7 +1426,7 @@ def test_delete_duplicates(deletes_file: str, example_task: FileScanTask, table_
str(with_deletes)
== """pyarrow.Table
foo: string
bar: int64 not null
bar: int32 not null
baz: bool
----
foo: [["a","c"]]
Expand Down Expand Up @@ -1462,7 +1462,7 @@ def test_pyarrow_wrap_fsspec(example_task: FileScanTask, table_schema_simple: Sc
str(projection)
== """pyarrow.Table
foo: string
bar: int64 not null
bar: int32 not null
baz: bool
----
foo: [["a","b","c"]]
Expand Down

0 comments on commit bc6bea1

Please sign in to comment.