@@ -568,7 +568,7 @@ def __init__(
568
568
TableProperties .MANIFEST_TARGET_SIZE_BYTES_DEFAULT ,
569
569
) # type: ignore
570
570
self ._table = table
571
- self ._spec_id = spec_id
571
+ self ._spec_id = spec_id or table . spec (). spec_id
572
572
573
573
def rewrite_manifests (self ) -> RewriteManifestsResult :
574
574
data_result = self ._find_matching_manifests (ManifestContent .DATA )
@@ -608,9 +608,33 @@ def _find_matching_manifests(self, content: ManifestContent) -> RewriteManifests
608
608
609
609
return RewriteManifestsResult (rewritten_manifests = manifests , added_manifests = new_manifests )
610
610
611
+ def _copy_manifest_file (self , manifest_file :ManifestFile , snapshot_id :int ) -> ManifestFile :
612
+ return ManifestFile (
613
+ manifest_path = manifest_file .manifest_path ,
614
+ manifest_length = manifest_file .manifest_length ,
615
+ partition_spec_id = manifest_file .partition_spec_id ,
616
+ content = manifest_file .content ,
617
+ sequence_number = manifest_file .sequence_number ,
618
+ min_sequence_number = manifest_file .min_sequence_number ,
619
+ added_snapshot_id = snapshot_id ,
620
+ added_files_count = manifest_file .added_files_count ,
621
+ existing_files_count = manifest_file .existing_files_count ,
622
+ deleted_files_count = manifest_file .deleted_files_count ,
623
+ added_rows_count = manifest_file .added_rows_count ,
624
+ existing_rows_count = manifest_file .existing_rows_count ,
625
+ deleted_rows_count = manifest_file .deleted_rows_count ,
626
+ partitions = manifest_file .partitions ,
627
+ key_metadata = manifest_file .key_metadata ,
628
+ )
629
+
611
630
def _existing_manifests (self ) -> List [ManifestFile ]:
612
631
"""Determine if there are any existing manifest files."""
613
- return []
632
+ new_manifests = [
633
+ self ._copy_manifest_file (manifest , self .snapshot_id )
634
+ for manifest in self .added_manifests + self .rewritten_manifests
635
+ ]
636
+ return new_manifests
637
+ # return []
614
638
615
639
def _deleted_entries (self ) -> List [ManifestEntry ]:
616
640
"""To determine if we need to record any deleted manifest entries.
@@ -620,6 +644,277 @@ def _deleted_entries(self) -> List[ManifestEntry]:
620
644
return []
621
645
622
646
647
+ # class _RewriteManifests(_SnapshotProducer["_RewriteManifests"]):
648
+ # KEPT_MANIFESTS_COUNT = "manifests-kept"
649
+ # CREATED_MANIFESTS_COUNT = "manifests-created"
650
+ # REPLACED_MANIFESTS_COUNT = "manifests-replaced"
651
+ # PROCESSED_ENTRY_COUNT = "entries-processed"
652
+ # _target_size_bytes: int
653
+ # _min_count_to_merge: int
654
+ #
655
+ # def __init__(
656
+ # self,
657
+ # table: Table,
658
+ # transaction: Transaction,
659
+ # io: FileIO,
660
+ # commit_uuid: Optional[uuid.UUID] = None,
661
+ # snapshot_properties: Dict[str, str] = EMPTY_DICT,
662
+ # ) -> None:
663
+ # from pyiceberg.table import TableProperties
664
+ #
665
+ # super().__init__(Operation.REPLACE, transaction, io, commit_uuid, snapshot_properties)
666
+ # self._table = table
667
+ # self.specs_by_id = self._table.spec() # ops.current().specs_by_id()
668
+ # self.manifest_target_size_bytes = 8388608 # Default value
669
+ # self.deleted_manifests: Set[ManifestFile] = set()
670
+ # self.added_manifests: List[ManifestFile] = []
671
+ # self.rewritten_added_manifests: List[ManifestFile] = []
672
+ # self.kept_manifests: List[ManifestFile] = []
673
+ # self.new_manifests: List[ManifestFile] = []
674
+ # self.rewritten_manifests: Set[ManifestFile] = set()
675
+ # self.snapshot_Id_Inheritance_enabled = property_as_bool(
676
+ # self._transaction.table_metadata.properties,
677
+ # TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
678
+ # TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT,
679
+ # ) # type: ignore
680
+ # self.can_inherit_snapshot_id = table.metadata.format_version > 1 or self.snapshot_Id_Inheritance_enabled
681
+ # # self.writers: Dict[Tuple[Any, int], 'WriterWrapper'] = {}
682
+ # self.entry_count = 0
683
+ # self.cluster_by_func: Optional[Callable[[DataFile], Any]] = None
684
+ # self.predicate: Optional[Callable[[ManifestFile], bool]] = None
685
+ # # self.summary_builder = SnapshotSummary.Builder()
686
+ # # self.lock = threading.Lock()
687
+ # self._target_size_bytes = property_as_int(
688
+ # self._transaction.table_metadata.properties,
689
+ # TableProperties.MANIFEST_TARGET_SIZE_BYTES,
690
+ # TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT,
691
+ # ) # type: ignore
692
+ # self._min_count_to_merge = property_as_int(
693
+ # self._transaction.table_metadata.properties,
694
+ # TableProperties.MANIFEST_MIN_MERGE_COUNT,
695
+ # TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT,
696
+ # ) # type: ignore
697
+ #
698
+ # # def cluster_by(self, func: Callable[[DataFile], Any]]) -> RewriteManifests:
699
+ # # self.cluster_by_func = func
700
+ # # return self
701
+ #
702
+ # # def rewrite_if(self, predicate: Callable[[ManifestFile], bool]]) -> RewriteManifests:
703
+ # # self.predicate = predicate
704
+ # # return self
705
+ #
706
+ # # def delete_manifest(self, manifest: ManifestFile) -> "_RewriteManifests":
707
+ # # self.deleted_manifests.add(manifest)
708
+ # # return self
709
+ # #
710
+ # # def add_manifest(self, manifest: ManifestFile) -> "_RewriteManifests":
711
+ # # if self.can_inherit_snapshot_id and manifest.added_snapshot_id is None:
712
+ # # self.added_manifests.append(manifest)
713
+ # # else:
714
+ # # copied_manifest = self.copy_manifest(manifest)
715
+ # # self.rewritten_added_manifests.append(copied_manifest)
716
+ # # return self
717
+ #
718
+ # # def copy_manifest(self, manifest: ManifestFile) -> ManifestFile:
719
+ # # """
720
+ # # Copies a manifest file to a new location, updating its metadata (e.g., snapshot ID).
721
+ # #
722
+ # # Args:
723
+ # # manifest: The manifest file to copy.
724
+ # #
725
+ # # Returns:
726
+ # # A new ManifestFile object representing the copied manifest.
727
+ # # """
728
+ # # # Get the current table metadata
729
+ # # current_metadata = self._table.metadata
730
+ # #
731
+ # # # Create an input file for the manifest to be copied
732
+ # # input_file = self.ops.io().new_input_file(manifest.path)
733
+ # #
734
+ # # # Create an output file for the new manifest
735
+ # # output_file = self.new_manifest_output_file()
736
+ # #
737
+ # # # Copy the manifest file, updating its metadata
738
+ # # new_manifest = ManifestFile(
739
+ # # format_version=current_metadata.format_version,
740
+ # # spec_id=manifest.partition_spec_id,
741
+ # # input_file=input_file,
742
+ # # specs_by_id=self.specs_by_id,
743
+ # # output_file=output_file,
744
+ # # snapshot_id=self.snapshot_id(),
745
+ # # summary_builder=self.summary_builder,
746
+ # # )
747
+ # #
748
+ # # return new_manifest
749
+ #
750
+ # def _existing_manifests(self) -> List[ManifestFile]:
751
+ # """To determine if there are any existing manifest files.
752
+ #
753
+ # A fast append will add another ManifestFile to the ManifestList.
754
+ # All the existing manifest files are considered existing.
755
+ # """
756
+ # existing_manifests = []
757
+ #
758
+ # if self._parent_snapshot_id is not None:
759
+ # previous_snapshot = self._transaction.table_metadata.snapshot_by_id(self._parent_snapshot_id)
760
+ #
761
+ # if previous_snapshot is None:
762
+ # raise ValueError(f"Snapshot could not be found: {self._parent_snapshot_id}")
763
+ #
764
+ # for manifest in previous_snapshot.manifests(io=self._io):
765
+ # if manifest.has_added_files() or manifest.has_existing_files() or manifest.added_snapshot_id == self._snapshot_id:
766
+ # existing_manifests.append(manifest)
767
+ #
768
+ # return existing_manifests
769
+ # def _deleted_entries(self) -> List[ManifestEntry]:
770
+ # """To determine if we need to record any deleted manifest entries.
771
+ #
772
+ # In case of an append, nothing is deleted.
773
+ # """
774
+ # return []
775
+ #
776
+ #
777
+ # def rewrite_manifests(self) -> List[ManifestFile]:
778
+ # snapshot = self._table.current_snapshot()
779
+ # if not snapshot:
780
+ # raise ValueError("Cannot rewrite manifests without a current snapshot")
781
+ # current_manifests = snapshot.manifests(self._io)
782
+ # current_manifest_set = set(current_manifests)
783
+ # self.validate_deleted_manifests(current_manifest_set, snapshot.snapshot_id)
784
+ #
785
+ # if self.requires_rewrite(current_manifest_set):
786
+ # self.perform_rewrite(current_manifests)
787
+ # else:
788
+ # self.keep_active_manifests(current_manifests)
789
+ #
790
+ # self.validate_files_counts()
791
+ #
792
+ # new_manifests = [
793
+ # self.with_snapshot_id(manifest)
794
+ # for manifest in list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests
795
+ # ]
796
+ # return new_manifests + list(self.kept_manifests)
797
+ #
798
+ # def perform_rewrite(self, current_manifests: List[ManifestFile]) -> None:
799
+ # # self.reset()
800
+ # remaining_manifests = [m for m in current_manifests if m not in self.deleted_manifests]
801
+ # data_manifest_merge_manager = _ManifestMergeManager(
802
+ # target_size_bytes=self._target_size_bytes,
803
+ # min_count_to_merge=self._min_count_to_merge,
804
+ # merge_enabled=True,
805
+ # snapshot_producer=self,
806
+ # )
807
+ #
808
+ # new_manifests = data_manifest_merge_manager.merge_manifests(remaining_manifests)
809
+ # self.new_manifests.extend(new_manifests)
810
+ #
811
+ # # def process_manifest(self, manifest: ManifestFile):
812
+ # # if not self.contains_deletes(manifest) and self.matches_predicate(manifest):
813
+ # # self.rewritten_manifests.add(manifest)
814
+ # # try:
815
+ # # reader = ManifestReader(manifest, self.ops.io(), self.ops.current().specs_by_id())
816
+ # # for entry in reader.live_entries():
817
+ # # self.append_entry(entry, self.cluster_by_func(entry.file), manifest.partition_spec_id)
818
+ # # reader.close()
819
+ # # except IOError as e:
820
+ # # raise RuntimeIOException from e
821
+ # # else:
822
+ # # self.kept_manifests.put(manifest)
823
+ #
824
+ # # def append_entry(self, entry: ManifestEntry, key: Any, partition_spec_id: int):
825
+ # # with self.lock:
826
+ # # writer = self.get_writer(key, partition_spec_id)
827
+ # # writer.add_entry(entry)
828
+ # # self.entry_count += 1
829
+ #
830
+ # # def get_writer(self, key: Any, partition_spec_id: int) -> 'WriterWrapper':
831
+ # # return self.writers.setdefault((key, partition_spec_id), WriterWrapper(self, self.specs_by_id[partition_spec_id]))
832
+ #
833
+ # def validate_deleted_manifests(self, current_manifests: Set[ManifestFile], current_snapshot_id: int) -> None:
834
+ # for manifest in self.deleted_manifests:
835
+ # if manifest not in current_manifests:
836
+ # raise ValueError(
837
+ # f"Deleted manifest {manifest.manifest_path} could not be found in the latest snapshot {current_snapshot_id}"
838
+ # )
839
+ #
840
+ # def requires_rewrite(self, current_manifests: Set[ManifestFile]) -> bool:
841
+ # # if self.cluster_by_func is None:
842
+ # # return False
843
+ # return len(self.rewritten_manifests) == 0 or any(
844
+ # manifest not in current_manifests for manifest in self.rewritten_manifests
845
+ # )
846
+ #
847
+ # def keep_active_manifests(self, current_manifests: List[ManifestFile]) -> None:
848
+ # for manifest in current_manifests:
849
+ # if manifest not in self.rewritten_manifests and manifest not in self.deleted_manifests:
850
+ # self.kept_manifests.append(manifest)
851
+ #
852
+ # def validate_files_counts(self) -> None:
853
+ # created_manifests = list(self.new_manifests) + self.added_manifests + self.rewritten_added_manifests
854
+ # created_files_count = self.active_files_count(created_manifests)
855
+ # replaced_manifests = list(self.rewritten_manifests) + list(self.deleted_manifests)
856
+ # replaced_files_count = self.active_files_count(replaced_manifests)
857
+ #
858
+ # if created_files_count != replaced_files_count:
859
+ # raise ValueError(
860
+ # f"Replaced and created manifests must have the same number of active files: {created_files_count} (new), {replaced_files_count} (old)"
861
+ # )
862
+ #
863
+ # def active_files_count(self, manifests: List[ManifestFile]) -> int:
864
+ # count = 0
865
+ # for manifest in manifests:
866
+ # count += manifest.existing_files_count + manifest.added_files_count
867
+ # return count
868
+ #
869
+ # # def reset(self) -> None:
870
+ # # self.clean_uncommitted(self.new_manifests, set())
871
+ # # self.entry_count = 0
872
+ # # self.kept_manifests.clear()
873
+ # # self.rewritten_manifests.clear()
874
+ # # self.new_manifests.clear()
875
+ # #
876
+ # # def clean_uncommitted(self, manifests: List[ManifestFile], committed: Set[ManifestFile]) -> None:
877
+ # # for manifest in manifests:
878
+ # # if manifest not in committed:
879
+ # # self.delete_file(manifest.manifest_path)
880
+ # #
881
+ # # def delete_file(self, path: str):
882
+ # # # Mock implementation
883
+ # # if os.path.exists(path):
884
+ # # os.remove(path)
885
+ #
886
+ # def with_snapshot_id(self, manifest: ManifestFile) -> ManifestFile:
887
+ # # Mock implementation
888
+ # return ManifestFile(manifest.manifest_path, snapshot_id=0)
889
+ #
890
+ # def new_manifest_writer(self, spec: PartitionSpec) -> ManifestWriter:
891
+ # # Mock implementation
892
+ # return ManifestWriter(spec)
893
+ #
894
+ # # class WriterWrapper:
895
+ # # def __init__(self, outer: 'BaseRewriteManifests', spec: PartitionSpec):
896
+ # # self.outer = outer
897
+ # # self.spec = spec
898
+ # # self.writer: Optional[ManifestWriter] = None
899
+ # # self.lock = threading.Lock()
900
+ # #
901
+ # # def add_entry(self, entry: ManifestEntry):
902
+ # # with self.lock:
903
+ # # if self.writer is None or self.writer.length() >= self.outer.manifest_target_size_bytes:
904
+ # # self._close_writer()
905
+ # # self.writer = self.outer.new_manifest_writer(self.spec)
906
+ # # self.writer.existing(entry)
907
+ # #
908
+ # # def _close_writer(self):
909
+ # # if self.writer:
910
+ # # self.writer.close()
911
+ # # self.outer.new_manifests.put(self.writer.to_manifest_file())
912
+ # #
913
+ # # def close(self):
914
+ # # with self.lock:
915
+ # # self._close_writer()
916
+
917
+
623
918
class _OverwriteFiles (_SnapshotProducer ["_OverwriteFiles" ]):
624
919
"""Overwrites data from the table. This will produce an OVERWRITE snapshot.
625
920
0 commit comments