From 7e131bb3c57dda6bcea3ed2909bf3f992a450c1d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 24 Mar 2024 15:44:33 -0700 Subject: [PATCH] future return row count --- pyiceberg/io/pyarrow.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 72de14880a..4bfd1fd508 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -946,13 +946,9 @@ def _task_to_table( projected_field_ids: Set[int], positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, - row_counts: List[int], limit: Optional[int] = None, name_mapping: Optional[NameMapping] = None, ) -> Optional[pa.Table]: - if limit and sum(row_counts) >= limit: - return None - _, _, path = PyArrowFileIO.parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with fs.open_input_file(path) as fin: @@ -1015,11 +1011,6 @@ def _task_to_table( if len(arrow_table) < 1: return None - if limit is not None and sum(row_counts) >= limit: - return None - - row_counts.append(len(arrow_table)) - return to_requested_schema(projected_schema, file_project_schema, arrow_table) @@ -1085,7 +1076,6 @@ def project_table( id for id in projected_schema.field_ids if not isinstance(projected_schema.find_type(id), (MapType, ListType)) }.union(extract_field_ids(bound_row_filter)) - row_counts: List[int] = [] deletes_per_file = _read_all_delete_files(fs, tasks) executor = ExecutorFactory.get_or_create() futures = [ @@ -1098,21 +1088,21 @@ def project_table( projected_field_ids, deletes_per_file.get(task.file.file_path), case_sensitive, - row_counts, limit, table.name_mapping(), ) for task in tasks ] - + total_row_count = 0 # for consistent ordering, we need to maintain future order futures_index = {f: i for i, f in enumerate(futures)} completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f]) for future in concurrent.futures.as_completed(futures): completed_futures.add(future) - + if table_result := future.result(): + total_row_count += len(table_result) # stop early if limit is satisfied - if limit is not None and sum(row_counts) >= limit: + if limit is not None and total_row_count >= limit: break # by now, we've either completed all tasks or satisfied the limit