Skip to content

Commit

Permalink
adopt review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
sungwy committed Mar 19, 2024
1 parent ae3619a commit a8213a7
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ test-integration:
docker-compose -f dev/docker-compose-integration.yml up -d
sleep 10
docker-compose -f dev/docker-compose-integration.yml exec -T spark-iceberg ipython ./provision.py
poetry run pytest tests/integration/test_add_files.py -v -m integration ${PYTEST_ARGS}
poetry run pytest tests/ -v -m integration ${PYTEST_ARGS}

test-integration-rebuild:
docker-compose -f dev/docker-compose-integration.yml kill
Expand Down
2 changes: 1 addition & 1 deletion mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ tbl.add_files(file_paths=file_paths)
Because `add_files` uses existing files without writing new parquet files that are aware of the Iceberg's schema, it requires the Iceberg's table to have a [Name Mapping](https://iceberg.apache.org/spec/?h=name+mapping#name-mapping-serialization) (The Name mapping maps the field names within the parquet files to the Iceberg field IDs). Hence, `add_files` requires that there are no field IDs in the parquet file's metadata, and creates a new Name Mapping based on the table's current schema if the table doesn't already have one.
!!! note "Partitions"
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported).
`add_files` only requires the client to read the existing parquet files' metadata footer to infer the partition value of each file. This implementation also supports adding files to Iceberg tables with partition transforms like `MonthTransform`, and `TruncateTransform` which preserve the order of the values after the transformation (Any Transform that has the `preserves_order` property set to True is supported). Please note that if the column statistics of the `PartitionField`'s source column are not present in the parquet metadata, the partition value is inferred as `None`.
!!! warning "Maintenance Operations"
Because `add_files` commits the existing parquet files to the Iceberg Table as any other data file, destructive maintenance operations like expiring snapshots will remove them.
Expand Down
100 changes: 63 additions & 37 deletions tests/integration/test_add_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,25 +104,31 @@
)


def _create_table(session_catalog: Catalog, identifier: str, partition_spec: Optional[PartitionSpec] = None) -> Table:
def _create_table(
session_catalog: Catalog, identifier: str, format_version: int, partition_spec: Optional[PartitionSpec] = None
) -> Table:
try:
session_catalog.drop_table(identifier=identifier)
except NoSuchTableError:
pass

tbl = session_catalog.create_table(
identifier=identifier, schema=TABLE_SCHEMA, partition_spec=partition_spec if partition_spec else PartitionSpec()
identifier=identifier,
schema=TABLE_SCHEMA,
properties={"format-version": str(format_version)},
partition_spec=partition_spec if partition_spec else PartitionSpec(),
)

return tbl


@pytest.mark.integration
def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_table"
tbl = _create_table(session_catalog, identifier)
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.unpartitioned_table_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned/test-{i}.parquet" for i in range(5)]
file_paths = [f"s3://warehouse/default/unpartitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand Down Expand Up @@ -154,11 +160,14 @@ def test_add_files_to_unpartitioned_table(spark: SparkSession, session_catalog:


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_raises_not_found"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/test-{i}.parquet" for i in range(5)]
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_unpartitioned_table_raises_file_not_found(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.unpartitioned_raises_not_found_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_not_found/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand All @@ -172,11 +181,14 @@ def test_add_files_to_unpartitioned_table_raises_file_not_found(spark: SparkSess


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_raises_field_ids"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/test-{i}.parquet" for i in range(5)]
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_unpartitioned_table_raises_has_field_ids(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.unpartitioned_raises_field_ids_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned_raises_field_ids/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand All @@ -190,11 +202,14 @@ def test_add_files_to_unpartitioned_table_raises_has_field_ids(spark: SparkSessi


@pytest.mark.integration
def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.unpartitioned_table_2"
tbl = _create_table(session_catalog, identifier)

file_paths = [f"s3://warehouse/default/unpartitioned_2/test-{i}.parquet" for i in range(5)]
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_unpartitioned_table_with_schema_updates(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.unpartitioned_table_schema_updates_v{format_version}"
tbl = _create_table(session_catalog, identifier, format_version)

file_paths = [f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand All @@ -212,7 +227,7 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio
update.add_column("quux", IntegerType())
update.delete_column("bar")

file_path = "s3://warehouse/default/unpartitioned_2/test-6.parquet"
file_path = f"s3://warehouse/default/unpartitioned_schema_updates/v{format_version}/test-6.parquet"
# write parquet files
fo = tbl.io.new_output(file_path)
with fo.create(overwrite=True) as fos:
Expand Down Expand Up @@ -242,20 +257,21 @@ def test_add_files_to_unpartitioned_table_with_schema_updates(spark: SparkSessio


@pytest.mark.integration
def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.partitioned_table"
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.partitioned_table_v{format_version}"

partition_spec = PartitionSpec(
PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"),
PartitionField(source_id=10, field_id=1001, transform=MonthTransform(), name="qux_month"),
spec_id=0,
)

tbl = _create_table(session_catalog, identifier, partition_spec)
tbl = _create_table(session_catalog, identifier, format_version, partition_spec)

date_iter = iter([date(2024, 3, 7), date(2024, 3, 8), date(2024, 3, 16), date(2024, 3, 18), date(2024, 3, 19)])

file_paths = [f"s3://warehouse/default/partitioned/test-{i}.parquet" for i in range(5)]
file_paths = [f"s3://warehouse/default/partitioned/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand Down Expand Up @@ -310,19 +326,20 @@ def test_add_files_to_partitioned_table(spark: SparkSession, session_catalog: Ca


@pytest.mark.integration
def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.partitioned_table_2"
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None:
identifier = f"default.partitioned_table_bucket_fails_v{format_version}"

partition_spec = PartitionSpec(
PartitionField(source_id=4, field_id=1000, transform=BucketTransform(num_buckets=3), name="baz_bucket_3"),
spec_id=0,
)

tbl = _create_table(session_catalog, identifier, partition_spec)
tbl = _create_table(session_catalog, identifier, format_version, partition_spec)

int_iter = iter(range(5))

file_paths = [f"s3://warehouse/default/partitioned_2/test-{i}.parquet" for i in range(5)]
file_paths = [f"s3://warehouse/default/partitioned_table_bucket_fails/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand All @@ -345,21 +362,27 @@ def test_add_files_to_bucket_partitioned_table_fails(spark: SparkSession, sessio
# add the parquet files as data files
with pytest.raises(ValueError) as exc_info:
tbl.add_files(file_paths=file_paths)
assert "Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]" in str(exc_info.value)
assert (
"Cannot infer partition value from parquet metadata for a non-linear Partition Field: baz_bucket_3 with transform bucket[3]"
in str(exc_info.value)
)


@pytest.mark.integration
def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spark: SparkSession, session_catalog: Catalog) -> None:
identifier = "default.partitioned_table_3"
@pytest.mark.parametrize("format_version", [1, 2])
def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(
spark: SparkSession, session_catalog: Catalog, format_version: int
) -> None:
identifier = f"default.partitioned_table_mismatch_fails_v{format_version}"

partition_spec = PartitionSpec(
PartitionField(source_id=4, field_id=1000, transform=IdentityTransform(), name="baz"),
spec_id=0,
)

tbl = _create_table(session_catalog, identifier, partition_spec)
tbl = _create_table(session_catalog, identifier, format_version, partition_spec)

file_paths = [f"s3://warehouse/default/partitioned_3/test-{i}.parquet" for i in range(5)]
file_paths = [f"s3://warehouse/default/partitioned_table_mismatch_fails/v{format_version}/test-{i}.parquet" for i in range(5)]
# write parquet files
for file_path in file_paths:
fo = tbl.io.new_output(file_path)
Expand All @@ -379,7 +402,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar
"bar": "bar_string",
"baz": 124,
"qux": date(2024, 3, 7),
}
},
],
schema=ARROW_SCHEMA,
)
Expand All @@ -388,4 +411,7 @@ def test_add_files_to_partitioned_table_fails_with_lower_and_upper_mismatch(spar
# add the parquet files as data files
with pytest.raises(ValueError) as exc_info:
tbl.add_files(file_paths=file_paths)
assert "Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124" in str(exc_info.value)
assert (
"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: baz. lower_value=123, upper_value=124"
in str(exc_info.value)
)

0 comments on commit a8213a7

Please sign in to comment.