From b450c1c482a615cbb62cabe88ffaca04fb3f7376 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 22 Dec 2024 15:55:06 -0500 Subject: [PATCH] [infra] Update pyspark java iceberg library to 1.6.0 (#1462) * update pyspark java iceberb library to 1.6.0 * fix test * add reminder * make link --- dev/Dockerfile | 1 + tests/conftest.py | 3 ++- tests/integration/test_deletes.py | 10 ++-------- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/dev/Dockerfile b/dev/Dockerfile index d4346bf757..1cc70beda5 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -36,6 +36,7 @@ ENV PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/spark-events WORKDIR ${SPARK_HOME} +# Remember to also update `tests/conftest`'s spark setting ENV SPARK_VERSION=3.5.3 ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12 ENV ICEBERG_VERSION=1.6.0 diff --git a/tests/conftest.py b/tests/conftest.py index 89af22896f..22329b3882 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2240,9 +2240,10 @@ def spark() -> "SparkSession": from pyspark.sql import SparkSession + # Remember to also update `dev/Dockerfile` spark_version = ".".join(importlib.metadata.version("pyspark").split(".")[:2]) scala_version = "2.12" - iceberg_version = "1.4.3" + iceberg_version = "1.6.0" os.environ["PYSPARK_SUBMIT_ARGS"] = ( f"--packages org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}," diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index affc480f09..f2417bde2d 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -237,9 +237,7 @@ def test_delete_partitioned_table_positional_deletes(spark: SparkSession, sessio # Will rewrite a data file without the positional delete tbl.delete(EqualTo("number", 40)) - # One positional delete has been added, but an OVERWRITE status is set - # https://github.com/apache/iceberg/issues/10122 - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"] + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "overwrite"] assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]} @@ -410,8 +408,6 @@ def test_overwrite_partitioned_table(spark: SparkSession, session_catalog: RestC # Will rewrite a data file without the positional delete tbl.overwrite(arrow_tbl, "number_partitioned == 10") - # One positional delete has been added, but an OVERWRITE status is set - # https://github.com/apache/iceberg/issues/10122 assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "delete", "append"] assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10, 20], "number": [4, 5, 3]} @@ -461,13 +457,11 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio # Will rewrite a data file without a positional delete tbl.delete(EqualTo("number", 201)) - # One positional delete has been added, but an OVERWRITE status is set - # https://github.com/apache/iceberg/issues/10122 snapshots = tbl.snapshots() assert len(snapshots) == 3 # Snapshots produced by Spark - assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"] + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "delete"] # Will rewrite one parquet file assert snapshots[2].summary == Summary(