20
20
from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Set , Tuple
21
21
22
22
from pyiceberg .conversions import from_bytes
23
- from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , PartitionFieldSummary
23
+ from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , ManifestFile , PartitionFieldSummary
24
24
from pyiceberg .partitioning import PartitionSpec
25
25
from pyiceberg .table .snapshots import Snapshot , ancestors_of
26
26
from pyiceberg .types import PrimitiveType
@@ -523,7 +523,73 @@ def history(self) -> "pa.Table":
523
523
524
524
return pa .Table .from_pylist (history , schema = history_schema )
525
525
526
- def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
526
+ def _get_files_from_manifest (
527
+ self , manifest_list : ManifestFile , data_file_filter : Optional [Set [DataFileContent ]] = None
528
+ ) -> "pa.Table" :
529
+ import pyarrow as pa
530
+
531
+ files : list [dict [str , Any ]] = []
532
+ schema = self .tbl .metadata .schema ()
533
+ io = self .tbl .io
534
+
535
+ for manifest_entry in manifest_list .fetch_manifest_entry (io ):
536
+ data_file = manifest_entry .data_file
537
+ if data_file_filter and data_file .content not in data_file_filter :
538
+ continue
539
+ column_sizes = data_file .column_sizes or {}
540
+ value_counts = data_file .value_counts or {}
541
+ null_value_counts = data_file .null_value_counts or {}
542
+ nan_value_counts = data_file .nan_value_counts or {}
543
+ lower_bounds = data_file .lower_bounds or {}
544
+ upper_bounds = data_file .upper_bounds or {}
545
+ readable_metrics = {
546
+ schema .find_column_name (field .field_id ): {
547
+ "column_size" : column_sizes .get (field .field_id ),
548
+ "value_count" : value_counts .get (field .field_id ),
549
+ "null_value_count" : null_value_counts .get (field .field_id ),
550
+ "nan_value_count" : nan_value_counts .get (field .field_id ),
551
+ "lower_bound" : from_bytes (field .field_type , lower_bound )
552
+ if (lower_bound := lower_bounds .get (field .field_id ))
553
+ else None ,
554
+ "upper_bound" : from_bytes (field .field_type , upper_bound )
555
+ if (upper_bound := upper_bounds .get (field .field_id ))
556
+ else None ,
557
+ }
558
+ for field in self .tbl .metadata .schema ().fields
559
+ }
560
+ partition = data_file .partition
561
+ partition_record_dict = {
562
+ field .name : partition [pos ]
563
+ for pos , field in enumerate (self .tbl .metadata .specs ()[manifest_list .partition_spec_id ].fields )
564
+ }
565
+ files .append (
566
+ {
567
+ "content" : data_file .content ,
568
+ "file_path" : data_file .file_path ,
569
+ "file_format" : data_file .file_format ,
570
+ "spec_id" : data_file .spec_id ,
571
+ "partition" : partition_record_dict ,
572
+ "record_count" : data_file .record_count ,
573
+ "file_size_in_bytes" : data_file .file_size_in_bytes ,
574
+ "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
575
+ "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
576
+ "null_value_counts" : dict (data_file .null_value_counts ) if data_file .null_value_counts is not None else None ,
577
+ "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
578
+ "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
579
+ "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
580
+ "key_metadata" : data_file .key_metadata ,
581
+ "split_offsets" : data_file .split_offsets ,
582
+ "equality_ids" : data_file .equality_ids ,
583
+ "sort_order_id" : data_file .sort_order_id ,
584
+ "readable_metrics" : readable_metrics ,
585
+ }
586
+ )
587
+ return pa .Table .from_pylist (
588
+ files ,
589
+ schema = self ._get_files_schema (),
590
+ )
591
+
592
+ def _get_files_schema (self ) -> "pa.Schema" :
527
593
import pyarrow as pa
528
594
529
595
from pyiceberg .io .pyarrow import schema_to_pyarrow
@@ -544,6 +610,9 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
544
610
]
545
611
)
546
612
613
+ partition_record = self .tbl .metadata .specs_struct ()
614
+ pa_record_struct = schema_to_pyarrow (partition_record )
615
+
547
616
for field in self .tbl .metadata .schema ().fields :
548
617
readable_metrics_struct .append (
549
618
pa .field (schema .find_column_name (field .field_id ), _readable_metrics_struct (field .field_type ), nullable = False )
@@ -555,6 +624,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
555
624
pa .field ("file_path" , pa .string (), nullable = False ),
556
625
pa .field ("file_format" , pa .dictionary (pa .int32 (), pa .string ()), nullable = False ),
557
626
pa .field ("spec_id" , pa .int32 (), nullable = False ),
627
+ pa .field ("partition" , pa_record_struct , nullable = False ),
558
628
pa .field ("record_count" , pa .int64 (), nullable = False ),
559
629
pa .field ("file_size_in_bytes" , pa .int64 (), nullable = False ),
560
630
pa .field ("column_sizes" , pa .map_ (pa .int32 (), pa .int64 ()), nullable = True ),
@@ -570,71 +640,21 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
570
640
pa .field ("readable_metrics" , pa .struct (readable_metrics_struct ), nullable = True ),
571
641
]
572
642
)
643
+ return files_schema
573
644
574
- files : list [dict [str , Any ]] = []
645
+ def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
646
+ import pyarrow as pa
575
647
576
648
if not snapshot_id and not self .tbl .metadata .current_snapshot ():
577
- return pa .Table .from_pylist (
578
- files ,
579
- schema = files_schema ,
580
- )
581
- snapshot = self ._get_snapshot (snapshot_id )
649
+ return self ._get_files_schema ().empty_table ()
582
650
651
+ snapshot = self ._get_snapshot (snapshot_id )
583
652
io = self .tbl .io
653
+ files_table : list [pa .Table ] = []
584
654
for manifest_list in snapshot .manifests (io ):
585
- for manifest_entry in manifest_list .fetch_manifest_entry (io ):
586
- data_file = manifest_entry .data_file
587
- if data_file_filter and data_file .content not in data_file_filter :
588
- continue
589
- column_sizes = data_file .column_sizes or {}
590
- value_counts = data_file .value_counts or {}
591
- null_value_counts = data_file .null_value_counts or {}
592
- nan_value_counts = data_file .nan_value_counts or {}
593
- lower_bounds = data_file .lower_bounds or {}
594
- upper_bounds = data_file .upper_bounds or {}
595
- readable_metrics = {
596
- schema .find_column_name (field .field_id ): {
597
- "column_size" : column_sizes .get (field .field_id ),
598
- "value_count" : value_counts .get (field .field_id ),
599
- "null_value_count" : null_value_counts .get (field .field_id ),
600
- "nan_value_count" : nan_value_counts .get (field .field_id ),
601
- "lower_bound" : from_bytes (field .field_type , lower_bound )
602
- if (lower_bound := lower_bounds .get (field .field_id ))
603
- else None ,
604
- "upper_bound" : from_bytes (field .field_type , upper_bound )
605
- if (upper_bound := upper_bounds .get (field .field_id ))
606
- else None ,
607
- }
608
- for field in self .tbl .metadata .schema ().fields
609
- }
610
- files .append (
611
- {
612
- "content" : data_file .content ,
613
- "file_path" : data_file .file_path ,
614
- "file_format" : data_file .file_format ,
615
- "spec_id" : data_file .spec_id ,
616
- "record_count" : data_file .record_count ,
617
- "file_size_in_bytes" : data_file .file_size_in_bytes ,
618
- "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
619
- "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
620
- "null_value_counts" : dict (data_file .null_value_counts )
621
- if data_file .null_value_counts is not None
622
- else None ,
623
- "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
624
- "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
625
- "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
626
- "key_metadata" : data_file .key_metadata ,
627
- "split_offsets" : data_file .split_offsets ,
628
- "equality_ids" : data_file .equality_ids ,
629
- "sort_order_id" : data_file .sort_order_id ,
630
- "readable_metrics" : readable_metrics ,
631
- }
632
- )
655
+ files_table .append (self ._get_files_from_manifest (manifest_list , data_file_filter ))
633
656
634
- return pa .Table .from_pylist (
635
- files ,
636
- schema = files_schema ,
637
- )
657
+ return pa .concat_tables (files_table )
638
658
639
659
def files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
640
660
return self ._files (snapshot_id )
@@ -657,3 +677,30 @@ def all_manifests(self) -> "pa.Table":
657
677
lambda args : self ._generate_manifests_table (* args ), [(snapshot , True ) for snapshot in snapshots ]
658
678
)
659
679
return pa .concat_tables (manifests_by_snapshots )
680
+
681
+ def _all_files (self , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
682
+ import pyarrow as pa
683
+
684
+ snapshots = self .tbl .snapshots ()
685
+ if not snapshots :
686
+ return pa .Table .from_pylist ([], schema = self ._get_files_schema ())
687
+
688
+ executor = ExecutorFactory .get_or_create ()
689
+ manifest_lists = executor .map (lambda snapshot : snapshot .manifests (self .tbl .io ), snapshots )
690
+
691
+ unique_manifests = {(manifest .manifest_path , manifest ) for manifest_list in manifest_lists for manifest in manifest_list }
692
+
693
+ file_lists = executor .map (
694
+ lambda args : self ._get_files_from_manifest (* args ), [(manifest , data_file_filter ) for _ , manifest in unique_manifests ]
695
+ )
696
+
697
+ return pa .concat_tables (file_lists )
698
+
699
+ def all_files (self ) -> "pa.Table" :
700
+ return self ._all_files ()
701
+
702
+ def all_data_files (self ) -> "pa.Table" :
703
+ return self ._all_files ({DataFileContent .DATA })
704
+
705
+ def all_delete_files (self ) -> "pa.Table" :
706
+ return self ._all_files ({DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
0 commit comments