From f814ee162de27eed2c3bc9ef6d5d3624cf73afe8 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 18 Dec 2024 12:01:10 -0500 Subject: [PATCH 1/7] Initial commit for fix --- pyiceberg/io/pyarrow.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 7956a83242..98f3faa6fd 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -138,7 +138,7 @@ ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping -from pyiceberg.transforms import TruncateTransform +from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import EMPTY_DICT, Properties, Record from pyiceberg.types import ( BinaryType, @@ -1226,6 +1226,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, use_large_types: bool = True, + partition_spec: PartitionSpec = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1248,6 +1249,17 @@ def _task_to_record_batches( if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + # Apply column projection rules for missing partitions and default values (ref: https://iceberg.apache.org/spec/#column-projection) + projected_missing_fields = None + for field_id in projected_field_ids.difference(file_project_schema.field_ids): + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: + projected_missing_fields = (partition_field.name, task.file.partition[0]) + + if nested_field := projected_schema.find_field(field_id) and projected_missing_fields is None and task.file.partition is None: + if nested_field.initial_default is not None: + projected_missing_fields(nested_field.name, nested_field) + fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, # With PyArrow 16.0.0 there is an issue with casting record-batches: @@ -1286,7 +1298,7 @@ def _task_to_record_batches( continue output_batches = arrow_table.to_batches() for output_batch in output_batches: - yield _to_requested_schema( + result_batch = _to_requested_schema( projected_schema, file_project_schema, output_batch, @@ -1294,6 +1306,12 @@ def _task_to_record_batches( use_large_types=use_large_types, ) + if projected_missing_fields is not None: + name, value = projected_missing_fields + result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) + + yield result_batch + def _task_to_table( fs: FileSystem, @@ -1517,6 +1535,7 @@ def _record_batches_from_scan_tasks_and_deletes( self._case_sensitive, self._table_metadata.name_mapping(), self._use_large_types, + self._table_metadata.spec(), ) for batch in batches: if self._limit is not None: From cf36660a0edd388b1aa924eb567858b8d3d6805a Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Wed, 18 Dec 2024 15:19:59 -0500 Subject: [PATCH 2/7] Add test and commit lint changes --- pyiceberg/io/pyarrow.py | 6 ++++- tests/io/test_pyarrow.py | 53 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 98f3faa6fd..dcdfee18b3 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1256,7 +1256,11 @@ def _task_to_record_batches( if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: projected_missing_fields = (partition_field.name, task.file.partition[0]) - if nested_field := projected_schema.find_field(field_id) and projected_missing_fields is None and task.file.partition is None: + if ( + nested_field := projected_schema.find_field(field_id) + and projected_missing_fields is None + and task.file.partition is None + ): if nested_field.initial_default is not None: projected_missing_fields(nested_field.name, nested_field) diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index e4017e1df5..533141628c 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -77,6 +77,7 @@ from pyiceberg.schema import Schema, make_compatible_name, visit from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 +from pyiceberg.table.name_mapping import create_mapping_from_schema from pyiceberg.transforms import IdentityTransform from pyiceberg.typedef import UTF8, Properties, Record from pyiceberg.types import ( @@ -1122,6 +1123,58 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" +def test_projection_partition_inference(tmp_path: str, example_task: FileScanTask): + schema = Schema( + NestedField(1, "partition_field", IntegerType(), required=False), + NestedField(2, "other_field", StringType(), required=False), + ) + + partition_spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "partition_field")) + + table = TableMetadataV2( + location="file://a/b/c.json", + last_column_id=2, + format_version=2, + current_schema_id=0, + schemas=[schema], + partition_specs=[partition_spec], + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + pa_schema = pa.schema([pa.field("other_field", pa.string())]) + pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) + pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=f"{tmp_path}/datafile.parquet", + file_format=FileFormat.PARQUET, + partition=Record(partition_id=123456), + file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + sort_order_id=None, + spec_id=0, + equality_ids=None, + key_metadata=None, + ) + + table_result_scan = ArrowScan( + table_metadata=table, + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table(tasks=[FileScanTask(data_file=data_file)]) + + assert ( + str(table_result_scan) + == """pyarrow.Table +partition_field: int64 +other_field: large_string +---- +partition_field: [[123456]] +other_field: [["x"]]""" + ) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From 79824651a126801ef5bef420a66d5fc0331b39ac Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 00:18:46 -0500 Subject: [PATCH 3/7] default-value bug fixes and adding more tests --- pyiceberg/io/pyarrow.py | 19 ++++++-------- tests/io/test_pyarrow.py | 54 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index dcdfee18b3..acff6e4624 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1249,20 +1249,17 @@ def _task_to_record_batches( if file_schema is None: raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - # Apply column projection rules for missing partitions and default values (ref: https://iceberg.apache.org/spec/#column-projection) - projected_missing_fields = None + # Apply column projection rules for missing partitions and default values + # https://iceberg.apache.org/spec/#column-projection + projected_missing_fields = {} for field_id in projected_field_ids.difference(file_project_schema.field_ids): for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields = (partition_field.name, task.file.partition[0]) + projected_missing_fields[partition_field.name] = task.file.partition[0] - if ( - nested_field := projected_schema.find_field(field_id) - and projected_missing_fields is None - and task.file.partition is None - ): + if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: - projected_missing_fields(nested_field.name, nested_field) + projected_missing_fields[nested_field.name] = nested_field.initial_default fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1310,8 +1307,8 @@ def _task_to_record_batches( use_large_types=use_large_types, ) - if projected_missing_fields is not None: - name, value = projected_missing_fields + # Inject projected column values if available + for name, value in projected_missing_fields.items(): result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) yield result_batch diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 533141628c..822e188f4a 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1123,7 +1123,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str, example_task: FileScanTask): +def test_projection_partition_inference(tmp_path: str): schema = Schema( NestedField(1, "partition_field", IntegerType(), required=False), NestedField(2, "other_field", StringType(), required=False), @@ -1175,6 +1175,58 @@ def test_projection_partition_inference(tmp_path: str, example_task: FileScanTas ) +def test_projection_initial_default_inference(tmp_path: str) -> None: + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), + NestedField(2, "other_field_1", StringType(), required=False, initial_default="foo"), + ) + + table = TableMetadataV2( + location="file://a/b/c.json", + last_column_id=2, + format_version=2, + current_schema_id=0, + schemas=[schema], + partition_specs=[PartitionSpec()], + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + pa_schema = pa.schema([pa.field("other_field", pa.string())]) + pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) + pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + + data_file = DataFile( + content=DataFileContent.DATA, + file_path=f"{tmp_path}/datafile.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + sort_order_id=None, + spec_id=0, + equality_ids=None, + key_metadata=None, + ) + + table_result_scan = ArrowScan( + table_metadata=table, + io=load_file_io(), + projected_schema=schema, + row_filter=AlwaysTrue(), + ).to_table(tasks=[FileScanTask(data_file=data_file)]) + + print(str(table_result_scan)) + + assert ( + str(table_result_scan) + == """pyarrow.Table +other_field: large_string +other_field_1: string +---- +other_field: [["x"]] +other_field_1: [["foo"]]""" + ) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From e4d58826f4c42a82c0dba380d1c3e771c14ad900 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 00:41:21 -0500 Subject: [PATCH 4/7] Add continue, check file_schema before using it, group steps of projection together --- pyiceberg/io/pyarrow.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index acff6e4624..c052403d55 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1238,24 +1238,27 @@ def _task_to_record_batches( # When V3 support is introduced, we will update `downcast_ns_timestamp_to_us` flag based on # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) + + if file_schema is None: + raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") + pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) bound_file_filter = bind(file_schema, translated_row_filter, case_sensitive=case_sensitive) pyarrow_filter = expression_to_pyarrow(bound_file_filter) - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection + + file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) projected_missing_fields = {} + for field_id in projected_field_ids.difference(file_project_schema.field_ids): for partition_field in partition_spec.fields_by_source_id(field_id): if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: projected_missing_fields[partition_field.name] = task.file.partition[0] + continue if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: From 694a52d6f3b7cd5353c50a8cc0ed5fb6ab25ffb5 Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 19 Dec 2024 10:10:10 -0500 Subject: [PATCH 5/7] Fix lint issues, reorder partition spec to be of higher importance than initial-default --- pyiceberg/io/pyarrow.py | 12 +++++------- tests/io/test_pyarrow.py | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index c052403d55..0bd90ec216 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1226,7 +1226,7 @@ def _task_to_record_batches( case_sensitive: bool, name_mapping: Optional[NameMapping] = None, use_large_types: bool = True, - partition_spec: PartitionSpec = None, + partition_spec: Optional[PartitionSpec] = None, ) -> Iterator[pa.RecordBatch]: _, _, path = _parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) @@ -1250,19 +1250,17 @@ def _task_to_record_batches( # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection - file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) projected_missing_fields = {} for field_id in projected_field_ids.difference(file_project_schema.field_ids): - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields[partition_field.name] = task.file.partition[0] - continue - if nested_field := projected_schema.find_field(field_id): if nested_field.initial_default is not None: projected_missing_fields[nested_field.name] = nested_field.initial_default + if partition_spec is not None: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: + projected_missing_fields[partition_field.name] = task.file.partition[0] fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 822e188f4a..b0455280c2 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1123,7 +1123,7 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str): +def test_projection_partition_inference(tmp_path: str) -> None: schema = Schema( NestedField(1, "partition_field", IntegerType(), required=False), NestedField(2, "other_field", StringType(), required=False), From fee24ab4be7de1671bfead19a5fe9cd4202f496c Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Thu, 26 Dec 2024 12:31:37 -0500 Subject: [PATCH 6/7] Removed file_schema check and initial-default logic, separated projection logic to helper method, changed test to use high-level table scan --- pyiceberg/io/pyarrow.py | 36 +++++++----- tests/io/test_pyarrow.py | 115 ++++++++++++--------------------------- 2 files changed, 59 insertions(+), 92 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 0bd90ec216..a3a821d197 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1216,6 +1216,25 @@ def _field_id(self, field: pa.Field) -> int: return -1 +def _get_column_projection_values( + file: DataFile, + projected_schema: Schema, + projected_field_ids: Set[int], + file_project_schema: Schema, + partition_spec: Optional[PartitionSpec] = None, +) -> Dict[str, object]: + """Apply Column Projection rules to File Schema.""" + projected_missing_fields = {} + + for field_id in projected_field_ids.difference(file_project_schema.field_ids): + if partition_spec is not None: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform) and partition_field.name in file.partition.__dict__: + projected_missing_fields[partition_field.name] = file.partition.__dict__[partition_field.name] + + return projected_missing_fields + + def _task_to_record_batches( fs: FileSystem, task: FileScanTask, @@ -1239,9 +1258,6 @@ def _task_to_record_batches( # the table format version. file_schema = pyarrow_to_schema(physical_schema, name_mapping, downcast_ns_timestamp_to_us=True) - if file_schema is None: - raise ValueError(f"Missing Iceberg schema in Metadata for file: {path}") - pyarrow_filter = None if bound_row_filter is not AlwaysTrue(): translated_row_filter = translate_column_names(bound_row_filter, file_schema, case_sensitive=case_sensitive) @@ -1251,16 +1267,10 @@ def _task_to_record_batches( # Apply column projection rules for missing partitions and default values # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - projected_missing_fields = {} - - for field_id in projected_field_ids.difference(file_project_schema.field_ids): - if nested_field := projected_schema.find_field(field_id): - if nested_field.initial_default is not None: - projected_missing_fields[nested_field.name] = nested_field.initial_default - if partition_spec is not None: - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and task.file.partition is not None: - projected_missing_fields[partition_field.name] = task.file.partition[0] + + projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, projected_field_ids, file_project_schema, partition_spec + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index b0455280c2..277fa050cb 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -69,7 +69,10 @@ _read_deletes, _to_requested_schema, bin_pack_arrow_table, + compute_statistics_plan, + data_file_statistics_from_parquet_metadata, expression_to_pyarrow, + parquet_path_to_id_mapping, schema_to_pyarrow, ) from pyiceberg.manifest import DataFile, DataFileContent, FileFormat @@ -100,6 +103,7 @@ TimestamptzType, TimeType, ) +from tests.catalog.test_base import InMemoryCatalog from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES @@ -1123,110 +1127,63 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str) -> None: +def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) -> None: schema = Schema( - NestedField(1, "partition_field", IntegerType(), required=False), - NestedField(2, "other_field", StringType(), required=False), + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) ) - partition_spec = PartitionSpec(PartitionField(1, 1000, IdentityTransform(), "partition_field")) + partition_spec = PartitionSpec(PartitionField(2, 1000, IdentityTransform(), "partition_id")) - table = TableMetadataV2( - location="file://a/b/c.json", - last_column_id=2, - format_version=2, - current_schema_id=0, - schemas=[schema], - partition_specs=[partition_spec], + table = catalog.create_table( + "default.test_projection_partition_inference", + schema=schema, + partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, ) - pa_schema = pa.schema([pa.field("other_field", pa.string())]) - pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) - pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) - data_file = DataFile( - content=DataFileContent.DATA, - file_path=f"{tmp_path}/datafile.parquet", - file_format=FileFormat.PARQUET, - partition=Record(partition_id=123456), - file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), - sort_order_id=None, - spec_id=0, - equality_ids=None, - key_metadata=None, - ) - - table_result_scan = ArrowScan( - table_metadata=table, - io=load_file_io(), - projected_schema=schema, - row_filter=AlwaysTrue(), - ).to_table(tasks=[FileScanTask(data_file=data_file)]) - - assert ( - str(table_result_scan) - == """pyarrow.Table -partition_field: int64 -other_field: large_string ----- -partition_field: [[123456]] -other_field: [["x"]]""" + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), ) - -def test_projection_initial_default_inference(tmp_path: str) -> None: - schema = Schema( - NestedField(1, "other_field", StringType(), required=False), - NestedField(2, "other_field_1", StringType(), required=False, initial_default="foo"), - ) - - table = TableMetadataV2( - location="file://a/b/c.json", - last_column_id=2, - format_version=2, - current_schema_id=0, - schemas=[schema], - partition_specs=[PartitionSpec()], - properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, - ) - - pa_schema = pa.schema([pa.field("other_field", pa.string())]) - pa_table = pa.table({"other_field": ["x"]}, schema=pa_schema) - pq.write_table(pa_table, f"{tmp_path}/datafile.parquet") - - data_file = DataFile( + unpartitioned_file = DataFile( content=DataFileContent.DATA, - file_path=f"{tmp_path}/datafile.parquet", + file_path=file_loc, file_format=FileFormat.PARQUET, - partition=Record(), - file_size_in_bytes=os.path.getsize(f"{tmp_path}/datafile.parquet"), + partition=Record(partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), sort_order_id=None, - spec_id=0, + spec_id=table.metadata.default_spec_id, equality_ids=None, key_metadata=None, + **statistics.to_serialized_dict(), ) - table_result_scan = ArrowScan( - table_metadata=table, - io=load_file_io(), - projected_schema=schema, - row_filter=AlwaysTrue(), - ).to_table(tasks=[FileScanTask(data_file=data_file)]) - - print(str(table_result_scan)) + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) assert ( - str(table_result_scan) + str(table.scan().to_arrow()) == """pyarrow.Table other_field: large_string -other_field_1: string +partition_id: int64 ---- -other_field: [["x"]] -other_field_1: [["foo"]]""" +other_field: [["foo"]] +partition_id: [[1]]""" ) +@pytest.fixture +def catalog() -> InMemoryCatalog: + return InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) + + def test_projection_filter(schema_int: Schema, file_int: str) -> None: result_table = project(schema_int, [file_int], GreaterThan("id", 4)) assert len(result_table.columns[0]) == 0 From 8362803322a031605a019fee83ef15b5d619c67b Mon Sep 17 00:00:00 2001 From: Gabriel Igliozzi Date: Fri, 17 Jan 2025 14:58:25 -0800 Subject: [PATCH 7/7] Add should_project check, add lookup by accessor, multiple-partition test --- pyiceberg/io/pyarrow.py | 43 ++++++++++++++++-------- tests/io/test_pyarrow.py | 72 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 97 insertions(+), 18 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a3a821d197..117b40cfcc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -129,6 +129,7 @@ SchemaVisitorPerPrimitiveType, SchemaWithPartnerVisitor, _check_schema_compatible, + build_position_accessors, pre_order_visit, promote, prune_columns, @@ -1219,18 +1220,25 @@ def _field_id(self, field: pa.Field) -> int: def _get_column_projection_values( file: DataFile, projected_schema: Schema, - projected_field_ids: Set[int], - file_project_schema: Schema, - partition_spec: Optional[PartitionSpec] = None, + project_schema_diff: Set[int], + partition_spec: PartitionSpec, ) -> Dict[str, object]: """Apply Column Projection rules to File Schema.""" - projected_missing_fields = {} + projected_missing_fields: Dict[str, Any] = {} + + partition_schema = partition_spec.partition_type(projected_schema) + accessors = build_position_accessors(partition_schema) + + for field_id in project_schema_diff: + for partition_field in partition_spec.fields_by_source_id(field_id): + if isinstance(partition_field.transform, IdentityTransform): + accesor = accessors.get(partition_field.field_id) - for field_id in projected_field_ids.difference(file_project_schema.field_ids): - if partition_spec is not None: - for partition_field in partition_spec.fields_by_source_id(field_id): - if isinstance(partition_field.transform, IdentityTransform) and partition_field.name in file.partition.__dict__: - projected_missing_fields[partition_field.name] = file.partition.__dict__[partition_field.name] + if accesor is None: + continue + + if partition_value := accesor.get(file.partition): + projected_missing_fields[partition_field.name] = partition_value return projected_missing_fields @@ -1268,9 +1276,15 @@ def _task_to_record_batches( # https://iceberg.apache.org/spec/#column-projection file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False) - projected_missing_fields = _get_column_projection_values( - task.file, projected_schema, projected_field_ids, file_project_schema, partition_spec - ) + project_schema_diff = projected_field_ids.difference(file_project_schema.field_ids) + should_project_columns = len(project_schema_diff) > 0 + + projected_missing_fields = {} + + if should_project_columns and partition_spec is not None: + projected_missing_fields = _get_column_projection_values( + task.file, projected_schema, project_schema_diff, partition_spec + ) fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, @@ -1319,8 +1333,9 @@ def _task_to_record_batches( ) # Inject projected column values if available - for name, value in projected_missing_fields.items(): - result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) + if should_project_columns: + for name, value in projected_missing_fields.items(): + result_batch = result_batch.set_column(result_batch.schema.get_field_index(name), name, [value]) yield result_batch diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index 277fa050cb..00c5607c67 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -81,7 +81,7 @@ from pyiceberg.table import FileScanTask, TableProperties from pyiceberg.table.metadata import TableMetadataV2 from pyiceberg.table.name_mapping import create_mapping_from_schema -from pyiceberg.transforms import IdentityTransform +from pyiceberg.transforms import IdentityTransform, VoidTransform from pyiceberg.typedef import UTF8, Properties, Record from pyiceberg.types import ( BinaryType, @@ -1127,15 +1127,21 @@ def test_projection_concat_files(schema_int: Schema, file_int: str) -> None: assert repr(result_table.schema) == "id: int32" -def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) -> None: +def test_projection_single_partition_value(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + schema = Schema( NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) ) - partition_spec = PartitionSpec(PartitionField(2, 1000, IdentityTransform(), "partition_id")) + partition_spec = PartitionSpec( + PartitionField(2, 1000, IdentityTransform(), "partition_id"), + ) table = catalog.create_table( - "default.test_projection_partition_inference", + "default.test_projection_partition", schema=schema, partition_spec=partition_spec, properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, @@ -1179,6 +1185,64 @@ def test_projection_partition_inference(tmp_path: str, catalog: InMemoryCatalog) ) +def test_projection_multiple_partition_values(tmp_path: str, catalog: InMemoryCatalog) -> None: + # Test by adding a non-partitioned data file to a multi-partitioned table, verifying partition value projection from manifest metadata. + # TODO: Update to use a data file created by writing data to an unpartitioned table once add_files supports field IDs. + # (context: https://github.com/apache/iceberg-python/pull/1443#discussion_r1901374875) + schema = Schema( + NestedField(1, "other_field", StringType(), required=False), NestedField(2, "partition_id", IntegerType(), required=False) + ) + + partition_spec = PartitionSpec( + PartitionField(2, 1000, VoidTransform(), "void_partition_id"), + PartitionField(2, 1001, IdentityTransform(), "partition_id"), + ) + + table = catalog.create_table( + "default.test_projection_partitions", + schema=schema, + partition_spec=partition_spec, + properties={TableProperties.DEFAULT_NAME_MAPPING: create_mapping_from_schema(schema).model_dump_json()}, + ) + + file_data = pa.array(["foo"], type=pa.string()) + file_loc = f"{tmp_path}/test.parquet" + pq.write_table(pa.table([file_data], names=["other_field"]), file_loc) + + statistics = data_file_statistics_from_parquet_metadata( + parquet_metadata=pq.read_metadata(file_loc), + stats_columns=compute_statistics_plan(table.schema(), table.metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(table.schema()), + ) + + unpartitioned_file = DataFile( + content=DataFileContent.DATA, + file_path=file_loc, + file_format=FileFormat.PARQUET, + partition=Record(void_partition_id=None, partition_id=1), + file_size_in_bytes=os.path.getsize(file_loc), + sort_order_id=None, + spec_id=table.metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + **statistics.to_serialized_dict(), + ) + + with table.transaction() as transaction: + with transaction.update_snapshot().overwrite() as update: + update.append_data_file(unpartitioned_file) + + assert ( + str(table.scan().to_arrow()) + == """pyarrow.Table +other_field: large_string +partition_id: int64 +---- +other_field: [["foo"]] +partition_id: [[1]]""" + ) + + @pytest.fixture def catalog() -> InMemoryCatalog: return InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"})