Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Centralized table properties management #388

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -155,7 +155,7 @@ def _construct_hive_storage_descriptor(schema: Schema, location: Optional[str])
PROP_TABLE_TYPE = "table_type"
PROP_METADATA_LOCATION = "metadata_location"
PROP_PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
DEFAULT_PROPERTIES = {'write.parquet.compression-codec': 'zstd'}
DEFAULT_PROPERTIES = {TableProperties.PARQUET_COMPRESSION: TableProperties.PARQUET_COMPRESSION_DEFAULT}


def _construct_parameters(metadata_location: str, previous_metadata_location: Optional[str] = None) -> Dict[str, Any]:
Expand Down
67 changes: 33 additions & 34 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import WriteTask
from pyiceberg.table import PropertyUtil, TableProperties, WriteTask
from pyiceberg.table.name_mapping import NameMapping
from pyiceberg.transforms import TruncateTransform
from pyiceberg.typedef import EMPTY_DICT, Properties, Record
Expand Down Expand Up @@ -1384,19 +1384,12 @@ class MetricModeTypes(Enum):
FULL = "full"


DEFAULT_METRICS_MODE_KEY = "write.metadata.metrics.default"
COLUMN_METRICS_MODE_KEY_PREFIX = "write.metadata.metrics.column"


@dataclass(frozen=True)
class MetricsMode(Singleton):
type: MetricModeTypes
length: Optional[int] = None


_DEFAULT_METRICS_MODE = MetricsMode(MetricModeTypes.TRUNCATE, DEFAULT_TRUNCATION_LENGTH)


def match_metrics_mode(mode: str) -> MetricsMode:
sanitized_mode = mode.strip().lower()
if sanitized_mode.startswith("truncate"):
Expand Down Expand Up @@ -1430,12 +1423,14 @@ class PyArrowStatisticsCollector(PreOrderSchemaVisitor[List[StatisticsCollector]
_field_id: int = 0
_schema: Schema
_properties: Dict[str, str]
_default_mode: Optional[str]
_default_mode: str

def __init__(self, schema: Schema, properties: Dict[str, str]):
self._schema = schema
self._properties = properties
self._default_mode = self._properties.get(DEFAULT_METRICS_MODE_KEY)
self._default_mode = self._properties.get(
TableProperties.DEFAULT_WRITE_METRICS_MODE, TableProperties.DEFAULT_WRITE_METRICS_MODE_DEFAULT
)

def schema(self, schema: Schema, struct_result: Callable[[], List[StatisticsCollector]]) -> List[StatisticsCollector]:
return struct_result()
Expand Down Expand Up @@ -1470,12 +1465,9 @@ def primitive(self, primitive: PrimitiveType) -> List[StatisticsCollector]:
if column_name is None:
return []

metrics_mode = _DEFAULT_METRICS_MODE

if self._default_mode:
metrics_mode = match_metrics_mode(self._default_mode)
metrics_mode = match_metrics_mode(self._default_mode)

col_mode = self._properties.get(f"{COLUMN_METRICS_MODE_KEY_PREFIX}.{column_name}")
col_mode = self._properties.get(f"{TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX}.{column_name}")
if col_mode:
metrics_mode = match_metrics_mode(col_mode)

Expand Down Expand Up @@ -1762,33 +1754,40 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
return iter([data_file])


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
def _get_int(key: str, default: Optional[int] = None) -> Optional[int]:
if value := table_properties.get(key):
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e
else:
return default
ICEBERG_UNCOMPRESSED_CODEC = "uncompressed"
PYARROW_UNCOMPRESSED_CODEC = "none"


def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]:
for key_pattern in [
"write.parquet.row-group-size-bytes",
"write.parquet.page-row-limit",
"write.parquet.bloom-filter-max-bytes",
"write.parquet.bloom-filter-enabled.column.*",
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_PAGE_ROW_LIMIT,
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
]:
if unsupported_keys := fnmatch.filter(table_properties, key_pattern):
raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented")

compression_codec = table_properties.get("write.parquet.compression-codec", "zstd")
compression_level = _get_int("write.parquet.compression-level")
if compression_codec == "uncompressed":
compression_codec = "none"
compression_codec = table_properties.get(TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT)
compression_level = PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_COMPRESSION_LEVEL,
default=TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
)
if compression_codec == ICEBERG_UNCOMPRESSED_CODEC:
compression_codec = PYARROW_UNCOMPRESSED_CODEC

return {
"compression": compression_codec,
"compression_level": compression_level,
"data_page_size": _get_int("write.parquet.page-size-bytes"),
"dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes", default=2 * 1024 * 1024),
"data_page_size": PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_PAGE_SIZE_BYTES,
default=TableProperties.PARQUET_PAGE_SIZE_BYTES_DEFAULT,
),
"dictionary_pagesize_limit": PropertyUtil.property_as_int(
properties=table_properties,
property_name=TableProperties.PARQUET_DICT_SIZE_BYTES,
default=TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT,
),
}
50 changes: 47 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
TableMetadataUtil,
)
from pyiceberg.table.name_mapping import (
SCHEMA_NAME_MAPPING_DEFAULT,
NameMapping,
create_mapping_from_schema,
parse_mapping_from_json,
Expand Down Expand Up @@ -134,6 +133,50 @@
_JAVA_LONG_MAX = 9223372036854775807


class TableProperties:
PARQUET_ROW_GROUP_SIZE_BYTES = "write.parquet.row-group-size-bytes"
PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT = 128 * 1024 * 1024 # 128 MB

PARQUET_PAGE_SIZE_BYTES = "write.parquet.page-size-bytes"
PARQUET_PAGE_SIZE_BYTES_DEFAULT = 1024 * 1024 # 1 MB

PARQUET_PAGE_ROW_LIMIT = "write.parquet.page-row-limit"
PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20000

PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes"
PARQUET_DICT_SIZE_BYTES_DEFAULT = 2 * 1024 * 1024 # 2 MB

PARQUET_COMPRESSION = "write.parquet.compression-codec"
PARQUET_COMPRESSION_DEFAULT = "zstd"

PARQUET_COMPRESSION_LEVEL = "write.parquet.compression-level"
PARQUET_COMPRESSION_LEVEL_DEFAULT = None

PARQUET_BLOOM_FILTER_MAX_BYTES = "write.parquet.bloom-filter-max-bytes"
PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT = 1024 * 1024

PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX = "write.parquet.bloom-filter-enabled.column"

DEFAULT_WRITE_METRICS_MODE = "write.metadata.metrics.default"
DEFAULT_WRITE_METRICS_MODE_DEFAULT = "truncate(16)"

METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column"

DEFAULT_NAME_MAPPING = "schema.name-mapping.default"


class PropertyUtil:
@staticmethod
def property_as_int(properties: Dict[str, str], property_name: str, default: Optional[int] = None) -> Optional[int]:
if value := properties.get(property_name):
try:
return int(value)
except ValueError as e:
raise ValueError(f"Could not parse table property {property_name} to an integer: {value}") from e
else:
return default


class Transaction:
_table: Table
_updates: Tuple[TableUpdate, ...]
Expand Down Expand Up @@ -921,7 +964,7 @@ def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive

def name_mapping(self) -> NameMapping:
"""Return the table's field-id NameMapping."""
if name_mapping_json := self.properties.get(SCHEMA_NAME_MAPPING_DEFAULT):
if name_mapping_json := self.properties.get(TableProperties.DEFAULT_NAME_MAPPING):
return parse_mapping_from_json(name_mapping_json)
else:
return create_mapping_from_schema(self.schema())
Expand Down Expand Up @@ -1493,7 +1536,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
Fokko marked this conversation as resolved.
Show resolved Hide resolved
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
)
return self
Expand Down
2 changes: 0 additions & 2 deletions pyiceberg/table/name_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
from pyiceberg.typedef import IcebergBaseModel, IcebergRootModel
from pyiceberg.types import ListType, MapType, NestedField, PrimitiveType, StructType

SCHEMA_NAME_MAPPING_DEFAULT = "schema.name-mapping.default"


class MappedField(IcebergBaseModel):
field_id: int = Field(alias="field-id")
Expand Down