20
20
from typing import TYPE_CHECKING , Any , Dict , Iterator , List , Optional , Set , Tuple
21
21
22
22
from pyiceberg .conversions import from_bytes
23
- from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , PartitionFieldSummary
23
+ from pyiceberg .manifest import DataFile , DataFileContent , ManifestContent , ManifestFile , PartitionFieldSummary
24
24
from pyiceberg .partitioning import PartitionSpec
25
25
from pyiceberg .table .snapshots import Snapshot , ancestors_of
26
26
from pyiceberg .types import PrimitiveType
@@ -523,7 +523,62 @@ def history(self) -> "pa.Table":
523
523
524
524
return pa .Table .from_pylist (history , schema = history_schema )
525
525
526
- def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
526
+ def _files_by_manifest (
527
+ self , manifest_list : ManifestFile , data_file_filter : Optional [Set [DataFileContent ]] = None
528
+ ) -> List [Dict [str , Any ]]:
529
+ files : list [dict [str , Any ]] = []
530
+ schema = self .tbl .metadata .schema ()
531
+ io = self .tbl .io
532
+
533
+ for manifest_entry in manifest_list .fetch_manifest_entry (io ):
534
+ data_file = manifest_entry .data_file
535
+ if data_file_filter and data_file .content not in data_file_filter :
536
+ continue
537
+ column_sizes = data_file .column_sizes or {}
538
+ value_counts = data_file .value_counts or {}
539
+ null_value_counts = data_file .null_value_counts or {}
540
+ nan_value_counts = data_file .nan_value_counts or {}
541
+ lower_bounds = data_file .lower_bounds or {}
542
+ upper_bounds = data_file .upper_bounds or {}
543
+ readable_metrics = {
544
+ schema .find_column_name (field .field_id ): {
545
+ "column_size" : column_sizes .get (field .field_id ),
546
+ "value_count" : value_counts .get (field .field_id ),
547
+ "null_value_count" : null_value_counts .get (field .field_id ),
548
+ "nan_value_count" : nan_value_counts .get (field .field_id ),
549
+ "lower_bound" : from_bytes (field .field_type , lower_bound )
550
+ if (lower_bound := lower_bounds .get (field .field_id ))
551
+ else None ,
552
+ "upper_bound" : from_bytes (field .field_type , upper_bound )
553
+ if (upper_bound := upper_bounds .get (field .field_id ))
554
+ else None ,
555
+ }
556
+ for field in self .tbl .metadata .schema ().fields
557
+ }
558
+ files .append (
559
+ {
560
+ "content" : data_file .content ,
561
+ "file_path" : data_file .file_path ,
562
+ "file_format" : data_file .file_format ,
563
+ "spec_id" : data_file .spec_id ,
564
+ "record_count" : data_file .record_count ,
565
+ "file_size_in_bytes" : data_file .file_size_in_bytes ,
566
+ "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
567
+ "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
568
+ "null_value_counts" : dict (data_file .null_value_counts ) if data_file .null_value_counts is not None else None ,
569
+ "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
570
+ "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
571
+ "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
572
+ "key_metadata" : data_file .key_metadata ,
573
+ "split_offsets" : data_file .split_offsets ,
574
+ "equality_ids" : data_file .equality_ids ,
575
+ "sort_order_id" : data_file .sort_order_id ,
576
+ "readable_metrics" : readable_metrics ,
577
+ }
578
+ )
579
+ return files
580
+
581
+ def _get_files_schema (self ) -> "pa.Schema" :
527
582
import pyarrow as pa
528
583
529
584
from pyiceberg .io .pyarrow import schema_to_pyarrow
@@ -570,70 +625,27 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
570
625
pa .field ("readable_metrics" , pa .struct (readable_metrics_struct ), nullable = True ),
571
626
]
572
627
)
628
+ return files_schema
629
+
630
+ def _files (self , snapshot_id : Optional [int ] = None , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
631
+ import pyarrow as pa
573
632
574
633
files : list [dict [str , Any ]] = []
575
634
576
635
if not snapshot_id and not self .tbl .metadata .current_snapshot ():
577
636
return pa .Table .from_pylist (
578
637
files ,
579
- schema = files_schema ,
638
+ schema = self . _get_files_schema () ,
580
639
)
581
640
snapshot = self ._get_snapshot (snapshot_id )
582
641
583
642
io = self .tbl .io
584
643
for manifest_list in snapshot .manifests (io ):
585
- for manifest_entry in manifest_list .fetch_manifest_entry (io ):
586
- data_file = manifest_entry .data_file
587
- if data_file_filter and data_file .content not in data_file_filter :
588
- continue
589
- column_sizes = data_file .column_sizes or {}
590
- value_counts = data_file .value_counts or {}
591
- null_value_counts = data_file .null_value_counts or {}
592
- nan_value_counts = data_file .nan_value_counts or {}
593
- lower_bounds = data_file .lower_bounds or {}
594
- upper_bounds = data_file .upper_bounds or {}
595
- readable_metrics = {
596
- schema .find_column_name (field .field_id ): {
597
- "column_size" : column_sizes .get (field .field_id ),
598
- "value_count" : value_counts .get (field .field_id ),
599
- "null_value_count" : null_value_counts .get (field .field_id ),
600
- "nan_value_count" : nan_value_counts .get (field .field_id ),
601
- "lower_bound" : from_bytes (field .field_type , lower_bound )
602
- if (lower_bound := lower_bounds .get (field .field_id ))
603
- else None ,
604
- "upper_bound" : from_bytes (field .field_type , upper_bound )
605
- if (upper_bound := upper_bounds .get (field .field_id ))
606
- else None ,
607
- }
608
- for field in self .tbl .metadata .schema ().fields
609
- }
610
- files .append (
611
- {
612
- "content" : data_file .content ,
613
- "file_path" : data_file .file_path ,
614
- "file_format" : data_file .file_format ,
615
- "spec_id" : data_file .spec_id ,
616
- "record_count" : data_file .record_count ,
617
- "file_size_in_bytes" : data_file .file_size_in_bytes ,
618
- "column_sizes" : dict (data_file .column_sizes ) if data_file .column_sizes is not None else None ,
619
- "value_counts" : dict (data_file .value_counts ) if data_file .value_counts is not None else None ,
620
- "null_value_counts" : dict (data_file .null_value_counts )
621
- if data_file .null_value_counts is not None
622
- else None ,
623
- "nan_value_counts" : dict (data_file .nan_value_counts ) if data_file .nan_value_counts is not None else None ,
624
- "lower_bounds" : dict (data_file .lower_bounds ) if data_file .lower_bounds is not None else None ,
625
- "upper_bounds" : dict (data_file .upper_bounds ) if data_file .upper_bounds is not None else None ,
626
- "key_metadata" : data_file .key_metadata ,
627
- "split_offsets" : data_file .split_offsets ,
628
- "equality_ids" : data_file .equality_ids ,
629
- "sort_order_id" : data_file .sort_order_id ,
630
- "readable_metrics" : readable_metrics ,
631
- }
632
- )
644
+ files .extend (self ._files_by_manifest (manifest_list , data_file_filter ))
633
645
634
646
return pa .Table .from_pylist (
635
647
files ,
636
- schema = files_schema ,
648
+ schema = self . _get_files_schema () ,
637
649
)
638
650
639
651
def files (self , snapshot_id : Optional [int ] = None ) -> "pa.Table" :
@@ -657,3 +669,35 @@ def all_manifests(self) -> "pa.Table":
657
669
lambda args : self ._generate_manifests_table (* args ), [(snapshot , True ) for snapshot in snapshots ]
658
670
)
659
671
return pa .concat_tables (manifests_by_snapshots )
672
+
673
+ def _all_files (self , data_file_filter : Optional [Set [DataFileContent ]] = None ) -> "pa.Table" :
674
+ import pyarrow as pa
675
+
676
+ snapshots = self .tbl .snapshots ()
677
+ if not snapshots :
678
+ return pa .Table .from_pylist ([], schema = self ._get_files_schema ())
679
+
680
+ executor = ExecutorFactory .get_or_create ()
681
+ all_manifest_files_by_snapshot : Iterator [List [ManifestFile ]] = executor .map (
682
+ lambda args : args [0 ].manifests (self .tbl .io ), [(snapshot ,) for snapshot in snapshots ]
683
+ )
684
+ all_manifest_files = list (
685
+ {(manifest .manifest_path , manifest ) for manifest_list in all_manifest_files_by_snapshot for manifest in manifest_list }
686
+ )
687
+ all_files_by_manifest : Iterator [List [Dict [str , Any ]]] = executor .map (
688
+ lambda args : self ._files_by_manifest (* args ), [(manifest , data_file_filter ) for _ , manifest in all_manifest_files ]
689
+ )
690
+ all_files_list = [file for files in all_files_by_manifest for file in files ]
691
+ return pa .Table .from_pylist (
692
+ all_files_list ,
693
+ schema = self ._get_files_schema (),
694
+ )
695
+
696
+ def all_files (self ) -> "pa.Table" :
697
+ return self ._all_files ()
698
+
699
+ def all_data_files (self ) -> "pa.Table" :
700
+ return self ._all_files ({DataFileContent .DATA })
701
+
702
+ def all_delete_files (self ) -> "pa.Table" :
703
+ return self ._all_files ({DataFileContent .POSITION_DELETES , DataFileContent .EQUALITY_DELETES })
0 commit comments