diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3e056b12b4..f7c0aef68c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -26,6 +26,7 @@ from __future__ import annotations import concurrent.futures +import fnmatch import itertools import logging import os @@ -1720,13 +1721,14 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: except StopIteration: pass + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = schema_to_pyarrow(table.schema()) - collected_metrics: List[pq.FileMetaData] = [] fo = table.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: + with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **parquet_writer_kwargs) as writer: writer.write_table(task.df) data_file = DataFile( @@ -1745,14 +1747,42 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: key_metadata=None, ) - if len(collected_metrics) != 1: - # One file has been written - raise ValueError(f"Expected 1 entry, got: {collected_metrics}") - fill_parquet_file_metadata( data_file=data_file, - parquet_metadata=collected_metrics[0], + parquet_metadata=writer.writer.metadata, stats_columns=compute_statistics_plan(table.schema(), table.properties), parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) return iter([data_file]) + + +def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: + def _get_int(key: str) -> Optional[int]: + if value := table_properties.get(key): + try: + return int(value) + except ValueError as e: + raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e + else: + return None + + for key_pattern in [ + "write.parquet.row-group-size-bytes", + "write.parquet.page-row-limit", + "write.parquet.bloom-filter-max-bytes", + "write.parquet.bloom-filter-enabled.column.*", + ]: + if unsupported_keys := fnmatch.filter(table_properties, key_pattern): + raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") + + compression_codec = table_properties.get("write.parquet.compression-codec", "zstd") + compression_level = _get_int("write.parquet.compression-level") + if compression_codec == "uncompressed": + compression_codec = "none" + + return { + "compression": compression_codec, + "compression_level": compression_level, + "data_page_size": _get_int("write.parquet.page-size-bytes"), + "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"), + } diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 3fc06fbddc..b35b348638 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None: @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None: table_test_all_types = catalog.load_table("default.test_all_types") - fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") + fs = S3FileSystem( + endpoint_override=catalog.properties["s3.endpoint"], + access_key=catalog.properties["s3.access-key-id"], + secret_key=catalog.properties["s3.secret-access-key"], + ) data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] for data_file_path in data_file_paths: uri = urlparse(data_file_path) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index c8552a2e96..a65d98fc9e 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -17,12 +17,17 @@ # pylint:disable=redefined-outer-name import uuid from datetime import date, datetime +from typing import Any, Dict, List +from urllib.parse import urlparse import pyarrow as pa +import pyarrow.parquet as pq import pytest +from pyarrow.fs import S3FileSystem from pyspark.sql import SparkSession +from pytest_mock.plugin import MockerFixture -from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog import Catalog, Properties, Table, load_catalog from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError from pyiceberg.schema import Schema from pyiceberg.types import ( @@ -158,142 +163,79 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table: return pa.Table.from_pylist([{}, {}], schema=pa_schema) -@pytest.fixture(scope="session", autouse=True) -def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.arrow_table_v1_with_null" - +def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_null) + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) + for d in data: + tbl.append(d) + + return tbl + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: identifier = "default.arrow_table_v1_without_data" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_without_data) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: identifier = "default.arrow_table_v1_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_only_nulls) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - for _ in range(2): - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: identifier = "default.arrow_table_v2_without_data" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_without_data) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: identifier = "default.arrow_table_v2_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_with_only_nulls) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - - for _ in range(2): - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_v2_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" with tbl.transaction() as tx: @@ -397,15 +339,7 @@ def test_query_filter_v1_v2_append_null(spark: SparkSession, col: str) -> None: @pytest.mark.integration def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_summaries" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - tbl.append(arrow_table_with_null) - tbl.append(arrow_table_with_null) + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) tbl.overwrite(arrow_table_with_null) rows = spark.sql( @@ -423,39 +357,39 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi assert summaries[0] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '5283', + 'total-files-size': '5437', 'total-position-deletes': '0', 'total-records': '3', } assert summaries[1] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'total-data-files': '2', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '10566', + 'total-files-size': '10874', 'total-position-deletes': '0', 'total-records': '6', } assert summaries[2] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'deleted-data-files': '2', 'deleted-records': '6', - 'removed-files-size': '10566', + 'removed-files-size': '10874', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '5283', + 'total-files-size': '5437', 'total-position-deletes': '0', 'total-records': '3', } @@ -464,12 +398,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi @pytest.mark.integration def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_data_files" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) tbl.overwrite(arrow_table_with_null) # should produce a DELETE entry @@ -490,15 +419,100 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w @pytest.mark.integration -def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.arrow_data_files" +@pytest.mark.parametrize("format_version", ["1", "2"]) +@pytest.mark.parametrize( + "properties, expected_compression_name", + [ + # REST catalog uses Zstandard by default: https://github.com/apache/iceberg/pull/8593 + ({}, "ZSTD"), + ({"write.parquet.compression-codec": "uncompressed"}, "UNCOMPRESSED"), + ({"write.parquet.compression-codec": "gzip", "write.parquet.compression-level": "1"}, "GZIP"), + ({"write.parquet.compression-codec": "zstd", "write.parquet.compression-level": "1"}, "ZSTD"), + ({"write.parquet.compression-codec": "snappy"}, "SNAPPY"), + ], +) +def test_write_parquet_compression_properties( + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + format_version: str, + properties: Dict[str, Any], + expected_compression_name: str, +) -> None: + identifier = "default.write_parquet_compression_properties" - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass + tbl = _create_table(session_catalog, identifier, {"format-version": format_version, **properties}, [arrow_table_with_null]) - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()] + + fs = S3FileSystem( + endpoint_override=session_catalog.properties["s3.endpoint"], + access_key=session_catalog.properties["s3.access-key-id"], + secret_key=session_catalog.properties["s3.secret-access-key"], + ) + uri = urlparse(data_file_paths[0]) + with fs.open_input_file(f"{uri.netloc}{uri.path}") as f: + parquet_metadata = pq.read_metadata(f) + compression = parquet_metadata.row_group(0).column(0).compression + + assert compression == expected_compression_name + + +@pytest.mark.integration +@pytest.mark.parametrize( + "properties, expected_kwargs", + [ + ({"write.parquet.page-size-bytes": "42"}, {"data_page_size": 42}), + ({"write.parquet.dict-size-bytes": "42"}, {"dictionary_pagesize_limit": 42}), + ], +) +def test_write_parquet_other_properties( + mocker: MockerFixture, + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + properties: Dict[str, Any], + expected_kwargs: Dict[str, Any], +) -> None: + print(type(mocker)) + identifier = "default.test_write_parquet_other_properties" + + # The properties we test cannot be checked on the resulting Parquet file, so we spy on the ParquetWriter call instead + ParquetWriter = mocker.spy(pq, "ParquetWriter") + _create_table(session_catalog, identifier, properties, [arrow_table_with_null]) + + call_kwargs = ParquetWriter.call_args[1] + for key, value in expected_kwargs.items(): + assert call_kwargs.get(key) == value + + +@pytest.mark.integration +@pytest.mark.parametrize( + "properties", + [ + {"write.parquet.row-group-size-bytes": "42"}, + {"write.parquet.page-row-limit": "42"}, + {"write.parquet.bloom-filter-enabled.column.bool": "42"}, + {"write.parquet.bloom-filter-max-bytes": "42"}, + ], +) +def test_write_parquet_unsupported_properties( + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + properties: Dict[str, str], +) -> None: + identifier = "default.write_parquet_unsupported_properties" + + tbl = _create_table(session_catalog, identifier, properties, []) + with pytest.raises(NotImplementedError): + tbl.append(arrow_table_with_null) + + +@pytest.mark.integration +def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_data_files" + tbl = _create_table(session_catalog, identifier, {'format-version': '1'}, []) with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"): tbl.overwrite("not a df") @@ -512,15 +526,9 @@ def test_summaries_with_only_nulls( spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: pa.Table, arrow_table_with_only_nulls: pa.Table ) -> None: identifier = "default.arrow_table_summaries_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - tbl.append(arrow_table_without_data) - tbl.append(arrow_table_with_only_nulls) + tbl = _create_table( + session_catalog, identifier, {'format-version': '1'}, [arrow_table_without_data, arrow_table_with_only_nulls] + ) tbl.overwrite(arrow_table_without_data) rows = spark.sql( @@ -547,12 +555,12 @@ def test_summaries_with_only_nulls( assert summaries[1] == { 'added-data-files': '1', - 'added-files-size': '4045', + 'added-files-size': '4217', 'added-records': '2', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '4045', + 'total-files-size': '4217', 'total-position-deletes': '0', 'total-records': '2', }