From 8d86f7143e07320af9dd9f5fb2949a421edc76a6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 16 Jan 2025 01:34:40 -0500 Subject: [PATCH 1/5] Use Apache archive (#1523) --- dev/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index 1cc70beda5..b55be39e9d 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -42,7 +42,7 @@ ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 ENV ICEBERG_VERSION=1.6.0 ENV PYICEBERG_VERSION=0.8.1 -RUN curl --retry 5 -s -C - https://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ +RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \ && tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \ && rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz From f4caa3ac927c626eeba5d0408f80ddd9b95214e0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 16 Jan 2025 07:54:26 +0100 Subject: [PATCH 2/5] Build: Bump mypy-boto3-glue from 1.35.93 to 1.36.0 (#1522) Bumps [mypy-boto3-glue](https://github.com/youtype/mypy_boto3_builder) from 1.35.93 to 1.36.0. - [Release notes](https://github.com/youtype/mypy_boto3_builder/releases) - [Commits](https://github.com/youtype/mypy_boto3_builder/commits) --- updated-dependencies: - dependency-name: mypy-boto3-glue dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- poetry.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index b67371ecbd..1d17ba6b52 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2886,13 +2886,13 @@ typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""} [[package]] name = "mypy-boto3-glue" -version = "1.35.93" -description = "Type annotations for boto3 Glue 1.35.93 service generated with mypy-boto3-builder 8.8.0" +version = "1.36.0" +description = "Type annotations for boto3 Glue 1.36.0 service generated with mypy-boto3-builder 8.8.0" optional = true python-versions = ">=3.8" files = [ - {file = "mypy_boto3_glue-1.35.93-py3-none-any.whl", hash = "sha256:cf46553f68048124bad65345b593ec5ba3806bd9bd15a1d7516d0cb3d79a0652"}, - {file = "mypy_boto3_glue-1.35.93.tar.gz", hash = "sha256:27759a83ffa5414b2589da83625816a3c7cb97600fec68578bd3012a9ae20ee8"}, + {file = "mypy_boto3_glue-1.36.0-py3-none-any.whl", hash = "sha256:5f0a134508496dc4f061d13dd38f91887d8182f9cdfda5f9310eb32c617359a8"}, + {file = "mypy_boto3_glue-1.36.0.tar.gz", hash = "sha256:a9a06ae29d445873a35b92f8b3f373deda6b0b2967f71bafa7bfcd8fe9f8a5c5"}, ] [package.dependencies] From 0a3a8863bbd31ecaca66a1dfa3a668f5740fc228 Mon Sep 17 00:00:00 2001 From: Andre Luis Anastacio Date: Thu, 16 Jan 2025 11:09:30 -0300 Subject: [PATCH 3/5] Add table statistics (#1285) * Add table statistics update * Update pyiceberg/table/statistics.py Co-authored-by: Fokko Driesprong * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong * Update mkdocs/docs/api.md Co-authored-by: Fokko Driesprong * Add Literal import * Rewrite tests --------- Co-authored-by: Fokko Driesprong --- mkdocs/docs/api.md | 23 +++++ pyiceberg/table/__init__.py | 18 ++++ pyiceberg/table/metadata.py | 9 ++ pyiceberg/table/statistics.py | 45 +++++++++ pyiceberg/table/update/__init__.py | 36 +++++++ pyiceberg/table/update/statistics.py | 75 ++++++++++++++ tests/conftest.py | 98 +++++++++++++++++++ .../integration/test_statistics_operations.py | 84 ++++++++++++++++ tests/table/test_init.py | 98 +++++++++++++++++++ tests/table/test_metadata.py | 4 +- 10 files changed, 488 insertions(+), 2 deletions(-) create mode 100644 pyiceberg/table/statistics.py create mode 100644 pyiceberg/table/update/statistics.py create mode 100644 tests/integration/test_statistics_operations.py diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index f1ef69b9cb..b5a3cfa8e3 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -1258,6 +1258,29 @@ with table.manage_snapshots() as ms: ms.create_branch(snapshot_id1, "Branch_A").create_tag(snapshot_id2, "tag789") ``` +## Table Statistics Management + +Manage table statistics with operations through the `Table` API: + +```python +# To run a specific operation +table.update_statistics().set_statistics(snapshot_id=1, statistics_file=statistics_file).commit() +# To run multiple operations +table.update_statistics() + .set_statistics(snapshot_id1, statistics_file1) + .remove_statistics(snapshot_id2) + .commit() +# Operations are applied on commit. +``` + +You can also use context managers to make more changes: + +```python +with table.update_statistics() as update: + update.set_statistics(snaphsot_id1, statistics_file) + update.remove_statistics(snapshot_id2) +``` + ## Query the data To query a table, a table scan is needed. A table scan accepts a filter, columns, optionally a limit and a snapshot ID: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index f2df84d7ee..057c02f260 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -118,6 +118,7 @@ _FastAppendFiles, ) from pyiceberg.table.update.spec import UpdateSpec +from pyiceberg.table.update.statistics import UpdateStatistics from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import ( EMPTY_DICT, @@ -1043,6 +1044,23 @@ def manage_snapshots(self) -> ManageSnapshots: """ return ManageSnapshots(transaction=Transaction(self, autocommit=True)) + def update_statistics(self) -> UpdateStatistics: + """ + Shorthand to run statistics management operations like add statistics and remove statistics. + + Use table.update_statistics().().commit() to run a specific operation. + Use table.update_statistics().().().commit() to run multiple operations. + + Pending changes are applied on commit. + + We can also use context managers to make more changes. For example: + + with table.update_statistics() as update: + update.set_statistics(snapshot_id=1, statistics_file=statistics_file) + update.remove_statistics(snapshot_id=2) + """ + return UpdateStatistics(transaction=Transaction(self, autocommit=True)) + def update_schema(self, allow_incompatible_changes: bool = False, case_sensitive: bool = True) -> UpdateSchema: """Create a new UpdateSchema to alter the columns of this table. diff --git a/pyiceberg/table/metadata.py b/pyiceberg/table/metadata.py index 8173bb2c03..ef1a324c45 100644 --- a/pyiceberg/table/metadata.py +++ b/pyiceberg/table/metadata.py @@ -44,6 +44,7 @@ SortOrder, assign_fresh_sort_order_ids, ) +from pyiceberg.table.statistics import StatisticsFile from pyiceberg.typedef import ( EMPTY_DICT, IcebergBaseModel, @@ -221,6 +222,14 @@ class TableMetadataCommonFields(IcebergBaseModel): There is always a main branch reference pointing to the current-snapshot-id even if the refs map is null.""" + statistics: List[StatisticsFile] = Field(default_factory=list) + """A optional list of table statistics files. + Table statistics files are valid Puffin files. Statistics are + informational. A reader can choose to ignore statistics + information. Statistics support is not required to read the + table correctly. A table can contain many statistics files + associated with different table snapshots.""" + # validators @field_validator("properties", mode="before") def transform_properties_dict_value_to_str(cls, properties: Properties) -> Dict[str, str]: diff --git a/pyiceberg/table/statistics.py b/pyiceberg/table/statistics.py new file mode 100644 index 0000000000..151f5e961c --- /dev/null +++ b/pyiceberg/table/statistics.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Dict, List, Literal, Optional + +from pydantic import Field + +from pyiceberg.typedef import IcebergBaseModel + + +class BlobMetadata(IcebergBaseModel): + type: Literal["apache-datasketches-theta-v1", "deletion-vector-v1"] + snapshot_id: int = Field(alias="snapshot-id") + sequence_number: int = Field(alias="sequence-number") + fields: List[int] + properties: Optional[Dict[str, str]] = None + + +class StatisticsFile(IcebergBaseModel): + snapshot_id: int = Field(alias="snapshot-id") + statistics_path: str = Field(alias="statistics-path") + file_size_in_bytes: int = Field(alias="file-size-in-bytes") + file_footer_size_in_bytes: int = Field(alias="file-footer-size-in-bytes") + key_metadata: Optional[str] = Field(alias="key-metadata", default=None) + blob_metadata: List[BlobMetadata] = Field(alias="blob-metadata") + + +def filter_statistics_by_snapshot_id( + statistics: List[StatisticsFile], + reject_snapshot_id: int, +) -> List[StatisticsFile]: + return [stat for stat in statistics if stat.snapshot_id != reject_snapshot_id] diff --git a/pyiceberg/table/update/__init__.py b/pyiceberg/table/update/__init__.py index d5e8c1aba1..3cf2db630d 100644 --- a/pyiceberg/table/update/__init__.py +++ b/pyiceberg/table/update/__init__.py @@ -36,6 +36,7 @@ SnapshotLogEntry, ) from pyiceberg.table.sorting import SortOrder +from pyiceberg.table.statistics import StatisticsFile, filter_statistics_by_snapshot_id from pyiceberg.typedef import ( IcebergBaseModel, Properties, @@ -174,6 +175,17 @@ class RemovePropertiesUpdate(IcebergBaseModel): removals: List[str] +class SetStatisticsUpdate(IcebergBaseModel): + action: Literal["set-statistics"] = Field(default="set-statistics") + snapshot_id: int = Field(alias="snapshot-id") + statistics: StatisticsFile + + +class RemoveStatisticsUpdate(IcebergBaseModel): + action: Literal["remove-statistics"] = Field(default="remove-statistics") + snapshot_id: int = Field(alias="snapshot-id") + + TableUpdate = Annotated[ Union[ AssignUUIDUpdate, @@ -191,6 +203,8 @@ class RemovePropertiesUpdate(IcebergBaseModel): SetLocationUpdate, SetPropertiesUpdate, RemovePropertiesUpdate, + SetStatisticsUpdate, + RemoveStatisticsUpdate, ], Field(discriminator="action"), ] @@ -475,6 +489,28 @@ def _( return base_metadata.model_copy(update={"default_sort_order_id": new_sort_order_id}) +@_apply_table_update.register(SetStatisticsUpdate) +def _(update: SetStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if update.snapshot_id != update.statistics.snapshot_id: + raise ValueError("Snapshot id in statistics does not match the snapshot id in the update") + + statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id) + context.add_update(update) + + return base_metadata.model_copy(update={"statistics": statistics + [update.statistics]}) + + +@_apply_table_update.register(RemoveStatisticsUpdate) +def _(update: RemoveStatisticsUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + if not any(stat.snapshot_id == update.snapshot_id for stat in base_metadata.statistics): + raise ValueError(f"Statistics with snapshot id {update.snapshot_id} does not exist") + + statistics = filter_statistics_by_snapshot_id(base_metadata.statistics, update.snapshot_id) + context.add_update(update) + + return base_metadata.model_copy(update={"statistics": statistics}) + + def update_table_metadata( base_metadata: TableMetadata, updates: Tuple[TableUpdate, ...], diff --git a/pyiceberg/table/update/statistics.py b/pyiceberg/table/update/statistics.py new file mode 100644 index 0000000000..e31025453b --- /dev/null +++ b/pyiceberg/table/update/statistics.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import TYPE_CHECKING, Tuple + +from pyiceberg.table.statistics import StatisticsFile +from pyiceberg.table.update import ( + RemoveStatisticsUpdate, + SetStatisticsUpdate, + TableUpdate, + UpdatesAndRequirements, + UpdateTableMetadata, +) + +if TYPE_CHECKING: + from pyiceberg.table import Transaction + + +class UpdateStatistics(UpdateTableMetadata["UpdateStatistics"]): + """ + Run statistics management operations using APIs. + + APIs include set_statistics and remove statistics operations. + + Use table.update_statistics().().commit() to run a specific operation. + Use table.update_statistics().().().commit() to run multiple operations. + + Pending changes are applied on commit. + + We can also use context managers to make more changes. For example: + + with table.update_statistics() as update: + update.set_statistics(snapshot_id=1, statistics_file=statistics_file) + update.remove_statistics(snapshot_id=2) + """ + + _updates: Tuple[TableUpdate, ...] = () + + def __init__(self, transaction: "Transaction") -> None: + super().__init__(transaction) + + def set_statistics(self, snapshot_id: int, statistics_file: StatisticsFile) -> "UpdateStatistics": + self._updates += ( + SetStatisticsUpdate( + snapshot_id=snapshot_id, + statistics=statistics_file, + ), + ) + + return self + + def remove_statistics(self, snapshot_id: int) -> "UpdateStatistics": + self._updates = ( + RemoveStatisticsUpdate( + snapshot_id=snapshot_id, + ), + ) + + return self + + def _commit(self) -> UpdatesAndRequirements: + return self._updates, () diff --git a/tests/conftest.py b/tests/conftest.py index ef980f3818..c8dde01563 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -955,6 +955,87 @@ def generate_snapshot( "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}}, } +TABLE_METADATA_V2_WITH_STATISTICS = { + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": True, + "type": "long", + } + ], + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/1.avro", + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1, + }, + ], + "statistics": [ + { + "snapshot-id": 3051729675574597004, + "statistics-path": "s3://a/b/stats.puffin", + "file-size-in-bytes": 413, + "file-footer-size-in-bytes": 42, + "blob-metadata": [ + { + "type": "apache-datasketches-theta-v1", + "snapshot-id": 3051729675574597004, + "sequence-number": 1, + "fields": [1], + } + ], + }, + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://a/b/stats.puffin", + "file-size-in-bytes": 413, + "file-footer-size-in-bytes": 42, + "blob-metadata": [ + { + "type": "deletion-vector-v1", + "snapshot-id": 3055729675574597004, + "sequence-number": 1, + "fields": [1], + } + ], + }, + ], + "snapshot-log": [], + "metadata-log": [], +} + @pytest.fixture def example_table_metadata_v2() -> Dict[str, Any]: @@ -966,6 +1047,11 @@ def table_metadata_v2_with_fixed_and_decimal_types() -> Dict[str, Any]: return TABLE_METADATA_V2_WITH_FIXED_AND_DECIMAL_TYPES +@pytest.fixture +def table_metadata_v2_with_statistics() -> Dict[str, Any]: + return TABLE_METADATA_V2_WITH_STATISTICS + + @pytest.fixture(scope="session") def metadata_location(tmp_path_factory: pytest.TempPathFactory) -> str: from pyiceberg.io.pyarrow import PyArrowFileIO @@ -2199,6 +2285,18 @@ def table_v2_with_extensive_snapshots(example_table_metadata_v2_with_extensive_s ) +@pytest.fixture +def table_v2_with_statistics(table_metadata_v2_with_statistics: Dict[str, Any]) -> Table: + table_metadata = TableMetadataV2(**table_metadata_v2_with_statistics) + return Table( + identifier=("database", "table"), + metadata=table_metadata, + metadata_location=f"{table_metadata.location}/uuid.metadata.json", + io=load_file_io(), + catalog=NoopCatalog("NoopCatalog"), + ) + + @pytest.fixture def bound_reference_str() -> BoundReference[str]: return BoundReference(field=NestedField(1, "field", StringType(), required=False), accessor=Accessor(position=0, inner=None)) diff --git a/tests/integration/test_statistics_operations.py b/tests/integration/test_statistics_operations.py new file mode 100644 index 0000000000..361bfebb63 --- /dev/null +++ b/tests/integration/test_statistics_operations.py @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import TYPE_CHECKING + +import pytest + +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.table.statistics import BlobMetadata, StatisticsFile + +if TYPE_CHECKING: + import pyarrow as pa + + from pyiceberg.catalog import Catalog + from pyiceberg.schema import Schema + from pyiceberg.table import Table + + +def _create_table_with_schema(catalog: "Catalog", schema: "Schema") -> "Table": + tbl_name = "default.test_table_statistics_operations" + + try: + catalog.drop_table(tbl_name) + except NoSuchTableError: + pass + return catalog.create_table(identifier=tbl_name, schema=schema) + + +@pytest.mark.integration +@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")]) +def test_manage_statistics(catalog: "Catalog", arrow_table_with_null: "pa.Table") -> None: + tbl = _create_table_with_schema(catalog, arrow_table_with_null.schema) + + tbl.append(arrow_table_with_null) + tbl.append(arrow_table_with_null) + + add_snapshot_id_1 = tbl.history()[0].snapshot_id + add_snapshot_id_2 = tbl.history()[1].snapshot_id + + def create_statistics_file(snapshot_id: int, type_name: str) -> StatisticsFile: + blob_metadata = BlobMetadata( + type=type_name, + snapshot_id=snapshot_id, + sequence_number=2, + fields=[1], + properties={"prop-key": "prop-value"}, + ) + + statistics_file = StatisticsFile( + snapshot_id=snapshot_id, + statistics_path="s3://bucket/warehouse/stats.puffin", + file_size_in_bytes=124, + file_footer_size_in_bytes=27, + blob_metadata=[blob_metadata], + ) + + return statistics_file + + statistics_file_snap_1 = create_statistics_file(add_snapshot_id_1, "apache-datasketches-theta-v1") + statistics_file_snap_2 = create_statistics_file(add_snapshot_id_2, "deletion-vector-v1") + + with tbl.update_statistics() as update: + update.set_statistics(add_snapshot_id_1, statistics_file_snap_1) + update.set_statistics(add_snapshot_id_2, statistics_file_snap_2) + + assert len(tbl.metadata.statistics) == 2 + + with tbl.update_statistics() as update: + update.remove_statistics(add_snapshot_id_1) + + assert len(tbl.metadata.statistics) == 1 diff --git a/tests/table/test_init.py b/tests/table/test_init.py index bcb2d643dc..e1f2ccc876 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name +import json import uuid from copy import copy from typing import Any, Dict @@ -64,6 +65,7 @@ SortField, SortOrder, ) +from pyiceberg.table.statistics import BlobMetadata, StatisticsFile from pyiceberg.table.update import ( AddSnapshotUpdate, AddSortOrderUpdate, @@ -76,9 +78,11 @@ AssertRefSnapshotId, AssertTableUUID, RemovePropertiesUpdate, + RemoveStatisticsUpdate, SetDefaultSortOrderUpdate, SetPropertiesUpdate, SetSnapshotRefUpdate, + SetStatisticsUpdate, _apply_table_update, _TableMetadataUpdateContext, update_table_metadata, @@ -1247,3 +1251,97 @@ def test_update_metadata_log_overflow(table_v2: Table) -> None: table_v2.metadata_location, ) assert len(new_metadata.metadata_log) == 1 + + +def test_set_statistics_update(table_v2_with_statistics: Table) -> None: + snapshot_id = table_v2_with_statistics.metadata.current_snapshot_id + + blob_metadata = BlobMetadata( + type="apache-datasketches-theta-v1", + snapshot_id=snapshot_id, + sequence_number=2, + fields=[1], + properties={"prop-key": "prop-value"}, + ) + + statistics_file = StatisticsFile( + snapshot_id=snapshot_id, + statistics_path="s3://bucket/warehouse/stats.puffin", + file_size_in_bytes=124, + file_footer_size_in_bytes=27, + blob_metadata=[blob_metadata], + ) + + update = SetStatisticsUpdate( + snapshot_id=snapshot_id, + statistics=statistics_file, + ) + + new_metadata = update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + expected = """ + { + "snapshot-id": 3055729675574597004, + "statistics-path": "s3://bucket/warehouse/stats.puffin", + "file-size-in-bytes": 124, + "file-footer-size-in-bytes": 27, + "blob-metadata": [ + { + "type": "apache-datasketches-theta-v1", + "snapshot-id": 3055729675574597004, + "sequence-number": 2, + "fields": [ + 1 + ], + "properties": { + "prop-key": "prop-value" + } + } + ] + }""" + + assert len(new_metadata.statistics) == 2 + + updated_statistics = [stat for stat in new_metadata.statistics if stat.snapshot_id == snapshot_id] + + assert len(updated_statistics) == 1 + assert json.loads(updated_statistics[0].model_dump_json()) == json.loads(expected) + + update = SetStatisticsUpdate( + snapshot_id=123456789, + statistics=statistics_file, + ) + + with pytest.raises( + ValueError, + match="Snapshot id in statistics does not match the snapshot id in the update", + ): + update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + +def test_remove_statistics_update(table_v2_with_statistics: Table) -> None: + update = RemoveStatisticsUpdate( + snapshot_id=3055729675574597004, + ) + + remove_metadata = update_table_metadata( + table_v2_with_statistics.metadata, + (update,), + ) + + assert len(remove_metadata.statistics) == 1 + + with pytest.raises( + ValueError, + match="Statistics with snapshot id 123456789 does not exist", + ): + update_table_metadata( + table_v2_with_statistics.metadata, + (RemoveStatisticsUpdate(snapshot_id=123456789),), + ) diff --git a/tests/table/test_metadata.py b/tests/table/test_metadata.py index 3b7ccf7c10..6423531304 100644 --- a/tests/table/test_metadata.py +++ b/tests/table/test_metadata.py @@ -168,13 +168,13 @@ def test_updating_metadata(example_table_metadata_v2: Dict[str, Any]) -> None: def test_serialize_v1(example_table_metadata_v1: Dict[str, Any]) -> None: table_metadata = TableMetadataV1(**example_table_metadata_v1) table_metadata_json = table_metadata.model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822,"manifest-list":"s3://bucket/test/manifest-list"}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"d20125c8-7284-442c-9aea-15fee620737c","last-updated-ms":1602638573874,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]}],"current-schema-id":0,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{},"snapshots":[{"snapshot-id":1925,"timestamp-ms":1602638573822,"manifest-list":"s3://bucket/test/manifest-list"}],"snapshot-log":[],"metadata-log":[],"sort-orders":[{"order-id":0,"fields":[]}],"default-sort-order-id":0,"refs":{},"statistics":[],"format-version":1,"schema":{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},"partition-spec":[{"name":"x","transform":"identity","source-id":1,"field-id":1000}]}""" assert table_metadata_json == expected def test_serialize_v2(example_table_metadata_v2: Dict[str, Any]) -> None: table_metadata = TableMetadataV2(**example_table_metadata_v2).model_dump_json() - expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"format-version":2,"last-sequence-number":34}""" + expected = """{"location":"s3://bucket/test/location","table-uuid":"9c12d441-03fe-4693-9a96-a0705ddf69c1","last-updated-ms":1602638573590,"last-column-id":3,"schemas":[{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true}],"schema-id":0,"identifier-field-ids":[]},{"type":"struct","fields":[{"id":1,"name":"x","type":"long","required":true},{"id":2,"name":"y","type":"long","required":true,"doc":"comment"},{"id":3,"name":"z","type":"long","required":true}],"schema-id":1,"identifier-field-ids":[1,2]}],"current-schema-id":1,"partition-specs":[{"spec-id":0,"fields":[{"source-id":1,"field-id":1000,"transform":"identity","name":"x"}]}],"default-spec-id":0,"last-partition-id":1000,"properties":{"read.split.target.size":"134217728"},"current-snapshot-id":3055729675574597004,"snapshots":[{"snapshot-id":3051729675574597004,"sequence-number":0,"timestamp-ms":1515100955770,"manifest-list":"s3://a/b/1.avro","summary":{"operation":"append"}},{"snapshot-id":3055729675574597004,"parent-snapshot-id":3051729675574597004,"sequence-number":1,"timestamp-ms":1555100955770,"manifest-list":"s3://a/b/2.avro","summary":{"operation":"append"},"schema-id":1}],"snapshot-log":[{"snapshot-id":3051729675574597004,"timestamp-ms":1515100955770},{"snapshot-id":3055729675574597004,"timestamp-ms":1555100955770}],"metadata-log":[{"metadata-file":"s3://bucket/.../v1.json","timestamp-ms":1515100}],"sort-orders":[{"order-id":3,"fields":[{"source-id":2,"transform":"identity","direction":"asc","null-order":"nulls-first"},{"source-id":3,"transform":"bucket[4]","direction":"desc","null-order":"nulls-last"}]}],"default-sort-order-id":3,"refs":{"test":{"snapshot-id":3051729675574597004,"type":"tag","max-ref-age-ms":10000000},"main":{"snapshot-id":3055729675574597004,"type":"branch"}},"statistics":[],"format-version":2,"last-sequence-number":34}""" assert table_metadata == expected From 50c33aa0119d9e2478b3865d864ec23a7c45b1d7 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+sungwy@users.noreply.github.com> Date: Thu, 16 Jan 2025 10:54:37 -0500 Subject: [PATCH 4/5] feat: Support Bucket and Truncate transforms on write (#1345) * introduce bucket transform * include pyiceberg-core * introduce bucket transform * include pyiceberg-core * resolve poetry conflict * support truncate transforms * Remove stale comment * fix poetry hash * avoid codespell error for truncate transform * adopt nits --- poetry.lock | 18 +- pyiceberg/transforms.py | 39 +++- pyproject.toml | 6 + .../test_writes/test_partitioned_writes.py | 170 ++++++++++++++++-- tests/test_transforms.py | 46 ++++- 5 files changed, 259 insertions(+), 20 deletions(-) diff --git a/poetry.lock b/poetry.lock index 1d17ba6b52..1c94a5f29a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3717,6 +3717,21 @@ files = [ [package.extras] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pyiceberg-core" +version = "0.4.0" +description = "" +optional = true +python-versions = "*" +files = [ + {file = "pyiceberg_core-0.4.0-cp39-abi3-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:5aec569271c96e18428d542f9b7007117a7232c06017f95cb239d42e952ad3b4"}, + {file = "pyiceberg_core-0.4.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5e74773e58efa4df83aba6f6265cdd41e446fa66fa4e343ca86395fed9f209ae"}, + {file = "pyiceberg_core-0.4.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7675d21a54bf3753c740d8df78ad7efe33f438096844e479d4f3493f84830925"}, + {file = "pyiceberg_core-0.4.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7058ad935a40b1838e4cdc5febd768878c1a51f83dca005d5a52a7fa280a2489"}, + {file = "pyiceberg_core-0.4.0-cp39-abi3-win_amd64.whl", hash = "sha256:a83eb4c2307ae3dd321a9360828fb043a4add2cc9797bef0bafa20894488fb07"}, + {file = "pyiceberg_core-0.4.0.tar.gz", hash = "sha256:d2e6138707868477b806ed354aee9c476e437913a331cb9ad9ad46b4054cd11f"}, +] + [[package]] name = "pyjwt" version = "2.10.1" @@ -5346,6 +5361,7 @@ glue = ["boto3", "mypy-boto3-glue"] hive = ["thrift"] pandas = ["pandas", "pyarrow"] pyarrow = ["pyarrow"] +pyiceberg-core = ["pyiceberg-core"] ray = ["pandas", "pyarrow", "ray", "ray"] rest-sigv4 = ["boto3"] s3fs = ["s3fs"] @@ -5357,4 +5373,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.0" python-versions = "^3.9, !=3.9.7" -content-hash = "306213628bcc69346e14742843c8e6bccf19c2615886943c2e1482a954a388ec" +content-hash = "cc789ef423714710f51e5452de7071642f4512511b1d205f77b952bb1df63a64" diff --git a/pyiceberg/transforms.py b/pyiceberg/transforms.py index 84e1c942d3..22dcdfe88a 100644 --- a/pyiceberg/transforms.py +++ b/pyiceberg/transforms.py @@ -85,6 +85,8 @@ if TYPE_CHECKING: import pyarrow as pa + ArrayLike = TypeVar("ArrayLike", pa.Array, pa.ChunkedArray) + S = TypeVar("S") T = TypeVar("T") @@ -193,6 +195,27 @@ def supports_pyarrow_transform(self) -> bool: @abstractmethod def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]": ... + def _pyiceberg_transform_wrapper( + self, transform_func: Callable[["ArrayLike", Any], "ArrayLike"], *args: Any + ) -> Callable[["ArrayLike"], "ArrayLike"]: + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For bucket/truncate transforms, PyArrow needs to be installed") from e + + def _transform(array: "ArrayLike") -> "ArrayLike": + if isinstance(array, pa.Array): + return transform_func(array, *args) + elif isinstance(array, pa.ChunkedArray): + result_chunks = [] + for arr in array.iterchunks(): + result_chunks.append(transform_func(arr, *args)) + return pa.chunked_array(result_chunks) + else: + raise ValueError(f"PyArrow array can only be of type pa.Array or pa.ChunkedArray, but found {type(array)}") + + return _transform + class BucketTransform(Transform[S, int]): """Base Transform class to transform a value into a bucket partition value. @@ -309,7 +332,13 @@ def __repr__(self) -> str: return f"BucketTransform(num_buckets={self._num_buckets})" def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]": - raise NotImplementedError() + from pyiceberg_core import transform as pyiceberg_core_transform + + return self._pyiceberg_transform_wrapper(pyiceberg_core_transform.bucket, self._num_buckets) + + @property + def supports_pyarrow_transform(self) -> bool: + return True class TimeResolution(IntEnum): @@ -827,7 +856,13 @@ def __repr__(self) -> str: return f"TruncateTransform(width={self._width})" def pyarrow_transform(self, source: IcebergType) -> "Callable[[pa.Array], pa.Array]": - raise NotImplementedError() + from pyiceberg_core import transform as pyiceberg_core_transform + + return self._pyiceberg_transform_wrapper(pyiceberg_core_transform.truncate, self._width) + + @property + def supports_pyarrow_transform(self) -> bool: + return True @singledispatch diff --git a/pyproject.toml b/pyproject.toml index 4b425141b5..5d2808db94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -79,6 +79,7 @@ psycopg2-binary = { version = ">=2.9.6", optional = true } sqlalchemy = { version = "^2.0.18", optional = true } getdaft = { version = ">=0.2.12", optional = true } cachetools = "^5.5.0" +pyiceberg-core = { version = "^0.4.0", optional = true } [tool.poetry.group.dev.dependencies] pytest = "7.4.4" @@ -842,6 +843,10 @@ ignore_missing_imports = true module = "daft.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pyiceberg_core.*" +ignore_missing_imports = true + [[tool.mypy.overrides]] module = "pyparsing.*" ignore_missing_imports = true @@ -1206,6 +1211,7 @@ sql-postgres = ["sqlalchemy", "psycopg2-binary"] sql-sqlite = ["sqlalchemy"] gcsfs = ["gcsfs"] rest-sigv4 = ["boto3"] +pyiceberg-core = ["pyiceberg-core"] [tool.pytest.ini_options] markers = [ diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 9e7632852c..1e6ea1b797 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -412,6 +412,12 @@ def test_dynamic_partition_overwrite_unpartitioned_evolve_to_identity_transform( spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, part_col: str, format_version: int ) -> None: identifier = f"default.unpartitioned_table_v{format_version}_evolve_into_identity_transformed_partition_field_{part_col}" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + tbl = session_catalog.create_table( identifier=identifier, schema=TABLE_SCHEMA, @@ -756,6 +762,55 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog) -> Non tbl.append("not a df") +@pytest.mark.integration +@pytest.mark.parametrize( + "spec", + [ + (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(2), name="int_trunc"))), + (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="long_trunc"))), + (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(2), name="string_trunc"))), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_truncate_transform( + spec: PartitionSpec, + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + format_version: int, +) -> None: + identifier = "default.truncate_transform" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = _create_table( + session_catalog=session_catalog, + identifier=identifier, + properties={"format-version": str(format_version)}, + data=[arrow_table_with_null], + partition_spec=spec, + ) + + assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" + df = spark.table(identifier) + assert df.count() == 3, f"Expected 3 total rows for {identifier}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}" + assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null" + + assert tbl.inspect.partitions().num_rows == 3 + files_df = spark.sql( + f""" + SELECT * + FROM {identifier}.files + """ + ) + assert files_df.count() == 3 + + @pytest.mark.integration @pytest.mark.parametrize( "spec", @@ -767,18 +822,52 @@ def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog) -> Non PartitionField(source_id=1, field_id=1002, transform=IdentityTransform(), name="bool"), ) ), - # none of non-identity is supported - (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_bucket"))), - (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=BucketTransform(2), name="long_bucket"))), - (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=BucketTransform(2), name="date_bucket"))), - (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=BucketTransform(2), name="timestamp_bucket"))), - (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=BucketTransform(2), name="timestamptz_bucket"))), - (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=BucketTransform(2), name="string_bucket"))), - (PartitionSpec(PartitionField(source_id=12, field_id=1001, transform=BucketTransform(2), name="fixed_bucket"))), - (PartitionSpec(PartitionField(source_id=11, field_id=1001, transform=BucketTransform(2), name="binary_bucket"))), - (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=TruncateTransform(2), name="int_trunc"))), - (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=TruncateTransform(2), name="long_trunc"))), - (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=TruncateTransform(2), name="string_trunc"))), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_identity_and_bucket_transform_spec( + spec: PartitionSpec, + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + format_version: int, +) -> None: + identifier = "default.identity_and_bucket_transform" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = _create_table( + session_catalog=session_catalog, + identifier=identifier, + properties={"format-version": str(format_version)}, + data=[arrow_table_with_null], + partition_spec=spec, + ) + + assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" + df = spark.table(identifier) + assert df.count() == 3, f"Expected 3 total rows for {identifier}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}" + assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null" + + assert tbl.inspect.partitions().num_rows == 3 + files_df = spark.sql( + f""" + SELECT * + FROM {identifier}.files + """ + ) + assert files_df.count() == 3 + + +@pytest.mark.integration +@pytest.mark.parametrize( + "spec", + [ (PartitionSpec(PartitionField(source_id=11, field_id=1001, transform=TruncateTransform(2), name="binary_trunc"))), ], ) @@ -801,11 +890,66 @@ def test_unsupported_transform( with pytest.raises( ValueError, - match="Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: *", + match="FeatureUnsupported => Unsupported data type for truncate transform: LargeBinary", ): tbl.append(arrow_table_with_null) +@pytest.mark.integration +@pytest.mark.parametrize( + "spec, expected_rows", + [ + (PartitionSpec(PartitionField(source_id=4, field_id=1001, transform=BucketTransform(2), name="int_bucket")), 3), + (PartitionSpec(PartitionField(source_id=5, field_id=1001, transform=BucketTransform(2), name="long_bucket")), 3), + (PartitionSpec(PartitionField(source_id=10, field_id=1001, transform=BucketTransform(2), name="date_bucket")), 3), + (PartitionSpec(PartitionField(source_id=8, field_id=1001, transform=BucketTransform(2), name="timestamp_bucket")), 3), + (PartitionSpec(PartitionField(source_id=9, field_id=1001, transform=BucketTransform(2), name="timestamptz_bucket")), 3), + (PartitionSpec(PartitionField(source_id=2, field_id=1001, transform=BucketTransform(2), name="string_bucket")), 3), + (PartitionSpec(PartitionField(source_id=12, field_id=1001, transform=BucketTransform(2), name="fixed_bucket")), 2), + (PartitionSpec(PartitionField(source_id=11, field_id=1001, transform=BucketTransform(2), name="binary_bucket")), 2), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_bucket_transform( + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + spec: PartitionSpec, + expected_rows: int, + format_version: int, +) -> None: + identifier = "default.bucket_transform" + + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = _create_table( + session_catalog=session_catalog, + identifier=identifier, + properties={"format-version": str(format_version)}, + data=[arrow_table_with_null], + partition_spec=spec, + ) + + assert tbl.format_version == format_version, f"Expected v{format_version}, got: v{tbl.format_version}" + df = spark.table(identifier) + assert df.count() == 3, f"Expected 3 total rows for {identifier}" + for col in arrow_table_with_null.column_names: + assert df.where(f"{col} is not null").count() == 2, f"Expected 2 non-null rows for {col}" + assert df.where(f"{col} is null").count() == 1, f"Expected 1 null row for {col} is null" + + assert tbl.inspect.partitions().num_rows == expected_rows + files_df = spark.sql( + f""" + SELECT * + FROM {identifier}.files + """ + ) + assert files_df.count() == expected_rows + + @pytest.mark.integration @pytest.mark.parametrize( "transform,expected_rows", diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 6d04a1e4ce..3088719a06 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -18,10 +18,11 @@ # pylint: disable=eval-used,protected-access,redefined-outer-name from datetime import date from decimal import Decimal -from typing import TYPE_CHECKING, Any, Callable, Optional +from typing import Any, Callable, Optional, Union from uuid import UUID import mmh3 as mmh3 +import pyarrow as pa import pytest from pydantic import ( BeforeValidator, @@ -116,9 +117,6 @@ timestamptz_to_micros, ) -if TYPE_CHECKING: - import pyarrow as pa - @pytest.mark.parametrize( "test_input,test_type,expected", @@ -1563,3 +1561,43 @@ def test_ymd_pyarrow_transforms( else: with pytest.raises(ValueError): transform.pyarrow_transform(DateType())(arrow_table_date_timestamps[source_col]) + + +@pytest.mark.parametrize( + "source_type, input_arr, expected, num_buckets", + [ + (IntegerType(), pa.array([1, 2]), pa.array([6, 2], type=pa.int32()), 10), + ( + IntegerType(), + pa.chunked_array([pa.array([1, 2]), pa.array([3, 4])]), + pa.chunked_array([pa.array([6, 2], type=pa.int32()), pa.array([5, 0], type=pa.int32())]), + 10, + ), + (IntegerType(), pa.array([1, 2]), pa.array([6, 2], type=pa.int32()), 10), + ], +) +def test_bucket_pyarrow_transforms( + source_type: PrimitiveType, + input_arr: Union[pa.Array, pa.ChunkedArray], + expected: Union[pa.Array, pa.ChunkedArray], + num_buckets: int, +) -> None: + transform: Transform[Any, Any] = BucketTransform(num_buckets=num_buckets) + assert expected == transform.pyarrow_transform(source_type)(input_arr) + + +@pytest.mark.parametrize( + "source_type, input_arr, expected, width", + [ + (StringType(), pa.array(["developer", "iceberg"]), pa.array(["dev", "ice"]), 3), + (IntegerType(), pa.array([1, -1]), pa.array([0, -10]), 10), + ], +) +def test_truncate_pyarrow_transforms( + source_type: PrimitiveType, + input_arr: Union[pa.Array, pa.ChunkedArray], + expected: Union[pa.Array, pa.ChunkedArray], + width: int, +) -> None: + transform: Transform[Any, Any] = TruncateTransform(width=width) + assert expected == transform.pyarrow_transform(source_type)(input_arr) From f948f564e3f4060cdabcdc7aac91d7c966a7b91c Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Thu, 16 Jan 2025 21:46:06 +0100 Subject: [PATCH 5/5] Bump Poetry to 2.0.1 (#1525) I'm feeling lucky --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b53a98da61..7a8c8ee945 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ help: ## Display this help install-poetry: ## Install poetry if the user has not done that yet. @if ! command -v poetry &> /dev/null; then \ echo "Poetry could not be found. Installing..."; \ - pip install --user poetry==1.8.5; \ + pip install --user poetry==2.0.1; \ else \ echo "Poetry is already installed."; \ fi