2222from abc import abstractmethod
2323from collections import defaultdict
2424from concurrent .futures import Future
25- from dataclasses import dataclass
25+ from dataclasses import dataclass , field
2626from functools import cached_property
2727from typing import TYPE_CHECKING , Any , Callable , Dict , Generic , List , Optional , Set , Tuple
2828
@@ -476,8 +476,8 @@ def _deleted_entries(self) -> List[ManifestEntry]:
476476
477477@dataclass (init = False )
478478class RewriteManifestsResult :
479- rewritten_manifests : List [ManifestFile ]
480- added_manifests : List [ManifestFile ]
479+ rewritten_manifests : List [ManifestFile ] = field ( default_factory = list )
480+ added_manifests : List [ManifestFile ] = field ( default_factory = list )
481481
482482 def __init__ (
483483 self ,
@@ -540,7 +540,11 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
540540
541541
542542class _RewriteManifests (_SnapshotProducer ["_RewriteManifests" ]):
543+ _table : Table
544+ _spec_id : int
543545 _target_size_bytes : int
546+ _min_count_to_merge : int
547+ _merge_enabled : bool
544548 rewritten_manifests : List [ManifestFile ] = []
545549 added_manifests : List [ManifestFile ] = []
546550 kept_manifests : List [ManifestFile ] = []
@@ -555,10 +559,15 @@ def __init__(
555559 ):
556560 from pyiceberg .table import TableProperties
557561
558- _table : Table
559- _spec : PartitionSpec
560-
561562 super ().__init__ (Operation .REPLACE , transaction , io , snapshot_properties = snapshot_properties )
563+
564+ snapshot = self ._table .current_snapshot ()
565+ if self ._spec_id and self ._spec_id not in self ._table .specs ():
566+ raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
567+
568+ if not snapshot :
569+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
570+
562571 self ._target_size_bytes = property_as_int (
563572 self ._transaction .table_metadata .properties ,
564573 TableProperties .MANIFEST_TARGET_SIZE_BYTES ,
@@ -567,6 +576,17 @@ def __init__(
567576 self ._table = table
568577 self ._spec_id = spec_id or table .spec ().spec_id
569578
579+ self ._min_count_to_merge = property_as_int (
580+ self ._transaction .table_metadata .properties ,
581+ TableProperties .MANIFEST_MIN_MERGE_COUNT ,
582+ TableProperties .MANIFEST_MIN_MERGE_COUNT_DEFAULT ,
583+ ) # type: ignore
584+ self ._merge_enabled = property_as_bool (
585+ self ._transaction .table_metadata .properties ,
586+ TableProperties .MANIFEST_MERGE_ENABLED ,
587+ TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
588+ )
589+
570590 def _summary (self , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> Summary :
571591 from pyiceberg .table import TableProperties
572592
@@ -579,10 +599,10 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
579599 ssc .set_partition_summary_limit (partition_summary_limit )
580600
581601 props = {
582- "manifests-kept" : str ( len ([])) ,
602+ "manifests-kept" : "0" ,
583603 "manifests-created" : str (len (self .added_manifests )),
584604 "manifests-replaced" : str (len (self .rewritten_manifests )),
585- "entries-processed" : str ( len ([])) ,
605+ "entries-processed" : "0" ,
586606 }
587607 previous_snapshot = (
588608 self ._transaction .table_metadata .snapshot_by_id (self ._parent_snapshot_id )
@@ -597,28 +617,25 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
597617 )
598618
599619 def rewrite_manifests (self ) -> RewriteManifestsResult :
600- data_result = self ._find_matching_manifests (ManifestContent .DATA )
620+ snapshot = self ._table .current_snapshot ()
621+ if not snapshot :
622+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
623+
624+ data_result = self ._find_matching_manifests (snapshot , ManifestContent .DATA )
601625
602626 self .rewritten_manifests .extend (data_result .rewritten_manifests )
603627 self .added_manifests .extend (data_result .added_manifests )
604628
605- deletes_result = self ._find_matching_manifests (ManifestContent .DELETES )
629+ deletes_result = self ._find_matching_manifests (snapshot , ManifestContent .DELETES )
606630 self .rewritten_manifests .extend (deletes_result .rewritten_manifests )
607631 self .added_manifests .extend (deletes_result .added_manifests )
608632
609- if not self .rewritten_manifests :
633+ if len ( self .rewritten_manifests ) == 0 :
610634 return RewriteManifestsResult (rewritten_manifests = [], added_manifests = [])
611635
612636 return RewriteManifestsResult (rewritten_manifests = self .rewritten_manifests , added_manifests = self .added_manifests )
613637
614- def _find_matching_manifests (self , content : ManifestContent ) -> RewriteManifestsResult :
615- snapshot = self ._table .current_snapshot ()
616- if self ._spec_id and self ._spec_id not in self ._table .specs ():
617- raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
618-
619- if not snapshot :
620- raise ValueError ("Cannot rewrite manifests without a current snapshot" )
621-
638+ def _find_matching_manifests (self , snapshot : Snapshot , content : ManifestContent ) -> RewriteManifestsResult :
622639 manifests = [
623640 manifest
624641 for manifest in snapshot .manifests (io = self ._io )
@@ -627,8 +644,8 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests
627644
628645 data_manifest_merge_manager = _ManifestMergeManager (
629646 target_size_bytes = self ._target_size_bytes ,
630- min_count_to_merge = 2 ,
631- merge_enabled = True ,
647+ min_count_to_merge = self . _min_count_to_merge ,
648+ merge_enabled = self . _merge_enabled ,
632649 snapshot_producer = self ,
633650 )
634651 new_manifests = data_manifest_merge_manager .merge_manifests (manifests = manifests )
@@ -664,10 +681,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
664681 return [self ._copy_manifest_file (manifest , self .snapshot_id ) for manifest in self .added_manifests ]
665682
666683 def _deleted_entries (self ) -> List [ManifestEntry ]:
667- """To determine if we need to record any deleted manifest entries.
668-
669- In case of an append, nothing is deleted.
670- """
684+ """To determine if we need to record any deleted manifest entries."""
671685 return []
672686
673687
0 commit comments