27
27
from typing_extensions import Annotated
28
28
29
29
from pyiceberg .exceptions import CommitFailedException
30
- from pyiceberg .partitioning import PARTITION_FIELD_ID_START , PartitionSpec
30
+ from pyiceberg .partitioning import INITIAL_PARTITION_SPEC_ID , PARTITION_FIELD_ID_START , PartitionSpec
31
31
from pyiceberg .schema import Schema
32
32
from pyiceberg .table .metadata import SUPPORTED_TABLE_FORMAT_VERSION , TableMetadata , TableMetadataUtil
33
33
from pyiceberg .table .refs import MAIN_BRANCH , SnapshotRef
36
36
Snapshot ,
37
37
SnapshotLogEntry ,
38
38
)
39
- from pyiceberg .table .sorting import SortOrder
39
+ from pyiceberg .table .sorting import UNSORTED_SORT_ORDER , SortOrder
40
40
from pyiceberg .typedef import (
41
41
IcebergBaseModel ,
42
42
Properties ,
@@ -90,8 +90,6 @@ class AddSchemaUpdate(IcebergBaseModel):
90
90
# This field is required: https://github.com/apache/iceberg/pull/7445
91
91
last_column_id : int = Field (alias = "last-column-id" )
92
92
93
- initial_change : bool = Field (default = False , exclude = True )
94
-
95
93
96
94
class SetCurrentSchemaUpdate (IcebergBaseModel ):
97
95
action : Literal ["set-current-schema" ] = Field (default = "set-current-schema" )
@@ -104,8 +102,6 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
104
102
action : Literal ["add-spec" ] = Field (default = "add-spec" )
105
103
spec : PartitionSpec
106
104
107
- initial_change : bool = Field (default = False , exclude = True )
108
-
109
105
110
106
class SetDefaultSpecUpdate (IcebergBaseModel ):
111
107
action : Literal ["set-default-spec" ] = Field (default = "set-default-spec" )
@@ -118,8 +114,6 @@ class AddSortOrderUpdate(IcebergBaseModel):
118
114
action : Literal ["add-sort-order" ] = Field (default = "add-sort-order" )
119
115
sort_order : SortOrder = Field (alias = "sort-order" )
120
116
121
- initial_change : bool = Field (default = False , exclude = True )
122
-
123
117
124
118
class SetDefaultSortOrderUpdate (IcebergBaseModel ):
125
119
action : Literal ["set-default-sort-order" ] = Field (default = "set-default-sort-order" )
@@ -304,9 +298,10 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta
304
298
if update .last_column_id < base_metadata .last_column_id :
305
299
raise ValueError (f"Invalid last column id { update .last_column_id } , must be >= { base_metadata .last_column_id } " )
306
300
301
+ skip_empty_schema = base_metadata .schemas == [Schema ()]
307
302
metadata_updates : Dict [str , Any ] = {
308
303
"last_column_id" : update .last_column_id ,
309
- "schemas" : [update .schema_ ] if update . initial_change else base_metadata .schemas + [update .schema_ ],
304
+ "schemas" : [update .schema_ ] if skip_empty_schema else base_metadata .schemas + [update .schema_ ],
310
305
}
311
306
312
307
context .add_update (update )
@@ -335,19 +330,23 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta
335
330
336
331
@_apply_table_update .register (AddPartitionSpecUpdate )
337
332
def _ (update : AddPartitionSpecUpdate , base_metadata : TableMetadata , context : _TableMetadataUpdateContext ) -> TableMetadata :
333
+ context .add_update (update )
334
+ if update .spec .spec_id == INITIAL_PARTITION_SPEC_ID :
335
+ # no op
336
+ return base_metadata
337
+
338
338
for spec in base_metadata .partition_specs :
339
- if spec .spec_id == update .spec .spec_id and not update . initial_change :
339
+ if spec .spec_id == update .spec .spec_id :
340
340
raise ValueError (f"Partition spec with id { spec .spec_id } already exists: { spec } " )
341
341
342
342
metadata_updates : Dict [str , Any ] = {
343
- "partition_specs" : [ update . spec ] if update . initial_change else base_metadata .partition_specs + [update .spec ],
343
+ "partition_specs" : base_metadata .partition_specs + [update .spec ],
344
344
"last_partition_id" : max (
345
345
max ([field .field_id for field in update .spec .fields ], default = 0 ),
346
346
base_metadata .last_partition_id or PARTITION_FIELD_ID_START - 1 ,
347
347
),
348
348
}
349
349
350
- context .add_update (update )
351
350
return base_metadata .model_copy (update = metadata_updates )
352
351
353
352
@@ -443,12 +442,22 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
443
442
return base_metadata .model_copy (update = metadata_updates )
444
443
445
444
445
+ @_apply_table_update .register (RemoveSnapshotRefUpdate )
446
+ def _ (update : RemoveSnapshotRefUpdate , base_metadata : TableMetadata , context : _TableMetadataUpdateContext ) -> TableMetadata :
447
+ # (TODO) actually implement this
448
+ context .add_update (update )
449
+ return base_metadata
450
+
451
+
446
452
@_apply_table_update .register (AddSortOrderUpdate )
447
453
def _ (update : AddSortOrderUpdate , base_metadata : TableMetadata , context : _TableMetadataUpdateContext ) -> TableMetadata :
448
454
context .add_update (update )
455
+ if update .sort_order == UNSORTED_SORT_ORDER :
456
+ # no op
457
+ return base_metadata
449
458
return base_metadata .model_copy (
450
459
update = {
451
- "sort_orders" : [ update . sort_order ] if update . initial_change else base_metadata .sort_orders + [update .sort_order ],
460
+ "sort_orders" : base_metadata .sort_orders + [update .sort_order ],
452
461
}
453
462
)
454
463
0 commit comments