22
22
from abc import abstractmethod
23
23
from collections import defaultdict
24
24
from concurrent .futures import Future
25
- from dataclasses import dataclass
25
+ from dataclasses import dataclass , field
26
26
from functools import cached_property
27
27
from typing import TYPE_CHECKING , Any , Callable , Dict , Generic , List , Optional , Set , Tuple
28
28
@@ -480,8 +480,8 @@ def _deleted_entries(self) -> List[ManifestEntry]:
480
480
481
481
@dataclass (init = False )
482
482
class RewriteManifestsResult :
483
- rewritten_manifests : List [ManifestFile ]
484
- added_manifests : List [ManifestFile ]
483
+ rewritten_manifests : List [ManifestFile ] = field ( default_factory = list )
484
+ added_manifests : List [ManifestFile ] = field ( default_factory = list )
485
485
486
486
def __init__ (
487
487
self ,
@@ -544,7 +544,11 @@ def _process_manifests(self, manifests: List[ManifestFile]) -> List[ManifestFile
544
544
545
545
546
546
class _RewriteManifests (_SnapshotProducer ["_RewriteManifests" ]):
547
+ _table : Table
548
+ _spec_id : int
547
549
_target_size_bytes : int
550
+ _min_count_to_merge : int
551
+ _merge_enabled : bool
548
552
rewritten_manifests : List [ManifestFile ] = []
549
553
added_manifests : List [ManifestFile ] = []
550
554
kept_manifests : List [ManifestFile ] = []
@@ -559,10 +563,15 @@ def __init__(
559
563
):
560
564
from pyiceberg .table import TableProperties
561
565
562
- _table : Table
563
- _spec : PartitionSpec
564
-
565
566
super ().__init__ (Operation .REPLACE , transaction , io , snapshot_properties = snapshot_properties )
567
+
568
+ snapshot = self ._table .current_snapshot ()
569
+ if self ._spec_id and self ._spec_id not in self ._table .specs ():
570
+ raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
571
+
572
+ if not snapshot :
573
+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
574
+
566
575
self ._target_size_bytes = property_as_int (
567
576
self ._transaction .table_metadata .properties ,
568
577
TableProperties .MANIFEST_TARGET_SIZE_BYTES ,
@@ -571,6 +580,17 @@ def __init__(
571
580
self ._table = table
572
581
self ._spec_id = spec_id or table .spec ().spec_id
573
582
583
+ self ._min_count_to_merge = property_as_int (
584
+ self ._transaction .table_metadata .properties ,
585
+ TableProperties .MANIFEST_MIN_MERGE_COUNT ,
586
+ TableProperties .MANIFEST_MIN_MERGE_COUNT_DEFAULT ,
587
+ ) # type: ignore
588
+ self ._merge_enabled = property_as_bool (
589
+ self ._transaction .table_metadata .properties ,
590
+ TableProperties .MANIFEST_MERGE_ENABLED ,
591
+ TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
592
+ )
593
+
574
594
def _summary (self , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> Summary :
575
595
from pyiceberg .table import TableProperties
576
596
@@ -583,10 +603,10 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
583
603
ssc .set_partition_summary_limit (partition_summary_limit )
584
604
585
605
props = {
586
- "manifests-kept" : str ( len ([])) ,
606
+ "manifests-kept" : "0" ,
587
607
"manifests-created" : str (len (self .added_manifests )),
588
608
"manifests-replaced" : str (len (self .rewritten_manifests )),
589
- "entries-processed" : str ( len ([])) ,
609
+ "entries-processed" : "0" ,
590
610
}
591
611
previous_snapshot = (
592
612
self ._transaction .table_metadata .snapshot_by_id (self ._parent_snapshot_id )
@@ -601,28 +621,25 @@ def _summary(self, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> Summary:
601
621
)
602
622
603
623
def rewrite_manifests (self ) -> RewriteManifestsResult :
604
- data_result = self ._find_matching_manifests (ManifestContent .DATA )
624
+ snapshot = self ._table .current_snapshot ()
625
+ if not snapshot :
626
+ raise ValueError ("Cannot rewrite manifests without a current snapshot" )
627
+
628
+ data_result = self ._find_matching_manifests (snapshot , ManifestContent .DATA )
605
629
606
630
self .rewritten_manifests .extend (data_result .rewritten_manifests )
607
631
self .added_manifests .extend (data_result .added_manifests )
608
632
609
- deletes_result = self ._find_matching_manifests (ManifestContent .DELETES )
633
+ deletes_result = self ._find_matching_manifests (snapshot , ManifestContent .DELETES )
610
634
self .rewritten_manifests .extend (deletes_result .rewritten_manifests )
611
635
self .added_manifests .extend (deletes_result .added_manifests )
612
636
613
- if not self .rewritten_manifests :
637
+ if len ( self .rewritten_manifests ) == 0 :
614
638
return RewriteManifestsResult (rewritten_manifests = [], added_manifests = [])
615
639
616
640
return RewriteManifestsResult (rewritten_manifests = self .rewritten_manifests , added_manifests = self .added_manifests )
617
641
618
- def _find_matching_manifests (self , content : ManifestContent ) -> RewriteManifestsResult :
619
- snapshot = self ._table .current_snapshot ()
620
- if self ._spec_id and self ._spec_id not in self ._table .specs ():
621
- raise ValueError (f"Cannot find spec with id: { self ._spec_id } " )
622
-
623
- if not snapshot :
624
- raise ValueError ("Cannot rewrite manifests without a current snapshot" )
625
-
642
+ def _find_matching_manifests (self , snapshot : Snapshot , content : ManifestContent ) -> RewriteManifestsResult :
626
643
manifests = [
627
644
manifest
628
645
for manifest in snapshot .manifests (io = self ._io )
@@ -631,8 +648,8 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests
631
648
632
649
data_manifest_merge_manager = _ManifestMergeManager (
633
650
target_size_bytes = self ._target_size_bytes ,
634
- min_count_to_merge = 2 ,
635
- merge_enabled = True ,
651
+ min_count_to_merge = self . _min_count_to_merge ,
652
+ merge_enabled = self . _merge_enabled ,
636
653
snapshot_producer = self ,
637
654
)
638
655
new_manifests = data_manifest_merge_manager .merge_manifests (manifests = manifests )
@@ -668,10 +685,7 @@ def _existing_manifests(self) -> List[ManifestFile]:
668
685
return [self ._copy_manifest_file (manifest , self .snapshot_id ) for manifest in self .added_manifests ]
669
686
670
687
def _deleted_entries (self ) -> List [ManifestEntry ]:
671
- """To determine if we need to record any deleted manifest entries.
672
-
673
- In case of an append, nothing is deleted.
674
- """
688
+ """To determine if we need to record any deleted manifest entries."""
675
689
return []
676
690
677
691
0 commit comments