From 9544e946956228aa4e2ef3a6c8aaa06d6fe44065 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sat, 3 Feb 2024 17:39:19 +0100 Subject: [PATCH 1/9] Use `write.parquet.compression-{codec,level}` --- pyiceberg/catalog/rest.py | 4 +++ pyiceberg/io/pyarrow.py | 19 ++++++++----- tests/integration/test_writes.py | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 7 deletions(-) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 765f04b128..eabc2395a8 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -450,6 +450,10 @@ def create_table( iceberg_schema = self._convert_schema_if_needed(schema) iceberg_schema = assign_fresh_schema_ids(iceberg_schema) + properties = properties.copy() + for copy_key in ["write.parquet.compression-codec", "write.parquet.compression-level"]: + if copy_key in self.properties: + properties[copy_key] = self.properties[copy_key] namespace_and_table = self._split_identifier_for_path(identifier) request = CreateTableRequest( name=namespace_and_table["table"], diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 3e056b12b4..a6022ebc1e 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1720,13 +1720,22 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: except StopIteration: pass + compression_codec = table.properties.get("write.parquet.compression-codec") + compression_level = table.properties.get("write.parquet.compression-level") + if compression_codec == "uncompressed": + compression_options = {"compression": "none"} + else: + compression_options = { + "compression": compression_codec, + "compression_level": None if compression_level is None else int(compression_level), + } + file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = schema_to_pyarrow(table.schema()) - collected_metrics: List[pq.FileMetaData] = [] fo = table.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, version="1.0", metadata_collector=collected_metrics) as writer: + with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **compression_options) as writer: writer.write_table(task.df) data_file = DataFile( @@ -1745,13 +1754,9 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: key_metadata=None, ) - if len(collected_metrics) != 1: - # One file has been written - raise ValueError(f"Expected 1 entry, got: {collected_metrics}") - fill_parquet_file_metadata( data_file=data_file, - parquet_metadata=collected_metrics[0], + parquet_metadata=writer.writer.metadata, stats_columns=compute_statistics_plan(table.schema(), table.properties), parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index c8552a2e96..acfe812be1 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -17,9 +17,12 @@ # pylint:disable=redefined-outer-name import uuid from datetime import date, datetime +from urllib.parse import urlparse import pyarrow as pa +import pyarrow.parquet as pq import pytest +from pyarrow.fs import S3FileSystem from pyspark.sql import SparkSession from pyiceberg.catalog import Catalog, load_catalog @@ -489,6 +492,50 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w assert [row.deleted_data_files_count for row in rows] == [0, 0, 1, 0, 0] +@pytest.mark.integration +@pytest.mark.parametrize("compression_codec", ["", "uncompressed", "gzip", "zstd", "snappy"]) +def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Table, compression_codec: str) -> None: + catalog_properties = { + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + } + if compression_codec != "": + catalog_properties["write.parquet.compression-codec"] = compression_codec + if compression_codec != "snappy": + catalog_properties["write.parquet.compression-level"] = "1" + catalog = load_catalog("local", **catalog_properties) + + identifier = "default.arrow_data_files" + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + tbl = catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + + tbl.overwrite(arrow_table_with_null) + + data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()] + + fs = S3FileSystem( + endpoint_override=catalog.properties["s3.endpoint"], + access_key=catalog.properties["s3.access-key-id"], + secret_key=catalog.properties["s3.secret-access-key"], + ) + uri = urlparse(data_file_paths[0]) + with fs.open_input_file(f"{uri.netloc}{uri.path}") as f: + parquet_metadata = pq.read_metadata(f) + compression = parquet_metadata.row_group(0).column(0).compression + + if compression_codec == "": + assert compression == "ZSTD" + else: + assert compression == compression_codec.upper() + + @pytest.mark.integration def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_data_files" From e385e7757cac08380a7578cf5a230bd62e88bb36 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sat, 3 Feb 2024 17:40:22 +0100 Subject: [PATCH 2/9] Cleanup --- tests/integration/test_reads.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index 3fc06fbddc..b35b348638 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -224,7 +224,11 @@ def test_ray_all_types(catalog: Catalog) -> None: @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) def test_pyarrow_to_iceberg_all_types(catalog: Catalog) -> None: table_test_all_types = catalog.load_table("default.test_all_types") - fs = S3FileSystem(endpoint_override="http://localhost:9000", access_key="admin", secret_key="password") + fs = S3FileSystem( + endpoint_override=catalog.properties["s3.endpoint"], + access_key=catalog.properties["s3.access-key-id"], + secret_key=catalog.properties["s3.secret-access-key"], + ) data_file_paths = [task.file.file_path for task in table_test_all_types.scan().plan_files()] for data_file_path in data_file_paths: uri = urlparse(data_file_path) From 3e678d1762c2266490074b6a4b2542b233d0896c Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 10:48:05 +0100 Subject: [PATCH 3/9] Review feedback --- pyiceberg/io/pyarrow.py | 1 + tests/integration/test_writes.py | 44 +++++++++++++++++++------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a6022ebc1e..e4593630b1 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1722,6 +1722,7 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: compression_codec = table.properties.get("write.parquet.compression-codec") compression_level = table.properties.get("write.parquet.compression-level") + compression_options: Dict[str, Any] if compression_codec == "uncompressed": compression_options = {"compression": "none"} else: diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index acfe812be1..eb6f07ffd0 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -493,21 +493,32 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w @pytest.mark.integration -@pytest.mark.parametrize("compression_codec", ["", "uncompressed", "gzip", "zstd", "snappy"]) -def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Table, compression_codec: str) -> None: - catalog_properties = { - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - } - if compression_codec != "": - catalog_properties["write.parquet.compression-codec"] = compression_codec - if compression_codec != "snappy": - catalog_properties["write.parquet.compression-level"] = "1" - catalog = load_catalog("local", **catalog_properties) +@pytest.mark.parametrize( + "compression", + # List of (compression_properties, expected_compression_name) + [ + # REST catalog uses Zstandard by default: https://github.com/apache/iceberg/pull/8593 + ({}, "ZSTD"), + ({"write.parquet.compression-codec": "uncompressed"}, "UNCOMPRESSED"), + ({"write.parquet.compression-codec": "gzip", "write.parquet.compression-level": "1"}, "GZIP"), + ({"write.parquet.compression-codec": "zstd", "write.parquet.compression-level": "1"}, "ZSTD"), + ({"write.parquet.compression-codec": "snappy"}, "SNAPPY"), + ], +) +def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Table, compression) -> None: + compression_properties, expected_compression_name = compression + catalog = load_catalog( + "local", + **{ + "type": "rest", + "uri": "http://localhost:8181", + "s3.endpoint": "http://localhost:9000", + "s3.access-key-id": "admin", + "s3.secret-access-key": "password", + **compression_properties, + }, + ) identifier = "default.arrow_data_files" try: @@ -530,10 +541,7 @@ def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Tabl parquet_metadata = pq.read_metadata(f) compression = parquet_metadata.row_group(0).column(0).compression - if compression_codec == "": - assert compression == "ZSTD" - else: - assert compression == compression_codec.upper() + assert compression == expected_compression_name @pytest.mark.integration From 029aabe32de46bf1dad8e5fbf41b6f1b1d5ef7b5 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 20:22:58 +0100 Subject: [PATCH 4/9] Review feedback --- pyiceberg/catalog/rest.py | 4 - pyiceberg/io/pyarrow.py | 50 +++++-- tests/integration/test_writes.py | 238 +++++++++++++------------------ 3 files changed, 136 insertions(+), 156 deletions(-) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index eabc2395a8..765f04b128 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -450,10 +450,6 @@ def create_table( iceberg_schema = self._convert_schema_if_needed(schema) iceberg_schema = assign_fresh_schema_ids(iceberg_schema) - properties = properties.copy() - for copy_key in ["write.parquet.compression-codec", "write.parquet.compression-level"]: - if copy_key in self.properties: - properties[copy_key] = self.properties[copy_key] namespace_and_table = self._split_identifier_for_path(identifier) request = CreateTableRequest( name=namespace_and_table["table"], diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e4593630b1..01f76b8afd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -26,6 +26,7 @@ from __future__ import annotations import concurrent.futures +import fnmatch import itertools import logging import os @@ -1720,23 +1721,14 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: except StopIteration: pass - compression_codec = table.properties.get("write.parquet.compression-codec") - compression_level = table.properties.get("write.parquet.compression-level") - compression_options: Dict[str, Any] - if compression_codec == "uncompressed": - compression_options = {"compression": "none"} - else: - compression_options = { - "compression": compression_codec, - "compression_level": None if compression_level is None else int(compression_level), - } + parquet_writer_kwargs = _get_parquet_writer_kwargs(table.properties) file_path = f'{table.location()}/data/{task.generate_data_file_filename("parquet")}' file_schema = schema_to_pyarrow(table.schema()) fo = table.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **compression_options) as writer: + with pq.ParquetWriter(fos, schema=file_schema, version="1.0", **parquet_writer_kwargs) as writer: writer.write_table(task.df) data_file = DataFile( @@ -1762,3 +1754,39 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) return iter([data_file]) + + +def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: + def _get_int(key: str) -> Optional[int]: + value = table_properties.get(key) + if value is None: + return None + else: + return int(value) + + for key_pattern in [ + "write.parquet.row-group-size-bytes", + "write.parquet.page-row-limit", + "write.parquet.bloom-filter-max-bytes", + "write.parquet.bloom-filter-enabled.column.*", + ]: + unsupported_keys = fnmatch.filter(table_properties, key_pattern) + if unsupported_keys: + raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") + + kwargs: Dict[str, Any] = { + "data_page_size": _get_int("write.parquet.page-size-bytes"), + "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"), + } + + compression_codec = table_properties.get("write.parquet.compression-codec") + compression_level = _get_int("write.parquet.compression-level") + if compression_codec == "uncompressed": + kwargs.update({"compression": "none"}) + else: + kwargs.update({ + "compression": compression_codec, + "compression_level": compression_level, + }) + + return kwargs diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index eb6f07ffd0..643e3dd67e 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -17,6 +17,7 @@ # pylint:disable=redefined-outer-name import uuid from datetime import date, datetime +from typing import Any, Dict, List from urllib.parse import urlparse import pyarrow as pa @@ -24,8 +25,9 @@ import pytest from pyarrow.fs import S3FileSystem from pyspark.sql import SparkSession +from pytest_mock.plugin import MockerFixture -from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog import Catalog, Properties, Table, load_catalog from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchTableError from pyiceberg.schema import Schema from pyiceberg.types import ( @@ -161,142 +163,79 @@ def arrow_table_with_only_nulls(pa_schema: pa.Schema) -> pa.Table: return pa.Table.from_pylist([{}, {}], schema=pa_schema) -@pytest.fixture(scope="session", autouse=True) -def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.arrow_table_v1_with_null" - +def _create_table(session_catalog: Catalog, identifier: str, properties: Properties, data: List[pa.Table]) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_null) + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties=properties) + for d in data: + tbl.append(d) + return tbl + + +@pytest.fixture(scope="session", autouse=True) +def table_v1_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_table_v1_with_null" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: identifier = "default.arrow_table_v1_without_data" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_without_data) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_without_data]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: identifier = "default.arrow_table_v1_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_only_nulls) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_only_nulls]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - for _ in range(2): - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: identifier = "default.arrow_table_v2_without_data" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_without_data) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_without_data]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_with_only_nulls(session_catalog: Catalog, arrow_table_with_only_nulls: pa.Table) -> None: identifier = "default.arrow_table_v2_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - tbl.append(arrow_table_with_only_nulls) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_only_nulls]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '2'}) - - for _ in range(2): - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v1_v2_appended_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v1_v2_appended_with_null" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - tbl.append(arrow_table_with_null) - + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, [arrow_table_with_null]) assert tbl.format_version == 1, f"Expected v1, got: v{tbl.format_version}" with tbl.transaction() as tx: @@ -400,15 +339,7 @@ def test_query_filter_v1_v2_append_null(spark: SparkSession, col: str) -> None: @pytest.mark.integration def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_summaries" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - tbl.append(arrow_table_with_null) - tbl.append(arrow_table_with_null) + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, 2 * [arrow_table_with_null]) tbl.overwrite(arrow_table_with_null) rows = spark.sql( @@ -467,12 +398,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi @pytest.mark.integration def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_data_files" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) tbl.overwrite(arrow_table_with_null) # should produce a DELETE entry @@ -493,9 +419,9 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w @pytest.mark.integration +@pytest.mark.parametrize("format_version", ["1", "2"]) @pytest.mark.parametrize( - "compression", - # List of (compression_properties, expected_compression_name) + "properties, expected_compression_name", [ # REST catalog uses Zstandard by default: https://github.com/apache/iceberg/pull/8593 ({}, "ZSTD"), @@ -505,36 +431,24 @@ def test_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_w ({"write.parquet.compression-codec": "snappy"}, "SNAPPY"), ], ) -def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Table, compression) -> None: - compression_properties, expected_compression_name = compression - - catalog = load_catalog( - "local", - **{ - "type": "rest", - "uri": "http://localhost:8181", - "s3.endpoint": "http://localhost:9000", - "s3.access-key-id": "admin", - "s3.secret-access-key": "password", - **compression_properties, - }, - ) - identifier = "default.arrow_data_files" - - try: - catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) +def test_write_parquet_compression_properties( + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + format_version: str, + properties: Dict[str, Any], + expected_compression_name: str, +) -> None: + identifier = "default.write_parquet_compression_properties" - tbl.overwrite(arrow_table_with_null) + tbl = _create_table(session_catalog, identifier, {"format-version": format_version, **properties}, [arrow_table_with_null]) data_file_paths = [task.file.file_path for task in tbl.scan().plan_files()] fs = S3FileSystem( - endpoint_override=catalog.properties["s3.endpoint"], - access_key=catalog.properties["s3.access-key-id"], - secret_key=catalog.properties["s3.secret-access-key"], + endpoint_override=session_catalog.properties["s3.endpoint"], + access_key=session_catalog.properties["s3.access-key-id"], + secret_key=session_catalog.properties["s3.secret-access-key"], ) uri = urlparse(data_file_paths[0]) with fs.open_input_file(f"{uri.netloc}{uri.path}") as f: @@ -545,15 +459,63 @@ def test_parquet_compression(spark: SparkSession, arrow_table_with_null: pa.Tabl @pytest.mark.integration -def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: - identifier = "default.arrow_data_files" +@pytest.mark.integration +@pytest.mark.parametrize( + "properties, expected_kwargs", + [ + ({"write.parquet.page-size-bytes": "42"}, {"data_page_size": 42}), + ({"write.parquet.dict-size-bytes": "42"}, {"dictionary_pagesize_limit": 42}), + ], +) +def test_write_parquet_other_properties( + mocker: MockerFixture, + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + properties: Dict[str, Any], + expected_kwargs: Dict[str, Any], +) -> None: + print(type(mocker)) + identifier = "default.test_write_parquet_other_properties" - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass + # The properties we test cannot be checked on the resulting Parquet file, so we spy on the ParquetWriter call instead + ParquetWriter = mocker.spy(pq, "ParquetWriter") + _create_table(session_catalog, identifier, properties, [arrow_table_with_null]) + + call_kwargs = ParquetWriter.call_args[1] + for key, value in expected_kwargs.items(): + assert call_kwargs.get(key) == value + + +@pytest.mark.integration +@pytest.mark.integration +@pytest.mark.integration +@pytest.mark.parametrize( + "properties", + [ + {"write.parquet.row-group-size-bytes": "42"}, + {"write.parquet.page-row-limit": "42"}, + {"write.parquet.bloom-filter-enabled.column.bool": "true"}, + {"write.parquet.bloom-filter-max-bytes": "42"}, + ], +) +def test_write_parquet_unsupported_properties( + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + properties: Dict[str, Any], +) -> None: + identifier = "default.write_parquet_unsupported_properties" - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) + tbl = _create_table(session_catalog, identifier, properties, []) + with pytest.raises(NotImplementedError): + tbl.append(arrow_table_with_null) + + +@pytest.mark.integration +def test_invalid_arguments(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.arrow_data_files" + tbl = _create_table(session_catalog, identifier, {'format-version': '1'}, []) with pytest.raises(ValueError, match="Expected PyArrow table, got: not a df"): tbl.overwrite("not a df") @@ -567,15 +529,9 @@ def test_summaries_with_only_nulls( spark: SparkSession, session_catalog: Catalog, arrow_table_without_data: pa.Table, arrow_table_with_only_nulls: pa.Table ) -> None: identifier = "default.arrow_table_summaries_with_only_nulls" - - try: - session_catalog.drop_table(identifier=identifier) - except NoSuchTableError: - pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA, properties={'format-version': '1'}) - - tbl.append(arrow_table_without_data) - tbl.append(arrow_table_with_only_nulls) + tbl = _create_table( + session_catalog, identifier, {'format-version': '1'}, [arrow_table_without_data, arrow_table_with_only_nulls] + ) tbl.overwrite(arrow_table_without_data) rows = spark.sql( From 4ea76fbefd56a9f57c84888ebf7482200e475b93 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 20:32:12 +0100 Subject: [PATCH 5/9] Review feedback --- pyiceberg/io/pyarrow.py | 19 +++++++------------ tests/integration/test_writes.py | 22 +++++++++++----------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 01f76b8afd..7a7925bc3d 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1774,19 +1774,14 @@ def _get_int(key: str) -> Optional[int]: if unsupported_keys: raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") - kwargs: Dict[str, Any] = { - "data_page_size": _get_int("write.parquet.page-size-bytes"), - "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"), - } - compression_codec = table_properties.get("write.parquet.compression-codec") compression_level = _get_int("write.parquet.compression-level") if compression_codec == "uncompressed": - kwargs.update({"compression": "none"}) - else: - kwargs.update({ - "compression": compression_codec, - "compression_level": compression_level, - }) + compression_codec = "none" - return kwargs + return { + "compression": compression_codec, + "compression_level": compression_level, + "data_page_size": _get_int("write.parquet.page-size-bytes"), + "dictionary_pagesize_limit": _get_int("write.parquet.dict-size-bytes"), + } diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 643e3dd67e..f3f7822664 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -207,14 +207,14 @@ def table_v1_appended_with_null(session_catalog: Catalog, arrow_table_with_null: @pytest.fixture(scope="session", autouse=True) def table_v2_with_null(session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: identifier = "default.arrow_table_v2_with_null" - tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_with_null]) + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_with_null]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @pytest.fixture(scope="session", autouse=True) def table_v2_without_data(session_catalog: Catalog, arrow_table_without_data: pa.Table) -> None: identifier = "default.arrow_table_v2_without_data" - tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, 2 * [arrow_table_without_data]) + tbl = _create_table(session_catalog, identifier, {"format-version": "2"}, [arrow_table_without_data]) assert tbl.format_version == 2, f"Expected v2, got: v{tbl.format_version}" @@ -357,39 +357,39 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi assert summaries[0] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '5283', + 'total-files-size': '5437', 'total-position-deletes': '0', 'total-records': '3', } assert summaries[1] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'total-data-files': '2', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '10566', + 'total-files-size': '10874', 'total-position-deletes': '0', 'total-records': '6', } assert summaries[2] == { 'added-data-files': '1', - 'added-files-size': '5283', + 'added-files-size': '5437', 'added-records': '3', 'deleted-data-files': '2', 'deleted-records': '6', - 'removed-files-size': '10566', + 'removed-files-size': '10874', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '5283', + 'total-files-size': '5437', 'total-position-deletes': '0', 'total-records': '3', } @@ -558,12 +558,12 @@ def test_summaries_with_only_nulls( assert summaries[1] == { 'added-data-files': '1', - 'added-files-size': '4045', + 'added-files-size': '4217', 'added-records': '2', 'total-data-files': '1', 'total-delete-files': '0', 'total-equality-deletes': '0', - 'total-files-size': '4045', + 'total-files-size': '4217', 'total-position-deletes': '0', 'total-records': '2', } From 6536f8e2272d9eb4cb4887d1d1b1db8e5434b8e5 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 20:53:44 +0100 Subject: [PATCH 6/9] Update pyiceberg/io/pyarrow.py Co-authored-by: Fokko Driesprong --- pyiceberg/io/pyarrow.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7a7925bc3d..d73b033907 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1758,9 +1758,11 @@ def write_file(table: Table, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: def _get_parquet_writer_kwargs(table_properties: Properties) -> Dict[str, Any]: def _get_int(key: str) -> Optional[int]: - value = table_properties.get(key) - if value is None: - return None + if value := table_properties.get(key): + try: + return int(value) + except ValueError as e: + raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e else: return int(value) From 809ca2e2f847b52ced9176406279eff78ac67429 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 21:00:41 +0100 Subject: [PATCH 7/9] Fixup --- pyiceberg/io/pyarrow.py | 6 ++---- tests/integration/test_writes.py | 25 +++++++++++++------------ 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index d73b033907..f5c26b2f3c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1763,8 +1763,6 @@ def _get_int(key: str) -> Optional[int]: return int(value) except ValueError as e: raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e - else: - return int(value) for key_pattern in [ "write.parquet.row-group-size-bytes", @@ -1773,8 +1771,8 @@ def _get_int(key: str) -> Optional[int]: "write.parquet.bloom-filter-enabled.column.*", ]: unsupported_keys = fnmatch.filter(table_properties, key_pattern) - if unsupported_keys: - raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") + if set_unsupported_keys := [table_properties[key] for key in unsupported_keys]: + raise NotImplementedError(f"Parquet writer option(s) {set_unsupported_keys} not implemented") compression_codec = table_properties.get("write.parquet.compression-codec") compression_level = _get_int("write.parquet.compression-level") diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index f3f7822664..eae876fc31 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -17,7 +17,7 @@ # pylint:disable=redefined-outer-name import uuid from datetime import date, datetime -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional from urllib.parse import urlparse import pyarrow as pa @@ -490,25 +490,26 @@ def test_write_parquet_other_properties( @pytest.mark.integration @pytest.mark.integration @pytest.mark.integration +@pytest.mark.parametrize("value", [None, "42"]) @pytest.mark.parametrize( - "properties", + "property", [ - {"write.parquet.row-group-size-bytes": "42"}, - {"write.parquet.page-row-limit": "42"}, - {"write.parquet.bloom-filter-enabled.column.bool": "true"}, - {"write.parquet.bloom-filter-max-bytes": "42"}, + "write.parquet.row-group-size-bytes", + "write.parquet.page-row-limit", + "write.parquet.bloom-filter-enabled.column.bool", + "write.parquet.bloom-filter-max-bytes", ], ) def test_write_parquet_unsupported_properties( - spark: SparkSession, - session_catalog: Catalog, - arrow_table_with_null: pa.Table, - properties: Dict[str, Any], + spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, property: str, value: Optional[str] ) -> None: identifier = "default.write_parquet_unsupported_properties" - tbl = _create_table(session_catalog, identifier, properties, []) - with pytest.raises(NotImplementedError): + tbl = _create_table(session_catalog, identifier, {property: value}, []) + if value: + with pytest.raises(NotImplementedError): + tbl.append(arrow_table_with_null) + else: tbl.append(arrow_table_with_null) From 8ea5ad50d6ef813ac7664940ac81e588b4bc15d5 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Sun, 4 Feb 2024 21:39:17 +0100 Subject: [PATCH 8/9] Fixup --- pyiceberg/io/pyarrow.py | 7 ++++--- tests/integration/test_writes.py | 25 ++++++++++++------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f5c26b2f3c..acfa9c0729 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1763,6 +1763,8 @@ def _get_int(key: str) -> Optional[int]: return int(value) except ValueError as e: raise ValueError(f"Could not parse table property {key} to an integer: {value}") from e + else: + return None for key_pattern in [ "write.parquet.row-group-size-bytes", @@ -1770,9 +1772,8 @@ def _get_int(key: str) -> Optional[int]: "write.parquet.bloom-filter-max-bytes", "write.parquet.bloom-filter-enabled.column.*", ]: - unsupported_keys = fnmatch.filter(table_properties, key_pattern) - if set_unsupported_keys := [table_properties[key] for key in unsupported_keys]: - raise NotImplementedError(f"Parquet writer option(s) {set_unsupported_keys} not implemented") + if unsupported_keys := fnmatch.filter(table_properties, key_pattern): + raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") compression_codec = table_properties.get("write.parquet.compression-codec") compression_level = _get_int("write.parquet.compression-level") diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index eae876fc31..327e539c82 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -17,7 +17,7 @@ # pylint:disable=redefined-outer-name import uuid from datetime import date, datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List from urllib.parse import urlparse import pyarrow as pa @@ -490,26 +490,25 @@ def test_write_parquet_other_properties( @pytest.mark.integration @pytest.mark.integration @pytest.mark.integration -@pytest.mark.parametrize("value", [None, "42"]) @pytest.mark.parametrize( - "property", + "properties", [ - "write.parquet.row-group-size-bytes", - "write.parquet.page-row-limit", - "write.parquet.bloom-filter-enabled.column.bool", - "write.parquet.bloom-filter-max-bytes", + {"write.parquet.row-group-size-bytes": "42"}, + {"write.parquet.page-row-limit": "42"}, + {"write.parquet.bloom-filter-enabled.column.bool": "42"}, + {"write.parquet.bloom-filter-max-bytes": "42"}, ], ) def test_write_parquet_unsupported_properties( - spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, property: str, value: Optional[str] + spark: SparkSession, + session_catalog: Catalog, + arrow_table_with_null: pa.Table, + properties: Dict[str, str], ) -> None: identifier = "default.write_parquet_unsupported_properties" - tbl = _create_table(session_catalog, identifier, {property: value}, []) - if value: - with pytest.raises(NotImplementedError): - tbl.append(arrow_table_with_null) - else: + tbl = _create_table(session_catalog, identifier, properties, []) + with pytest.raises(NotImplementedError): tbl.append(arrow_table_with_null) From 517f2e828bed7c4ad9f7b43f978f0a1cdbd2f485 Mon Sep 17 00:00:00 2001 From: Jonas Haag Date: Mon, 5 Feb 2024 08:49:53 +0100 Subject: [PATCH 9/9] Fixup --- pyiceberg/io/pyarrow.py | 2 +- tests/integration/test_writes.py | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index acfa9c0729..f7c0aef68c 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1775,7 +1775,7 @@ def _get_int(key: str) -> Optional[int]: if unsupported_keys := fnmatch.filter(table_properties, key_pattern): raise NotImplementedError(f"Parquet writer option(s) {unsupported_keys} not implemented") - compression_codec = table_properties.get("write.parquet.compression-codec") + compression_codec = table_properties.get("write.parquet.compression-codec", "zstd") compression_level = _get_int("write.parquet.compression-level") if compression_codec == "uncompressed": compression_codec = "none" diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 327e539c82..a65d98fc9e 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -458,7 +458,6 @@ def test_write_parquet_compression_properties( assert compression == expected_compression_name -@pytest.mark.integration @pytest.mark.integration @pytest.mark.parametrize( "properties, expected_kwargs", @@ -487,8 +486,6 @@ def test_write_parquet_other_properties( assert call_kwargs.get(key) == value -@pytest.mark.integration -@pytest.mark.integration @pytest.mark.integration @pytest.mark.parametrize( "properties",