From a8213a7f23b84ccb4942e06b0bf9d02633c57118 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Tue, 19 Mar 2024 15:00:25 +0000 Subject: [PATCH] adopt review feedback --- Makefile | 2 +- mkdocs/docs/api.md | 2 +- tests/integration/test_add_files.py | 100 ++++++++++++++++++---------- 3 files changed, 65 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 133a13983b..c3e816ebd5 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 66c1a5b635..5eec487c67 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -327,7 +327,7 @@ tbl.add_files(file_paths=file_paths) Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. !!! note "Partitions" - `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). + `add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`. !!! warning "Maintenance Operations" Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index f725e50b9e..7c17618280 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -104,25 +104,31 @@ ) -def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table: +def _create_table( + session_catalog: Catalog, identifier: str, format_version: int, partition_spec: Optional[PartitionSpec] = None +) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass tbl = session_catalog.create_table( - identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec() + identifier=identifier, + schema=TABLE_SCHEMA, + properties={"format-version": str(format_version)}, + partition_spec=partition_spec if partition_spec else PartitionSpec(), ) return tbl @pytest.mark.integration -def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_table" - tbl = _create_table(session_catalog, identifier) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.unpartitioned_table_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) - file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -154,11 +160,14 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: @pytest.mark.integration -def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_raises_not_found" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_raises_file_not_found( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_raises_not_found_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -172,11 +181,14 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSess @pytest.mark.integration -def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_raises_field_ids" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_raises_has_field_ids( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_raises_field_ids_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -190,11 +202,14 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSessi @pytest.mark.integration -def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.unpartitioned_table_2" - tbl = _create_table(session_catalog, identifier) - - file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)] +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_unpartitioned_table_with_schema_updates( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.unpartitioned_table_schema_updates_v{format_version}" + tbl = _create_table(session_catalog, identifier, format_version) + + file_paths = [f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -212,7 +227,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio update.add_column("quux", IntegerType()) update.delete_column("bar") - file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet" + file_path = f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-6.parquet" # write parquet files fo = tbl.io.new_output(file_path) with fo.create(overwrite=True) as fos: @@ -242,8 +257,9 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio @pytest.mark.integration -def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.partitioned_table_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), @@ -251,11 +267,11 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)]) - file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -310,19 +326,20 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca @pytest.mark.integration -def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table_2" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.partitioned_table_bucket_fails_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=BucketTransform(num_buckets=3), name="baz_bucket_3"), spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) int_iter = iter(range(5)) - file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned_table_bucket_fails/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -345,21 +362,27 @@ def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, sessio # add the parquet files as data files with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=file_paths) - assert "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" in str(exc_info.value) + assert ( + "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" + in str(exc_info.value) + ) @pytest.mark.integration -def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table_3" +@pytest.mark.parametrize("format_version", [1, 2]) +def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch( + spark: SparkSession, session_catalog: Catalog, format_version: int +) -> None: + identifier = f"default.partitioned_table_mismatch_fails_v{format_version}" partition_spec = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), spec_id=0, ) - tbl = _create_table(session_catalog, identifier, partition_spec) + tbl = _create_table(session_catalog, identifier, format_version, partition_spec) - file_paths = [f"s3://warehouse/default/partitioned_3/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned_table_mismatch_fails/v{format_version}/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -379,7 +402,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar "bar": "bar_string", "baz": 124, "qux": date(2024, 3, 7), - } + }, ], schema=ARROW_SCHEMA, ) @@ -388,4 +411,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar # add the parquet files as data files with pytest.raises(ValueError) as exc_info: tbl.add_files(file_paths=file_paths) - assert "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" in str(exc_info.value) \ No newline at end of file + assert ( + "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" + in str(exc_info.value) + )