diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 76bb041195..dbb8edfefe 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1717,54 +1717,55 @@ def fill_parquet_file_metadata( def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]: - task = next(tasks) - - try: - _ = next(tasks) - # If there are more tasks, raise an exception - raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208") - except StopIteration: - pass - - parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) - - file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = file_schema or table.schema() arrow_file_schema = schema_to_pyarrow(file_schema) - - fo = table.io.new_output(file_path) + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) row_group_size = PropertyUtil.property_as_int( properties=table.properties, property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT, ) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: - writer.write_table(task.df, row_group_size=row_group_size) - - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=len(fo), - # After this has been fixed: - # https://github.com/apache/iceberg-python/issues/271 - # sort_order_id=task.sort_order_id, - sort_order_id=None, - # Just copy these from the table for now - spec_id=table.spec().spec_id, - equality_ids=None, - key_metadata=None, - ) + data_files = [] + for task in tasks: + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' + fo = table.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer: + for batch in task.record_batches: + writer.write_batch(batch, row_group_size=row_group_size) + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=len(fo), + # After this has been fixed: + # https://github.com/apache/iceberg-python/issues/271 + # sort_order_id=task.sort_order_id, + sort_order_id=None, + # Just copy these from the table for now + spec_id=table.spec().spec_id, + equality_ids=None, + key_metadata=None, + ) + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=writer.writer.metadata, + stats_columns=compute_statistics_plan(file_schema, table.properties), + parquet_column_mapping=parquet_path_to_id_mapping(file_schema), + ) + data_files.append(data_file) + return iter(data_files) - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=writer.writer.metadata, - stats_columns=compute_statistics_plan(file_schema, table.properties), - parquet_column_mapping=parquet_path_to_id_mapping(file_schema), - ) - return iter([data_file]) +def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]: + # bin-pack the table into 256 MB chunks + from pyiceberg.utils.bin_packing import PackingIterator + + splits = tbl.to_batches() + target_weight = 2 << 27 # 256 MB + bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True) + return bin_packed ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b07da7e0fc..923cc645c3 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2330,7 +2330,7 @@ def _generate_snapshot_id() -> int: class WriteTask: write_uuid: uuid.UUID task_id: int - df: pa.Table + record_batches: List[pa.RecordBatch] sort_order_id: Optional[int] = None # Later to be extended with partition information @@ -2359,7 +2359,7 @@ def _dataframe_to_data_files( Returns: An iterable that supplies datafiles that represent the table. """ - from pyiceberg.io.pyarrow import write_file + from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file if len(table.spec().fields) > 0: raise ValueError("Cannot write to partitioned tables") @@ -2369,7 +2369,7 @@ def _dataframe_to_data_files( # This is an iter, so we don't have to materialize everything every time # This will be more relevant when we start doing partitioned writes - yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)]), file_schema=file_schema) + yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema) class _MergingSnapshotProducer: