Skip to content

Rewrite manifests #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]

def __hash__(self) -> int:
"""Return the hash of the file path."""
return hash(self.manifest_path)


@cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list))
def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]:
Expand Down
18 changes: 13 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@
update_table_metadata,
)
from pyiceberg.table.update.schema import UpdateSchema
from pyiceberg.table.update.snapshot import (
ManageSnapshots,
UpdateSnapshot,
_FastAppendFiles,
)
from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles
from pyiceberg.table.update.spec import UpdateSpec
from pyiceberg.table.update.statistics import UpdateStatistics
from pyiceberg.transforms import IdentityTransform
Expand Down Expand Up @@ -439,6 +435,14 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U
"""
return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties)

def rewrite_manifests(self) -> RewriteManifestsResult:
if self._table.current_snapshot() is None:
return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[])

with self.update_snapshot().rewrite() as rewrite:
rewritten = rewrite.rewrite_manifests()
return rewritten

def update_statistics(self) -> UpdateStatistics:
"""
Create a new UpdateStatistics to update the statistics of the table.
Expand Down Expand Up @@ -1233,6 +1237,10 @@ def name_mapping(self) -> Optional[NameMapping]:
"""Return the table's field-id NameMapping."""
return self.metadata.name_mapping()

def rewrite_manifests(self) -> None:
with self.transaction() as tx:
tx.rewrite_manifests()

def upsert(
self,
df: pa.Table,
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def get_prop(prop: str) -> int:
def update_snapshot_summaries(
summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False
) -> Summary:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}:
if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}:
raise ValueError(f"Operation not implemented: {summary.operation}")

if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None:
Expand Down
166 changes: 164 additions & 2 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import Future
from dataclasses import dataclass, field
from functools import cached_property
from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple

from sortedcontainers import SortedList

Expand Down Expand Up @@ -82,7 +83,7 @@
from pyiceberg.utils.properties import property_as_bool, property_as_int

if TYPE_CHECKING:
from pyiceberg.table import Transaction
from pyiceberg.table import Table, Transaction


def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str:
Expand Down Expand Up @@ -473,6 +474,12 @@ def _deleted_entries(self) -> List[ManifestEntry]:
return []


@dataclass(frozen=True)
class RewriteManifestsResult:
rewritten_manifests: List[ManifestFile] = field(default_factory=list)
added_manifests: List[ManifestFile] = field(default_factory=list)


class _MergeAppendFiles(_FastAppendFiles):
_target_size_bytes: int
_min_count_to_merge: int
Expand Down Expand Up @@ -524,6 +531,153 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests


class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]):
_table: Table
_spec_id: int
_target_size_bytes: int
_min_count_to_merge: int
_merge_enabled: bool
rewritten_manifests: List[ManifestFile] = []
added_manifests: List[ManifestFile] = []
kept_manifests: List[ManifestFile] = []

def __init__(
self,
table: Table,
transaction: Transaction,
io: FileIO,
spec_id: Optional[int] = None,
snapshot_properties: Dict[str, str] = EMPTY_DICT,
):
from pyiceberg.table import TableProperties

super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties)

snapshot = self._table.current_snapshot()
if self._spec_id and self._spec_id not in self._table.specs():
raise ValueError(f"Cannot find spec with id: {self._spec_id}")

if not snapshot:
raise ValueError("Cannot rewrite manifests without a current snapshot")

self._target_size_bytes = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_TARGET_SIZE_BYTES,
TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
) # type: ignore
self._table = table
self._spec_id = spec_id or table.spec().spec_id

self._min_count_to_merge = property_as_int(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MIN_MERGE_COUNT,
TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
) # type: ignore
self._merge_enabled = property_as_bool(
self._transaction.table_metadata.properties,
TableProperties.MANIFEST_MERGE_ENABLED,
TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
)

def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
from pyiceberg.table import TableProperties

ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
Comment on lines +586 to +590
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're cloning this logic, I've created an issue to resolve this in another PR: #1779

ssc.set_partition_summary_limit(partition_summary_limit)
Comment on lines +585 to +591
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ssc = SnapshotSummaryCollector()
partition_summary_limit = int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
)
ssc.set_partition_summary_limit(partition_summary_limit)
ssc = SnapshotSummaryCollector(int(
self._transaction.table_metadata.properties.get(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT
)
))


props = {
"manifests-kept": "0",
"manifests-created": str(len(self.added_manifests)),
"manifests-replaced": str(len(self.rewritten_manifests)),
"entries-processed": "0",
}
previous_snapshot = (
self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
if self._parent_snapshot_id is not None
else None
)

return update_snapshot_summaries(
summary=Summary(operation=self._operation, **ssc.build(), **props),
previous_summary=previous_snapshot.summary if previous_snapshot is not None else None,
truncate_full_table=False,
)

def rewrite_manifests(self) -> RewriteManifestsResult:
snapshot = self._table.current_snapshot()
if not snapshot:
raise ValueError("Cannot rewrite manifests without a current snapshot")

data_result = self._find_matching_manifests(snapshot, ManifestContent.DATA)

self.rewritten_manifests.extend(data_result.rewritten_manifests)
self.added_manifests.extend(data_result.added_manifests)

deletes_result = self._find_matching_manifests(snapshot, ManifestContent.DELETES)
self.rewritten_manifests.extend(deletes_result.rewritten_manifests)
self.added_manifests.extend(deletes_result.added_manifests)

if len(self.rewritten_manifests) == 0:
return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[])

return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests)

def _find_matching_manifests(self, snapshot: Snapshot, content: ManifestContent) -> RewriteManifestsResult:
manifests = [
manifest
for manifest in snapshot.manifests(io=self._io)
if manifest.partition_spec_id == self._spec_id and manifest.content == content
]
Comment on lines +631 to +635
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a followup it might be worthwile to see if we can cache this result, since we're going over the manifests twice (once for data, once for delete).


data_manifest_merge_manager = _ManifestMergeManager(
target_size_bytes=self._target_size_bytes,
min_count_to_merge=self._min_count_to_merge,
merge_enabled=self._merge_enabled,
snapshot_producer=self,
)
new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests)

return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests)

def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile:
return ManifestFile.from_args(
_table_format_version=self._transaction.table_metadata.format_version,
manifest_path=manifest_file.manifest_path,
manifest_length=manifest_file.manifest_length,
partition_spec_id=manifest_file.partition_spec_id,
content=manifest_file.content,
sequence_number=manifest_file.sequence_number,
min_sequence_number=manifest_file.min_sequence_number,
added_snapshot_id=snapshot_id, # Using the new snapshot ID here
added_files_count=manifest_file.added_files_count,
existing_files_count=manifest_file.existing_files_count,
deleted_files_count=manifest_file.deleted_files_count,
added_rows_count=manifest_file.added_rows_count,
existing_rows_count=manifest_file.existing_rows_count,
deleted_rows_count=manifest_file.deleted_rows_count,
partitions=manifest_file.partitions,
key_metadata=manifest_file.key_metadata,
)

def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Commit only if we have rewritten any manifests."""
if self.rewritten_manifests:
self.commit()

def _existing_manifests(self) -> List[ManifestFile]:
"""Determine if there are any existing manifest files."""
return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests]

def _deleted_entries(self) -> List[ManifestEntry]:
"""To determine if we need to record any deleted manifest entries."""
return []


class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]):
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.

Expand Down Expand Up @@ -621,6 +775,14 @@ def merge_append(self) -> _MergeAppendFiles:
operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties
)

def rewrite(self) -> _RewriteManifests:
return _RewriteManifests(
table=self._transaction._table,
transaction=self._transaction,
io=self._io,
snapshot_properties=self._snapshot_properties,
)

def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles:
return _OverwriteFiles(
commit_uuid=commit_uuid,
Expand Down
42 changes: 42 additions & 0 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,45 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog
.reset_index()
)
assert_frame_equal(lhs, rhs, check_dtype=False)


def test_inspect_all_example(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
from pandas.testing import assert_frame_equal

identifier = "default.table_metadata_all_manifests"
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

spark.sql(
f"""
CREATE TABLE {identifier} (
id int,
data string
)
PARTITIONED BY (data)
TBLPROPERTIES ('write.update.mode'='merge-on-read',
'write.delete.mode'='merge-on-read')
"""
)
tbl = session_catalog.load_table(identifier)
# check all_manifests when there are no snapshots
lhs = tbl.inspect.all_manifests().to_pandas()
rhs = spark.table(f"{identifier}.all_manifests").toPandas()
assert_frame_equal(lhs, rhs, check_dtype=False)

spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')")

spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')")

spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1")

spark.sql(f"DELETE FROM {identifier} WHERE id = 2")

spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')")

tbl.refresh()

tbl.rewrite_manifests()
print("efd")
Loading
Loading