@@ -2435,7 +2435,7 @@ def _dataframe_to_data_files(
2435
2435
yield from write_file (table , iter ([WriteTask (write_uuid , next (counter ), df )]), file_schema = file_schema )
2436
2436
2437
2437
2438
- class _SnapshotProducer :
2438
+ class _SnapshotProducer ( ABC ) :
2439
2439
commit_uuid : uuid .UUID
2440
2440
_operation : Operation
2441
2441
_table : Table
@@ -2480,10 +2480,9 @@ def _deleted_entries(self) -> List[ManifestEntry]: ...
2480
2480
@abstractmethod
2481
2481
def _existing_manifests (self ) -> List [ManifestFile ]: ...
2482
2482
2483
- def _combine_manifests (
2484
- self , added_manifests : List [ManifestFile ], delete_manifests : List [ManifestFile ], existing_manifests : List [ManifestFile ]
2485
- ) -> List [ManifestFile ]:
2486
- return added_manifests + delete_manifests + existing_manifests
2483
+ def _process_manifests (self , manifests : List [ManifestFile ]) -> List [ManifestFile ]:
2484
+ """To perform any post-processing on the manifests before writing them to the new snapshot."""
2485
+ return manifests
2487
2486
2488
2487
def _manifests (self ) -> List [ManifestFile ]:
2489
2488
def _write_added_manifest () -> List [ManifestFile ]:
@@ -2535,7 +2534,7 @@ def _write_delete_manifest() -> List[ManifestFile]:
2535
2534
delete_manifests = executor .submit (_write_delete_manifest )
2536
2535
existing_manifests = executor .submit (self ._existing_manifests )
2537
2536
2538
- return self ._combine_manifests (added_manifests .result (), delete_manifests .result (), existing_manifests .result ())
2537
+ return self ._process_manifests (added_manifests .result () + delete_manifests .result () + existing_manifests .result ())
2539
2538
2540
2539
def _summary (self ) -> Summary :
2541
2540
ssc = SnapshotSummaryCollector ()
@@ -2619,7 +2618,30 @@ def fetch_manifest_entry(self, manifest: ManifestFile, discard_deleted: bool = T
2619
2618
return manifest .fetch_manifest_entry (io = self ._table .io , discard_deleted = discard_deleted )
2620
2619
2621
2620
2622
- class MergeAppendFiles (_SnapshotProducer ):
2621
+ class AppendFiles (_SnapshotProducer , ABC ):
2622
+ def _existing_manifests (self ) -> List [ManifestFile ]:
2623
+ """To determine if there are any existing manifest files.
2624
+
2625
+ All the existing manifest files are considered existing.
2626
+ A fast append will add another ManifestFile to the ManifestList.
2627
+ A merge append will merge ManifestFiles if needed later.
2628
+ """
2629
+ existing_manifests = []
2630
+
2631
+ if self ._parent_snapshot_id is not None :
2632
+ previous_snapshot = self ._table .snapshot_by_id (self ._parent_snapshot_id )
2633
+
2634
+ if previous_snapshot is None :
2635
+ raise ValueError (f"Snapshot could not be found: { self ._parent_snapshot_id } " )
2636
+
2637
+ for manifest in previous_snapshot .manifests (io = self ._table .io ):
2638
+ if manifest .has_added_files () or manifest .has_existing_files () or manifest .added_snapshot_id == self ._snapshot_id :
2639
+ existing_manifests .append (manifest )
2640
+
2641
+ return existing_manifests
2642
+
2643
+
2644
+ class MergeAppendFiles (AppendFiles ):
2623
2645
_target_size_bytes : int
2624
2646
_min_count_to_merge : int
2625
2647
_merge_enabled : bool
@@ -2642,39 +2664,21 @@ def __init__(
2642
2664
self ._table .properties , TableProperties .MANIFEST_MERGE_ENABLED , TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT
2643
2665
)
2644
2666
2645
- def _existing_manifests (self ) -> List [ManifestFile ]:
2646
- """To determine if there are any existing manifest files."""
2647
- existing_manifests = []
2648
-
2649
- if self ._parent_snapshot_id is not None :
2650
- previous_snapshot = self ._table .snapshot_by_id (self ._parent_snapshot_id )
2651
-
2652
- if previous_snapshot is None :
2653
- raise ValueError (f"Snapshot could not be found: { self ._parent_snapshot_id } " )
2654
-
2655
- for manifest in previous_snapshot .manifests (io = self ._table .io ):
2656
- if manifest .has_added_files () or manifest .has_existing_files () or manifest .added_snapshot_id == self ._snapshot_id :
2657
- existing_manifests .append (manifest )
2658
-
2659
- return existing_manifests
2660
-
2661
2667
def _deleted_entries (self ) -> List [ManifestEntry ]:
2662
2668
"""To determine if we need to record any deleted manifest entries.
2663
2669
2664
2670
In case of an append, nothing is deleted.
2665
2671
"""
2666
2672
return []
2667
2673
2668
- def _combine_manifests (
2669
- self , added_manifests : List [ManifestFile ], delete_manifests : List [ManifestFile ], existing_manifests : List [ManifestFile ]
2670
- ) -> List [ManifestFile ]:
2671
- unmerged_data_manifests = (
2672
- added_manifests
2673
- + delete_manifests
2674
- + [manifest for manifest in existing_manifests if manifest .content == ManifestContent .DATA ]
2675
- )
2676
- # TODO: need to re-consider the name here: manifest containing positional deletes and manifest containing deleted entries
2677
- unmerged_deletes_manifests = [manifest for manifest in existing_manifests if manifest .content == ManifestContent .DELETES ]
2674
+ def _process_manifests (self , manifests : List [ManifestFile ]) -> List [ManifestFile ]:
2675
+ """To perform any post-processing on the manifests before writing them to the new snapshot.
2676
+
2677
+ In MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
2678
+ if automatic merge is enabled.
2679
+ """
2680
+ unmerged_data_manifests = [manifest for manifest in manifests if manifest .content == ManifestContent .DATA ]
2681
+ unmerged_deletes_manifests = [manifest for manifest in manifests if manifest .content == ManifestContent .DELETES ]
2678
2682
2679
2683
data_manifest_merge_manager = _ManifestMergeManager (
2680
2684
target_size_bytes = self ._target_size_bytes ,
@@ -2686,27 +2690,7 @@ def _combine_manifests(
2686
2690
return data_manifest_merge_manager .merge_manifests (unmerged_data_manifests ) + unmerged_deletes_manifests
2687
2691
2688
2692
2689
- class FastAppendFiles (_SnapshotProducer ):
2690
- def _existing_manifests (self ) -> List [ManifestFile ]:
2691
- """To determine if there are any existing manifest files.
2692
-
2693
- A fast append will add another ManifestFile to the ManifestList.
2694
- All the existing manifest files are considered existing.
2695
- """
2696
- existing_manifests = []
2697
-
2698
- if self ._parent_snapshot_id is not None :
2699
- previous_snapshot = self ._table .snapshot_by_id (self ._parent_snapshot_id )
2700
-
2701
- if previous_snapshot is None :
2702
- raise ValueError (f"Snapshot could not be found: { self ._parent_snapshot_id } " )
2703
-
2704
- for manifest in previous_snapshot .manifests (io = self ._table .io ):
2705
- if manifest .has_added_files () or manifest .has_existing_files () or manifest .added_snapshot_id == self ._snapshot_id :
2706
- existing_manifests .append (manifest )
2707
-
2708
- return existing_manifests
2709
-
2693
+ class FastAppendFiles (AppendFiles ):
2710
2694
def _deleted_entries (self ) -> List [ManifestEntry ]:
2711
2695
"""To determine if we need to record any deleted manifest entries.
2712
2696
@@ -2804,9 +2788,6 @@ def _group_by_spec(
2804
2788
return groups
2805
2789
2806
2790
def _create_manifest (self , spec_id : int , manifest_bin : List [ManifestFile ]) -> ManifestFile :
2807
- # TODO: later check if we need to implement cache:
2808
- # https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/ManifestMergeManager.java#L165
2809
-
2810
2791
with self ._snapshot_producer .new_manifest_writer (spec = self ._snapshot_producer .spec (spec_id )) as writer :
2811
2792
for manifest in manifest_bin :
2812
2793
for entry in self ._snapshot_producer .fetch_manifest_entry (manifest ):
0 commit comments