From 02d46d52e114c46bfd3c37e42fadec40f9fd5b6b Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 10 Sep 2024 10:38:47 -0700 Subject: [PATCH] remove initial_change --- pyiceberg/schema.py | 3 --- pyiceberg/table/__init__.py | 10 ++++----- pyiceberg/table/update/__init__.py | 35 +++++++++++++++++++----------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/pyiceberg/schema.py b/pyiceberg/schema.py index cfe3fe3a7b..68ae052c31 100644 --- a/pyiceberg/schema.py +++ b/pyiceberg/schema.py @@ -111,9 +111,6 @@ def __len__(self) -> int: def __eq__(self, other: Any) -> bool: """Return the equality of two instances of the Schema class.""" - if not other: - return False - if not isinstance(other, Schema): return False diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 3eedff4581..1f6c7540bf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -691,22 +691,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None: schema: Schema = table_metadata.schema() self._updates += ( - AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True), + AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id), SetCurrentSchemaUpdate(schema_id=-1), ) spec: PartitionSpec = table_metadata.spec() if spec.is_unpartitioned(): - self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),) + self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),) else: - self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),) + self._updates += (AddPartitionSpecUpdate(spec=spec),) self._updates += (SetDefaultSpecUpdate(spec_id=-1),) sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id) if sort_order is None or sort_order.is_unsorted: - self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),) + self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),) else: - self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),) + self._updates += (AddSortOrderUpdate(sort_order=sort_order),) self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),) self._updates += ( diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 6e14046f9a..6ef894c5bb 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -27,7 +27,7 @@ from typing_extensions import Annotated from pyiceberg.exceptions import CommitFailedException -from pyiceberg.partitioning import PARTITION_FIELD_ID_START, PartitionSpec +from pyiceberg.partitioning import INITIAL_PARTITION_SPEC_ID, PARTITION_FIELD_ID_START, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.metadata import SUPPORTED_TABLE_FORMAT_VERSION, TableMetadata, TableMetadataUtil from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef @@ -36,7 +36,7 @@ Snapshot, SnapshotLogEntry, ) -from pyiceberg.table.sorting import SortOrder +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import ( IcebergBaseModel, Properties, @@ -90,8 +90,6 @@ class AddSchemaUpdate(IcebergBaseModel): # This field is required: https://github.com/apache/iceberg/pull/7445 last_column_id: int = Field(alias="last-column-id") - initial_change: bool = Field(default=False, exclude=True) - class SetCurrentSchemaUpdate(IcebergBaseModel): action: Literal["set-current-schema"] = Field(default="set-current-schema") @@ -104,8 +102,6 @@ class AddPartitionSpecUpdate(IcebergBaseModel): action: Literal["add-spec"] = Field(default="add-spec") spec: PartitionSpec - initial_change: bool = Field(default=False, exclude=True) - class SetDefaultSpecUpdate(IcebergBaseModel): action: Literal["set-default-spec"] = Field(default="set-default-spec") @@ -118,8 +114,6 @@ class AddSortOrderUpdate(IcebergBaseModel): action: Literal["add-sort-order"] = Field(default="add-sort-order") sort_order: SortOrder = Field(alias="sort-order") - initial_change: bool = Field(default=False, exclude=True) - class SetDefaultSortOrderUpdate(IcebergBaseModel): action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order") @@ -304,9 +298,10 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta if update.last_column_id < base_metadata.last_column_id: raise ValueError(f"Invalid last column id {update.last_column_id}, must be >= {base_metadata.last_column_id}") + skip_empty_schema = base_metadata.schemas == [Schema()] metadata_updates: Dict[str, Any] = { "last_column_id": update.last_column_id, - "schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_], + "schemas": [update.schema_] if skip_empty_schema else base_metadata.schemas + [update.schema_], } context.add_update(update) @@ -335,19 +330,23 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta @_apply_table_update.register(AddPartitionSpecUpdate) def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + context.add_update(update) + if update.spec.spec_id == INITIAL_PARTITION_SPEC_ID: + # no op + return base_metadata + for spec in base_metadata.partition_specs: - if spec.spec_id == update.spec.spec_id and not update.initial_change: + if spec.spec_id == update.spec.spec_id: raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}") metadata_updates: Dict[str, Any] = { - "partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec], + "partition_specs": base_metadata.partition_specs + [update.spec], "last_partition_id": max( max([field.field_id for field in update.spec.fields], default=0), base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1, ), } - context.add_update(update) return base_metadata.model_copy(update=metadata_updates) @@ -443,12 +442,22 @@ def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: _Tabl return base_metadata.model_copy(update=metadata_updates) +@_apply_table_update.register(RemoveSnapshotRefUpdate) +def _(update: RemoveSnapshotRefUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + # (TODO) actually implement this + context.add_update(update) + return base_metadata + + @_apply_table_update.register(AddSortOrderUpdate) def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: context.add_update(update) + if update.sort_order == UNSORTED_SORT_ORDER: + # no op + return base_metadata return base_metadata.model_copy( update={ - "sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order], + "sort_orders": base_metadata.sort_orders + [update.sort_order], } )