Skip to content

Commit

Permalink
clean up for review
Browse files Browse the repository at this point in the history
  • Loading branch information
jqin61 committed Mar 14, 2024
1 parent cbf1afd commit b5ce265
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 31 deletions.
10 changes: 4 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2572,13 +2572,11 @@ def _write_delete_manifest() -> List[ManifestFile]:

def _summary(self) -> Summary:
ssc = SnapshotSummaryCollector()
partition_summary_limit = self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
if isinstance(partition_summary_limit, str):
raise ValueError(
f"WRITE_PARTITION_SUMMARY_LIMIT in table properties should be int but get str: {partition_summary_limit}"
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc.set_partition_summary_limit(partition_summary_limit)

for data_file in self._added_data_files:
Expand Down
22 changes: 9 additions & 13 deletions pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from pyiceberg.io import FileIO
from pyiceberg.manifest import DataFile, DataFileContent, ManifestFile, read_manifest_list
from pyiceberg.partitioning import PartitionSpec
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import IcebergBaseModel

Expand Down Expand Up @@ -272,20 +272,16 @@ def __init__(self) -> None:
def set_partition_summary_limit(self, limit: int) -> None:
self.max_changed_partitions_for_summaries = limit

def add_file(
self, data_file: DataFile, partition_spec: Optional[PartitionSpec] = None, schema: Optional[Schema] = None
) -> None:
def add_file(self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC) -> None:
self.metrics.add_file(data_file)
if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0:
if partition_spec is None or schema is None:
raise ValueError("add data file with partition but without specifying the partiton_spec and schema")
if len(data_file.partition.record_fields()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=True, schema=schema)

def remove_file(self, data_file: DataFile, partition_spec: Optional[PartitionSpec], schema: Optional[Schema]) -> None:
def remove_file(
self, data_file: DataFile, schema: Schema, partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC
) -> None:
self.metrics.remove_file(data_file)
if getattr(data_file, "partition", None) is not None and len(data_file.partition.record_fields()) != 0:
if partition_spec is None or schema is None:
raise ValueError("remove data file with partition but without specifying the partiton_spec and schema")
if len(data_file.partition.record_fields()) != 0:
self.update_partition_metrics(partition_spec=partition_spec, file=data_file, is_add_file=False, schema=schema)

def update_partition_metrics(self, partition_spec: PartitionSpec, file: DataFile, is_add_file: bool, schema: Schema) -> None:
Expand All @@ -303,12 +299,12 @@ def build(self) -> Dict[str, str]:
set_when_positive(properties, changed_partitions_size, CHANGED_PARTITION_COUNT_PROP)
if changed_partitions_size <= self.max_changed_partitions_for_summaries:
for partition_path, update_metrics_partition in self.partition_metrics.items():
if (summary := self.partition_summary(update_metrics_partition)) and len(summary) != 0:
if (summary := self._partition_summary(update_metrics_partition)) and len(summary) != 0:
properties[CHANGED_PARTITION_PREFIX + partition_path] = summary

return properties

def partition_summary(self, update_metrics: UpdateMetrics) -> str:
def _partition_summary(self, update_metrics: UpdateMetrics) -> str:
return ",".join([f"{prop}={val}" for prop, val in update_metrics.to_dict().items()])


Expand Down
17 changes: 5 additions & 12 deletions tests/table/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,13 @@ def manifest_file() -> ManifestFile:
)


@pytest.fixture
def data_file() -> DataFile:
return DataFile(
content=DataFileContent.DATA,
record_count=100,
file_size_in_bytes=1234,
)


def test_snapshot_summary_collector(data_file: DataFile) -> None:
@pytest.mark.integration
def test_snapshot_summary_collector(table_schema_simple: Schema) -> None:
ssc = SnapshotSummaryCollector()

assert ssc.build() == {}

ssc.add_file(data_file)
data_file = DataFile(content=DataFileContent.DATA, record_count=100, file_size_in_bytes=1234, partition=Record())
ssc.add_file(data_file, schema=table_schema_simple)

assert ssc.build() == {
'added-data-files': '1',
Expand All @@ -170,6 +162,7 @@ def test_snapshot_summary_collector(data_file: DataFile) -> None:
}


@pytest.mark.integration
def test_snapshot_summary_collector_with_partition() -> None:
# Given

Expand Down

0 comments on commit b5ce265

Please sign in to comment.