@@ -2481,13 +2481,13 @@ def _dataframe_to_data_files(
2481
2481
yield from write_file (io = io , table_metadata = table_metadata , tasks = iter ([WriteTask (write_uuid , next (counter ), df )]))
2482
2482
2483
2483
2484
- class _SnapshotProducer (UpdateTableMetadata ["_MergingSnapshotProducer " ]):
2484
+ class _SnapshotProducer (UpdateTableMetadata ["_SnapshotProducer " ]):
2485
2485
commit_uuid : uuid .UUID
2486
+ _io : FileIO
2486
2487
_operation : Operation
2487
2488
_snapshot_id : int
2488
2489
_parent_snapshot_id : Optional [int ]
2489
2490
_added_data_files : List [DataFile ]
2490
- _transaction : Optional [Transaction ]
2491
2491
_manifest_num_counter : itertools .count [int ]
2492
2492
2493
2493
def __init__ (
@@ -2507,7 +2507,6 @@ def __init__(
2507
2507
snapshot .snapshot_id if (snapshot := self ._transaction .table_metadata .current_snapshot ()) else None
2508
2508
)
2509
2509
self ._added_data_files = []
2510
- self ._transaction = transaction
2511
2510
self ._manifest_num_counter = itertools .count (0 )
2512
2511
2513
2512
def append_data_file (self , data_file : DataFile ) -> _SnapshotProducer :
@@ -2646,26 +2645,28 @@ def snapshot_id(self) -> int:
2646
2645
return self ._snapshot_id
2647
2646
2648
2647
def spec (self , spec_id : int ) -> PartitionSpec :
2649
- return self ._table .specs ()[spec_id ]
2648
+ return self ._transaction . table_metadata .specs ()[spec_id ]
2650
2649
2651
2650
def new_manifest_writer (self , spec : PartitionSpec ) -> ManifestWriter :
2652
2651
return write_manifest (
2653
- format_version = self ._table .format_version ,
2652
+ format_version = self ._transaction . table_metadata .format_version ,
2654
2653
spec = spec ,
2655
- schema = self ._table .schema (),
2654
+ schema = self ._transaction . table_metadata .schema (),
2656
2655
output_file = self .new_manifest_output (),
2657
2656
snapshot_id = self ._snapshot_id ,
2658
2657
)
2659
2658
2660
2659
def new_manifest_output (self ) -> OutputFile :
2661
- return self ._table . io .new_output (
2660
+ return self ._io .new_output (
2662
2661
_new_manifest_path (
2663
- location = self ._table .location (), num = next (self ._manifest_num_counter ), commit_uuid = self .commit_uuid
2662
+ location = self ._transaction .table_metadata .location ,
2663
+ num = next (self ._manifest_num_counter ),
2664
+ commit_uuid = self .commit_uuid ,
2664
2665
)
2665
2666
)
2666
2667
2667
2668
def fetch_manifest_entry (self , manifest : ManifestFile , discard_deleted : bool = True ) -> List [ManifestEntry ]:
2668
- return manifest .fetch_manifest_entry (io = self ._table . io , discard_deleted = discard_deleted )
2669
+ return manifest .fetch_manifest_entry (io = self ._io , discard_deleted = discard_deleted )
2669
2670
2670
2671
2671
2672
class AppendFiles (_SnapshotProducer , ABC ):
@@ -2699,19 +2700,25 @@ class MergeAppendFiles(AppendFiles):
2699
2700
def __init__ (
2700
2701
self ,
2701
2702
operation : Operation ,
2702
- table : Table ,
2703
+ transaction : Transaction ,
2704
+ io : FileIO ,
2703
2705
commit_uuid : Optional [uuid .UUID ] = None ,
2704
- transaction : Optional [Transaction ] = None ,
2705
2706
) -> None :
2706
- super ().__init__ (operation , table , commit_uuid , transaction )
2707
+ super ().__init__ (operation , transaction , io , commit_uuid )
2707
2708
self ._target_size_bytes = PropertyUtil .property_as_int (
2708
- self ._table .properties , TableProperties .MANIFEST_TARGET_SIZE_BYTES , TableProperties .MANIFEST_TARGET_SIZE_BYTES_DEFAULT
2709
+ self ._transaction .table_metadata .properties ,
2710
+ TableProperties .MANIFEST_TARGET_SIZE_BYTES ,
2711
+ TableProperties .MANIFEST_TARGET_SIZE_BYTES_DEFAULT ,
2709
2712
) # type: ignore
2710
2713
self ._min_count_to_merge = PropertyUtil .property_as_int (
2711
- self ._table .properties , TableProperties .MANIFEST_MIN_MERGE_COUNT , TableProperties .MANIFEST_MIN_MERGE_COUNT_DEFAULT
2714
+ self ._transaction .table_metadata .properties ,
2715
+ TableProperties .MANIFEST_MIN_MERGE_COUNT ,
2716
+ TableProperties .MANIFEST_MIN_MERGE_COUNT_DEFAULT ,
2712
2717
) # type: ignore
2713
2718
self ._merge_enabled = PropertyUtil .property_as_bool (
2714
- self ._table .properties , TableProperties .MANIFEST_MERGE_ENABLED , TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT
2719
+ self ._transaction .table_metadata .properties ,
2720
+ TableProperties .MANIFEST_MERGE_ENABLED ,
2721
+ TableProperties .MANIFEST_MERGE_ENABLED_DEFAULT ,
2715
2722
)
2716
2723
2717
2724
def _deleted_entries (self ) -> List [ManifestEntry ]:
@@ -2804,7 +2811,7 @@ def fast_append(self) -> FastAppendFiles:
2804
2811
return FastAppendFiles (operation = Operation .APPEND , transaction = self ._transaction , io = self ._io )
2805
2812
2806
2813
def merge_append (self ) -> MergeAppendFiles :
2807
- return MergeAppendFiles (table = self . _table , operation = Operation .APPEND , transaction = self ._transaction )
2814
+ return MergeAppendFiles (operation = Operation .APPEND , transaction = self ._transaction , io = self . _io )
2808
2815
2809
2816
def overwrite (self ) -> OverwriteFiles :
2810
2817
return OverwriteFiles (
0 commit comments