Skip to content

Commit

Permalink
Fix downgrading of schema (#176)
Browse files Browse the repository at this point in the history
* Fix downgrading of schema

* Fix comparison

* Add suggestion

* Remove double assignment
  • Loading branch information
Fokko authored Dec 4, 2023
1 parent ba0c90f commit fc0a72f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 6 deletions.
18 changes: 12 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1412,13 +1412,19 @@ def commit(self) -> None:
"""Apply the pending changes and commit."""
new_schema = self._apply()

if new_schema != self._schema:
last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id)
updates = (
AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id),
SetCurrentSchemaUpdate(schema_id=-1),
)
existing_schema_id = next((schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None)

# Check if it is different current schema ID
if existing_schema_id != self._table.schema().schema_id:
requirements = (AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),)
if existing_schema_id is None:
last_column_id = max(self._table.metadata.last_column_id, new_schema.highest_field_id)
updates = (
AddSchemaUpdate(schema=new_schema, last_column_id=last_column_id),
SetCurrentSchemaUpdate(schema_id=-1),
)
else:
updates = (SetCurrentSchemaUpdate(schema_id=existing_schema_id),) # type: ignore

if self._transaction is not None:
self._transaction._append_updates(*updates) # pylint: disable=W0212
Expand Down
28 changes: 28 additions & 0 deletions tests/test_integration_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,34 @@ def test_no_changes_empty_commit(simple_table: Table, table_schema_simple: Schem
assert simple_table.schema() == table_schema_simple


@pytest.mark.integration
def test_revert_changes(simple_table: Table, table_schema_simple: Schema) -> None:
with simple_table.update_schema() as update:
update.add_column(path="data", field_type=IntegerType(), required=False)

with simple_table.update_schema(allow_incompatible_changes=True) as update:
update.delete_column(path="data")

assert simple_table.schemas() == {
0: Schema(
NestedField(field_id=1, name='foo', field_type=StringType(), required=False),
NestedField(field_id=2, name='bar', field_type=IntegerType(), required=True),
NestedField(field_id=3, name='baz', field_type=BooleanType(), required=False),
schema_id=0,
identifier_field_ids=[2],
),
1: Schema(
NestedField(field_id=1, name='foo', field_type=StringType(), required=False),
NestedField(field_id=2, name='bar', field_type=IntegerType(), required=True),
NestedField(field_id=3, name='baz', field_type=BooleanType(), required=False),
NestedField(field_id=4, name='data', field_type=IntegerType(), required=False),
schema_id=1,
identifier_field_ids=[2],
),
}
assert simple_table.schema().schema_id == 0


@pytest.mark.integration
def test_delete_field(simple_table: Table) -> None:
with simple_table.update_schema() as schema_update:
Expand Down

0 comments on commit fc0a72f

Please sign in to comment.