diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3a116dec3a..d4b28fbdda 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2572,13 +2572,11 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self) -> Summary: ssc = SnapshotSummaryCollector() - partition_summary_limit = self._transaction.table_metadata.properties.get( - TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT - ) - if isinstance(partition_summary_limit, str): - raise ValueError( - f"WRITE_PARTITION_SUMMARY_LIMIT in table properties should be int but get str: {partition_summary_limit}" + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT ) + ) ssc.set_partition_summary_limit(partition_summary_limit) for data_file in self._added_data_files: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index b7a321a6c2..f74ac4b7d4 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -23,7 +23,7 @@ from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list -from pyiceberg.partitioning import PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel @@ -272,20 +272,16 @@ def __init__(self) -> None: def set_partition_summary_limit(self, limit: int) -> None: self.max_changed_partitions_for_summaries = limit - def add_file( - self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None - ) -> None: + def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None: self.metrics.add_file(data_file) - if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: - if partition_spec is None or schema is None: - raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + if len(data_file.partition.record_fields()) != 0: self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema) - def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None: + def remove_file( + self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC + ) -> None: self.metrics.remove_file(data_file) - if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: - if partition_spec is None or schema is None: - raise ValueError("remove data file with partition but without specifying the partiton_spec and schema") + if len(data_file.partition.record_fields()) != 0: self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: @@ -303,12 +299,12 @@ def build(self) -> Dict[str, str]: set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP) if changed_partitions_size <= self.max_changed_partitions_for_summaries: for partition_path, update_metrics_partition in self.partition_metrics.items(): - if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0: + if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0: properties[CHANGED_PARTITION_PREFIX + partition_path] = summary return properties - def partition_summary(self, update_metrics: UpdateMetrics) -> str: + def _partition_summary(self, update_metrics: UpdateMetrics) -> str: return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()]) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 7c4fe11841..e85ecce506 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -147,21 +147,13 @@ def manifest_file() -> ManifestFile: ) -@pytest.fixture -def data_file() -> DataFile: - return DataFile( - content=DataFileContent.DATA, - record_count=100, - file_size_in_bytes=1234, - ) - - -def test_snapshot_summary_collector(data_file: DataFile) -> None: +@pytest.mark.integration +def test_snapshot_summary_collector(table_schema_simple: Schema) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} - - ssc.add_file(data_file) + data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) + ssc.add_file(data_file, schema=table_schema_simple) assert ssc.build() == { 'added-data-files': '1', @@ -170,6 +162,7 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None: } +@pytest.mark.integration def test_snapshot_summary_collector_with_partition() -> None: # Given