From 3a2d3f397fba215bb53d901f94fdf0eb757d019d Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 12 Mar 2024 17:50:45 +0000 Subject: [PATCH 1/4] add partition stats in snapshot summary --- pyiceberg/table/__init__.py | 17 +- pyiceberg/table/snapshots.py | 290 +++++++++++++++++++++---------- tests/integration/test_writes.py | 11 +- tests/table/test_snapshots.py | 41 +++++ 4 files changed, 259 insertions(+), 100 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b244dfb16d..3a116dec3a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -212,6 +212,9 @@ class TableProperties: METRICS_MODE_COLUMN_CONF_PREFIX = "write.metadata.metrics.column" + WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" + WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 + DEFAULT_NAME_MAPPING = "schema.name-mapping.default" FORMAT_VERSION = "format-version" DEFAULT_FORMAT_VERSION = 2 @@ -2569,9 +2572,21 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self) -> Summary: ssc = SnapshotSummaryCollector() + partition_summary_limit = self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + if isinstance(partition_summary_limit, str): + raise ValueError( + f"WRITE_PARTITION_SUMMARY_LIMIT in table properties should be int but get str: {partition_summary_limit}" + ) + ssc.set_partition_summary_limit(partition_summary_limit) for data_file in self._added_data_files: - ssc.add_file(data_file=data_file) + ssc.add_file( + data_file=data_file, + partition_spec=self._transaction.table_metadata.spec(), + schema=self._transaction.table_metadata.schema(), + ) previous_snapshot = ( self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index a2f15d4405..48f3041b02 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -15,19 +15,16 @@ # specific language governing permissions and limitations # under the License. import time +from collections import defaultdict from enum import Enum -from typing import ( - Any, - Dict, - List, - Mapping, - Optional, -) +from typing import Any, DefaultDict, Dict, List, Mapping, Optional from pydantic import Field, PrivateAttr, model_serializer from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel ADDED_DATA_FILES = 'added-data-files' @@ -52,8 +49,8 @@ TOTAL_DELETE_FILES = 'total-delete-files' TOTAL_RECORDS = 'total-records' TOTAL_FILE_SIZE = 'total-files-size' - - +CHANGED_PARTITION_COUNT_PROP = 'changed-partition-count' +CHANGED_PARTITION_PREFIX = "partitions." OPERATION = "operation" @@ -77,6 +74,147 @@ def __repr__(self) -> str: return f"Operation.{self.name}" +class UpdateMetrics: + added_file_size: int + removed_file_size: int + added_data_files: int + removed_data_files: int + added_eq_delete_files: int + removed_eq_delete_files: int + added_pos_delete_files: int + removed_pos_delete_files: int + added_delete_files: int + removed_delete_files: int + added_records: int + deleted_records: int + added_pos_deletes: int + removed_pos_deletes: int + added_eq_deletes: int + removed_eq_deletes: int + + # def clear() { + # self.added_file_size = 0 + # self.removed_file_size = 0 + # self.added_data_files = 0 + # self.removed_data_files = 0 + # self.added_eq_delete_files = 0 + # self.removed_eq_delete_files = 0 + # self.added_pos_delete_files = 0 + # self.removed_pos_delete_files = 0 + # self.added_delete_files = 0 + # self.removed_delete_files = 0 + # self.added_records = 0 + # self.deleted_records = 0 + # self.added_pos_deletes = 0 + # self.removed_pos_deletes = 0 + # self.added_eq_deletes = 0 + # self.removed_eq_deletes = 0 + # } + def __init__(self) -> None: + self.added_file_size = 0 + self.removed_file_size = 0 + self.added_data_files = 0 + self.removed_data_files = 0 + self.added_eq_delete_files = 0 + self.removed_eq_delete_files = 0 + self.added_pos_delete_files = 0 + self.removed_pos_delete_files = 0 + self.added_delete_files = 0 + self.removed_delete_files = 0 + self.added_records = 0 + self.deleted_records = 0 + self.added_pos_deletes = 0 + self.removed_pos_deletes = 0 + self.added_eq_deletes = 0 + self.removed_eq_deletes = 0 + + # def added_file(file: DataFile) -> None: + # self.added_file_size += file.file_size_in_bytes + # if file.content == DataFileContent.DATA: + # self.added_data_files += 1 + # self.added_records += file.record_count + # elif file.content == DataFileContent.POSITION_DELETES: + # self.added_delete_files += 1 + # self.added_pos_delete_files += 1 + # self.added_pos_deletes += file.record_count + # elif file.content == DataFileContent.EQUALITY_DELETES: + # self.added_delete_files += 1 + # self.added_eq_delete_files += 1 + # self.added_eq_deletes += file.record_count + # else: + # raise ValueError("Unsupported file content type: " + file.content()) + + # def removed_file(file: DataFile) -> None: + # self.removed_file_size += file.file_size_in_bytes + # if file.content == DataFileContent.DATA: + # self.removed_data_files += 1 + # self.deleted_records += file.record_count + # elif file.content == DataFileContent.POSITION_DELETES: + # self.removed_delete_files += 1 + # self.removed_pos_delete_files += 1 + # self.removed_pos_deletes += file.record_count + # elif file.content == DataFileContent.EQUALITY_DELETES + # self.removed_delete_files += 1 + # self.removed_eq_delete_files += 1 + # self.removed_eq_deletes += file.record_count + # else: + # raise ValueError("Unsupported file content type: " + file.content()) + + def add_file(self, data_file: DataFile) -> None: + self.added_file_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.added_data_files += 1 + self.added_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.added_delete_files += 1 + self.added_pos_delete_files += 1 + self.added_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.added_delete_files += 1 + self.added_eq_delete_files += 1 + self.added_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def remove_file(self, data_file: DataFile) -> None: + self.removed_file_size += data_file.file_size_in_bytes + + if data_file.content == DataFileContent.DATA: + self.removed_data_files += 1 + self.deleted_records += data_file.record_count + elif data_file.content == DataFileContent.POSITION_DELETES: + self.removed_delete_files += 1 + self.removed_pos_delete_files += 1 + self.removed_pos_deletes += data_file.record_count + elif data_file.content == DataFileContent.EQUALITY_DELETES: + self.removed_delete_files += 1 + self.removed_eq_delete_files += 1 + self.removed_eq_deletes += data_file.record_count + else: + raise ValueError(f"Unknown data file content: {data_file.content}") + + def to_dict(self) -> Dict[str, str]: + properties: Dict[str, str] = {} + set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) + set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) + set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) + set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) + set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) + set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) + set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) + set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) + set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) + set_when_positive(properties, self.added_records, ADDED_RECORDS) + set_when_positive(properties, self.deleted_records, DELETED_RECORDS) + set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) + set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) + set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) + set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + return properties + + class Summary(IcebergBaseModel, Mapping[str, str]): """A class that stores the summary information for a Snapshot. @@ -172,100 +310,57 @@ class SnapshotLogEntry(IcebergBaseModel): class SnapshotSummaryCollector: - added_file_size: int - removed_file_size: int - added_data_files: int - removed_data_files: int - added_eq_delete_files: int - removed_eq_delete_files: int - added_pos_delete_files: int - removed_pos_delete_files: int - added_delete_files: int - removed_delete_files: int - added_records: int - deleted_records: int - added_pos_deletes: int - removed_pos_deletes: int - added_eq_deletes: int - removed_eq_deletes: int + metrics: UpdateMetrics + partition_metrics: DefaultDict[str, UpdateMetrics] + max_changed_partitions_for_summaries: int def __init__(self) -> None: - self.added_file_size = 0 - self.removed_file_size = 0 - self.added_data_files = 0 - self.removed_data_files = 0 - self.added_eq_delete_files = 0 - self.removed_eq_delete_files = 0 - self.added_pos_delete_files = 0 - self.removed_pos_delete_files = 0 - self.added_delete_files = 0 - self.removed_delete_files = 0 - self.added_records = 0 - self.deleted_records = 0 - self.added_pos_deletes = 0 - self.removed_pos_deletes = 0 - self.added_eq_deletes = 0 - self.removed_eq_deletes = 0 - - def add_file(self, data_file: DataFile) -> None: - self.added_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.added_data_files += 1 - self.added_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.added_delete_files += 1 - self.added_pos_delete_files += 1 - self.added_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.added_delete_files += 1 - self.added_eq_delete_files += 1 - self.added_eq_deletes += data_file.record_count - else: - raise ValueError(f"Unknown data file content: {data_file.content}") - - def remove_file(self, data_file: DataFile) -> None: - self.removed_file_size += data_file.file_size_in_bytes - - if data_file.content == DataFileContent.DATA: - self.removed_data_files += 1 - self.deleted_records += data_file.record_count - elif data_file.content == DataFileContent.POSITION_DELETES: - self.removed_delete_files += 1 - self.removed_pos_delete_files += 1 - self.removed_pos_deletes += data_file.record_count - elif data_file.content == DataFileContent.EQUALITY_DELETES: - self.removed_delete_files += 1 - self.removed_eq_delete_files += 1 - self.removed_eq_deletes += data_file.record_count + self.metrics = UpdateMetrics() + self.partition_metrics = defaultdict(UpdateMetrics) + self.max_changed_partitions_for_summaries = 0 + + def set_partition_summary_limit(self, limit: int) -> None: + self.max_changed_partitions_for_summaries = limit + + def add_file( + self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None + ) -> None: + self.metrics.add_file(data_file) + if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: + if partition_spec is None or schema is None: + raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema) + + def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None: + self.metrics.remove_file(data_file) + if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: + if partition_spec is None or schema is None: + raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) + + def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: + partition_path = partition_spec.partition_to_path(file.partition, schema) + partition_metrics: UpdateMetrics = self.partition_metrics[partition_path] + + if is_add_file: + partition_metrics.add_file(file) else: - raise ValueError(f"Unknown data file content: {data_file.content}") + partition_metrics.remove_file(file) def build(self) -> Dict[str, str]: - def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: - if num > 0: - properties[property_name] = str(num) - - properties: Dict[str, str] = {} - set_when_positive(properties, self.added_file_size, ADDED_FILE_SIZE) - set_when_positive(properties, self.removed_file_size, REMOVED_FILE_SIZE) - set_when_positive(properties, self.added_data_files, ADDED_DATA_FILES) - set_when_positive(properties, self.removed_data_files, DELETED_DATA_FILES) - set_when_positive(properties, self.added_eq_delete_files, ADDED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.removed_eq_delete_files, REMOVED_EQUALITY_DELETE_FILES) - set_when_positive(properties, self.added_pos_delete_files, ADDED_POSITION_DELETE_FILES) - set_when_positive(properties, self.removed_pos_delete_files, REMOVED_POSITION_DELETE_FILES) - set_when_positive(properties, self.added_delete_files, ADDED_DELETE_FILES) - set_when_positive(properties, self.removed_delete_files, REMOVED_DELETE_FILES) - set_when_positive(properties, self.added_records, ADDED_RECORDS) - set_when_positive(properties, self.deleted_records, DELETED_RECORDS) - set_when_positive(properties, self.added_pos_deletes, ADDED_POSITION_DELETES) - set_when_positive(properties, self.removed_pos_deletes, REMOVED_POSITION_DELETES) - set_when_positive(properties, self.added_eq_deletes, ADDED_EQUALITY_DELETES) - set_when_positive(properties, self.removed_eq_deletes, REMOVED_EQUALITY_DELETES) + properties = self.metrics.to_dict() + changed_partitions_size = len(self.partition_metrics) + set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP) + if changed_partitions_size <= self.max_changed_partitions_for_summaries: + for partition_path, update_metrics_partition in self.partition_metrics.items(): + if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0: + properties[CHANGED_PARTITION_PREFIX + partition_path] = summary return properties + def partition_summary(self, update_metrics: UpdateMetrics) -> str: + return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()]) + def _truncate_table_summary(summary: Summary, previous_summary: Mapping[str, str]) -> Summary: for prop in { @@ -366,3 +461,8 @@ def _update_totals(total_property: str, added_property: str, removed_property: s ) return summary + + +def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: + if num > 0: + properties[property_name] = str(num) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 82c4ace711..c7095cb71b 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -525,12 +525,15 @@ def test_summaries_with_only_nulls( 'total-records': '2', } - assert summaries[0] == { - 'total-data-files': '0', - 'total-delete-files': '0', + assert summaries[2] == { + 'removed-files-size': '4239', 'total-equality-deletes': '0', - 'total-files-size': '0', 'total-position-deletes': '0', + 'deleted-data-files': '1', + 'total-delete-files': '0', + 'total-files-size': '0', + 'deleted-records': '2', + 'total-data-files': '0', 'total-records': '0', } diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 3591847ad6..35ac3e280b 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -18,7 +18,17 @@ import pytest from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestFile +from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation, Snapshot, SnapshotSummaryCollector, Summary, update_snapshot_summaries +from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import Record +from pyiceberg.types import ( + BooleanType, + IntegerType, + NestedField, + StringType, +) @pytest.fixture @@ -146,6 +156,11 @@ def data_file() -> DataFile: ) +@pytest.fixture +def data_file_with_partition() -> DataFile: + return DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) + + def test_snapshot_summary_collector(data_file: DataFile) -> None: ssc = SnapshotSummaryCollector() @@ -160,6 +175,32 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None: } +def test_snapshot_summary_collector_with_partition(data_file_with_partition: DataFile) -> None: + ssc = SnapshotSummaryCollector() + + assert ssc.build() == {} + schema = Schema( + NestedField(field_id=1, name="bool_field", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="string_field", field_type=StringType(), required=False), + NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), + ) + spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name='int_field')) + ssc.set_partition_summary_limit(10) + ssc.add_file(data_file=data_file_with_partition, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_with_partition, schema=schema, partition_spec=spec) + + assert ssc.build() == { + 'added-files-size': '1234', + 'removed-files-size': '1234', + 'added-data-files': '1', + 'deleted-data-files': '1', + 'added-records': '100', + 'deleted-records': '100', + 'changed-partition-count': '1', + 'partitions.int_field=1': 'added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100', + } + + def test_merge_snapshot_summaries_empty() -> None: assert update_snapshot_summaries(Summary(Operation.APPEND)) == Summary( operation=Operation.APPEND, From dafb0a441c1636023c3e931d2ab699123a2adc36 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 12 Mar 2024 17:58:33 +0000 Subject: [PATCH 2/4] clean up --- pyiceberg/table/snapshots.py | 32 -------------------------------- 1 file changed, 32 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 48f3041b02..769bb7bf8a 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -128,38 +128,6 @@ def __init__(self) -> None: self.added_eq_deletes = 0 self.removed_eq_deletes = 0 - # def added_file(file: DataFile) -> None: - # self.added_file_size += file.file_size_in_bytes - # if file.content == DataFileContent.DATA: - # self.added_data_files += 1 - # self.added_records += file.record_count - # elif file.content == DataFileContent.POSITION_DELETES: - # self.added_delete_files += 1 - # self.added_pos_delete_files += 1 - # self.added_pos_deletes += file.record_count - # elif file.content == DataFileContent.EQUALITY_DELETES: - # self.added_delete_files += 1 - # self.added_eq_delete_files += 1 - # self.added_eq_deletes += file.record_count - # else: - # raise ValueError("Unsupported file content type: " + file.content()) - - # def removed_file(file: DataFile) -> None: - # self.removed_file_size += file.file_size_in_bytes - # if file.content == DataFileContent.DATA: - # self.removed_data_files += 1 - # self.deleted_records += file.record_count - # elif file.content == DataFileContent.POSITION_DELETES: - # self.removed_delete_files += 1 - # self.removed_pos_delete_files += 1 - # self.removed_pos_deletes += file.record_count - # elif file.content == DataFileContent.EQUALITY_DELETES - # self.removed_delete_files += 1 - # self.removed_eq_delete_files += 1 - # self.removed_eq_deletes += file.record_count - # else: - # raise ValueError("Unsupported file content type: " + file.content()) - def add_file(self, data_file: DataFile) -> None: self.added_file_size += data_file.file_size_in_bytes From cbf1afd2eee4a664425b7de6368afb2df22c8eda Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Tue, 12 Mar 2024 18:44:07 +0000 Subject: [PATCH 3/4] enhance unit test --- pyiceberg/table/snapshots.py | 20 +----------------- tests/table/test_snapshots.py | 40 ++++++++++++++++++++++++----------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 769bb7bf8a..b7a321a6c2 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -92,24 +92,6 @@ class UpdateMetrics: added_eq_deletes: int removed_eq_deletes: int - # def clear() { - # self.added_file_size = 0 - # self.removed_file_size = 0 - # self.added_data_files = 0 - # self.removed_data_files = 0 - # self.added_eq_delete_files = 0 - # self.removed_eq_delete_files = 0 - # self.added_pos_delete_files = 0 - # self.removed_pos_delete_files = 0 - # self.added_delete_files = 0 - # self.removed_delete_files = 0 - # self.added_records = 0 - # self.deleted_records = 0 - # self.added_pos_deletes = 0 - # self.removed_pos_deletes = 0 - # self.added_eq_deletes = 0 - # self.removed_eq_deletes = 0 - # } def __init__(self) -> None: self.added_file_size = 0 self.removed_file_size = 0 @@ -303,7 +285,7 @@ def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpe self.metrics.remove_file(data_file) if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: if partition_spec is None or schema is None: - raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + raise ValueError("remove data file with partition but without specifying the partiton_spec and schema") self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 35ac3e280b..7c4fe11841 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -156,11 +156,6 @@ def data_file() -> DataFile: ) -@pytest.fixture -def data_file_with_partition() -> DataFile: - return DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) - - def test_snapshot_summary_collector(data_file: DataFile) -> None: ssc = SnapshotSummaryCollector() @@ -175,7 +170,9 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None: } -def test_snapshot_summary_collector_with_partition(data_file_with_partition: DataFile) -> None: +def test_snapshot_summary_collector_with_partition() -> None: + # Given + ssc = SnapshotSummaryCollector() assert ssc.build() == {} @@ -185,19 +182,38 @@ def test_snapshot_summary_collector_with_partition(data_file_with_partition: Dat NestedField(field_id=3, name="int_field", field_type=IntegerType(), required=False), ) spec = PartitionSpec(PartitionField(source_id=3, field_id=1001, transform=IdentityTransform(), name='int_field')) + data_file_1 = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record(int_field=1)) + data_file_2 = DataFile(content=DataFileContent.DATA, record_count=200, file_size_in_bytes=4321, partition=Record(int_field=2)) + # When + ssc.add_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_1, schema=schema, partition_spec=spec) + ssc.remove_file(data_file=data_file_2, schema=schema, partition_spec=spec) + + # Then + assert ssc.build() == { + 'added-files-size': '1234', + 'removed-files-size': '5555', + 'added-data-files': '1', + 'deleted-data-files': '2', + 'added-records': '100', + 'deleted-records': '300', + 'changed-partition-count': '2', + } + + # When ssc.set_partition_summary_limit(10) - ssc.add_file(data_file=data_file_with_partition, schema=schema, partition_spec=spec) - ssc.remove_file(data_file=data_file_with_partition, schema=schema, partition_spec=spec) + # Then assert ssc.build() == { 'added-files-size': '1234', - 'removed-files-size': '1234', + 'removed-files-size': '5555', 'added-data-files': '1', - 'deleted-data-files': '1', + 'deleted-data-files': '2', 'added-records': '100', - 'deleted-records': '100', - 'changed-partition-count': '1', + 'deleted-records': '300', + 'changed-partition-count': '2', 'partitions.int_field=1': 'added-files-size=1234,removed-files-size=1234,added-data-files=1,deleted-data-files=1,added-records=100,deleted-records=100', + 'partitions.int_field=2': 'removed-files-size=4321,deleted-data-files=1,deleted-records=200', } From b5ce2655e014a5b453edc9936d01f7e8693ba161 Mon Sep 17 00:00:00 2001 From: Adrian Qin <147659252+jqin61@users.noreply.github.com> Date: Thu, 14 Mar 2024 22:08:41 +0000 Subject: [PATCH 4/4] clean up for review --- pyiceberg/table/__init__.py | 10 ++++------ pyiceberg/table/snapshots.py | 22 +++++++++------------- tests/table/test_snapshots.py | 17 +++++------------ 3 files changed, 18 insertions(+), 31 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3a116dec3a..d4b28fbdda 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2572,13 +2572,11 @@ def _write_delete_manifest() -> List[ManifestFile]: def _summary(self) -> Summary: ssc = SnapshotSummaryCollector() - partition_summary_limit = self._transaction.table_metadata.properties.get( - TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT - ) - if isinstance(partition_summary_limit, str): - raise ValueError( - f"WRITE_PARTITION_SUMMARY_LIMIT in table properties should be int but get str: {partition_summary_limit}" + partition_summary_limit = int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT ) + ) ssc.set_partition_summary_limit(partition_summary_limit) for data_file in self._added_data_files: diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index b7a321a6c2..f74ac4b7d4 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -23,7 +23,7 @@ from pyiceberg.io import FileIO from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list -from pyiceberg.partitioning import PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.typedef import IcebergBaseModel @@ -272,20 +272,16 @@ def __init__(self) -> None: def set_partition_summary_limit(self, limit: int) -> None: self.max_changed_partitions_for_summaries = limit - def add_file( - self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None - ) -> None: + def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None: self.metrics.add_file(data_file) - if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: - if partition_spec is None or schema is None: - raise ValueError("add data file with partition but without specifying the partiton_spec and schema") + if len(data_file.partition.record_fields()) != 0: self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema) - def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None: + def remove_file( + self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC + ) -> None: self.metrics.remove_file(data_file) - if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0: - if partition_spec is None or schema is None: - raise ValueError("remove data file with partition but without specifying the partiton_spec and schema") + if len(data_file.partition.record_fields()) != 0: self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema) def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None: @@ -303,12 +299,12 @@ def build(self) -> Dict[str, str]: set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP) if changed_partitions_size <= self.max_changed_partitions_for_summaries: for partition_path, update_metrics_partition in self.partition_metrics.items(): - if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0: + if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0: properties[CHANGED_PARTITION_PREFIX + partition_path] = summary return properties - def partition_summary(self, update_metrics: UpdateMetrics) -> str: + def _partition_summary(self, update_metrics: UpdateMetrics) -> str: return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()]) diff --git a/tests/table/test_snapshots.py b/tests/table/test_snapshots.py index 7c4fe11841..e85ecce506 100644 --- a/tests/table/test_snapshots.py +++ b/tests/table/test_snapshots.py @@ -147,21 +147,13 @@ def manifest_file() -> ManifestFile: ) -@pytest.fixture -def data_file() -> DataFile: - return DataFile( - content=DataFileContent.DATA, - record_count=100, - file_size_in_bytes=1234, - ) - - -def test_snapshot_summary_collector(data_file: DataFile) -> None: +@pytest.mark.integration +def test_snapshot_summary_collector(table_schema_simple: Schema) -> None: ssc = SnapshotSummaryCollector() assert ssc.build() == {} - - ssc.add_file(data_file) + data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record()) + ssc.add_file(data_file, schema=table_schema_simple) assert ssc.build() == { 'added-data-files': '1', @@ -170,6 +162,7 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None: } +@pytest.mark.integration def test_snapshot_summary_collector_with_partition() -> None: # Given