diff --git a/Makefile b/Makefile index 133a13983b..c3e816ebd5 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ test-integration: docker-compose -f dev/docker-compose-integration.yml up -d sleep 10 docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py - poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS} + poetry run pytest tests/ -v -m integration ${PYTEST_ARGS} test-integration-rebuild: docker-compose -f dev/docker-compose-integration.yml kill diff --git a/mkdocs/docs/api.md b/mkdocs/docs/api.md index 724a45c52f..8121b1d252 100644 --- a/mkdocs/docs/api.md +++ b/mkdocs/docs/api.md @@ -292,6 +292,39 @@ The nested lists indicate the different Arrow buffers, where the first write res +### Add Files + +Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them. + +``` +# Given that these parquet files have schema consistent with the Iceberg table + +file_paths = [ + "s3a://warehouse/default/existing-1.parquet", + "s3a://warehouse/default/existing-2.parquet", +] + +# They can be added to the table without rewriting them + +tbl.add_files(file_paths=file_paths) + +# A new snapshot is committed to the table with manifests pointing to the existing parquet files +``` + + + +!!! note "Name Mapping" + Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one. + + + + + +!!! warning "Maintenance Operations" + Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them. + + + ## Schema evolution PyIceberg supports full schema evolution through the Python API. It takes care of setting the field-IDs and makes sure that only non-breaking changes are done (can be overriden). diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index a66967c52d..8d57f11472 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1159,6 +1159,9 @@ def add_files(self, file_paths: List[str]) -> None: Raises: FileNotFoundError: If the file does not exist. """ + if len(self.spec().fields) > 0: + raise ValueError("Cannot write to partitioned tables") + with self.transaction() as tx: if self.name_mapping() is None: tx.set_properties(**{TableProperties.DEFAULT_NAME_MAPPING: self.schema().name_mapping.model_dump_json()}) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 7399a6d900..2066e178cd 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -234,7 +234,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio df = spark.table(identifier) assert df.count() == 6, "Expected 6 rows" assert len(df.columns) == 4, "Expected 4 columns" - df.show() + for col in df.columns: value_count = 1 if col == "quux" else 6 assert df.filter(df[col].isNotNull()).count() == value_count, f"Expected {value_count} rows to be non-null"