Skip to content

Commit 9969926

Browse files
committed
Move to PyArrow 17
1 parent d8b5c17 commit 9969926

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,7 +1197,7 @@ def _task_to_record_batches(
11971197
positional_deletes: Optional[List[ChunkedArray]],
11981198
case_sensitive: bool,
11991199
name_mapping: Optional[NameMapping] = None,
1200-
use_large_types: bool = True,
1200+
use_large_types: Optional[bool] = None,
12011201
) -> Iterator[pa.RecordBatch]:
12021202
_, _, path = PyArrowFileIO.parse_location(task.file.file_path)
12031203
arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
@@ -1220,15 +1220,16 @@ def _task_to_record_batches(
12201220
if file_schema is None:
12211221
raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}")
12221222

1223+
projected_file_schema = None
1224+
if use_large_types is not None:
1225+
if use_large_types is True:
1226+
projected_file_schema = _pyarrow_schema_ensure_large_types(physical_schema)
1227+
else:
1228+
projected_file_schema = _pyarrow_schema_ensure_small_types(physical_schema)
1229+
12231230
fragment_scanner = ds.Scanner.from_fragment(
12241231
fragment=fragment,
1225-
# With PyArrow 16.0.0 there is an issue with casting record-batches:
1226-
# https://github.com/apache/arrow/issues/41884
1227-
# https://github.com/apache/arrow/issues/43183
1228-
# Would be good to remove this later on
1229-
schema=_pyarrow_schema_ensure_large_types(physical_schema)
1230-
if use_large_types
1231-
else (_pyarrow_schema_ensure_small_types(physical_schema)),
1232+
schema=projected_file_schema,
12321233
# This will push down the query to Arrow.
12331234
# But in case there are positional deletes, we have to apply them first
12341235
filter=pyarrow_filter if not positional_deletes else None,
@@ -1246,14 +1247,9 @@ def _task_to_record_batches(
12461247
batch = batch.take(indices)
12471248
# Apply the user filter
12481249
if pyarrow_filter is not None:
1249-
# we need to switch back and forth between RecordBatch and Table
1250-
# as Expression filter isn't yet supported in RecordBatch
1251-
# https://github.com/apache/arrow/issues/39220
1252-
arrow_table = pa.Table.from_batches([batch])
1253-
arrow_table = arrow_table.filter(pyarrow_filter)
1254-
if len(arrow_table) == 0:
1250+
batch = batch.filter(pyarrow_filter)
1251+
if len(batch) == 0:
12551252
continue
1256-
batch = arrow_table.to_batches()[0]
12571253
yield _to_requested_schema(
12581254
projected_schema, file_project_schema, batch, downcast_ns_timestamp_to_us=True, use_large_types=use_large_types
12591255
)
@@ -1268,7 +1264,7 @@ def _task_to_table(
12681264
positional_deletes: Optional[List[ChunkedArray]],
12691265
case_sensitive: bool,
12701266
name_mapping: Optional[NameMapping] = None,
1271-
use_large_types: bool = True,
1267+
use_large_types: Optional[bool] = None,
12721268
) -> Optional[pa.Table]:
12731269
batches = list(
12741270
_task_to_record_batches(
@@ -1348,7 +1344,9 @@ def project_table(
13481344
# When FsSpec is not installed
13491345
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e
13501346

1351-
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
1347+
use_large_types = None
1348+
if PYARROW_USE_LARGE_TYPES_ON_READ in io.properties:
1349+
use_large_types = property_as_bool(io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True)
13521350

13531351
bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
13541352

0 commit comments

Comments
 (0)