From bc4a0d672d6a78fccc4b48da977187ed1ecfeca4 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 6 Feb 2024 23:30:58 -0800 Subject: [PATCH 1/3] add TableProperties and PropertyUtil --- pyiceberg/catalog/hive.py | 4 +-- pyiceberg/io/pyarrow.py | 64 +++++++++++++++++---------------- pyiceberg/table/__init__.py | 50 ++++++++++++++++++++++++-- pyiceberg/table/name_mapping.py | 2 -- 4 files changed, 82 insertions(+), 38 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 8069321095..d81404f77a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -67,7 +67,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT @@ -155,7 +155,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str]) PROP_TABLE_TYPE = "table_type" PROP_METADATA_LOCATION = "metadata_location" PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location" -DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'} +DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT} def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]: diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 91d8452eab..69e2af3dc1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -124,7 +124,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import WriteTask +from pyiceberg.table import PropertyUtil, TableProperties, WriteTask from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform from pyiceberg.typedef import EMPTY_DICT, Properties, Record @@ -1384,10 +1384,6 @@ class MetricModeTypes(Enum): FULL = "full" -DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default" -COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column" - - @dataclass(frozen=True) class MetricsMode(Singleton): type: MetricModeTypes @@ -1430,12 +1426,14 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector] _field_id: int = 0 _schema: Schema _properties: Dict[str, str] - _default_mode: Optional[str] + _default_mode: str def __init__(self, schema: Schema, properties: Dict[str, str]): self._schema = schema self._properties = properties - self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY) + self._default_mode = self._properties.get( + TableProperties.DEFAULT_WRITE_METRICS_MODE, TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT + ) def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]: return struct_result() @@ -1470,12 +1468,9 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]: if column_name is None: return [] - metrics_mode = _DEFAULT_METRICS_MODE + metrics_mode = match_metrics_mode(self._default_mode) - if self._default_mode: - metrics_mode = match_metrics_mode(self._default_mode) - - col_mode = self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}") + col_mode = self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}") if col_mode: metrics_mode = match_metrics_mode(col_mode) @@ -1762,33 +1757,40 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: return iter([data_file]) -def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: - def _get_int(key: str, default: Optional[int] = None) -> Optional[int]: - if value := table_properties.get(key): - try: - return int(value) - except ValueError as e: - raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e - else: - return default +ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" +PYARROW_UNCOMPRESSED_CODEC = "none" + +def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: for key_pattern in [ - "write.parquet.row-group-size-bytes", - "write.parquet.page-row-limit", - "write.parquet.bloom-filter-max-bytes", - "write.parquet.bloom-filter-enabled.column.*", + TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, + TableProperties.PARQUET_PAGE_ROW_LIMIT, + TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES, + f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*", ]: if unsupported_keys := fnmatch.filter(table_properties, key_pattern): raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") - compression_codec = table_properties.get("write.parquet.compression-codec", "zstd") - compression_level = _get_int("write.parquet.compression-level") - if compression_codec == "uncompressed": - compression_codec = "none" + compression_codec = table_properties.get(TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT) + compression_level = PropertyUtil.property_as_int( + properties=table_properties, + property_name=TableProperties.PARQUET_COMPRESSION_LEVEL, + default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT, + ) + if compression_codec == ICEBERG_UNCOMPRESSED_CODEC: + compression_codec = PYARROW_UNCOMPRESSED_CODEC return { "compression": compression_codec, "compression_level": compression_level, - "data_page_size": _get_int("write.parquet.page-size-bytes"), - "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", default=2 * 1024 * 1024), + "data_page_size": PropertyUtil.property_as_int( + properties=table_properties, + property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES, + default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT, + ), + "dictionary_pagesize_limit": PropertyUtil.property_as_int( + properties=table_properties, + property_name=TableProperties.PARQUET_DICT_SIZE_BYTES, + default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT, + ), } diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b9d44b7c4d..8dbafc05fb 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -85,7 +85,6 @@ TableMetadataUtil, ) from pyiceberg.table.name_mapping import ( - SCHEMA_NAME_MAPPING_DEFAULT, NameMapping, create_mapping_from_schema, parse_mapping_from_json, @@ -134,6 +133,50 @@ _JAVA_LONG_MAX = 9223372036854775807 +class TableProperties: + PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes" + PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB + + PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes" + PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB + + PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit" + PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000 + + PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes" + PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024 # 2 MB + + PARQUET_COMPRESSION = "write.parquet.compression-codec" + PARQUET_COMPRESSION_DEFAULT = "zstd" + + PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level" + PARQUET_COMPRESSION_LEVEL_DEFAULT = None + + PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes" + PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024 + + PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column" + + DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default" + DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)" + + METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column" + + DEFAULT_NAME_MAPPING = "schema.name-mapping.default" + + +class PropertyUtil: + @staticmethod + def property_as_int(properties: Dict[str, str], property_name: str, default: Optional[int] = None) -> Optional[int]: + if value := properties.get(property_name): + try: + return int(value) + except ValueError as e: + raise ValueError(f"Could not parse table property {property_name} to an integer: {value}") from e + else: + return default + + class Transaction: _table: Table _updates: Tuple[TableUpdate, ...] @@ -921,7 +964,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive def name_mapping(self) -> NameMapping: """Return the table's field-id NameMapping.""" - if name_mapping_json := self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT): + if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING): return parse_mapping_from_json(name_mapping_json) else: return create_mapping_from_schema(self.schema()) @@ -1493,7 +1536,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: visit_with_partner( Catalog._convert_schema_if_needed(new_schema), -1, - UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore + UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), + # type: ignore PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive), ) return self diff --git a/pyiceberg/table/name_mapping.py b/pyiceberg/table/name_mapping.py index 84a295f5e4..ffe96359a8 100644 --- a/pyiceberg/table/name_mapping.py +++ b/pyiceberg/table/name_mapping.py @@ -34,8 +34,6 @@ from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, StructType -SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default" - class MappedField(IcebergBaseModel): field_id: int = Field(alias="field-id") From 2bcbe462792fc433bf3b94ba408e4cae1ab2ed39 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 6 Feb 2024 23:32:49 -0800 Subject: [PATCH 2/3] fix lint --- pyiceberg/io/pyarrow.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 69e2af3dc1..5867843f1d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1390,9 +1390,6 @@ class MetricsMode(Singleton): length: Optional[int] = None -_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, DEFAULT_TRUNCATION_LENGTH) - - def match_metrics_mode(mode: str) -> MetricsMode: sanitized_mode = mode.strip().lower() if sanitized_mode.startswith("truncate"): From c0e4316961c5438dccea1b9d0b46f9501db749eb Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 7 Feb 2024 10:19:34 +0100 Subject: [PATCH 3/3] Revert unrelated change --- pyiceberg/table/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8dbafc05fb..8c0a08363a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1536,8 +1536,7 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: visit_with_partner( Catalog._convert_schema_if_needed(new_schema), -1, - UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), - # type: ignore + UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive), ) return self