Skip to content

Commit

Permalink
bin pack write
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Feb 23, 2024
1 parent 44948cd commit 7b4f3ce
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 43 deletions.
81 changes: 41 additions & 40 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1717,54 +1717,55 @@ def fill_parquet_file_metadata(


def write_file(table: Table, tasks: Iterator[WriteTask], file_schema: Optional[Schema] = None) -> Iterator[DataFile]:
task = next(tasks)

try:
_ = next(tasks)
# If there are more tasks, raise an exception
raise NotImplementedError("Only unpartitioned writes are supported: https://github.com/apache/iceberg-python/issues/208")
except StopIteration:
pass

parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = file_schema or table.schema()
arrow_file_schema = schema_to_pyarrow(file_schema)

fo = table.io.new_output(file_path)
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)
row_group_size = PropertyUtil.property_as_int(
properties=table.properties,
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
writer.write_table(task.df, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table.spec().spec_id,
equality_ids=None,
key_metadata=None,
)
data_files = []
for task in tasks:
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
fo = table.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=arrow_file_schema, **parquet_writer_kwargs) as writer:
for batch in task.record_batches:
writer.write_batch(batch, row_group_size=row_group_size)

data_file = DataFile(
content=DataFileContent.DATA,
file_path=file_path,
file_format=FileFormat.PARQUET,
partition=Record(),
file_size_in_bytes=len(fo),
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# sort_order_id=task.sort_order_id,
sort_order_id=None,
# Just copy these from the table for now
spec_id=table.spec().spec_id,
equality_ids=None,
key_metadata=None,
)
fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(file_schema, table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
)
data_files.append(data_file)
return iter(data_files)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(file_schema, table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(file_schema),
)
return iter([data_file])
def bin_pack_arrow_table(tbl: pa.Table) -> Iterator[List[pa.RecordBatch]]:
# bin-pack the table into 256 MB chunks
from pyiceberg.utils.bin_packing import PackingIterator

splits = tbl.to_batches()
target_weight = 2 << 27 # 256 MB
bin_packed = PackingIterator(splits, target_weight, lookback=2, weight_func=lambda x: x.nbytes, largest_bin_first=True)
return bin_packed


ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
Expand Down
6 changes: 3 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,7 @@ def _generate_snapshot_id() -> int:
class WriteTask:
write_uuid: uuid.UUID
task_id: int
df: pa.Table
record_batches: List[pa.RecordBatch]
sort_order_id: Optional[int] = None

# Later to be extended with partition information
Expand Down Expand Up @@ -2359,7 +2359,7 @@ def _dataframe_to_data_files(
Returns:
An iterable that supplies datafiles that represent the table.
"""
from pyiceberg.io.pyarrow import write_file
from pyiceberg.io.pyarrow import bin_pack_arrow_table, write_file

if len(table.spec().fields) > 0:
raise ValueError("Cannot write to partitioned tables")
Expand All @@ -2369,7 +2369,7 @@ def _dataframe_to_data_files(

# This is an iter, so we don't have to materialize everything every time
# This will be more relevant when we start doing partitioned writes
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), df)]), file_schema=file_schema)
yield from write_file(table, iter([WriteTask(write_uuid, next(counter), batches) for batches in bin_pack_arrow_table(df)]), file_schema=file_schema)


class _MergingSnapshotProducer:
Expand Down

0 comments on commit 7b4f3ce

Please sign in to comment.