21
21
from copy import copy
22
22
from enum import Enum
23
23
from types import TracebackType
24
- from typing import Any , Generator
25
- from typing import Callable
26
- from typing import Dict
27
- from typing import Iterator
28
- from typing import List
29
- from typing import Literal
30
- from typing import Optional
31
- from typing import Type
24
+ from typing import (
25
+ Any ,
26
+ Dict ,
27
+ Iterator ,
28
+ List ,
29
+ Literal ,
30
+ Optional ,
31
+ Type ,
32
+ )
32
33
33
34
from pydantic_core import to_json
34
35
35
36
from pyiceberg .avro .file import AvroFile , AvroOutputFile
36
37
from pyiceberg .conversions import to_bytes
37
38
from pyiceberg .exceptions import ValidationError
38
- from pyiceberg .io import FileIO
39
- from pyiceberg .io import InputFile
40
- from pyiceberg .io import OutputFile
39
+ from pyiceberg .io import FileIO , InputFile , OutputFile
41
40
from pyiceberg .partitioning import PartitionSpec
42
41
from pyiceberg .schema import Schema
43
42
from pyiceberg .typedef import Record , TableVersion
53
52
StringType ,
54
53
StructType ,
55
54
)
56
- from pyiceberg .typedef import EMPTY_DICT
57
55
58
56
UNASSIGNED_SEQ = - 1
59
57
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -103,9 +101,7 @@ def __repr__(self) -> str:
103
101
104
102
DATA_FILE_TYPE : Dict [int , StructType ] = {
105
103
1 : StructType (
106
- NestedField (
107
- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
108
- ),
104
+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
109
105
NestedField (
110
106
field_id = 101 ,
111
107
name = "file_format" ,
@@ -120,15 +116,9 @@ def __repr__(self) -> str:
120
116
required = True ,
121
117
doc = "Partition data tuple, schema based on the partition spec" ,
122
118
),
119
+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
123
120
NestedField (
124
- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
125
- ),
126
- NestedField (
127
- field_id = 104 ,
128
- name = "file_size_in_bytes" ,
129
- field_type = LongType (),
130
- required = True ,
131
- doc = "Total file size in bytes" ,
121
+ field_id = 104 , name = "file_size_in_bytes" , field_type = LongType (), required = True , doc = "Total file size in bytes"
132
122
),
133
123
NestedField (
134
124
field_id = 105 ,
@@ -181,11 +171,7 @@ def __repr__(self) -> str:
181
171
doc = "Map of column id to upper bound" ,
182
172
),
183
173
NestedField (
184
- field_id = 131 ,
185
- name = "key_metadata" ,
186
- field_type = BinaryType (),
187
- required = False ,
188
- doc = "Encryption key metadata blob" ,
174
+ field_id = 131 , name = "key_metadata" , field_type = BinaryType (), required = False , doc = "Encryption key metadata blob"
189
175
),
190
176
NestedField (
191
177
field_id = 132 ,
@@ -205,9 +191,7 @@ def __repr__(self) -> str:
205
191
doc = "File format name: avro, orc, or parquet" ,
206
192
initial_default = DataFileContent .DATA ,
207
193
),
208
- NestedField (
209
- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
210
- ),
194
+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
211
195
NestedField (
212
196
field_id = 101 ,
213
197
name = "file_format" ,
@@ -222,15 +206,9 @@ def __repr__(self) -> str:
222
206
required = True ,
223
207
doc = "Partition data tuple, schema based on the partition spec" ,
224
208
),
209
+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
225
210
NestedField (
226
- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
227
- ),
228
- NestedField (
229
- field_id = 104 ,
230
- name = "file_size_in_bytes" ,
231
- field_type = LongType (),
232
- required = True ,
233
- doc = "Total file size in bytes" ,
211
+ field_id = 104 , name = "file_size_in_bytes" , field_type = LongType (), required = True , doc = "Total file size in bytes"
234
212
),
235
213
NestedField (
236
214
field_id = 108 ,
@@ -275,11 +253,7 @@ def __repr__(self) -> str:
275
253
doc = "Map of column id to upper bound" ,
276
254
),
277
255
NestedField (
278
- field_id = 131 ,
279
- name = "key_metadata" ,
280
- field_type = BinaryType (),
281
- required = False ,
282
- doc = "Encryption key metadata blob" ,
256
+ field_id = 131 , name = "key_metadata" , field_type = BinaryType (), required = False , doc = "Encryption key metadata blob"
283
257
),
284
258
NestedField (
285
259
field_id = 132 ,
@@ -307,34 +281,28 @@ def __repr__(self) -> str:
307
281
308
282
309
283
def data_file_with_partition (partition_type : StructType , format_version : TableVersion ) -> StructType :
310
- data_file_partition_type = StructType (
311
- * [
312
- NestedField (
313
- field_id = field .field_id ,
314
- name = field .name ,
315
- field_type = field .field_type ,
316
- required = field .required ,
317
- )
318
- for field in partition_type .fields
319
- ]
320
- )
284
+ data_file_partition_type = StructType (* [
285
+ NestedField (
286
+ field_id = field .field_id ,
287
+ name = field .name ,
288
+ field_type = field .field_type ,
289
+ required = field .required ,
290
+ )
291
+ for field in partition_type .fields
292
+ ])
321
293
322
- return StructType (
323
- * [
324
- (
325
- NestedField (
326
- field_id = 102 ,
327
- name = "partition" ,
328
- field_type = data_file_partition_type ,
329
- required = True ,
330
- doc = "Partition data tuple, schema based on the partition spec" ,
331
- )
332
- if field .field_id == 102
333
- else field
334
- )
335
- for field in DATA_FILE_TYPE [format_version ].fields
336
- ]
337
- )
294
+ return StructType (* [
295
+ NestedField (
296
+ field_id = 102 ,
297
+ name = "partition" ,
298
+ field_type = data_file_partition_type ,
299
+ required = True ,
300
+ doc = "Partition data tuple, schema based on the partition spec" ,
301
+ )
302
+ if field .field_id == 102
303
+ else field
304
+ for field in DATA_FILE_TYPE [format_version ].fields
305
+ ])
338
306
339
307
340
308
class DataFile (Record ):
@@ -415,18 +383,14 @@ def __eq__(self, other: Any) -> bool:
415
383
),
416
384
}
417
385
418
- MANIFEST_ENTRY_SCHEMAS_STRUCT = {
419
- format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()
420
- }
386
+ MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()}
421
387
422
388
423
389
def manifest_entry_schema_with_data_file (format_version : TableVersion , data_file : StructType ) -> Schema :
424
- return Schema (
425
- * [
426
- NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
427
- for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
428
- ]
429
- )
390
+ return Schema (* [
391
+ NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
392
+ for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
393
+ ])
430
394
431
395
432
396
class ManifestEntry (Record ):
@@ -534,9 +498,7 @@ def update(self, value: Any) -> None:
534
498
self ._min = min (self ._min , value )
535
499
536
500
537
- def construct_partition_summaries (
538
- spec : PartitionSpec , schema : Schema , partitions : List [Record ]
539
- ) -> List [PartitionFieldSummary ]:
501
+ def construct_partition_summaries (spec : PartitionSpec , schema : Schema , partitions : List [Record ]) -> List [PartitionFieldSummary ]:
540
502
types = [field .field_type for field in spec .partition_type (schema ).fields ]
541
503
field_stats = [PartitionFieldStats (field_type ) for field_type in types ]
542
504
for partition_keys in partitions :
@@ -560,9 +522,7 @@ def construct_partition_summaries(
560
522
NestedField (512 , "added_rows_count" , LongType (), required = False ),
561
523
NestedField (513 , "existing_rows_count" , LongType (), required = False ),
562
524
NestedField (514 , "deleted_rows_count" , LongType (), required = False ),
563
- NestedField (
564
- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
565
- ),
525
+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
566
526
NestedField (519 , "key_metadata" , BinaryType (), required = False ),
567
527
),
568
528
2 : Schema (
@@ -579,16 +539,12 @@ def construct_partition_summaries(
579
539
NestedField (512 , "added_rows_count" , LongType (), required = True ),
580
540
NestedField (513 , "existing_rows_count" , LongType (), required = True ),
581
541
NestedField (514 , "deleted_rows_count" , LongType (), required = True ),
582
- NestedField (
583
- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
584
- ),
542
+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
585
543
NestedField (519 , "key_metadata" , BinaryType (), required = False ),
586
544
),
587
545
}
588
546
589
- MANIFEST_LIST_FILE_STRUCTS = {
590
- format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()
591
- }
547
+ MANIFEST_LIST_FILE_STRUCTS = {format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()}
592
548
593
549
594
550
POSITIONAL_DELETE_SCHEMA = Schema (
@@ -712,9 +668,7 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
712
668
713
669
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
714
670
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
715
- if entry .file_sequence_number is None and (
716
- manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED
717
- ):
671
+ if entry .file_sequence_number is None and (manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED ):
718
672
# Only available in V2, always 0 in V1
719
673
entry .file_sequence_number = manifest .sequence_number
720
674
@@ -1069,11 +1023,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
1069
1023
super ().__init__ (
1070
1024
format_version = 1 ,
1071
1025
output_file = output_file ,
1072
- meta = {
1073
- "snapshot-id" : str (snapshot_id ),
1074
- "parent-snapshot-id" : str (parent_snapshot_id ),
1075
- "format-version" : "1" ,
1076
- },
1026
+ meta = {"snapshot-id" : str (snapshot_id ), "parent-snapshot-id" : str (parent_snapshot_id ), "format-version" : "1" },
1077
1027
)
1078
1028
1079
1029
def prepare_manifest (self , manifest_file : ManifestFile ) -> ManifestFile :
@@ -1086,9 +1036,7 @@ class ManifestListWriterV2(ManifestListWriter):
1086
1036
_commit_snapshot_id : int
1087
1037
_sequence_number : int
1088
1038
1089
- def __init__ (
1090
- self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int
1091
- ):
1039
+ def __init__ (self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int ):
1092
1040
super ().__init__ (
1093
1041
format_version = 2 ,
1094
1042
output_file = output_file ,
@@ -1140,5 +1088,3 @@ def write_manifest_list(
1140
1088
return ManifestListWriterV2 (output_file , snapshot_id , parent_snapshot_id , sequence_number )
1141
1089
else :
1142
1090
raise ValueError (f"Cannot write manifest list for table version: { format_version } " )
1143
-
1144
-
0 commit comments