Skip to content

Commit

Permalink
Use write.parquet.compression-{codec,level} (#358)
Browse files Browse the repository at this point in the history
* Use `write.parquet.compression-{codec,level}`

* Cleanup

* Review feedback

* Review feedback

* Review feedback

* Update pyiceberg/io/pyarrow.py

Co-authored-by: Fokko Driesprong <[email protected]>

* Fixup

* Fixup

* Fixup

---------

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
jonashaag and Fokko authored Feb 5, 2024
1 parent a3d529b commit 9e4ed29
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 131 deletions.
44 changes: 37 additions & 7 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from __future__ import annotations

import concurrent.futures
import fnmatch
import itertools
import logging
import os
Expand Down Expand Up @@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
except StopIteration:
pass

parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties)

file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}'
file_schema = schema_to_pyarrow(table.schema())

collected_metrics: List[pq.FileMetaData] = []
fo = table.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer:
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **parquet_writer_kwargs) as writer:
writer.write_table(task.df)

data_file = DataFile(
Expand All @@ -1745,14 +1747,42 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
key_metadata=None,
)

if len(collected_metrics) != 1:
# One file has been written
raise ValueError(f"Expected 1 entry, got: {collected_metrics}")

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=collected_metrics[0],
parquet_metadata=writer.writer.metadata,
stats_columns=compute_statistics_plan(table.schema(), table.properties),
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()),
)
return iter([data_file])


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
def _get_int(key: str) -> Optional[int]:
if value := table_properties.get(key):
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e
else:
return None

for key_pattern in [
"write.parquet.row-group-size-bytes",
"write.parquet.page-row-limit",
"write.parquet.bloom-filter-max-bytes",
"write.parquet.bloom-filter-enabled.column.*",
]:
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented")

compression_codec = table_properties.get("write.parquet.compression-codec", "zstd")
compression_level = _get_int("write.parquet.compression-level")
if compression_codec == "uncompressed":
compression_codec = "none"

return {
"compression": compression_codec,
"compression_level": compression_level,
"data_page_size": _get_int("write.parquet.page-size-bytes"),
"dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"),
}
6 changes: 5 additions & 1 deletion tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None:
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])
def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None:
table_test_all_types = catalog.load_table("default.test_all_types")
fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password")
fs = S3FileSystem(
endpoint_override=catalog.properties["s3.endpoint"],
access_key=catalog.properties["s3.access-key-id"],
secret_key=catalog.properties["s3.secret-access-key"],
)
data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()]
for data_file_path in data_file_paths:
uri = urlparse(data_file_path)
Expand Down
Loading

0 comments on commit 9e4ed29

Please sign in to comment.