From c7e4095535289c49f93437ba7d18306480ddd6f6 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 10 Jul 2024 00:03:21 -0700 Subject: [PATCH] fix snapshot_id issue --- pyiceberg/manifest.py | 12 +++++---- tests/integration/test_writes/test_writes.py | 26 ++++++++++++++++++++ 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index b555b0493f..6148d9a69a 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -407,7 +407,7 @@ def __init__(self, *data: Any, **named_data: Any) -> None: def _wrap( self, new_status: ManifestEntryStatus, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -419,12 +419,14 @@ def _wrap( self.data_file = new_file return self - def _wrap_append(self, new_snapshot_id: int, new_data_sequence_number: Optional[int], new_file: DataFile) -> ManifestEntry: + def _wrap_append( + self, new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file: DataFile + ) -> ManifestEntry: return self._wrap(ManifestEntryStatus.ADDED, new_snapshot_id, new_data_sequence_number, None, new_file) def _wrap_delete( self, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -435,7 +437,7 @@ def _wrap_delete( def _wrap_existing( self, - new_snapshot_id: int, + new_snapshot_id: Optional[int], new_data_sequence_number: Optional[int], new_file_sequence_number: Optional[int], new_file: DataFile, @@ -838,7 +840,7 @@ def delete(self, entry: ManifestEntry) -> ManifestWriter: def existing(self, entry: ManifestEntry) -> ManifestWriter: self.add_entry( self._reused_entry_wrapper._wrap_existing( - self._snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file + entry.snapshot_id, entry.data_sequence_number, entry.file_sequence_number, entry.data_file ) ) return self diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index ae2def1fb4..2542fbdb38 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -1084,6 +1084,29 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab # tbl_b and tbl_c should contain the same data assert tbl_b.scan().to_arrow().equals(tbl_c.scan().to_arrow()) + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_merge_manifests_file_content(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None: + tbl_a = _create_table( + session_catalog, + "default.merge_manifest_a", + {"commit.manifest-merge.enabled": "true", "commit.manifest.min-count-to-merge": "1", "format-version": format_version}, + [], + ) + + # tbl_a should merge all manifests into 1 + tbl_a.append(arrow_table_with_null) + + tbl_a_first_entries = tbl_a.inspect.entries().to_pydict() + first_snapshot_id = tbl_a_first_entries["snapshot_id"][0] + first_data_file_path = tbl_a_first_entries["data_file"][0]["file_path"] + + tbl_a.append(arrow_table_with_null) + tbl_a.append(arrow_table_with_null) + + assert len(tbl_a.current_snapshot().manifests(tbl_a.io)) == 1 # type: ignore + # verify the sequence number of tbl_a's only manifest file tbl_a_manifest = tbl_a.current_snapshot().manifests(tbl_a.io)[0] # type: ignore assert tbl_a_manifest.sequence_number == (3 if format_version == 2 else 0) @@ -1114,6 +1137,9 @@ def test_merge_manifests(session_catalog: Catalog, arrow_table_with_null: pa.Tab assert tbl_a_data_file["equality_ids"] is None assert tbl_a_data_file["file_format"] == "PARQUET" assert tbl_a_data_file["file_path"].startswith("s3://warehouse/default/merge_manifest_a/data/") + if tbl_a_data_file["file_path"] == first_data_file_path: + # verify that the snapshot id recorded should be the one where the file was added + assert tbl_a_entries["snapshot_id"][i] == first_snapshot_id assert tbl_a_data_file["key_metadata"] is None assert tbl_a_data_file["lower_bounds"] == [ (1, b"\x00"),