-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use write.parquet.compression-{codec,level}
#358
Changes from 8 commits
9544e94
e385e77
3e678d1
029aabe
4ea76fb
6536f8e
809ca2e
8ea5ad5
517f2e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -26,6 +26,7 @@ | |||||||
from __future__ import annotations | ||||||||
|
||||||||
import concurrent.futures | ||||||||
import fnmatch | ||||||||
import itertools | ||||||||
import logging | ||||||||
import os | ||||||||
|
@@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | |||||||
except StopIteration: | ||||||||
pass | ||||||||
|
||||||||
parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) | ||||||||
|
||||||||
file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' | ||||||||
file_schema = schema_to_pyarrow(table.schema()) | ||||||||
|
||||||||
collected_metrics: List[pq.FileMetaData] = [] | ||||||||
fo = table.io.new_output(file_path) | ||||||||
with fo.create(overwrite=True) as fos: | ||||||||
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: | ||||||||
with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **parquet_writer_kwargs) as writer: | ||||||||
writer.write_table(task.df) | ||||||||
|
||||||||
data_file = DataFile( | ||||||||
|
@@ -1745,14 +1747,42 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | |||||||
key_metadata=None, | ||||||||
) | ||||||||
|
||||||||
if len(collected_metrics) != 1: | ||||||||
# One file has been written | ||||||||
raise ValueError(f"Expected 1 entry, got: {collected_metrics}") | ||||||||
|
||||||||
fill_parquet_file_metadata( | ||||||||
data_file=data_file, | ||||||||
parquet_metadata=collected_metrics[0], | ||||||||
parquet_metadata=writer.writer.metadata, | ||||||||
stats_columns=compute_statistics_plan(table.schema(), table.properties), | ||||||||
parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), | ||||||||
) | ||||||||
return iter([data_file]) | ||||||||
|
||||||||
|
||||||||
def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: | ||||||||
def _get_int(key: str) -> Optional[int]: | ||||||||
if value := table_properties.get(key): | ||||||||
try: | ||||||||
return int(value) | ||||||||
except ValueError as e: | ||||||||
raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e | ||||||||
else: | ||||||||
return None | ||||||||
|
||||||||
for key_pattern in [ | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to blow up if one of the properties isn't set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We want to raise if one of the properties is set. But I guess we should check for None There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. None is not allowed, reverted my changes |
||||||||
"write.parquet.row-group-size-bytes", | ||||||||
"write.parquet.page-row-limit", | ||||||||
"write.parquet.bloom-filter-max-bytes", | ||||||||
"write.parquet.bloom-filter-enabled.column.*", | ||||||||
]: | ||||||||
if unsupported_keys := fnmatch.filter(table_properties, key_pattern): | ||||||||
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") | ||||||||
|
||||||||
compression_codec = table_properties.get("write.parquet.compression-codec") | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
How about adding the default value here? RestCatalog backend and HiveCatalog explicitly set the default codec at catalog level. iceberg-python/pyiceberg/catalog/hive.py Line 158 in 02e6430
But other catalogs, such as |
||||||||
compression_level = _get_int("write.parquet.compression-level") | ||||||||
if compression_codec == "uncompressed": | ||||||||
compression_codec = "none" | ||||||||
|
||||||||
return { | ||||||||
"compression": compression_codec, | ||||||||
"compression_level": compression_level, | ||||||||
"data_page_size": _get_int("write.parquet.page-size-bytes"), | ||||||||
"dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"), | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked this through the debugger, and this looks good. Nice change @jonashaag 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also tell from the PyArrow code that it's identical :)