Skip to content

Commit

Permalink
remove initial_change
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinjqliu committed Sep 10, 2024
1 parent 1971fcf commit 90bacbe
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 21 deletions.
3 changes: 0 additions & 3 deletions pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ def __len__(self) -> int:

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the Schema class."""
if not other:
return False

if not isinstance(other, Schema):
return False

Expand Down
10 changes: 5 additions & 5 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,22 +684,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:

schema: Schema = table_metadata.schema()
self._updates += (
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True),
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
SetCurrentSchemaUpdate(schema_id=-1),
)

spec: PartitionSpec = table_metadata.spec()
if spec.is_unpartitioned():
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
else:
self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),)
self._updates += (AddPartitionSpecUpdate(spec=spec),)
self._updates += (SetDefaultSpecUpdate(spec_id=-1),)

sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
if sort_order is None or sort_order.is_unsorted:
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
else:
self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),)
self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)

self._updates += (
Expand Down
35 changes: 22 additions & 13 deletions pyiceberg/table/update/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing_extensions import Annotated

from pyiceberg.exceptions import CommitFailedException
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.partitioning import INITIAL_PARTITION_SPEC_ID, PARTITION_FIELD_ID_START, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
Expand All @@ -36,7 +36,7 @@
Snapshot,
SnapshotLogEntry,
)
from pyiceberg.table.sorting import SortOrder
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
IcebergBaseModel,
Properties,
Expand Down Expand Up @@ -90,8 +90,6 @@ class AddSchemaUpdate(IcebergBaseModel):
# This field is required: https://github.com/apache/iceberg/pull/7445
last_column_id: int = Field(alias="last-column-id")

initial_change: bool = Field(default=False, exclude=True)


class SetCurrentSchemaUpdate(IcebergBaseModel):
action: Literal["set-current-schema"] = Field(default="set-current-schema")
Expand All @@ -104,8 +102,6 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
action: Literal["add-spec"] = Field(default="add-spec")
spec: PartitionSpec

initial_change: bool = Field(default=False, exclude=True)


class SetDefaultSpecUpdate(IcebergBaseModel):
action: Literal["set-default-spec"] = Field(default="set-default-spec")
Expand All @@ -118,8 +114,6 @@ class AddSortOrderUpdate(IcebergBaseModel):
action: Literal["add-sort-order"] = Field(default="add-sort-order")
sort_order: SortOrder = Field(alias="sort-order")

initial_change: bool = Field(default=False, exclude=True)


class SetDefaultSortOrderUpdate(IcebergBaseModel):
action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order")
Expand Down Expand Up @@ -304,9 +298,10 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta
if update.last_column_id < base_metadata.last_column_id:
raise ValueError(f"Invalid last column id {update.last_column_id}, must be >= {base_metadata.last_column_id}")

skip_empty_schema = base_metadata.schemas == [Schema()]
metadata_updates: Dict[str, Any] = {
"last_column_id": update.last_column_id,
"schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_],
"schemas": [update.schema_] if skip_empty_schema else base_metadata.schemas + [update.schema_],
}

context.add_update(update)
Expand Down Expand Up @@ -335,19 +330,23 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta

@_apply_table_update.register(AddPartitionSpecUpdate)
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
if update.spec.spec_id == INITIAL_PARTITION_SPEC_ID:
# no op
return base_metadata

for spec in base_metadata.partition_specs:
if spec.spec_id == update.spec.spec_id and not update.initial_change:
if spec.spec_id == update.spec.spec_id:
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")

metadata_updates: Dict[str, Any] = {
"partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec],
"partition_specs": base_metadata.partition_specs + [update.spec],
"last_partition_id": max(
max([field.field_id for field in update.spec.fields], default=0),
base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
),
}

context.add_update(update)
return base_metadata.model_copy(update=metadata_updates)


Expand Down Expand Up @@ -443,12 +442,22 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl
return base_metadata.model_copy(update=metadata_updates)


@_apply_table_update.register(RemoveSnapshotRefUpdate)
def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
# (TODO) actually implement this
context.add_update(update)
return base_metadata


@_apply_table_update.register(AddSortOrderUpdate)
def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
context.add_update(update)
if update.sort_order == UNSORTED_SORT_ORDER:
# no op
return base_metadata
return base_metadata.model_copy(
update={
"sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order],
"sort_orders": base_metadata.sort_orders + [update.sort_order],
}
)

Expand Down

0 comments on commit 90bacbe

Please sign in to comment.