diff --git a/tests/integration/test_writes.py b/tests/integration/test_writes.py index 6add21f8d0..51c60dcadc 100644 --- a/tests/integration/test_writes.py +++ b/tests/integration/test_writes.py @@ -36,8 +36,8 @@ from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.schema import Schema -from pyiceberg.table import Table, _dataframe_to_data_files from pyiceberg.typedef import Properties +from pyiceberg.table import SetPropertiesUpdate, Table, TableProperties, _dataframe_to_data_files from pyiceberg.types import ( BinaryType, BooleanType, @@ -379,31 +379,50 @@ def get_current_snapshot_id(identifier: str) -> int: @pytest.mark.integration -@pytest.mark.parametrize("format_version", [1, 2]) -def test_write_multiple_data_files( - spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int -) -> None: - identifier = "default.write_multiple_arrow_data_files" - tbl = _create_table(session_catalog, identifier, {"format-version": format_version}, []) +def test_write_bin_pack_data_files(spark: SparkSession, session_catalog: Catalog, arrow_table_with_null: pa.Table) -> None: + identifier = "default.write_bin_pack_data_files" + tbl = _create_table(session_catalog, identifier, {"format-version": "1"}, []) def get_data_files_count(identifier: str) -> int: return spark.sql( f""" SELECT * - FROM {identifier}.all_data_files + FROM {identifier}.files """ ).count() - # writes to 1 data file since the table is small + def set_table_properties(tbl: Table, properties: Properties) -> Table: + with tbl.transaction() as transaction: + transaction._apply((SetPropertiesUpdate(updates=properties),)) + return tbl + + # writes 1 data file since the table is smaller than default target file size + assert arrow_table_with_null.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT tbl.overwrite(arrow_table_with_null) assert get_data_files_count(identifier) == 1 - # writes to 1 data file as long as table is smaller than default target file size + # writes 1 data file as long as table is smaller than default target file size bigger_arrow_tbl = pa.concat_tables([arrow_table_with_null] * 10) - tbl.overwrite(bigger_arrow_tbl) assert bigger_arrow_tbl.nbytes < TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT + tbl.overwrite(bigger_arrow_tbl) assert get_data_files_count(identifier) == 1 + # writes multiple data files once target file size is overridden + target_file_size = arrow_table_with_null.nbytes + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) + assert target_file_size < bigger_arrow_tbl.nbytes + tbl.overwrite(bigger_arrow_tbl) + assert get_data_files_count(identifier) == 10 + + # writes half the number of data files when target file size doubles + target_file_size = arrow_table_with_null.nbytes * 2 + tbl = set_table_properties(tbl, {TableProperties.WRITE_TARGET_FILE_SIZE_BYTES: str(target_file_size)}) + assert str(target_file_size) == tbl.properties.get(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES) + assert target_file_size < bigger_arrow_tbl.nbytes + tbl.overwrite(bigger_arrow_tbl) + assert get_data_files_count(identifier) == 5 + @pytest.mark.integration @pytest.mark.parametrize("format_version", [1, 2])