From 9c16634055e4773a9bfc91800269fea68ed7f4b4 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 6 Mar 2024 23:24:22 +0000 Subject: [PATCH 1/9] append files --- pyiceberg/io/pyarrow.py | 27 ++++++++ pyiceberg/table/__init__.py | 44 +++++++++++++ tests/integration/test_add_files.py | 97 +++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 tests/integration/test_add_files.py diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index be944ffb36..fcfd5b4904 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1772,6 +1772,33 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT return iter([data_file]) +def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: + input_file = io.new_input(file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + data_file = DataFile( + content=DataFileContent.DATA, + file_path=file_path, + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=parquet_metadata.num_rows, + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + ) + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + return data_file + + ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" PYARROW_UNCOMPRESSED_CODEC = "none" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1a4183c914..6d63fc9105 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1135,6 +1135,23 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T for data_file in data_files: update_snapshot.append_data_file(data_file) + def add_files(self, file_paths: List[str]) -> None: + """ + Shorthand API for adding files as data files to the table. + + Args: + files: The list of full file paths to be added as data files to the table + """ + if self.name_mapping() is None: + with self.transaction() as tx: + tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) + + with self.transaction() as txn: + with txn.update_snapshot().fast_append() as update_snapshot: + data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2430,6 +2447,21 @@ def generate_data_file_filename(self, extension: str) -> str: return f"00000-{self.task_id}-{self.write_uuid}.{extension}" +@dataclass(frozen=True) +class AddFileTask: + write_uuid: uuid.UUID + task_id: int + df: pa.Table + sort_order_id: Optional[int] = None + + # Later to be extended with partition information + + def generate_data_file_filename(self, extension: str) -> str: + # Mimics the behavior in the Java API: + # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 + return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + + def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: return f'{location}/metadata/{commit_uuid}-m{num}.avro' @@ -2461,6 +2493,18 @@ def _dataframe_to_data_files( yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)])) +def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: + """Convert a list files into DataFiles. + + Returns: + An iterable that supplies DataFiles that describe the parquet files. + """ + from pyiceberg.io.pyarrow import parquet_file_to_data_file + + for file_path in file_paths: + yield parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path) + + class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): commit_uuid: uuid.UUID _operation: Operation diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py new file mode 100644 index 0000000000..d76a5bf2bf --- /dev/null +++ b/tests/integration/test_add_files.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from pathlib import Path + +from pyiceberg.catalog import Catalog, Properties, Table +from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.schema import Schema +from pyiceberg.types import ( + BooleanType, + IntegerType, + NestedField, + StringType, +) +from pyiceberg.exceptions import NoSuchTableError +from pyspark.sql import SparkSession + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), +) + +ARROW_TABLE = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + } + ], + schema=schema_to_pyarrow(TABLE_SCHEMA), + ) + +def _create_table(session_catalog: Catalog, identifier: str) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA) + + return tbl + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, warehouse: Path) -> None: + identifier = "default.unpartitioned_table" + tbl = _create_table(session_catalog, identifier) + # rows = spark.sql( + # f""" + # SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + # FROM {identifier}.all_manifests + # """ + # ).collect() + + # assert [row.added_data_files_count for row in rows] == [] + # assert [row.existing_data_files_count for row in rows] == [] + # assert [row.deleted_data_files_count for row in rows] == [] + + file_paths = [f"/{warehouse}/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_TABLE.schema) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths) + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] \ No newline at end of file From e250ffcde7255f186740bba2824c2d7b328cdcce Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 8 Mar 2024 03:01:00 +0000 Subject: [PATCH 2/9] add_files --- pyiceberg/io/pyarrow.py | 53 ++++++------ pyiceberg/table/__init__.py | 42 ++++++---- tests/integration/test_add_files.py | 124 ++++++++++++++++++++-------- 3 files changed, 145 insertions(+), 74 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index fcfd5b4904..ba336e0e61 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -124,7 +124,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import PropertyUtil, TableProperties, WriteTask +from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -1772,31 +1772,32 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT return iter([data_file]) -def parquet_file_to_data_file(io: FileIO, table_metadata: TableMetadata, file_path: str) -> DataFile: - input_file = io.new_input(file_path) - with input_file.open() as input_stream: - parquet_metadata = pq.read_metadata(input_stream) - - schema = table_metadata.schema() - data_file = DataFile( - content=DataFileContent.DATA, - file_path=file_path, - file_format=FileFormat.PARQUET, - partition=Record(), - record_count=parquet_metadata.num_rows, - file_size_in_bytes=len(input_file), - sort_order_id=None, - spec_id=table_metadata.default_spec_id, - equality_ids=None, - key_metadata=None, - ) - fill_parquet_file_metadata( - data_file=data_file, - parquet_metadata=parquet_metadata, - stats_columns=compute_statistics_plan(schema, table_metadata.properties), - parquet_column_mapping=parquet_path_to_id_mapping(schema), - ) - return data_file +def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]: + for task in tasks: + input_file = io.new_input(task.file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + schema = table_metadata.schema() + data_file = DataFile( + content=DataFileContent.DATA, + file_path=task.file_path, + file_format=FileFormat.PARQUET, + partition=task.partition_field_value, + record_count=parquet_metadata.num_rows, + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + ) + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + yield data_file ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6d63fc9105..d8d05740ed 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -33,6 +33,7 @@ Dict, Generic, Iterable, + Iterator, List, Literal, Optional, @@ -56,6 +57,7 @@ parser, visitors, ) +from pyiceberg.expressions.literals import StringLiteral from pyiceberg.expressions.visitors import _InclusiveMetricsEvaluator, inclusive_projection from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ( @@ -117,6 +119,7 @@ Identifier, KeyDefaultDict, Properties, + Record, ) from pyiceberg.types import ( IcebergType, @@ -1140,7 +1143,7 @@ def add_files(self, file_paths: List[str]) -> None: Shorthand API for adding files as data files to the table. Args: - files: The list of full file paths to be added as data files to the table + file_paths: The list of full file paths to be added as data files to the table """ if self.name_mapping() is None: with self.transaction() as tx: @@ -2449,17 +2452,8 @@ def generate_data_file_filename(self, extension: str) -> str: @dataclass(frozen=True) class AddFileTask: - write_uuid: uuid.UUID - task_id: int - df: pa.Table - sort_order_id: Optional[int] = None - - # Later to be extended with partition information - - def generate_data_file_filename(self, extension: str) -> str: - # Mimics the behavior in the Java API: - # https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 - return f"00000-{self.task_id}-{self.write_uuid}.{extension}" + file_path: str + partition_field_value: Record def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: @@ -2493,16 +2487,34 @@ def _dataframe_to_data_files( yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)])) +def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]: + partition_spec = table_metadata.spec() + partition_struct = partition_spec.partition_type(table_metadata.schema()) + + for file_path in file_paths: + # file_path = 's3://warehouse/default/part1=2024-03-04/part2=ABCD' + # ['part1=2024-03-04', 'part2=ABCD'] + parts = [part for part in file_path.split("/") if "=" in part] + + partition_field_values = {} + for part in parts: + partition_name, string_value = part.split("=") + if partition_field := partition_struct.field_by_name(partition_name): + partition_field_values[partition_name] = StringLiteral(string_value).to(partition_field.field_type).value + + yield AddFileTask(file_path=file_path, partition_field_value=Record(**partition_field_values)) + + def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: """Convert a list files into DataFiles. Returns: An iterable that supplies DataFiles that describe the parquet files. """ - from pyiceberg.io.pyarrow import parquet_file_to_data_file + from pyiceberg.io.pyarrow import parquet_files_to_data_files - for file_path in file_paths: - yield parquet_file_to_data_file(io=io, table_metadata=table_metadata, file_path=file_path) + tasks = add_file_tasks_from_file_paths(file_paths, table_metadata) + yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks) class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index d76a5bf2bf..2aa33342cd 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -15,75 +15,92 @@ # specific language governing permissions and limitations # under the License. # pylint:disable=redefined-outer-name + +from datetime import date +from typing import Optional + import pyarrow as pa import pyarrow.parquet as pq import pytest -from pathlib import Path +from pyspark.sql import SparkSession -from pyiceberg.catalog import Catalog, Properties, Table -from pyiceberg.io.pyarrow import schema_to_pyarrow +from pyiceberg.catalog import Catalog, Table +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.transforms import IdentityTransform, MonthTransform from pyiceberg.types import ( BooleanType, + DateType, IntegerType, NestedField, StringType, ) -from pyiceberg.exceptions import NoSuchTableError -from pyspark.sql import SparkSession TABLE_SCHEMA = Schema( NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), NestedField(field_id=2, name="bar", field_type=StringType(), required=False), NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), ) +ARROW_SCHEMA = pa.schema([ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), +]) + ARROW_TABLE = pa.Table.from_pylist( - [ - { - "foo": True, - "bar": "bar_string", - "baz": 123, - } - ], - schema=schema_to_pyarrow(TABLE_SCHEMA), - ) - -def _create_table(session_catalog: Catalog, identifier: str) -> Table: + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA, +) + +PARTITION_SPEC = PartitionSpec( + PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), + PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="qux"), + spec_id=0, +) + + +def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass - tbl = session_catalog.create_table(identifier=identifier, schema=TABLE_SCHEMA) + tbl = session_catalog.create_table( + identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec() + ) return tbl + @pytest.mark.integration -def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, warehouse: Path) -> None: +def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: identifier = "default.unpartitioned_table" tbl = _create_table(session_catalog, identifier) - # rows = spark.sql( - # f""" - # SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - # FROM {identifier}.all_manifests - # """ - # ).collect() - - # assert [row.added_data_files_count for row in rows] == [] - # assert [row.existing_data_files_count for row in rows] == [] - # assert [row.deleted_data_files_count for row in rows] == [] - - file_paths = [f"/{warehouse}/test-{i}.parquet" for i in range(5)] + + file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=ARROW_TABLE.schema) as writer: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: writer.write_table(ARROW_TABLE) # add the parquet files as data files - tbl.add_files(file_paths) + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None rows = spark.sql( f""" @@ -94,4 +111,45 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: assert [row.added_data_files_count for row in rows] == [5] assert [row.existing_data_files_count for row in rows] == [0] - assert [row.deleted_data_files_count for row in rows] == [0] \ No newline at end of file + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) + + file_paths = [f"s3://warehouse/default/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" From 39436e69c342aa7a7e2f3dc5d71dbbb235d98823 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 8 Mar 2024 03:25:48 +0000 Subject: [PATCH 3/9] only support identity transforms --- pyiceberg/table/__init__.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index d8d05740ed..12c814912a 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -74,7 +74,6 @@ from pyiceberg.partitioning import ( INITIAL_PARTITION_SPEC_ID, PARTITION_FIELD_ID_START, - IdentityTransform, PartitionField, PartitionSpec, _PartitionNameGenerator, @@ -111,7 +110,7 @@ update_snapshot_summaries, ) from pyiceberg.table.sorting import SortOrder -from pyiceberg.transforms import TimeTransform, Transform, VoidTransform +from pyiceberg.transforms import IdentityTransform, TimeTransform, Transform, VoidTransform from pyiceberg.typedef import ( EMPTY_DICT, IcebergBaseModel, @@ -1145,6 +1144,9 @@ def add_files(self, file_paths: List[str]) -> None: Args: file_paths: The list of full file paths to be added as data files to the table """ + if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields): + raise NotImplementedError("Cannot add_files to a table with Transform Partitions") + if self.name_mapping() is None: with self.transaction() as tx: tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) @@ -2502,7 +2504,12 @@ def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableM if partition_field := partition_struct.field_by_name(partition_name): partition_field_values[partition_name] = StringLiteral(string_value).to(partition_field.field_type).value - yield AddFileTask(file_path=file_path, partition_field_value=Record(**partition_field_values)) + yield AddFileTask( + file_path=file_path, + partition_field_value=Record(**{ + field.name: partition_field_values.get(field.name) for field in partition_struct.fields + }), + ) def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: From fe51f4cbbc364d222b5c6efabc9e8c29191b8ec6 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 8 Mar 2024 03:37:21 +0000 Subject: [PATCH 4/9] lint --- tests/integration/test_add_files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 2aa33342cd..5538004582 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -28,7 +28,7 @@ from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.transforms import IdentityTransform, MonthTransform +from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BooleanType, DateType, From 05ed0205af43f1d94cdf7cf17a21b026cd74efd9 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Sat, 9 Mar 2024 20:49:55 +0000 Subject: [PATCH 5/9] more tests --- pyiceberg/manifest.py | 1 + tests/integration/test_add_files.py | 111 +++++++++++++++++++++++++++- 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 0504626d07..146c3ea45c 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -308,6 +308,7 @@ def data_file_with_partition(partition_type: StructType, format_version: Literal field_id=field.field_id, name=field.name, field_type=partition_field_to_data_file_partition_field(field.field_type), + required=False, ) for field in partition_type.fields ]) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 5538004582..4334969dfd 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -63,6 +63,25 @@ schema=ARROW_SCHEMA, ) +ARROW_SCHEMA_UPDATED = pa.schema([ + ("foo", pa.bool_()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ("quux", pa.int32()), +]) + +ARROW_TABLE_UPDATED = pa.Table.from_pylist( + [ + { + "foo": True, + "baz": 123, + "qux": date(2024, 3, 7), + "quux": 234, + } + ], + schema=ARROW_SCHEMA_UPDATED, +) + PARTITION_SPEC = PartitionSpec( PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="qux"), @@ -124,7 +143,7 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca identifier = "default.partitioned_table" tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) - file_paths = [f"s3://warehouse/default/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/partitioned/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -153,3 +172,93 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca assert df.count() == 5, "Expected 5 rows" for col in df.columns: assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_partitioned_table_missing_partition(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.partitioned_table" + tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) + + file_paths = [f"s3://warehouse/default/partitioned_2/baz=123/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + + for col in df.columns: + value_count = 0 if col == "qux" else 5 + assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_table_2" + tbl = _create_table(session_catalog, identifier) + + file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + with tbl.update_schema() as update: + update.add_column("quux", IntegerType()) + update.delete_column("bar") + + file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet" + # write parquet files + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_UPDATED) as writer: + writer.write_table(ARROW_TABLE_UPDATED) + + # add the parquet files as data files + tbl.add_files(file_paths=[file_path]) + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5, 1, 5] + assert [row.existing_data_files_count for row in rows] == [0, 0, 0] + assert [row.deleted_data_files_count for row in rows] == [0, 0, 0] + + df = spark.table(identifier) + assert df.count() == 6, "Expected 6 rows" + assert len(df.columns) == 4, "Expected 4 columns" + df.show() + for col in df.columns: + value_count = 1 if col == "quux" else 6 + assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" From c8232f53d2eafebe88abff9df1605edf7205f80e Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:26:46 +0000 Subject: [PATCH 6/9] support hive partition files --- pyiceberg/io/pyarrow.py | 6 +++-- tests/integration/test_add_files.py | 41 ++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f86711d517..08646c4a26 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1599,6 +1599,7 @@ def fill_parquet_file_metadata( parquet_metadata: pq.FileMetaData, stats_columns: Dict[int, StatisticsCollector], parquet_column_mapping: Dict[str, int], + check_schema_parity: bool = True, ) -> None: """ Compute and fill the following fields of the DataFile object. @@ -1618,12 +1619,12 @@ def fill_parquet_file_metadata( stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to set the mode for column metrics collection """ - if parquet_metadata.num_columns != len(stats_columns): + if check_schema_parity and parquet_metadata.num_columns != len(stats_columns): raise ValueError( f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})" ) - if parquet_metadata.num_columns != len(parquet_column_mapping): + if check_schema_parity and parquet_metadata.num_columns != len(parquet_column_mapping): raise ValueError( f"Number of columns in column mapping ({len(parquet_column_mapping)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})" ) @@ -1796,6 +1797,7 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks parquet_metadata=parquet_metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), + check_schema_parity=False, ) yield data_file diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 4334969dfd..9e94c64564 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -24,10 +24,11 @@ import pytest from pyspark.sql import SparkSession -from pyiceberg.catalog import Catalog, Table +from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import Table from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BooleanType, @@ -174,6 +175,44 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" +@pytest.mark.integration +def test_add_files_to_partitioned_table_hive_style(spark: SparkSession, session_catalog: Catalog) -> None: + # Typical Hive Style Partitioning does not have the partition date in the parquet file + # It is instead inferred from the partition path + identifier = "default.partitioned_hive_table" + tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) + + file_paths = [f"s3://warehouse/default/hive/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA.remove(3).remove(2)) as writer: + writer.write_table(ARROW_TABLE.drop_columns(["baz", "qux"])) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + @pytest.mark.integration def test_add_files_to_partitioned_table_missing_partition(spark: SparkSession, session_catalog: Catalog) -> None: identifier = "default.partitioned_table" From d63c775f0828826ad41a064204ffbe2946999ee9 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Thu, 14 Mar 2024 17:52:14 +0000 Subject: [PATCH 7/9] adopt review comments --- Makefile | 2 +- pyiceberg/io/pyarrow.py | 11 ++- pyiceberg/table/__init__.py | 33 ++----- tests/integration/test_add_files.py | 133 ++++++++-------------------- 4 files changed, 52 insertions(+), 127 deletions(-) diff --git a/Makefile b/Makefile index c3e816ebd5..133a13983b 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 08646c4a26..31d846f6f0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1599,7 +1599,6 @@ def fill_parquet_file_metadata( parquet_metadata: pq.FileMetaData, stats_columns: Dict[int, StatisticsCollector], parquet_column_mapping: Dict[str, int], - check_schema_parity: bool = True, ) -> None: """ Compute and fill the following fields of the DataFile object. @@ -1619,12 +1618,12 @@ def fill_parquet_file_metadata( stats_columns (Dict[int, StatisticsCollector]): The statistics gathering plan. It is required to set the mode for column metrics collection """ - if check_schema_parity and parquet_metadata.num_columns != len(stats_columns): + if parquet_metadata.num_columns != len(stats_columns): raise ValueError( f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})" ) - if check_schema_parity and parquet_metadata.num_columns != len(parquet_column_mapping): + if parquet_metadata.num_columns != len(parquet_column_mapping): raise ValueError( f"Number of columns in column mapping ({len(parquet_column_mapping)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})" ) @@ -1779,6 +1778,11 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks with input_file.open() as input_stream: parquet_metadata = pq.read_metadata(input_stream) + if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): + raise NotImplementedError( + f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" + ) + schema = table_metadata.schema() data_file = DataFile( content=DataFileContent.DATA, @@ -1797,7 +1801,6 @@ def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks parquet_metadata=parquet_metadata, stats_columns=compute_statistics_plan(schema, table_metadata.properties), parquet_column_mapping=parquet_path_to_id_mapping(schema), - check_schema_parity=False, ) yield data_file diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 99e452d45a..a66967c52d 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -57,7 +57,6 @@ EqualTo, Reference, ) -from pyiceberg.expressions.literals import StringLiteral from pyiceberg.io import FileIO, load_file_io from pyiceberg.manifest import ( POSITIONAL_DELETE_SCHEMA, @@ -1156,16 +1155,14 @@ def add_files(self, file_paths: List[str]) -> None: Args: file_paths: The list of full file paths to be added as data files to the table - """ - if any(not isinstance(field.transform, IdentityTransform) for field in self.metadata.spec().fields): - raise NotImplementedError("Cannot add_files to a table with Transform Partitions") - if self.name_mapping() is None: - with self.transaction() as tx: + Raises: + FileNotFoundError: If the file does not exist. + """ + with self.transaction() as tx: + if self.name_mapping() is None: tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) - - with self.transaction() as txn: - with txn.update_snapshot().fast_append() as update_snapshot: + with tx.update_snapshot().fast_append() as update_snapshot: data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io) for data_file in data_files: update_snapshot.append_data_file(data_file) @@ -2505,25 +2502,13 @@ def _dataframe_to_data_files( def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]: - partition_spec = table_metadata.spec() - partition_struct = partition_spec.partition_type(table_metadata.schema()) + if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: + raise ValueError("Cannot add files to partitioned tables") for file_path in file_paths: - # file_path = 's3://warehouse/default/part1=2024-03-04/part2=ABCD' - # ['part1=2024-03-04', 'part2=ABCD'] - parts = [part for part in file_path.split("/") if "=" in part] - - partition_field_values = {} - for part in parts: - partition_name, string_value = part.split("=") - if partition_field := partition_struct.field_by_name(partition_name): - partition_field_values[partition_name] = StringLiteral(string_value).to(partition_field.field_type).value - yield AddFileTask( file_path=file_path, - partition_field_value=Record(**{ - field.name: partition_field_values.get(field.name) for field in partition_struct.fields - }), + partition_field_value=Record(), ) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 9e94c64564..7399a6d900 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -26,10 +26,9 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.partitioning import PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table -from pyiceberg.transforms import IdentityTransform from pyiceberg.types import ( BooleanType, DateType, @@ -64,6 +63,26 @@ schema=ARROW_SCHEMA, ) +ARROW_SCHEMA_WITH_IDS = pa.schema([ + pa.field('foo', pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), + pa.field('bar', pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field('baz', pa.int32(), nullable=False, metadata={"PARQUET:field_id": "3"}), + pa.field('qux', pa.date32(), nullable=False, metadata={"PARQUET:field_id": "4"}), +]) + + +ARROW_TABLE_WITH_IDS = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_WITH_IDS, +) + ARROW_SCHEMA_UPDATED = pa.schema([ ("foo", pa.bool_()), ("baz", pa.int32()), @@ -83,12 +102,6 @@ schema=ARROW_SCHEMA_UPDATED, ) -PARTITION_SPEC = PartitionSpec( - PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"), - PartitionField(source_id=10, field_id=1001, transform=IdentityTransform(), name="qux"), - spec_id=0, -) - def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table: try: @@ -140,11 +153,11 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: @pytest.mark.integration -def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table" - tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) +def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_raises_not_found" + tbl = _create_table(session_catalog, identifier) - file_paths = [f"s3://warehouse/default/partitioned/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) @@ -153,102 +166,26 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca writer.write_table(ARROW_TABLE) # add the parquet files as data files - tbl.add_files(file_paths=file_paths) - - # NameMapping must have been set to enable reads - assert tbl.name_mapping() is not None - - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() - - assert [row.added_data_files_count for row in rows] == [5] - assert [row.existing_data_files_count for row in rows] == [0] - assert [row.deleted_data_files_count for row in rows] == [0] - - df = spark.table(identifier) - assert df.count() == 5, "Expected 5 rows" - for col in df.columns: - assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" - - -@pytest.mark.integration -def test_add_files_to_partitioned_table_hive_style(spark: SparkSession, session_catalog: Catalog) -> None: - # Typical Hive Style Partitioning does not have the partition date in the parquet file - # It is instead inferred from the partition path - identifier = "default.partitioned_hive_table" - tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) - - file_paths = [f"s3://warehouse/default/hive/baz=123/qux=2024-03-07/test-{i}.parquet" for i in range(5)] - # write parquet files - for file_path in file_paths: - fo = tbl.io.new_output(file_path) - with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=ARROW_SCHEMA.remove(3).remove(2)) as writer: - writer.write_table(ARROW_TABLE.drop_columns(["baz", "qux"])) - - # add the parquet files as data files - tbl.add_files(file_paths=file_paths) - - # NameMapping must have been set to enable reads - assert tbl.name_mapping() is not None - - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() - - assert [row.added_data_files_count for row in rows] == [5] - assert [row.existing_data_files_count for row in rows] == [0] - assert [row.deleted_data_files_count for row in rows] == [0] - - df = spark.table(identifier) - assert df.count() == 5, "Expected 5 rows" - for col in df.columns: - assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + with pytest.raises(FileNotFoundError): + tbl.add_files(file_paths=file_paths + ["s3://warehouse/default/unpartitioned_raises_not_found/unknown.parquet"]) @pytest.mark.integration -def test_add_files_to_partitioned_table_missing_partition(spark: SparkSession, session_catalog: Catalog) -> None: - identifier = "default.partitioned_table" - tbl = _create_table(session_catalog, identifier, PARTITION_SPEC) +def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_raises_field_ids" + tbl = _create_table(session_catalog, identifier) - file_paths = [f"s3://warehouse/default/partitioned_2/baz=123/test-{i}.parquet" for i in range(5)] + file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)] # write parquet files for file_path in file_paths: fo = tbl.io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: - writer.write_table(ARROW_TABLE) + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer: + writer.write_table(ARROW_TABLE_WITH_IDS) # add the parquet files as data files - tbl.add_files(file_paths=file_paths) - - # NameMapping must have been set to enable reads - assert tbl.name_mapping() is not None - - rows = spark.sql( - f""" - SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count - FROM {identifier}.all_manifests - """ - ).collect() - - assert [row.added_data_files_count for row in rows] == [5] - assert [row.existing_data_files_count for row in rows] == [0] - assert [row.deleted_data_files_count for row in rows] == [0] - - df = spark.table(identifier) - assert df.count() == 5, "Expected 5 rows" - - for col in df.columns: - value_count = 0 if col == "qux" else 5 - assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" + with pytest.raises(NotImplementedError): + tbl.add_files(file_paths=file_paths) @pytest.mark.integration From a355a2a286a9eac6bc2dde524ee5b80a73926846 Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 15 Mar 2024 02:08:50 +0000 Subject: [PATCH 8/9] review suggestions --- Makefile | 2 +- mkdocs/docs/api.md | 33 +++++++++++++++++++++++++++++ pyiceberg/table/__init__.py | 3 +++ tests/integration/test_add_files.py | 2 +- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 133a13983b..c3e816ebd5 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 724a45c52f..8121b1d252 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res +### Add Files + +Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. + +``` +# Given that these parquet files have schema consistent with the Iceberg table + +file_paths = [ + "s3a://warehouse/default/existing-1.parquet", + "s3a://warehouse/default/existing-2.parquet", +] + +# They can be added to the table without rewriting them + +tbl.add_files(file_paths=file_paths) + +# A new snapshot is committed to the table with manifests pointing to the existing parquet files +``` + + + +!!! note "Name Mapping" + Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. + + + + + +!!! warning "Maintenance Operations" + Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. + + + ## Schema evolution PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden). diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a66967c52d..8d57f11472 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1159,6 +1159,9 @@ def add_files(self, file_paths: List[str]) -> None: Raises: FileNotFoundError: If the file does not exist. """ + if len(self.spec().fields) > 0: + raise ValueError("Cannot write to partitioned tables") + with self.transaction() as tx: if self.name_mapping() is None: tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 7399a6d900..2066e178cd 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -234,7 +234,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio df = spark.table(identifier) assert df.count() == 6, "Expected 6 rows" assert len(df.columns) == 4, "Expected 4 columns" - df.show() + for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null" From fded34bc25cf630484f9ec2f09dd77d0222bc9db Mon Sep 17 00:00:00 2001 From: Sung Yun <107272191+syun64@users.noreply.github.com> Date: Fri, 15 Mar 2024 04:37:46 -0600 Subject: [PATCH 9/9] Update __init__.py Co-authored-by: Honah J. --- pyiceberg/table/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 8d57f11472..4fb14e7d05 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1160,7 +1160,7 @@ def add_files(self, file_paths: List[str]) -> None: FileNotFoundError: If the file does not exist. """ if len(self.spec().fields) > 0: - raise ValueError("Cannot write to partitioned tables") + raise ValueError("Cannot add files to partitioned tables") with self.transaction() as tx: if self.name_mapping() is None: