Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Deletion vectors #1516

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions dev/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,20 @@ RUN mkdir -p ${HADOOP_HOME} && mkdir -p ${SPARK_HOME} && mkdir -p /home/iceberg/
WORKDIR ${SPARK_HOME}

# Remember to also update `tests/conftest`'s spark setting
ENV SPARK_VERSION=3.5.3
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_2.12
ENV ICEBERG_VERSION=1.6.0
ENV SPARK_VERSION=3.5.4
ENV ICEBERG_SPARK_RUNTIME_VERSION=3.5_4.12
ENV ICEBERG_VERSION=1.8.0
ENV PYICEBERG_VERSION=0.8.1

RUN curl --retry 5 -s -C - https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz -o spark-${SPARK_VERSION}-bin-hadoop3.tgz \
&& tar xzf spark-${SPARK_VERSION}-bin-hadoop3.tgz --directory /opt/spark --strip-components 1 \
&& rm -rf spark-${SPARK_VERSION}-bin-hadoop3.tgz

# Download iceberg spark runtime
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}/${ICEBERG_VERSION}/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar -Lo iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar \
&& mv iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar /opt/spark/jars
RUN curl --retry 5 -s https://repository.apache.org/content/groups/snapshots/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.8.0-SNAPSHOT/iceberg-spark-runtime-3.5_2.12-1.8.0-20250129.001714-82.jar -Lo /opt/spark/jars/iceberg-spark-runtime-${ICEBERG_SPARK_RUNTIME_VERSION}-${ICEBERG_VERSION}.jar

# Download AWS bundle
RUN curl --retry 5 -s https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar
RUN curl --retry 5 -s https://repository.apache.org/content/groups/snapshots/org/apache/iceberg/iceberg-aws-bundle/1.8.0-SNAPSHOT/iceberg-aws-bundle-1.8.0-20250129.002612-165.jar -Lo /opt/spark/jars/iceberg-aws-bundle-${ICEBERG_VERSION}.jar

COPY spark-defaults.conf /opt/spark/conf
ENV PATH="/opt/spark/sbin:/opt/spark/bin:${PATH}"
Expand Down
159 changes: 85 additions & 74 deletions dev/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import math

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_add, expr
Expand Down Expand Up @@ -113,89 +114,99 @@
"""
)

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_deletes (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
);
"""
)
# Merge on read has been implemented in version ≥2:
# v2: Using positional deletes
# v3: Using deletion vectors

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_positional_mor_deletes
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)
for format_version in [2, 3]:
identifier = f'{catalog_name}.default.test_positional_mor_deletes_v{format_version}'
spark.sql(
f"""
CREATE OR REPLACE TABLE {identifier} (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='{format_version}'
);
"""
)

spark.sql(
f"""
INSERT INTO {identifier}
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)

spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE TAG tag_12")
spark.sql(f"ALTER TABLE {identifier} CREATE TAG tag_12")

spark.sql(f"ALTER TABLE {catalog_name}.default.test_positional_mor_deletes CREATE BRANCH without_5")
spark.sql(f"ALTER TABLE {identifier} CREATE BRANCH without_5")

spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes.branch_without_5 WHERE number = 5")
spark.sql(f"DELETE FROM {identifier}.branch_without_5 WHERE number = 5")

spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_deletes WHERE number = 9")
spark.sql(f"DELETE FROM {identifier} WHERE number = 9")

spark.sql(
f"""
CREATE OR REPLACE TABLE {catalog_name}.default.test_positional_mor_double_deletes (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
);
"""
)
identifier = f'{catalog_name}.default.test_positional_mor_double_deletes_v{format_version}'

spark.sql(
f"""
INSERT INTO {catalog_name}.default.test_positional_mor_double_deletes
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)
spark.sql(
f"""
CREATE OR REPLACE TABLE {identifier} (
dt date,
number integer,
letter string
)
USING iceberg
TBLPROPERTIES (
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read',
'format-version'='2'
);
"""
)

spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE number = 9")
spark.sql(
f"""
INSERT INTO {identifier}
VALUES
(CAST('2023-03-01' AS date), 1, 'a'),
(CAST('2023-03-02' AS date), 2, 'b'),
(CAST('2023-03-03' AS date), 3, 'c'),
(CAST('2023-03-04' AS date), 4, 'd'),
(CAST('2023-03-05' AS date), 5, 'e'),
(CAST('2023-03-06' AS date), 6, 'f'),
(CAST('2023-03-07' AS date), 7, 'g'),
(CAST('2023-03-08' AS date), 8, 'h'),
(CAST('2023-03-09' AS date), 9, 'i'),
(CAST('2023-03-10' AS date), 10, 'j'),
(CAST('2023-03-11' AS date), 11, 'k'),
(CAST('2023-03-12' AS date), 12, 'l');
"""
)

spark.sql(f"DELETE FROM {catalog_name}.default.test_positional_mor_double_deletes WHERE letter == 'f'")
# Perform two deletes, should produce:
# v2: two positional delete files in v2
# v3: one deletion vector since they are merged
spark.sql(f"DELETE FROM {identifier} WHERE number = 9")
spark.sql(f"DELETE FROM {identifier} WHERE letter == 'f'")

all_types_dataframe = (
spark.range(0, 5, 1, 5)
Expand Down
Loading