21
21
from copy import copy
22
22
from enum import Enum
23
23
from types import TracebackType
24
- from typing import Any , Generator
25
- from typing import Callable
26
- from typing import Dict
27
- from typing import Iterator
28
- from typing import List
29
- from typing import Literal
30
- from typing import Optional
31
- from typing import Type
24
+ from typing import (
25
+ Any ,
26
+ Dict ,
27
+ Iterator ,
28
+ List ,
29
+ Literal ,
30
+ Optional ,
31
+ Type ,
32
+ )
32
33
33
34
from pydantic_core import to_json
34
35
35
36
from pyiceberg .avro .file import AvroFile , AvroOutputFile
36
37
from pyiceberg .conversions import to_bytes
37
38
from pyiceberg .exceptions import ValidationError
38
- from pyiceberg .io import FileIO
39
- from pyiceberg .io import InputFile
40
- from pyiceberg .io import OutputFile
39
+ from pyiceberg .io import FileIO , InputFile , OutputFile
41
40
from pyiceberg .partitioning import PartitionSpec
42
41
from pyiceberg .schema import Schema
43
42
from pyiceberg .typedef import Record , TableVersion
53
52
StringType ,
54
53
StructType ,
55
54
)
56
- from pyiceberg .typedef import EMPTY_DICT
57
55
58
56
UNASSIGNED_SEQ = - 1
59
57
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
@@ -103,9 +101,7 @@ def __repr__(self) -> str:
103
101
104
102
DATA_FILE_TYPE : Dict [int , StructType ] = {
105
103
1 : StructType (
106
- NestedField (
107
- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
108
- ),
104
+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
109
105
NestedField (
110
106
field_id = 101 ,
111
107
name = "file_format" ,
@@ -120,15 +116,9 @@ def __repr__(self) -> str:
120
116
required = True ,
121
117
doc = "Partition data tuple, schema based on the partition spec" ,
122
118
),
119
+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
123
120
NestedField (
124
- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
125
- ),
126
- NestedField (
127
- field_id = 104 ,
128
- name = "file_size_in_bytes" ,
129
- field_type = LongType (),
130
- required = True ,
131
- doc = "Total file size in bytes" ,
121
+ field_id = 104 , name = "file_size_in_bytes" , field_type = LongType (), required = True , doc = "Total file size in bytes"
132
122
),
133
123
NestedField (
134
124
field_id = 105 ,
@@ -181,11 +171,7 @@ def __repr__(self) -> str:
181
171
doc = "Map of column id to upper bound" ,
182
172
),
183
173
NestedField (
184
- field_id = 131 ,
185
- name = "key_metadata" ,
186
- field_type = BinaryType (),
187
- required = False ,
188
- doc = "Encryption key metadata blob" ,
174
+ field_id = 131 , name = "key_metadata" , field_type = BinaryType (), required = False , doc = "Encryption key metadata blob"
189
175
),
190
176
NestedField (
191
177
field_id = 132 ,
@@ -205,9 +191,7 @@ def __repr__(self) -> str:
205
191
doc = "File format name: avro, orc, or parquet" ,
206
192
initial_default = DataFileContent .DATA ,
207
193
),
208
- NestedField (
209
- field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme"
210
- ),
194
+ NestedField (field_id = 100 , name = "file_path" , field_type = StringType (), required = True , doc = "Location URI with FS scheme" ),
211
195
NestedField (
212
196
field_id = 101 ,
213
197
name = "file_format" ,
@@ -222,15 +206,9 @@ def __repr__(self) -> str:
222
206
required = True ,
223
207
doc = "Partition data tuple, schema based on the partition spec" ,
224
208
),
209
+ NestedField (field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file" ),
225
210
NestedField (
226
- field_id = 103 , name = "record_count" , field_type = LongType (), required = True , doc = "Number of records in the file"
227
- ),
228
- NestedField (
229
- field_id = 104 ,
230
- name = "file_size_in_bytes" ,
231
- field_type = LongType (),
232
- required = True ,
233
- doc = "Total file size in bytes" ,
211
+ field_id = 104 , name = "file_size_in_bytes" , field_type = LongType (), required = True , doc = "Total file size in bytes"
234
212
),
235
213
NestedField (
236
214
field_id = 108 ,
@@ -275,11 +253,7 @@ def __repr__(self) -> str:
275
253
doc = "Map of column id to upper bound" ,
276
254
),
277
255
NestedField (
278
- field_id = 131 ,
279
- name = "key_metadata" ,
280
- field_type = BinaryType (),
281
- required = False ,
282
- doc = "Encryption key metadata blob" ,
256
+ field_id = 131 , name = "key_metadata" , field_type = BinaryType (), required = False , doc = "Encryption key metadata blob"
283
257
),
284
258
NestedField (
285
259
field_id = 132 ,
@@ -307,34 +281,28 @@ def __repr__(self) -> str:
307
281
308
282
309
283
def data_file_with_partition (partition_type : StructType , format_version : TableVersion ) -> StructType :
310
- data_file_partition_type = StructType (
311
- * [
312
- NestedField (
313
- field_id = field .field_id ,
314
- name = field .name ,
315
- field_type = field .field_type ,
316
- required = field .required ,
317
- )
318
- for field in partition_type .fields
319
- ]
320
- )
284
+ data_file_partition_type = StructType (* [
285
+ NestedField (
286
+ field_id = field .field_id ,
287
+ name = field .name ,
288
+ field_type = field .field_type ,
289
+ required = field .required ,
290
+ )
291
+ for field in partition_type .fields
292
+ ])
321
293
322
- return StructType (
323
- * [
324
- (
325
- NestedField (
326
- field_id = 102 ,
327
- name = "partition" ,
328
- field_type = data_file_partition_type ,
329
- required = True ,
330
- doc = "Partition data tuple, schema based on the partition spec" ,
331
- )
332
- if field .field_id == 102
333
- else field
334
- )
335
- for field in DATA_FILE_TYPE [format_version ].fields
336
- ]
337
- )
294
+ return StructType (* [
295
+ NestedField (
296
+ field_id = 102 ,
297
+ name = "partition" ,
298
+ field_type = data_file_partition_type ,
299
+ required = True ,
300
+ doc = "Partition data tuple, schema based on the partition spec" ,
301
+ )
302
+ if field .field_id == 102
303
+ else field
304
+ for field in DATA_FILE_TYPE [format_version ].fields
305
+ ])
338
306
339
307
340
308
class DataFile (Record ):
@@ -415,18 +383,14 @@ def __eq__(self, other: Any) -> bool:
415
383
),
416
384
}
417
385
418
- MANIFEST_ENTRY_SCHEMAS_STRUCT = {
419
- format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()
420
- }
386
+ MANIFEST_ENTRY_SCHEMAS_STRUCT = {format_version : schema .as_struct () for format_version , schema in MANIFEST_ENTRY_SCHEMAS .items ()}
421
387
422
388
423
389
def manifest_entry_schema_with_data_file (format_version : TableVersion , data_file : StructType ) -> Schema :
424
- return Schema (
425
- * [
426
- NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
427
- for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
428
- ]
429
- )
390
+ return Schema (* [
391
+ NestedField (2 , "data_file" , data_file , required = True ) if field .field_id == 2 else field
392
+ for field in MANIFEST_ENTRY_SCHEMAS [format_version ].fields
393
+ ])
430
394
431
395
432
396
class ManifestEntry (Record ):
@@ -496,9 +460,7 @@ def update(self, value: Any) -> None:
496
460
self ._min = min (self ._min , value )
497
461
498
462
499
- def construct_partition_summaries (
500
- spec : PartitionSpec , schema : Schema , partitions : List [Record ]
501
- ) -> List [PartitionFieldSummary ]:
463
+ def construct_partition_summaries (spec : PartitionSpec , schema : Schema , partitions : List [Record ]) -> List [PartitionFieldSummary ]:
502
464
types = [field .field_type for field in spec .partition_type (schema ).fields ]
503
465
field_stats = [PartitionFieldStats (field_type ) for field_type in types ]
504
466
for partition_keys in partitions :
@@ -522,9 +484,7 @@ def construct_partition_summaries(
522
484
NestedField (512 , "added_rows_count" , LongType (), required = False ),
523
485
NestedField (513 , "existing_rows_count" , LongType (), required = False ),
524
486
NestedField (514 , "deleted_rows_count" , LongType (), required = False ),
525
- NestedField (
526
- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
527
- ),
487
+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
528
488
NestedField (519 , "key_metadata" , BinaryType (), required = False ),
529
489
),
530
490
2 : Schema (
@@ -541,16 +501,12 @@ def construct_partition_summaries(
541
501
NestedField (512 , "added_rows_count" , LongType (), required = True ),
542
502
NestedField (513 , "existing_rows_count" , LongType (), required = True ),
543
503
NestedField (514 , "deleted_rows_count" , LongType (), required = True ),
544
- NestedField (
545
- 507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False
546
- ),
504
+ NestedField (507 , "partitions" , ListType (508 , PARTITION_FIELD_SUMMARY_TYPE , element_required = True ), required = False ),
547
505
NestedField (519 , "key_metadata" , BinaryType (), required = False ),
548
506
),
549
507
}
550
508
551
- MANIFEST_LIST_FILE_STRUCTS = {
552
- format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()
553
- }
509
+ MANIFEST_LIST_FILE_STRUCTS = {format_version : schema .as_struct () for format_version , schema in MANIFEST_LIST_FILE_SCHEMAS .items ()}
554
510
555
511
556
512
POSITIONAL_DELETE_SCHEMA = Schema (
@@ -669,16 +625,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani
669
625
670
626
# in v1 tables, the data sequence number is not persisted and can be safely defaulted to 0
671
627
# in v2 tables, the data sequence number should be inherited iff the entry status is ADDED
672
- if entry .data_sequence_number is None and (
673
- manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED
674
- ):
628
+ if entry .data_sequence_number is None and (manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED ):
675
629
entry .data_sequence_number = manifest .sequence_number
676
630
677
631
# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
678
632
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
679
- if entry .file_sequence_number is None and (
680
- manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED
681
- ):
633
+ if entry .file_sequence_number is None and (manifest .sequence_number == 0 or entry .status == ManifestEntryStatus .ADDED ):
682
634
# Only available in V2, always 0 in V1
683
635
entry .file_sequence_number = manifest .sequence_number
684
636
@@ -1001,11 +953,7 @@ def __init__(self, output_file: OutputFile, snapshot_id: int, parent_snapshot_id
1001
953
super ().__init__ (
1002
954
format_version = 1 ,
1003
955
output_file = output_file ,
1004
- meta = {
1005
- "snapshot-id" : str (snapshot_id ),
1006
- "parent-snapshot-id" : str (parent_snapshot_id ),
1007
- "format-version" : "1" ,
1008
- },
956
+ meta = {"snapshot-id" : str (snapshot_id ), "parent-snapshot-id" : str (parent_snapshot_id ), "format-version" : "1" },
1009
957
)
1010
958
1011
959
def prepare_manifest (self , manifest_file : ManifestFile ) -> ManifestFile :
@@ -1018,9 +966,7 @@ class ManifestListWriterV2(ManifestListWriter):
1018
966
_commit_snapshot_id : int
1019
967
_sequence_number : int
1020
968
1021
- def __init__ (
1022
- self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int
1023
- ):
969
+ def __init__ (self , output_file : OutputFile , snapshot_id : int , parent_snapshot_id : Optional [int ], sequence_number : int ):
1024
970
super ().__init__ (
1025
971
format_version = 2 ,
1026
972
output_file = output_file ,
@@ -1072,5 +1018,3 @@ def write_manifest_list(
1072
1018
return ManifestListWriterV2 (output_file , snapshot_id , parent_snapshot_id , sequence_number )
1073
1019
else :
1074
1020
raise ValueError (f"Cannot write manifest list for table version: { format_version } " )
1075
-
1076
-
0 commit comments