diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 724a45c52f..8121b1d252 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res +### Add Files + +Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. + +``` +# Given that these parquet files have schema consistent with the Iceberg table + +file_paths = [ + "s3a://warehouse/default/existing-1.parquet", + "s3a://warehouse/default/existing-2.parquet", +] + +# They can be added to the table without rewriting them + +tbl.add_files(file_paths=file_paths) + +# A new snapshot is committed to the table with manifests pointing to the existing parquet files +``` + + + +!!! note "Name Mapping" + Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. + + + + + +!!! warning "Maintenance Operations" + Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. + + + ## Schema evolution PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden). diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index f1e4d302ec..31d846f6f0 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -124,7 +124,7 @@ visit, visit_with_partner, ) -from pyiceberg.table import PropertyUtil, TableProperties, WriteTask +from pyiceberg.table import AddFileTask, PropertyUtil, TableProperties, WriteTask from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.name_mapping import NameMapping from pyiceberg.transforms import TruncateTransform @@ -1772,6 +1772,39 @@ def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteT return iter([data_file]) +def parquet_files_to_data_files(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[AddFileTask]) -> Iterator[DataFile]: + for task in tasks: + input_file = io.new_input(task.file_path) + with input_file.open() as input_stream: + parquet_metadata = pq.read_metadata(input_stream) + + if visit_pyarrow(parquet_metadata.schema.to_arrow_schema(), _HasIds()): + raise NotImplementedError( + f"Cannot add file {task.file_path} because it has field IDs. `add_files` only supports addition of files without field_ids" + ) + + schema = table_metadata.schema() + data_file = DataFile( + content=DataFileContent.DATA, + file_path=task.file_path, + file_format=FileFormat.PARQUET, + partition=task.partition_field_value, + record_count=parquet_metadata.num_rows, + file_size_in_bytes=len(input_file), + sort_order_id=None, + spec_id=table_metadata.default_spec_id, + equality_ids=None, + key_metadata=None, + ) + fill_parquet_file_metadata( + data_file=data_file, + parquet_metadata=parquet_metadata, + stats_columns=compute_statistics_plan(schema, table_metadata.properties), + parquet_column_mapping=parquet_path_to_id_mapping(schema), + ) + yield data_file + + ICEBERG_UNCOMPRESSED_CODEC = "uncompressed" PYARROW_UNCOMPRESSED_CODEC = "none" diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c5dd2e8e70..4fb14e7d05 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -33,6 +33,7 @@ Dict, Generic, Iterable, + Iterator, List, Literal, Optional, @@ -115,6 +116,7 @@ Identifier, KeyDefaultDict, Properties, + Record, ) from pyiceberg.types import ( IcebergType, @@ -1147,6 +1149,27 @@ def overwrite(self, df: pa.Table, overwrite_filter: BooleanExpression = ALWAYS_T for data_file in data_files: update_snapshot.append_data_file(data_file) + def add_files(self, file_paths: List[str]) -> None: + """ + Shorthand API for adding files as data files to the table. + + Args: + file_paths: The list of full file paths to be added as data files to the table + + Raises: + FileNotFoundError: If the file does not exist. + """ + if len(self.spec().fields) > 0: + raise ValueError("Cannot add files to partitioned tables") + + with self.transaction() as tx: + if self.name_mapping() is None: + tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) + with tx.update_snapshot().fast_append() as update_snapshot: + data_files = _parquet_files_to_data_files(table_metadata=self.metadata, file_paths=file_paths, io=self.io) + for data_file in data_files: + update_snapshot.append_data_file(data_file) + def update_spec(self, case_sensitive: bool = True) -> UpdateSpec: return UpdateSpec(Transaction(self, autocommit=True), case_sensitive=case_sensitive) @@ -2444,6 +2467,12 @@ def generate_data_file_filename(self, extension: str) -> str: return f"00000-{self.task_id}-{self.write_uuid}.{extension}" +@dataclass(frozen=True) +class AddFileTask: + file_path: str + partition_field_value: Record + + def _new_manifest_path(location: str, num: int, commit_uuid: uuid.UUID) -> str: return f'{location}/metadata/{commit_uuid}-m{num}.avro' @@ -2475,6 +2504,29 @@ def _dataframe_to_data_files( yield from write_file(io=io, table_metadata=table_metadata, tasks=iter([WriteTask(write_uuid, next(counter), df)])) +def add_file_tasks_from_file_paths(file_paths: List[str], table_metadata: TableMetadata) -> Iterator[AddFileTask]: + if len([spec for spec in table_metadata.partition_specs if spec.spec_id != 0]) > 0: + raise ValueError("Cannot add files to partitioned tables") + + for file_path in file_paths: + yield AddFileTask( + file_path=file_path, + partition_field_value=Record(), + ) + + +def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: + """Convert a list files into DataFiles. + + Returns: + An iterable that supplies DataFiles that describe the parquet files. + """ + from pyiceberg.io.pyarrow import parquet_files_to_data_files + + tasks = add_file_tasks_from_file_paths(file_paths, table_metadata) + yield from parquet_files_to_data_files(io=io, table_metadata=table_metadata, tasks=tasks) + + class _MergingSnapshotProducer(UpdateTableMetadata["_MergingSnapshotProducer"]): commit_uuid: uuid.UUID _operation: Operation diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py new file mode 100644 index 0000000000..2066e178cd --- /dev/null +++ b/tests/integration/test_add_files.py @@ -0,0 +1,240 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name + +from datetime import date +from typing import Optional + +import pyarrow as pa +import pyarrow.parquet as pq +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog import Catalog +from pyiceberg.exceptions import NoSuchTableError +from pyiceberg.partitioning import PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import Table +from pyiceberg.types import ( + BooleanType, + DateType, + IntegerType, + NestedField, + StringType, +) + +TABLE_SCHEMA = Schema( + NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False), + NestedField(field_id=2, name="bar", field_type=StringType(), required=False), + NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False), + NestedField(field_id=10, name="qux", field_type=DateType(), required=False), +) + +ARROW_SCHEMA = pa.schema([ + ("foo", pa.bool_()), + ("bar", pa.string()), + ("baz", pa.int32()), + ("qux", pa.date32()), +]) + +ARROW_TABLE = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA, +) + +ARROW_SCHEMA_WITH_IDS = pa.schema([ + pa.field('foo', pa.bool_(), nullable=False, metadata={"PARQUET:field_id": "1"}), + pa.field('bar', pa.string(), nullable=False, metadata={"PARQUET:field_id": "2"}), + pa.field('baz', pa.int32(), nullable=False, metadata={"PARQUET:field_id": "3"}), + pa.field('qux', pa.date32(), nullable=False, metadata={"PARQUET:field_id": "4"}), +]) + + +ARROW_TABLE_WITH_IDS = pa.Table.from_pylist( + [ + { + "foo": True, + "bar": "bar_string", + "baz": 123, + "qux": date(2024, 3, 7), + } + ], + schema=ARROW_SCHEMA_WITH_IDS, +) + +ARROW_SCHEMA_UPDATED = pa.schema([ + ("foo", pa.bool_()), + ("baz", pa.int32()), + ("qux", pa.date32()), + ("quux", pa.int32()), +]) + +ARROW_TABLE_UPDATED = pa.Table.from_pylist( + [ + { + "foo": True, + "baz": 123, + "qux": date(2024, 3, 7), + "quux": 234, + } + ], + schema=ARROW_SCHEMA_UPDATED, +) + + +def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table: + try: + session_catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + tbl = session_catalog.create_table( + identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec() + ) + + return tbl + + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_table" + tbl = _create_table(session_catalog, identifier) + + file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5] + assert [row.existing_data_files_count for row in rows] == [0] + assert [row.deleted_data_files_count for row in rows] == [0] + + df = spark.table(identifier) + assert df.count() == 5, "Expected 5 rows" + for col in df.columns: + assert df.filter(df[col].isNotNull()).count() == 5, "Expected all 5 rows to be non-null" + + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_raises_not_found" + tbl = _create_table(session_catalog, identifier) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + with pytest.raises(FileNotFoundError): + tbl.add_files(file_paths=file_paths + ["s3://warehouse/default/unpartitioned_raises_not_found/unknown.parquet"]) + + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_raises_field_ids" + tbl = _create_table(session_catalog, identifier) + + file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_WITH_IDS) as writer: + writer.write_table(ARROW_TABLE_WITH_IDS) + + # add the parquet files as data files + with pytest.raises(NotImplementedError): + tbl.add_files(file_paths=file_paths) + + +@pytest.mark.integration +def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None: + identifier = "default.unpartitioned_table_2" + tbl = _create_table(session_catalog, identifier) + + file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)] + # write parquet files + for file_path in file_paths: + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA) as writer: + writer.write_table(ARROW_TABLE) + + # add the parquet files as data files + tbl.add_files(file_paths=file_paths) + + # NameMapping must have been set to enable reads + assert tbl.name_mapping() is not None + + with tbl.update_schema() as update: + update.add_column("quux", IntegerType()) + update.delete_column("bar") + + file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet" + # write parquet files + fo = tbl.io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=ARROW_SCHEMA_UPDATED) as writer: + writer.write_table(ARROW_TABLE_UPDATED) + + # add the parquet files as data files + tbl.add_files(file_paths=[file_path]) + rows = spark.sql( + f""" + SELECT added_data_files_count, existing_data_files_count, deleted_data_files_count + FROM {identifier}.all_manifests + """ + ).collect() + + assert [row.added_data_files_count for row in rows] == [5, 1, 5] + assert [row.existing_data_files_count for row in rows] == [0, 0, 0] + assert [row.deleted_data_files_count for row in rows] == [0, 0, 0] + + df = spark.table(identifier) + assert df.count() == 6, "Expected 6 rows" + assert len(df.columns) == 4, "Expected 4 columns" + + for col in df.columns: + value_count = 1 if col == "quux" else 6 + assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"