Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove initial_change when dealing with table updates #950

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def __len__(self) -> int:

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the Schema class."""
if not other:
return False

if not isinstance(other, Schema):
return False

Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,22 +691,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:

schema: Schema = table_metadata.schema()
self._updates += (
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True),
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
)

spec: PartitionSpec = table_metadata.spec()
if spec.is_unpartitioned():
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
else:
self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=spec),)
self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
if sort_order is None or sort_order.is_unsorted:
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
else:
self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

self._updates += (
Expand Down
35 changes: 22 additions & 13 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing_extensions import Annotated

from pyiceberg.exceptions import CommitFailedException
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.partitioning import INITIAL_PARTITION_SPEC_ID, PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
Expand All @@ -36,7 +36,7 @@
Snapshot,
SnapshotLogEntry,
)
from pyiceberg.table.sorting import SortOrder
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
IcebergBaseModel,
Properties,
Expand Down Expand Up @@ -90,8 +90,6 @@ class AddSchemaUpdate(IcebergBaseModel):
# This field is required: https://github.com/apache/iceberg/pull/7445
last_column_id: int = Field(alias="last-column-id")

initial_change: bool = Field(default=False, exclude=True)


class SetCurrentSchemaUpdate(IcebergBaseModel):
action: Literal["set-current-schema"] = Field(default="set-current-schema")
Expand All @@ -104,8 +102,6 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
action: Literal["add-spec"] = Field(default="add-spec")
spec: PartitionSpec

initial_change: bool = Field(default=False, exclude=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of removing it directly, shall we go through a deprecation process given this is a public class? We could add a deprecation message (via field validator?) when this field is set explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 makes sense!



class SetDefaultSpecUpdate(IcebergBaseModel):
action: Literal["set-default-spec"] = Field(default="set-default-spec")
Expand All @@ -118,8 +114,6 @@ class AddSortOrderUpdate(IcebergBaseModel):
action: Literal["add-sort-order"] = Field(default="add-sort-order")
sort_order: SortOrder = Field(alias="sort-order")

initial_change: bool = Field(default=False, exclude=True)


class SetDefaultSortOrderUpdate(IcebergBaseModel):
action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order")
Expand Down Expand Up @@ -304,9 +298,10 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta
if update.last_column_id < base_metadata.last_column_id:
raise ValueError(f"Invalid last column id {update.last_column_id}, must be >= {base_metadata.last_column_id}")

skip_empty_schema = base_metadata.schemas == [Schema()]
metadata_updates: Dict[str, Any] = {
"last_column_id": update.last_column_id,
"schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_],
"schemas": [update.schema_] if skip_empty_schema else base_metadata.schemas + [update.schema_],
}

context.add_update(update)
Expand Down Expand Up @@ -335,19 +330,23 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta

@_apply_table_update.register(AddPartitionSpecUpdate)
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
if update.spec.spec_id == INITIAL_PARTITION_SPEC_ID:
# no op
return base_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to cause problem if I want to create a partitioned table from beginning. For example,

    iceberg_schema = Schema(*[NestedField(field_id=1, name="a", field_type=StringType())])
    iceberg_spec = PartitionSpec(*[PartitionField(source_id=1, field_id=1001, transform=IdentityTransform(), name='test1')])
    sort_order = SortOrder(*[SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC)])

    txn = catalog.create_table_transaction(identifier=identifier, schema=iceberg_schema, partition_spec=iceberg_spec, sort_order=sort_order)
    txn.commit_transaction()

    tbl = catalog.load_table(identifier)

    print("=====Schemas====")
    print(tbl.schemas())
    print("=====Specs====")
    print(tbl.specs())
    print("=====SortOrders====")
    print(tbl.sort_orders())


=====Schemas====
{0: Schema(NestedField(field_id=1, name='a', field_type=StringType(), required=False), schema_id=0, identifier_field_ids=[])}
=====Specs====
{0: PartitionSpec(spec_id=0)}
=====SortOrders====
{0: SortOrder(order_id=0), 1: SortOrder(SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), order_id=1)}

Although iceberg_spec is given, the table is still created with UNPARTITIONED_PARTITION_SPEC

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example.

on a meta level, this is the type of bug I'm afraid of when refactoring... how can we ensure other cases like this are captured

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe adding more tests for this specific case would be helpful. While we have extensive coverage for the logic of updating existing metadata, there are very few tests for create_table_transaction, where updates are applied to an empty metadata set to generate the entire metadata. Since this logic is unique to create_table_transaction, errors related to it are not detected by the current tests.


for spec in base_metadata.partition_specs:
if spec.spec_id == update.spec.spec_id and not update.initial_change:
if spec.spec_id == update.spec.spec_id:
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")

metadata_updates: Dict[str, Any] = {
"partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec],
"partition_specs": base_metadata.partition_specs + [update.spec],
"last_partition_id": max(
max([field.field_id for field in update.spec.fields], default=0),
base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
),
}

context.add_update(update)
return base_metadata.model_copy(update=metadata_updates)


Expand Down Expand Up @@ -443,12 +442,22 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(RemoveSnapshotRefUpdate)
def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
# (TODO) actually implement this
context.add_update(update)
return base_metadata


@_apply_table_update.register(AddSortOrderUpdate)
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
if update.sort_order == UNSORTED_SORT_ORDER:
# no op
return base_metadata
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As shown in the example above, if I specify a SortOrder in the beginning, I end up getting a table with an additional empty SortOrder (UNSORTED)

=====SortOrders====
{0: SortOrder(order_id=0), 1: SortOrder(SortField(source_id=1, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST), order_id=1)}

return base_metadata.model_copy(
update={
"sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order],
"sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)

Expand Down