diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 61cb87e3d8..26f11792c0 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -717,6 +717,10 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List if not discard_deleted or entry.status != ManifestEntryStatus.DELETED ] + def __hash__(self) -> int: + """Return the hash of the file path.""" + return hash(self.manifest_path) + @cached(cache=LRUCache(maxsize=128), key=lambda io, manifest_list: hashkey(manifest_list)) def _manifests(io: FileIO, manifest_list: str) -> Tuple[ManifestFile, ...]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 78676a774a..1dc20f3d80 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -115,11 +115,7 @@ update_table_metadata, ) from pyiceberg.table.update.schema import UpdateSchema -from pyiceberg.table.update.snapshot import ( - ManageSnapshots, - UpdateSnapshot, - _FastAppendFiles, -) +from pyiceberg.table.update.snapshot import ManageSnapshots, RewriteManifestsResult, UpdateSnapshot, _FastAppendFiles from pyiceberg.table.update.spec import UpdateSpec from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform @@ -439,6 +435,14 @@ def update_snapshot(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> U """ return UpdateSnapshot(self, io=self._table.io, snapshot_properties=snapshot_properties) + def rewrite_manifests(self) -> RewriteManifestsResult: + if self._table.current_snapshot() is None: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + with self.update_snapshot().rewrite() as rewrite: + rewritten = rewrite.rewrite_manifests() + return rewritten + def update_statistics(self) -> UpdateStatistics: """ Create a new UpdateStatistics to update the statistics of the table. @@ -1233,6 +1237,10 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() + def rewrite_manifests(self) -> RewriteManifestsResult: + with self.transaction() as tx: + return tx.rewrite_manifests() + def upsert( self, df: pa.Table, diff --git a/pyiceberg/table/snapshots.py b/pyiceberg/table/snapshots.py index 8d1a24c420..683ab1f3dc 100644 --- a/pyiceberg/table/snapshots.py +++ b/pyiceberg/table/snapshots.py @@ -353,7 +353,7 @@ def get_prop(prop: str) -> int: def update_snapshot_summaries( summary: Summary, previous_summary: Optional[Mapping[str, str]] = None, truncate_full_table: bool = False ) -> Summary: - if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE}: + if summary.operation not in {Operation.APPEND, Operation.OVERWRITE, Operation.DELETE, Operation.REPLACE}: raise ValueError(f"Operation not implemented: {summary.operation}") if truncate_full_table and summary.operation == Operation.OVERWRITE and previous_summary is not None: diff --git a/pyiceberg/table/update/snapshot.py b/pyiceberg/table/update/snapshot.py index a82167744d..87f0481f2e 100644 --- a/pyiceberg/table/update/snapshot.py +++ b/pyiceberg/table/update/snapshot.py @@ -22,8 +22,9 @@ from abc import abstractmethod from collections import defaultdict from concurrent.futures import Future +from dataclasses import dataclass, field from functools import cached_property -from typing import TYPE_CHECKING, Callable, Dict, Generic, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, List, Optional, Set, Tuple from sortedcontainers import SortedList @@ -82,7 +83,7 @@ from pyiceberg.utils.properties import property_as_bool, property_as_int if TYPE_CHECKING: - from pyiceberg.table import Transaction + from pyiceberg.table import Table, Transaction def _new_manifest_file_name(num: int, commit_uuid: uuid.UUID) -> str: @@ -473,6 +474,12 @@ def _deleted_entries(self) -> List[ManifestEntry]: return [] +@dataclass(frozen=True) +class RewriteManifestsResult: + rewritten_manifests: List[ManifestFile] = field(default_factory=list) + added_manifests: List[ManifestFile] = field(default_factory=list) + + class _MergeAppendFiles(_FastAppendFiles): _target_size_bytes: int _min_count_to_merge: int @@ -524,6 +531,153 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile return data_manifest_merge_manager.merge_manifests(unmerged_data_manifests) + unmerged_deletes_manifests +class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]): + _table: Table + _spec_id: int + _target_size_bytes: int + _min_count_to_merge: int + _merge_enabled: bool + rewritten_manifests: List[ManifestFile] = [] + added_manifests: List[ManifestFile] = [] + kept_manifests: List[ManifestFile] = [] + + def __init__( + self, + table: Table, + transaction: Transaction, + io: FileIO, + spec_id: Optional[int] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + from pyiceberg.table import TableProperties + + super().__init__(Operation.REPLACE, transaction, io, snapshot_properties=snapshot_properties) + + self._table = table + self._spec_id = spec_id or table.spec().spec_id + + snapshot = self._table.current_snapshot() + if self._spec_id and self._spec_id not in self._table.specs(): + raise ValueError(f"Cannot find spec with id: {self._spec_id}") + + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + self._target_size_bytes = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_TARGET_SIZE_BYTES, + TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT, + ) # type: ignore + self._min_count_to_merge = property_as_int( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MIN_MERGE_COUNT, + TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT, + ) # type: ignore + self._merge_enabled = property_as_bool( + self._transaction.table_metadata.properties, + TableProperties.MANIFEST_MERGE_ENABLED, + TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, + ) + + def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary: + from pyiceberg.table import TableProperties + + ssc = SnapshotSummaryCollector( + int( + self._transaction.table_metadata.properties.get( + TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT + ) + ) + ) + + props = { + "manifests-kept": "0", + "manifests-created": str(len(self.added_manifests)), + "manifests-replaced": str(len(self.rewritten_manifests)), + "entries-processed": "0", + } + previous_snapshot = ( + self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id) + if self._parent_snapshot_id is not None + else None + ) + + return update_snapshot_summaries( + summary=Summary(operation=self._operation, **ssc.build(), **props), + previous_summary=previous_snapshot.summary if previous_snapshot is not None else None, + truncate_full_table=False, + ) + + def rewrite_manifests(self) -> RewriteManifestsResult: + snapshot = self._table.current_snapshot() + if not snapshot: + raise ValueError("Cannot rewrite manifests without a current snapshot") + + data_result = self._find_matching_manifests(snapshot, ManifestContent.DATA) + + self.rewritten_manifests.extend(data_result.rewritten_manifests) + self.added_manifests.extend(data_result.added_manifests) + + deletes_result = self._find_matching_manifests(snapshot, ManifestContent.DELETES) + self.rewritten_manifests.extend(deletes_result.rewritten_manifests) + self.added_manifests.extend(deletes_result.added_manifests) + + if len(self.rewritten_manifests) == 0: + return RewriteManifestsResult(rewritten_manifests=[], added_manifests=[]) + + return RewriteManifestsResult(rewritten_manifests=self.rewritten_manifests, added_manifests=self.added_manifests) + + def _find_matching_manifests(self, snapshot: Snapshot, content: ManifestContent) -> RewriteManifestsResult: + manifests = [ + manifest + for manifest in snapshot.manifests(io=self._io) + if manifest.partition_spec_id == self._spec_id and manifest.content == content + ] + + data_manifest_merge_manager = _ManifestMergeManager( + target_size_bytes=self._target_size_bytes, + min_count_to_merge=self._min_count_to_merge, + merge_enabled=self._merge_enabled, + snapshot_producer=self, + ) + new_manifests = data_manifest_merge_manager.merge_manifests(manifests=manifests) + + return RewriteManifestsResult(rewritten_manifests=manifests, added_manifests=new_manifests) + + def _copy_manifest_file(self, manifest_file: ManifestFile, snapshot_id: int) -> ManifestFile: + return ManifestFile.from_args( + _table_format_version=self._transaction.table_metadata.format_version, + manifest_path=manifest_file.manifest_path, + manifest_length=manifest_file.manifest_length, + partition_spec_id=manifest_file.partition_spec_id, + content=manifest_file.content, + sequence_number=manifest_file.sequence_number, + min_sequence_number=manifest_file.min_sequence_number, + added_snapshot_id=snapshot_id, # Using the new snapshot ID here + added_files_count=manifest_file.added_files_count, + existing_files_count=manifest_file.existing_files_count, + deleted_files_count=manifest_file.deleted_files_count, + added_rows_count=manifest_file.added_rows_count, + existing_rows_count=manifest_file.existing_rows_count, + deleted_rows_count=manifest_file.deleted_rows_count, + partitions=manifest_file.partitions, + key_metadata=manifest_file.key_metadata, + ) + + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + """Commit only if we have rewritten any manifests.""" + if self.rewritten_manifests: + self.commit() + + def _existing_manifests(self) -> List[ManifestFile]: + """Determine if there are any existing manifest files.""" + return [self._copy_manifest_file(manifest, self.snapshot_id) for manifest in self.added_manifests] + + def _deleted_entries(self) -> List[ManifestEntry]: + """To determine if we need to record any deleted manifest entries.""" + return [] + + class _OverwriteFiles(_SnapshotProducer["_OverwriteFiles"]): """Overwrites data from the table. This will produce an OVERWRITE snapshot. @@ -621,6 +775,14 @@ def merge_append(self) -> _MergeAppendFiles: operation=Operation.APPEND, transaction=self._transaction, io=self._io, snapshot_properties=self._snapshot_properties ) + def rewrite(self) -> _RewriteManifests: + return _RewriteManifests( + table=self._transaction._table, + transaction=self._transaction, + io=self._io, + snapshot_properties=self._snapshot_properties, + ) + def overwrite(self, commit_uuid: Optional[uuid.UUID] = None) -> _OverwriteFiles: return _OverwriteFiles( commit_uuid=commit_uuid, diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index e81050a81c..875b1f69ad 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -1101,3 +1101,45 @@ def test_inspect_files_partitioned(spark: SparkSession, session_catalog: Catalog .reset_index() ) assert_frame_equal(lhs, rhs, check_dtype=False) + + +def test_inspect_all_example(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + from pandas.testing import assert_frame_equal + + identifier = "default.table_metadata_all_manifests" + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + spark.sql( + f""" + CREATE TABLE {identifier} ( + id int, + data string + ) + PARTITIONED BY (data) + TBLPROPERTIES ('write.update.mode'='merge-on-read', + 'write.delete.mode'='merge-on-read') + """ + ) + tbl = session_catalog.load_table(identifier) + # check all_manifests when there are no snapshots + lhs = tbl.inspect.all_manifests().to_pandas() + rhs = spark.table(f"{identifier}.all_manifests").toPandas() + assert_frame_equal(lhs, rhs, check_dtype=False) + + spark.sql(f"INSERT INTO {identifier} VALUES (1, 'a')") + + spark.sql(f"INSERT INTO {identifier} VALUES (2, 'b')") + + spark.sql(f"UPDATE {identifier} SET data = 'c' WHERE id = 1") + + spark.sql(f"DELETE FROM {identifier} WHERE id = 2") + + spark.sql(f"INSERT OVERWRITE {identifier} VALUES (1, 'a')") + + tbl.refresh() + + tbl.rewrite_manifests() + print("efd") diff --git a/tests/integration/test_writes/test_rewrite_manifests.py b/tests/integration/test_writes/test_rewrite_manifests.py new file mode 100644 index 0000000000..b4fd6ec37f --- /dev/null +++ b/tests/integration/test_writes/test_rewrite_manifests.py @@ -0,0 +1,271 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pyarrow as pa +import pytest + +from pyiceberg.catalog import Catalog +from pyiceberg.manifest import ManifestFile +from pyiceberg.table import TableProperties +from utils import _create_table + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v1_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: + identifier = "default.arrow_table_v2_without_data" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: + identifier = "default.arrow_table_v2_with_only_nulls" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + +@pytest.fixture(scope="session", autouse=True) +def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v2_appended_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + + +@pytest.mark.integration +def test_rewrite_v1_v2_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_rewrite_v1_v2_manifests" + # Create a v1 table and append data + tbl = _create_table( + session_catalog, + identifier, + {"format-version": "1"}, + [arrow_table_with_null], + ) + assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" + + # Upgrade to v2 and append more data + with tbl.transaction() as tx: + tx.upgrade_table_version(format_version=2) + + tbl.append(arrow_table_with_null) + assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" + + with tbl.transaction() as tx: # type: ignore[unreachable] + tx.set_properties({TableProperties.MANIFEST_MERGE_ENABLED: "true", TableProperties.MANIFEST_MIN_MERGE_COUNT: "2"}) + + # Get initial manifest state + manifests = tbl.inspect.manifests() + assert len(manifests) == 2, "Should have 2 manifests before rewrite" + + # Execute rewrite manifests + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 2, "Action should rewrite 2 manifests" + assert len(result.added_manifests) == 1, "Action should add 1 manifest" + + tbl.refresh() + + # Verify final state + current_snapshot = tbl.current_snapshot() + if not current_snapshot: + raise AssertionError("Expected a current snapshot") + + new_manifests = current_snapshot.manifests(tbl.io) + assert len(new_manifests) == 1, "Should have 1 manifest after rewrite" + assert new_manifests[0].existing_files_count == 2, "Should have 2 existing files in the new manifest" + assert new_manifests[0].added_files_count == 0, "Should have no added files in the new manifest" + assert new_manifests[0].deleted_files_count == 0, "Should have no deleted files in the new manifest" + assert new_manifests[0].sequence_number is not None, "Should have a sequence number in the new manifest" + + # Validate the data is intact + expected_records_count = arrow_table_with_null.shape[0] * 2 + result_df = tbl.scan().to_pandas() + actual_records_count = result_df.shape[0] + assert expected_records_count == actual_records_count, "Record count must match" + + +@pytest.mark.integration +def test_rewrite_manifests_empty_table(session_catalog: Catalog) -> None: + # Create an unpartitioned table + identifier = "default.test_rewrite_manifests_empty_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + + assert tbl.current_snapshot() is None, "Table must be empty" + + # Execute rewrite manifests action + tbl.rewrite_manifests() + + tbl.refresh() + assert tbl.current_snapshot() is None, "Table must stay empty" + + +@pytest.mark.integration +def test_rewrite_small_manifests_non_partitioned_table(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}) + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + tbl.transaction().set_properties( + {TableProperties.MANIFEST_MERGE_ENABLED: "true", TableProperties.MANIFEST_MIN_MERGE_COUNT: "2"} + ).commit_transaction() + tbl = tbl.refresh() + manifests = tbl.inspect.manifests() + assert len(manifests) == 2, "Should have 2 manifests before rewrite" + + result = tbl.rewrite_manifests() + + assert len(result.rewritten_manifests) == 2, "Action should rewrite 2 manifests" + assert len(result.added_manifests) == 1, "Action should add 1 manifest" + + tbl.refresh() + + current_snapshot = tbl.current_snapshot() + if not current_snapshot: + raise AssertionError + new_manifests = current_snapshot.manifests(tbl.io) + assert len(new_manifests) == 1, "Should have 1 manifest after rewrite" + assert new_manifests[0].existing_files_count == 2, "Should have 4 files in the new manifest" + assert new_manifests[0].added_files_count == 0, "Should have no added files in the new manifest" + assert new_manifests[0].deleted_files_count == 0, "Should have no deleted files in the new manifest" + + # Validate the records + expected_records_count = arrow_table_with_null.shape[0] * 2 + result_df = tbl.scan().to_pandas() + actual_records_count = result_df.shape[0] + assert expected_records_count == actual_records_count, "Rows must match" + + +def compute_manifest_entry_size_bytes(manifests: List[ManifestFile]) -> float: + total_size = 0 + num_entries = 0 + + for manifest in manifests: + total_size += manifest.manifest_length + num_entries += ( + (manifest.added_files_count or 0) + (manifest.existing_files_count or 0) + (manifest.deleted_files_count or 0) + ) + return total_size / num_entries if num_entries > 0 else 0 + + +def test_rewrite_small_manifests_partitioned_table(session_catalog: Catalog) -> None: + records1 = pa.Table.from_pydict({"c1": [1, 1], "c2": [None, "BBBBBBBBBB"], "c3": ["AAAA", "BBBB"]}) + + records2 = records2 = pa.Table.from_pydict({"c1": [2, 2], "c2": ["CCCCCCCCCC", "DDDDDDDDDD"], "c3": ["CCCC", "DDDD"]}) + + records3 = records3 = pa.Table.from_pydict({"c1": [3, 3], "c2": ["EEEEEEEEEE", "FFFFFFFFFF"], "c3": ["EEEE", "FFFF"]}) + + records4 = records4 = pa.Table.from_pydict({"c1": [4, 4], "c2": ["GGGGGGGGGG", "HHHHHHHHHG"], "c3": ["GGGG", "HHHH"]}) + + schema = pa.schema( + [ + ("c1", pa.int64()), + ("c2", pa.string()), + ("c3", pa.string()), + ] + ) + + identifier = "default.test_rewrite_small_manifests_non_partitioned_table" + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, schema=schema) + + tbl.append(records1) + tbl.append(records2) + tbl.append(records3) + tbl.append(records4) + tbl.refresh() + + tbl = tbl.refresh() + manifests = tbl.current_snapshot().manifests(tbl.io) # type: ignore + assert len(manifests) == 4, "Should have 4 manifests before rewrite" + + # manifest_entry_size_bytes = compute_manifest_entry_size_bytes(manifests) + target_manifest_size_bytes = 5200 * 2 + 100 + + tbl = ( + tbl.transaction() + .set_properties( + { + TableProperties.MANIFEST_TARGET_SIZE_BYTES: str(target_manifest_size_bytes), + TableProperties.MANIFEST_MERGE_ENABLED: "true", + TableProperties.MANIFEST_MIN_MERGE_COUNT: "2", + } + ) + .commit_transaction() + ) + + result = tbl.rewrite_manifests() + + tbl.refresh() + assert len(result.rewritten_manifests) == 4, "Action should rewrite 4 manifests" + assert len(result.added_manifests) == 2, "Action should add 2 manifests" + + new_manifests = tbl.current_snapshot().manifests(tbl.io) # type: ignore + assert len(new_manifests) == 2, "Should have 2 manifests after rewrite" + + assert new_manifests[0].existing_files_count == 2 + assert new_manifests[0].added_files_count == 0 + assert new_manifests[0].deleted_files_count == 0 + + assert new_manifests[1].existing_files_count == 2 + assert new_manifests[1].added_files_count == 0 + assert new_manifests[1].deleted_files_count == 0 + + sorted_df = tbl.scan().to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + expectedRecords = ( + pa.concat_tables([records1, records2, records3, records4]).to_pandas().sort_values(["c1", "c2"], ascending=[False, False]) + ) + from pandas.testing import assert_frame_equal + + assert_frame_equal(sorted_df.reset_index(drop=True), expectedRecords.reset_index(drop=True))