From c936c41bd580ba8d54d14799eef9a1922d254df3 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sat, 5 Oct 2024 20:55:16 -0700 Subject: [PATCH 1/9] make table metadata without validaiton --- dev/Dockerfile | 4 +- pyiceberg/catalog/__init__.py | 2 +- pyiceberg/table/metadata.py | 22 +++++++++-- pyiceberg/table/update/__init__.py | 20 +++------- tests/catalog/test_sql.py | 61 +++++++++++++++++++++++++++++- tests/conftest.py | 2 +- 6 files changed, 89 insertions(+), 22 deletions(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index 02affa78e2..a868c74fca 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -36,9 +36,9 @@ ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events WORKDIR ${SPARK_HOME} -ENV SPARK_VERSION=3.5.0 +ENV SPARK_VERSION=3.5.3 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 -ENV ICEBERG_VERSION=1.6.0 +ENV ICEBERG_VERSION=1.6.1 ENV PYICEBERG_VERSION=0.7.1 RUN curl --retry 3 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 7eb8b02d40..cc953a8849 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -1011,4 +1011,4 @@ def _empty_table_metadata() -> TableMetadata: Returns: TableMetadata: An empty TableMetadata instance. """ - return TableMetadataV1(location="", last_column_id=-1, schema=Schema()) + return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema()) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 1fea33010c..9aabb4b787 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -28,7 +28,7 @@ Union, ) -from pydantic import Field, field_serializer, field_validator, model_validator +from pydantic import Field, ValidationInfo, field_serializer, field_validator, model_validator from pydantic import ValidationError as PydanticValidationError from typing_extensions import Annotated @@ -478,7 +478,7 @@ class TableMetadataV2(TableMetadataCommonFields, IcebergBaseModel): """ @model_validator(mode="before") - def cleanup_snapshot_id(cls, data: Dict[str, Any]) -> Dict[str, Any]: + def cleanup_snapshot_id(cls, data: Dict[str, Any], info: ValidationInfo) -> Dict[str, Any]: return cleanup_snapshot_id(data) @model_validator(mode="after") @@ -490,7 +490,7 @@ def check_partition_specs(cls, table_metadata: TableMetadata) -> TableMetadata: return check_partition_specs(table_metadata) @model_validator(mode="after") - def check_sort_orders(cls, table_metadata: TableMetadata) -> TableMetadata: + def check_sort_orders(cls, table_metadata: TableMetadata, info: ValidationInfo) -> TableMetadata: return check_sort_orders(table_metadata) @model_validator(mode="after") @@ -587,5 +587,21 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata: else: raise ValidationError(f"Unknown format version: {format_version}") + @staticmethod + def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadata: + """Construct table metadata from an existing table without performing validation. + + This method is useful during a sequence of table updates when the model needs to be re-constructed but is not yet ready for validation. + """ + if table_metadata.format_version is None: + raise ValidationError(f"Missing format-version in TableMetadata: {table_metadata}") + + if table_metadata.format_version == 1: + return TableMetadataV1.model_construct(**dict(table_metadata)) + elif table_metadata.format_version == 2: + return TableMetadataV2.model_construct(**dict(table_metadata)) + else: + raise ValidationError(f"Unknown format version: {table_metadata.format_version}") + TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")] # type: ignore diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 6e14046f9a..7862de7b7c 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -18,7 +18,6 @@ import uuid from abc import ABC, abstractmethod -from copy import copy from datetime import datetime from functools import singledispatch from typing import TYPE_CHECKING, Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, Union @@ -90,8 +89,6 @@ class AddSchemaUpdate(IcebergBaseModel): # This field is required: https://github.com/apache/iceberg/pull/7445 last_column_id: int = Field(alias="last-column-id") - initial_change: bool = Field(default=False, exclude=True) - class SetCurrentSchemaUpdate(IcebergBaseModel): action: Literal["set-current-schema"] = Field(default="set-current-schema") @@ -104,8 +101,6 @@ class AddPartitionSpecUpdate(IcebergBaseModel): action: Literal["add-spec"] = Field(default="add-spec") spec: PartitionSpec - initial_change: bool = Field(default=False, exclude=True) - class SetDefaultSpecUpdate(IcebergBaseModel): action: Literal["set-default-spec"] = Field(default="set-default-spec") @@ -118,8 +113,6 @@ class AddSortOrderUpdate(IcebergBaseModel): action: Literal["add-sort-order"] = Field(default="add-sort-order") sort_order: SortOrder = Field(alias="sort-order") - initial_change: bool = Field(default=False, exclude=True) - class SetDefaultSortOrderUpdate(IcebergBaseModel): action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order") @@ -267,11 +260,10 @@ def _( elif update.format_version == base_metadata.format_version: return base_metadata - updated_metadata_data = copy(base_metadata.model_dump()) - updated_metadata_data["format-version"] = update.format_version + updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version}) context.add_update(update) - return TableMetadataUtil.parse_obj(updated_metadata_data) + return TableMetadataUtil._construct_without_validation(updated_metadata) @_apply_table_update.register(SetPropertiesUpdate) @@ -306,7 +298,7 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta metadata_updates: Dict[str, Any] = { "last_column_id": update.last_column_id, - "schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_], + "schemas": base_metadata.schemas + [update.schema_], } context.add_update(update) @@ -336,11 +328,11 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta @_apply_table_update.register(AddPartitionSpecUpdate) def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: for spec in base_metadata.partition_specs: - if spec.spec_id == update.spec.spec_id and not update.initial_change: + if spec.spec_id == update.spec.spec_id: raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}") metadata_updates: Dict[str, Any] = { - "partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec], + "partition_specs": base_metadata.partition_specs + [update.spec], "last_partition_id": max( max([field.field_id for field in update.spec.fields], default=0), base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1, @@ -448,7 +440,7 @@ def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableM context.add_update(update) return base_metadata.model_copy( update={ - "sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order], + "sort_orders": base_metadata.sort_orders + [update.sort_order], } ) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index e6c9a5b01b..f3b4bcbb76 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -41,7 +41,7 @@ ) from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( @@ -1492,6 +1492,65 @@ def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> N assert len(tbl.scan().to_arrow()) == 6 +@pytest.mark.parametrize( + "catalog", + [lazy_fixture("catalog_memory"), lazy_fixture("catalog_sqlite")], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_create_table_transaction_with_non_default_values( + catalog: SqlCatalog, table_schema_with_all_types: Schema, format_version: int +) -> None: + identifier = f"default.create_table_transaction_with_non_default_values_{format_version}" + identifier_ref = f"default.create_table_with_non_default_values_ref_{format_version}" + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + try: + catalog.drop_table(identifier=identifier_ref) + except NoSuchTableError: + pass + + iceberg_spec = PartitionSpec(*[ + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition") + ]) + + sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)]) + + txn = catalog.create_table_transaction( + identifier=identifier, + schema=table_schema_with_all_types, + partition_spec=iceberg_spec, + sort_order=sort_order, + properties={"format-version": format_version}, + ) + txn.commit_transaction() + + tbl = catalog.load_table(identifier) + + tbl_ref = catalog.create_table( + identifier=identifier_ref, + schema=table_schema_with_all_types, + partition_spec=iceberg_spec, + sort_order=sort_order, + properties={"format-version": format_version}, + ) + + assert tbl.format_version == tbl_ref.format_version + assert tbl.schema() == tbl.schema() + assert tbl.schemas() == tbl_ref.schemas() + assert tbl.spec() == tbl_ref.spec() + assert tbl.specs() == tbl_ref.specs() + assert tbl.sort_order() == tbl_ref.sort_order() + assert tbl.sort_orders() == tbl_ref.sort_orders() + + @pytest.mark.parametrize( "catalog", [ diff --git a/tests/conftest.py b/tests/conftest.py index b05947ebe6..3eb9365e4e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2221,7 +2221,7 @@ def spark() -> "SparkSession": spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" - iceberg_version = "1.4.3" + iceberg_version = "1.6.1" os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," From 07ac53cf2edcd8fb18144b6916865220263f681e Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 6 Oct 2024 05:45:07 +0000 Subject: [PATCH 2/9] update deletes test --- tests/integration/test_deletes.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 2cdf9916ee..2912d39281 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -218,9 +218,8 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio # Will rewrite a data file without the positional delete tbl.delete(EqualTo("number", 40)) - # One positional delete has been added, but an OVERWRITE status is set - # https://github.com/apache/iceberg/issues/10122 - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"] + # One positional delete has been added + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "overwrite"] assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]} @@ -448,7 +447,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert len(snapshots) == 3 # Snapshots produced by Spark - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"] + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "delete"] # Will rewrite one parquet file assert snapshots[2].summary == Summary( From b8adf4d880a869745f7b3809186ffb667ef4b21a Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 6 Oct 2024 05:56:40 +0000 Subject: [PATCH 3/9] remove info --- pyiceberg/table/metadata.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 9aabb4b787..2a641d228c 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -478,7 +478,7 @@ class TableMetadataV2(TableMetadataCommonFields, IcebergBaseModel): """ @model_validator(mode="before") - def cleanup_snapshot_id(cls, data: Dict[str, Any], info: ValidationInfo) -> Dict[str, Any]: + def cleanup_snapshot_id(cls, data: Dict[str, Any]) -> Dict[str, Any]: return cleanup_snapshot_id(data) @model_validator(mode="after") @@ -490,7 +490,7 @@ def check_partition_specs(cls, table_metadata: TableMetadata) -> TableMetadata: return check_partition_specs(table_metadata) @model_validator(mode="after") - def check_sort_orders(cls, table_metadata: TableMetadata, info: ValidationInfo) -> TableMetadata: + def check_sort_orders(cls, table_metadata: TableMetadata) -> TableMetadata: return check_sort_orders(table_metadata) @model_validator(mode="after") From 6c98bb09fa7f9d445a658c369d5076940e5a4667 Mon Sep 17 00:00:00 2001 From: HonahX Date: Sun, 6 Oct 2024 06:37:50 +0000 Subject: [PATCH 4/9] add deprecation message --- pyiceberg/table/metadata.py | 2 +- pyiceberg/table/update/__init__.py | 25 +++++++++++++++++++++++++ pyiceberg/utils/deprecated.py | 9 ++++++--- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 2a641d228c..8173bb2c03 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -28,7 +28,7 @@ Union, ) -from pydantic import Field, ValidationInfo, field_serializer, field_validator, model_validator +from pydantic import Field, field_serializer, field_validator, model_validator from pydantic import ValidationError as PydanticValidationError from typing_extensions import Annotated diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index 7862de7b7c..b81a2bf7f4 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -44,6 +44,7 @@ transform_dict_value_to_str, ) from pyiceberg.utils.datetime import datetime_to_millis +from pyiceberg.utils.deprecated import deprecation_notice from pyiceberg.utils.properties import property_as_int if TYPE_CHECKING: @@ -89,6 +90,14 @@ class AddSchemaUpdate(IcebergBaseModel): # This field is required: https://github.com/apache/iceberg/pull/7445 last_column_id: int = Field(alias="last-column-id") + initial_change: bool = Field( + default=False, + exclude=True, + deprecated=deprecation_notice( + deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field" + ), + ) + class SetCurrentSchemaUpdate(IcebergBaseModel): action: Literal["set-current-schema"] = Field(default="set-current-schema") @@ -101,6 +110,14 @@ class AddPartitionSpecUpdate(IcebergBaseModel): action: Literal["add-spec"] = Field(default="add-spec") spec: PartitionSpec + initial_change: bool = Field( + default=False, + exclude=True, + deprecated=deprecation_notice( + deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field" + ), + ) + class SetDefaultSpecUpdate(IcebergBaseModel): action: Literal["set-default-spec"] = Field(default="set-default-spec") @@ -113,6 +130,14 @@ class AddSortOrderUpdate(IcebergBaseModel): action: Literal["add-sort-order"] = Field(default="add-sort-order") sort_order: SortOrder = Field(alias="sort-order") + initial_change: bool = Field( + default=False, + exclude=True, + deprecated=deprecation_notice( + deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field" + ), + ) + class SetDefaultSortOrderUpdate(IcebergBaseModel): action: Literal["set-default-sort-order"] = Field(default="set-default-sort-order") diff --git a/pyiceberg/utils/deprecated.py b/pyiceberg/utils/deprecated.py index 92051b4fe6..188d0ce68b 100644 --- a/pyiceberg/utils/deprecated.py +++ b/pyiceberg/utils/deprecated.py @@ -41,14 +41,17 @@ def new_func(*args: Any, **kwargs: Any) -> Any: return decorator +def deprecation_notice(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> str: + """Return a deprecation notice.""" + return f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}" + + def deprecation_message(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> None: """Mark properties or behaviors as deprecated. Adding this will result in a warning being emitted. """ - message = f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}" - - _deprecation_warning(message) + _deprecation_warning(deprecation_notice(deprecated_in, removed_in, help_message)) def _deprecation_warning(message: str) -> None: From 20f6a94ac748cbfea546f65df1c1f821d77d330c Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 29 Oct 2024 20:33:12 -0700 Subject: [PATCH 5/9] revert lib version updates --- dev/Dockerfile | 4 ++-- tests/conftest.py | 2 +- tests/integration/test_deletes.py | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index a868c74fca..02affa78e2 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -36,9 +36,9 @@ ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events WORKDIR ${SPARK_HOME} -ENV SPARK_VERSION=3.5.3 +ENV SPARK_VERSION=3.5.0 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 -ENV ICEBERG_VERSION=1.6.1 +ENV ICEBERG_VERSION=1.6.0 ENV PYICEBERG_VERSION=0.7.1 RUN curl --retry 3 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ diff --git a/tests/conftest.py b/tests/conftest.py index 3eb9365e4e..b05947ebe6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2221,7 +2221,7 @@ def spark() -> "SparkSession": spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" - iceberg_version = "1.6.1" + iceberg_version = "1.4.3" os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 2912d39281..2cdf9916ee 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -218,8 +218,9 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio # Will rewrite a data file without the positional delete tbl.delete(EqualTo("number", 40)) - # One positional delete has been added - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "overwrite"] + # One positional delete has been added, but an OVERWRITE status is set + # https://github.com/apache/iceberg/issues/10122 + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"] assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]} @@ -447,7 +448,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert len(snapshots) == 3 # Snapshots produced by Spark - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "delete"] + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"] # Will rewrite one parquet file assert snapshots[2].summary == Summary( From 37c5ae7bf3813011b4d07bd76dc41280dc62fa30 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 29 Oct 2024 20:48:29 -0700 Subject: [PATCH 6/9] remove initial_changes usage in code --- pyiceberg/table/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 66b22a7a79..b0ad412bc7 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -692,22 +692,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None: schema: Schema = table_metadata.schema() self._updates += ( - AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True), + AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id), SetCurrentSchemaUpdate(schema_id=-1), ) spec: PartitionSpec = table_metadata.spec() if spec.is_unpartitioned(): - self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),) + self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),) else: - self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),) + self._updates += (AddPartitionSpecUpdate(spec=spec),) self._updates += (SetDefaultSpecUpdate(spec_id=-1),) sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id) if sort_order is None or sort_order.is_unsorted: - self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),) + self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),) else: - self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),) + self._updates += (AddSortOrderUpdate(sort_order=sort_order),) self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),) self._updates += ( From dce2b49b9337617b14861819928d6cac9661bba7 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 29 Oct 2024 21:14:12 -0700 Subject: [PATCH 7/9] move test to integration --- tests/catalog/test_sql.py | 61 +------------------- tests/integration/test_writes/test_writes.py | 59 ++++++++++++++++++- 2 files changed, 59 insertions(+), 61 deletions(-) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index f3b4bcbb76..e6c9a5b01b 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -41,7 +41,7 @@ ) from pyiceberg.io import FSSPEC_FILE_IO, PY_IO_IMPL from pyiceberg.io.pyarrow import _dataframe_to_data_files, schema_to_pyarrow -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC from pyiceberg.schema import Schema from pyiceberg.table.snapshots import Operation from pyiceberg.table.sorting import ( @@ -1492,65 +1492,6 @@ def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> N assert len(tbl.scan().to_arrow()) == 6 -@pytest.mark.parametrize( - "catalog", - [lazy_fixture("catalog_memory"), lazy_fixture("catalog_sqlite")], -) -@pytest.mark.parametrize("format_version", [1, 2]) -def test_create_table_transaction_with_non_default_values( - catalog: SqlCatalog, table_schema_with_all_types: Schema, format_version: int -) -> None: - identifier = f"default.create_table_transaction_with_non_default_values_{format_version}" - identifier_ref = f"default.create_table_with_non_default_values_ref_{format_version}" - try: - catalog.create_namespace("default") - except NamespaceAlreadyExistsError: - pass - - try: - catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - try: - catalog.drop_table(identifier=identifier_ref) - except NoSuchTableError: - pass - - iceberg_spec = PartitionSpec(*[ - PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition") - ]) - - sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)]) - - txn = catalog.create_table_transaction( - identifier=identifier, - schema=table_schema_with_all_types, - partition_spec=iceberg_spec, - sort_order=sort_order, - properties={"format-version": format_version}, - ) - txn.commit_transaction() - - tbl = catalog.load_table(identifier) - - tbl_ref = catalog.create_table( - identifier=identifier_ref, - schema=table_schema_with_all_types, - partition_spec=iceberg_spec, - sort_order=sort_order, - properties={"format-version": format_version}, - ) - - assert tbl.format_version == tbl_ref.format_version - assert tbl.schema() == tbl.schema() - assert tbl.schemas() == tbl_ref.schemas() - assert tbl.spec() == tbl_ref.spec() - assert tbl.specs() == tbl_ref.specs() - assert tbl.sort_order() == tbl_ref.sort_order() - assert tbl.sort_orders() == tbl_ref.sort_orders() - - @pytest.mark.parametrize( "catalog", [ diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index fc2746c614..176e288734 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -45,6 +45,7 @@ from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import TableProperties +from pyiceberg.table.sorting import SortDirection, SortField, SortOrder from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform from pyiceberg.types import ( DateType, @@ -738,7 +739,7 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: if format_version == 1 and isinstance(catalog, RestCatalog): pytest.skip( - "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" + "There is a bug in the REST catalog image (https://github.com/apache/iceberg/pull/10369) that prevents create and commit a staged version 1 table" ) identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" @@ -787,6 +788,62 @@ def test_create_table_transaction(catalog: Catalog, format_version: int) -> None assert len(tbl.scan().to_arrow()) == 6 +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_create_table_with_non_default_values(catalog: Catalog, table_schema_with_all_types: Schema, format_version: int) -> None: + if format_version == 1 and isinstance(catalog, RestCatalog): + pytest.skip( + "There is a bug in the REST catalog image (https://github.com/apache/iceberg/pull/10369) that prevents create and commit a staged version 1 table" + ) + + identifier = f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}" + identifier_ref = f"default.arrow_create_table_transaction_with_non_default_values_ref_{catalog.name}_{format_version}" + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + try: + catalog.drop_table(identifier=identifier_ref) + except NoSuchTableError: + pass + + iceberg_spec = PartitionSpec(*[ + PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition") + ]) + + sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)]) + + txn = catalog.create_table_transaction( + identifier=identifier, + schema=table_schema_with_all_types, + partition_spec=iceberg_spec, + sort_order=sort_order, + properties={"format-version": format_version}, + ) + txn.commit_transaction() + + tbl = catalog.load_table(identifier) + + tbl_ref = catalog.create_table( + identifier=identifier_ref, + schema=table_schema_with_all_types, + partition_spec=iceberg_spec, + sort_order=sort_order, + properties={"format-version": format_version}, + ) + + assert tbl.format_version == tbl_ref.format_version + assert tbl.schema() == tbl.schema() + assert tbl.schemas() == tbl_ref.schemas() + assert tbl.spec() == tbl_ref.spec() + assert tbl.specs() == tbl_ref.specs() + assert tbl.sort_order() == tbl_ref.sort_order() + assert tbl.sort_orders() == tbl_ref.sort_orders() + + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2]) def test_table_properties_int_value( From f512cb6145e9b485ebb243e8d32b8ba3fa5a23f9 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 29 Oct 2024 21:15:15 -0700 Subject: [PATCH 8/9] fix typo --- tests/integration/test_writes/test_writes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 176e288734..a67bf3f5b7 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -836,7 +836,7 @@ def test_create_table_with_non_default_values(catalog: Catalog, table_schema_wit ) assert tbl.format_version == tbl_ref.format_version - assert tbl.schema() == tbl.schema() + assert tbl.schema() == tbl_ref.schema() assert tbl.schemas() == tbl_ref.schemas() assert tbl.spec() == tbl_ref.spec() assert tbl.specs() == tbl_ref.specs() From cdeb0e6833eb3bae895bb9c925ea28af0a2db09d Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 29 Oct 2024 21:26:23 -0700 Subject: [PATCH 9/9] update error string --- tests/integration/test_writes/test_writes.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index a67bf3f5b7..61653ab88c 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -739,7 +739,7 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: if format_version == 1 and isinstance(catalog, RestCatalog): pytest.skip( - "There is a bug in the REST catalog image (https://github.com/apache/iceberg/pull/10369) that prevents create and commit a staged version 1 table" + "There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table" ) identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" @@ -794,7 +794,7 @@ def test_create_table_transaction(catalog: Catalog, format_version: int) -> None def test_create_table_with_non_default_values(catalog: Catalog, table_schema_with_all_types: Schema, format_version: int) -> None: if format_version == 1 and isinstance(catalog, RestCatalog): pytest.skip( - "There is a bug in the REST catalog image (https://github.com/apache/iceberg/pull/10369) that prevents create and commit a staged version 1 table" + "There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table" ) identifier = f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}"