@@ -20,7 +20,7 @@ use std::collections::HashMap;
20
20
use itertools:: Itertools ;
21
21
22
22
use super :: { DataContentType , DataFile , PartitionSpecRef } ;
23
- use crate :: spec:: { Operation , SchemaRef , Summary } ;
23
+ use crate :: spec:: { ManifestContentType , ManifestFile , Operation , SchemaRef , Summary } ;
24
24
use crate :: { Error , ErrorKind , Result } ;
25
25
26
26
const ADDED_DATA_FILES : & str = "added-data-files" ;
@@ -55,6 +55,7 @@ pub struct SnapshotSummaryCollector {
55
55
partition_metrics : HashMap < String , UpdateMetrics > ,
56
56
max_changed_partitions_for_summaries : u64 ,
57
57
properties : HashMap < String , String > ,
58
+ trust_partition_metrics : bool ,
58
59
}
59
60
60
61
#[ allow( dead_code) ]
@@ -92,6 +93,12 @@ impl SnapshotSummaryCollector {
92
93
}
93
94
}
94
95
96
+ pub fn add_manifest ( & mut self , manifest : & ManifestFile ) {
97
+ self . trust_partition_metrics = false ;
98
+ self . partition_metrics . clear ( ) ;
99
+ self . metrics . add_manifest ( manifest) ;
100
+ }
101
+
95
102
pub fn update_partition_metrics (
96
103
& mut self ,
97
104
schema : SchemaRef ,
@@ -109,6 +116,23 @@ impl SnapshotSummaryCollector {
109
116
}
110
117
}
111
118
119
+ pub fn merge ( & mut self , summary : SnapshotSummaryCollector ) {
120
+ self . metrics . merge ( & summary. metrics ) ;
121
+ self . properties . extend ( summary. properties ) ;
122
+
123
+ if self . trust_partition_metrics && summary. trust_partition_metrics {
124
+ for ( partition, partition_metric) in summary. partition_metrics . iter ( ) {
125
+ self . partition_metrics
126
+ . entry ( partition. to_string ( ) )
127
+ . or_default ( )
128
+ . merge ( partition_metric) ;
129
+ }
130
+ } else {
131
+ self . partition_metrics . clear ( ) ;
132
+ self . trust_partition_metrics = false ;
133
+ }
134
+ }
135
+
112
136
pub fn build ( & self ) -> HashMap < String , String > {
113
137
let mut properties = self . metrics . to_map ( ) ;
114
138
let changed_partitions_count = self . partition_metrics . len ( ) as u64 ;
@@ -140,14 +164,14 @@ impl SnapshotSummaryCollector {
140
164
struct UpdateMetrics {
141
165
added_file_size : u64 ,
142
166
removed_file_size : u64 ,
143
- added_data_files : u64 ,
144
- removed_data_files : u64 ,
167
+ added_data_files : u32 ,
168
+ removed_data_files : u32 ,
145
169
added_eq_delete_files : u64 ,
146
170
removed_eq_delete_files : u64 ,
147
171
added_pos_delete_files : u64 ,
148
172
removed_pos_delete_files : u64 ,
149
- added_delete_files : u64 ,
150
- removed_delete_files : u64 ,
173
+ added_delete_files : u32 ,
174
+ removed_delete_files : u32 ,
151
175
added_records : u64 ,
152
176
deleted_records : u64 ,
153
177
added_pos_deletes : u64 ,
@@ -197,6 +221,21 @@ impl UpdateMetrics {
197
221
}
198
222
}
199
223
224
+ fn add_manifest ( & mut self , manifest : & ManifestFile ) {
225
+ match manifest. content {
226
+ ManifestContentType :: Data => {
227
+ self . added_data_files += manifest. added_files_count . unwrap_or ( 0 ) ;
228
+ self . added_records += manifest. added_rows_count . unwrap_or ( 0 ) ;
229
+ self . removed_data_files += manifest. deleted_files_count . unwrap_or ( 0 ) ;
230
+ self . deleted_records += manifest. deleted_rows_count . unwrap_or ( 0 ) ;
231
+ }
232
+ ManifestContentType :: Deletes => {
233
+ self . added_delete_files += manifest. added_files_count . unwrap_or ( 0 ) ;
234
+ self . removed_delete_files += manifest. deleted_files_count . unwrap_or ( 0 ) ;
235
+ }
236
+ }
237
+ }
238
+
200
239
fn to_map ( & self ) -> HashMap < String , String > {
201
240
let mut properties = HashMap :: new ( ) ;
202
241
set_if_positive ( & mut properties, self . added_file_size , ADDED_FILE_SIZE ) ;
@@ -253,10 +292,30 @@ impl UpdateMetrics {
253
292
) ;
254
293
properties
255
294
}
295
+
296
+ fn merge ( & mut self , other : & UpdateMetrics ) {
297
+ self . added_file_size += other. added_file_size ;
298
+ self . removed_file_size += other. removed_file_size ;
299
+ self . added_data_files += other. added_data_files ;
300
+ self . removed_data_files += other. removed_data_files ;
301
+ self . added_eq_delete_files += other. added_eq_delete_files ;
302
+ self . removed_eq_delete_files += other. removed_eq_delete_files ;
303
+ self . added_pos_delete_files += other. added_pos_delete_files ;
304
+ self . removed_pos_delete_files += other. removed_pos_delete_files ;
305
+ self . added_delete_files += other. added_delete_files ;
306
+ self . removed_delete_files += other. removed_delete_files ;
307
+ self . added_records += other. added_records ;
308
+ self . deleted_records += other. deleted_records ;
309
+ self . added_pos_deletes += other. added_pos_deletes ;
310
+ self . removed_pos_deletes += other. removed_pos_deletes ;
311
+ self . added_eq_deletes += other. added_eq_deletes ;
312
+ self . removed_eq_deletes += other. removed_eq_deletes ;
313
+ }
256
314
}
257
315
258
- fn set_if_positive ( properties : & mut HashMap < String , String > , value : u64 , property_name : & str ) {
259
- if value > 0 {
316
+ fn set_if_positive < T > ( properties : & mut HashMap < String , String > , value : T , property_name : & str )
317
+ where T : PartialOrd + Default + ToString {
318
+ if value > T :: default ( ) {
260
319
properties. insert ( property_name. to_string ( ) , value. to_string ( ) ) ;
261
320
}
262
321
}
@@ -752,4 +811,175 @@ mod tests {
752
811
assert ! ( partition_summary. contains( & format!( "{}=1" , ADDED_DATA_FILES ) ) ) ;
753
812
assert ! ( partition_summary. contains( & format!( "{}=20" , ADDED_RECORDS ) ) ) ;
754
813
}
814
+
815
+ #[ test]
816
+ fn test_snapshot_summary_collector_add_manifest ( ) {
817
+ let mut collector = SnapshotSummaryCollector :: default ( ) ;
818
+ collector. set_partition_summary_limit ( 10 ) ;
819
+
820
+ let manifest = ManifestFile {
821
+ manifest_path : "file://dummy.manifest" . to_string ( ) ,
822
+ manifest_length : 0 ,
823
+ partition_spec_id : 0 ,
824
+ content : ManifestContentType :: Data ,
825
+ sequence_number : 0 ,
826
+ min_sequence_number : 0 ,
827
+ added_snapshot_id : 0 ,
828
+ added_files_count : Some ( 3 ) ,
829
+ existing_files_count : Some ( 0 ) ,
830
+ deleted_files_count : Some ( 1 ) ,
831
+ added_rows_count : Some ( 100 ) ,
832
+ existing_rows_count : Some ( 0 ) ,
833
+ deleted_rows_count : Some ( 50 ) ,
834
+ partitions : Vec :: new ( ) ,
835
+ key_metadata : Vec :: new ( ) ,
836
+ } ;
837
+
838
+ collector
839
+ . partition_metrics
840
+ . insert ( "dummy" . to_string ( ) , UpdateMetrics :: default ( ) ) ;
841
+ collector. add_manifest ( & manifest) ;
842
+
843
+ let props = collector. build ( ) ;
844
+ assert_eq ! ( props. get( ADDED_DATA_FILES ) . unwrap( ) , "3" ) ;
845
+ assert_eq ! ( props. get( DELETED_DATA_FILES ) . unwrap( ) , "1" ) ;
846
+ assert_eq ! ( props. get( ADDED_RECORDS ) . unwrap( ) , "100" ) ;
847
+ assert_eq ! ( props. get( DELETED_RECORDS ) . unwrap( ) , "50" ) ;
848
+ }
849
+
850
+ #[ test]
851
+ fn test_snapshot_summary_collector_merge ( ) {
852
+ let schema = Arc :: new (
853
+ Schema :: builder ( )
854
+ . with_fields ( vec ! [
855
+ NestedField :: required( 1 , "id" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
856
+ NestedField :: required( 2 , "name" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
857
+ ] )
858
+ . build ( )
859
+ . unwrap ( ) ,
860
+ ) ;
861
+
862
+ let partition_spec = Arc :: new (
863
+ PartitionSpec :: builder ( schema. clone ( ) )
864
+ . add_unbound_fields ( vec ! [ UnboundPartitionField :: builder( )
865
+ . source_id( 2 )
866
+ . name( "year" . to_string( ) )
867
+ . transform( Transform :: Identity )
868
+ . build( ) ] )
869
+ . unwrap ( )
870
+ . with_spec_id ( 1 )
871
+ . build ( )
872
+ . unwrap ( ) ,
873
+ ) ;
874
+
875
+ let mut summary_one = SnapshotSummaryCollector :: default ( ) ;
876
+ let mut summary_two = SnapshotSummaryCollector :: default ( ) ;
877
+
878
+ summary_one. add_file (
879
+ & DataFile {
880
+ content : DataContentType :: Data ,
881
+ file_path : "test.parquet" . into ( ) ,
882
+ file_format : DataFileFormat :: Parquet ,
883
+ partition : Struct :: from_iter ( vec ! [ ] ) ,
884
+ record_count : 10 ,
885
+ file_size_in_bytes : 100 ,
886
+ column_sizes : HashMap :: new ( ) ,
887
+ value_counts : HashMap :: new ( ) ,
888
+ null_value_counts : HashMap :: new ( ) ,
889
+ nan_value_counts : HashMap :: new ( ) ,
890
+ lower_bounds : HashMap :: new ( ) ,
891
+ upper_bounds : HashMap :: new ( ) ,
892
+ key_metadata : None ,
893
+ split_offsets : vec ! [ ] ,
894
+ equality_ids : vec ! [ ] ,
895
+ sort_order_id : None ,
896
+ partition_spec_id : 0 ,
897
+ } ,
898
+ schema. clone ( ) ,
899
+ partition_spec. clone ( ) ,
900
+ ) ;
901
+
902
+ summary_two. add_file (
903
+ & DataFile {
904
+ content : DataContentType :: Data ,
905
+ file_path : "test.parquet" . into ( ) ,
906
+ file_format : DataFileFormat :: Parquet ,
907
+ partition : Struct :: from_iter ( vec ! [ ] ) ,
908
+ record_count : 20 ,
909
+ file_size_in_bytes : 200 ,
910
+ column_sizes : HashMap :: new ( ) ,
911
+ value_counts : HashMap :: new ( ) ,
912
+ null_value_counts : HashMap :: new ( ) ,
913
+ nan_value_counts : HashMap :: new ( ) ,
914
+ lower_bounds : HashMap :: new ( ) ,
915
+ upper_bounds : HashMap :: new ( ) ,
916
+ key_metadata : None ,
917
+ split_offsets : vec ! [ ] ,
918
+ equality_ids : vec ! [ ] ,
919
+ sort_order_id : None ,
920
+ partition_spec_id : 0 ,
921
+ } ,
922
+ schema. clone ( ) ,
923
+ partition_spec. clone ( ) ,
924
+ ) ;
925
+
926
+ summary_one. merge ( summary_two) ;
927
+ let props = summary_one. build ( ) ;
928
+ assert_eq ! ( props. get( ADDED_DATA_FILES ) . unwrap( ) , "2" ) ;
929
+ assert_eq ! ( props. get( ADDED_RECORDS ) . unwrap( ) , "30" ) ;
930
+
931
+ let mut summary_three = SnapshotSummaryCollector :: default ( ) ;
932
+ let mut summary_four = SnapshotSummaryCollector :: default ( ) ;
933
+
934
+ summary_three. add_manifest ( & ManifestFile {
935
+ manifest_path : "test.manifest" . to_string ( ) ,
936
+ manifest_length : 0 ,
937
+ partition_spec_id : 0 ,
938
+ content : ManifestContentType :: Data ,
939
+ sequence_number : 0 ,
940
+ min_sequence_number : 0 ,
941
+ added_snapshot_id : 0 ,
942
+ added_files_count : Some ( 1 ) ,
943
+ existing_files_count : Some ( 0 ) ,
944
+ deleted_files_count : Some ( 0 ) ,
945
+ added_rows_count : Some ( 5 ) ,
946
+ existing_rows_count : Some ( 0 ) ,
947
+ deleted_rows_count : Some ( 0 ) ,
948
+ partitions : Vec :: new ( ) ,
949
+ key_metadata : Vec :: new ( ) ,
950
+ } ) ;
951
+
952
+ summary_four. add_file (
953
+ & DataFile {
954
+ content : DataContentType :: Data ,
955
+ file_path : "test.parquet" . into ( ) ,
956
+ file_format : DataFileFormat :: Parquet ,
957
+ partition : Struct :: from_iter ( vec ! [ ] ) ,
958
+ record_count : 1 ,
959
+ file_size_in_bytes : 10 ,
960
+ column_sizes : HashMap :: new ( ) ,
961
+ value_counts : HashMap :: new ( ) ,
962
+ null_value_counts : HashMap :: new ( ) ,
963
+ nan_value_counts : HashMap :: new ( ) ,
964
+ lower_bounds : HashMap :: new ( ) ,
965
+ upper_bounds : HashMap :: new ( ) ,
966
+ key_metadata : None ,
967
+ split_offsets : vec ! [ ] ,
968
+ equality_ids : vec ! [ ] ,
969
+ sort_order_id : None ,
970
+ partition_spec_id : 0 ,
971
+ } ,
972
+ schema. clone ( ) ,
973
+ partition_spec. clone ( ) ,
974
+ ) ;
975
+
976
+ summary_three. merge ( summary_four) ;
977
+ let props = summary_three. build ( ) ;
978
+
979
+ assert_eq ! ( props. get( ADDED_DATA_FILES ) . unwrap( ) , "2" ) ;
980
+ assert_eq ! ( props. get( ADDED_RECORDS ) . unwrap( ) , "6" ) ;
981
+ assert ! ( props
982
+ . iter( )
983
+ . all( |( k, _) | !k. starts_with( CHANGED_PARTITION_PREFIX ) ) ) ;
984
+ }
755
985
}
0 commit comments