Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema_id not incremented during schema evolution #290

Closed
kevinjqliu opened this issue Jan 21, 2024 · 9 comments
Closed

schema_id not incremented during schema evolution #290

kevinjqliu opened this issue Jan 21, 2024 · 9 comments

Comments

@kevinjqliu
Copy link
Contributor

kevinjqliu commented Jan 21, 2024

Apache Iceberg version

0.5.0 (latest release)

Please describe the bug 🐞

When updating the schema of an iceberg table (such as adding a column), the schema_id should be incremented.
schema_id is incremented during schema evolution in the Java library but not in the Python library

From the Iceberg spec

Evolution applies changes to the table’s current schema to produce a new schema that is identified by a unique schema ID, is added to the table’s list of schemas, and is set as the table’s current schema.

From the Java unit test TestTableMetadata.java
In particular, the newly created table schema has an id of 0 or TableMetadata.INITIAL_SCHEMA_ID (L1503)
The evolved schema after calling updateSchema updated the table schema id to 1 (L1520)

In comparison, from the Python unit test test_base.py
The original table schema id is 0, but even after calling update_schema()...commit(), the schema id remains 0 (L602 & L616)

Stacktrace:
In Java, the schema_id is incremented during schema evolution. (example1, example2)

In Python, this is done using the assign_fresh_schema_ids function (example1, example2)
However, this function does not increment the schema id. (source)
Note, the _get_and_increment function is used to increment the field id.

@HonahX
Copy link
Contributor

HonahX commented Jan 22, 2024

Hi @kevinjqliu. In Pyiceberg, the update_schema()...commit() increments the schema id:

return Schema(*struct.fields, schema_id=1 + max(self._table.schemas().keys()), identifier_field_ids=field_ids)

I think what you observed in test_base.py is because the _commit_table in in-memory catalog uses new_table_metadata instead of update_table_metadata to commit changes.

new_table_metadata is normally used when we want to create a new table, so it will re-create a new schema from the given one, assigning new field ids and reset schema-id to default (0). The update_table_metadata, in contrast, will update the metadata using the schema generated by update_schema()...commit() which has its schema-id incremented.

For example, this test verifies that the schema-id is incremented if we use GlueCatalog, which has the formal implementation of _commit_table:

def test_commit_table_update_schema(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
test_catalog = GlueCatalog(catalog_name, **{"s3.endpoint": moto_endpoint_url, "warehouse": f"s3://{BUCKET_NAME}"})
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier, table_schema_nested)
original_table_metadata = table.metadata
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 0
assert original_table_metadata.current_schema_id == 0
transaction = table.transaction()
update = transaction.update_schema()
update.add_column(path="b", field_type=IntegerType())
update.commit()
transaction.commit_transaction()
updated_table_metadata = table.metadata
assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location)
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.current_schema_id == 1
assert len(updated_table_metadata.schemas) == 2
new_schema = next(schema for schema in updated_table_metadata.schemas if schema.schema_id == 1)
assert new_schema
assert new_schema == update._apply()
assert new_schema.find_field("b").field_type == IntegerType()

The _commit_table in the InMemoryCatalog was implemented when we did not have a way to update the table metadata. It was a workaround to write basic tests for the transaction API and schema evolution. We should update the implementation to use the update_table_metadata.

I notice that you've already opened a PR for InMemoryCatalog and another for HiveCatalog. Thank you so much for the contribution!

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Jan 22, 2024

Thank you @HonahX

In #289 I reimplemented _commit_table with update_table_metadata and still got the problem above.
It looks like the real culprit was the difference between the update_schema()...commit() and CommitTableRequest (... AddSchemaUpdate(schema=...))
Just like you mentioned, update_schema()...commit() actually increments the schema_id but CommitTableRequest (... AddSchemaUpdate(schema=...)) assumes that the passed in schema's schema_id is already incremented.

Therefore the fix for test_commit_table is just to instantiate the schema with schema-id=1!

Pulling your comment out of the InMemory Catalog PR

The AddSchemaUpdate should contain a schema with changes applied and schema-id incremented. In pyiceberg, we trust update_schema API to give us the correct one, as I mentioned in this https://github.com/apache/iceberg-python/issues/290#issuecomment-1903304501.

Since this PR already updated the _commit_table for InMemory Catalog, I think we do not need to increment the schema-id here

@kevinjqliu
Copy link
Contributor Author

kevinjqliu commented Jan 22, 2024

Somewhat related, I noticed that Schema class __eq__ function does not check if the schema_ids are equal.
See

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the Schema class."""
if not other:
return False
if not isinstance(other, Schema):
return False
if len(self.columns) != len(other.columns):
return False
identifier_field_ids_is_equal = self.identifier_field_ids == other.identifier_field_ids
schema_is_equal = all(lhs == rhs for lhs, rhs in zip(self.columns, other.columns))
return identifier_field_ids_is_equal and schema_is_equal

In test_add_column, it checks if the two schemas are equal using schema_id=0 which is wrong, it should be schema_id=1 in L602 since schema evolution just happened
https://github.com/apache/iceberg-python/blob/main/tests/catalog/test_base.py#L592-L618

Should I file this as a separate issue?

@HonahX
Copy link
Contributor

HonahX commented Jan 23, 2024

I noticed that Schema class eq function does not check if the schema_ids are equal.

I think this is the intended behavior. We consider two schemas equal if they share the same set of fields and identifier fields, as these factors define the table structure. In contrast, the schema-id relates more to when the schema was added to the table. Say if we want to check if two tables have the same structure, we expect tbl1.schema() == tbl2.schema() return True if both tables have the same columns definition, even if two schemas have different schema-ids.

Another example: in update_schema() we rely on this equality check to determine if we need to add the new schema, or we can re-use some previously added one:

def commit(self) -> None:
"""Apply the pending changes and commit."""
new_schema = self._apply()
existing_schema_id = next((schema.schema_id for schema in self._table.metadata.schemas if schema == new_schema), None)
# Check if it is different current schema ID
if existing_schema_id != self._table.schema().schema_id:
requirements = (AssertCurrentSchemaId(current_schema_id=self._schema.schema_id),)

In test_add_column, it checks if the two schemas are equal using schema_id=0 which is wrong, it should be schema_id=1 in L602 since schema evolution just happened

You're right. For this test, we can remove the schema_id=0 and identifier_field_ids=[] in the "expected" schema because they will be initialized to these values in default. We can add another assertion to check the schema-id. Does this fix sound reasonable to you?

@kevinjqliu
Copy link
Contributor Author

Thanks for the explanation @HonahX

The equality check (==) for Schema here is overloaded. Sometimes it's used to check whether two tables have the same structure, i.e. tbl1.schema() == tbl2.schema(). Other times it's used to check whether the two schemas are objects with the same fields, including schema_id, i.e. in test_base.py#L592-L618.

I think it could be a common foot gun to use == with the schema_id constructor. Such as,

    assert given_table.schema() == Schema(
        NestedField(field_id=1, name="x", field_type=LongType(), required=True),
        NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"),
        NestedField(field_id=3, name="z", field_type=LongType(), required=True),
        NestedField(field_id=4, name="new_column1", field_type=IntegerType(), required=False),
        schema_id=0,
        identifier_field_ids=[],
    )

Looking at this code, I'd assume it's asserting that the schema_id must be 0

Couple options:

  1. Changing the definition of __eq__ for Schema.
  2. Creating a helper function which compares Schema for all fields
  3. Rely on the author to not use schema_id in the constructor and check for schema_id separately

Option (1) is a big refactor and changes the assumption in a lot of places, i.e. in update_schema().
I'm leaning toward options (2) and/or (3) but no preference

@HonahX
Copy link
Contributor

HonahX commented Jan 24, 2024

@kevinjqliu Thanks for sharing these options. I think (3) is enough here since this is just in test. There are only few places which require checking equality of both fields and schema_id. If you think (2) will be more helpful in the future, you can add one in conftest.py.

Do you want to include these in #289 ?

@kevinjqliu
Copy link
Contributor Author

Option (3) makes sense, I'll look for places where the Schema constructor sets the schema_id field.

I'll include the changes in a separate PR since #289 is already doing multiple things.

@Fokko
Copy link
Contributor

Fokko commented Feb 29, 2024

Oof, I think thas been fixed already in #470

@kevinjqliu
Copy link
Contributor Author

Awesome. Thank you @anupam-saini

I have another follow-up PR that adds schema_id checks in places where it was previously implicitly checked #487. We should be good to close this issue afterward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants