Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Jan 27, 2025
1 parent 2fc9e78 commit fed83e8
Showing 1 changed file with 52 additions and 52 deletions.
104 changes: 52 additions & 52 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,58 +309,58 @@ def test_delete_partitioned_table_positional_deletes_empty_batch(spark: SparkSes
assert len(reader.read_all()) == 0


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
multiplier = 10

run_spark_commands(
spark,
[
f"DROP TABLE IF EXISTS {identifier}",
f"""
CREATE TABLE {identifier} (
number int
)
USING iceberg
TBLPROPERTIES(
'format-version' = 2,
'write.delete.mode'='merge-on-read',
'write.update.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
""",
],
)

tbl = session_catalog.load_table(identifier)

arrow_table = pa.Table.from_arrays(
[
pa.array(list(range(1, 1001)) * multiplier),
],
schema=pa.schema([pa.field("number", pa.int32())]),
)

tbl.append(arrow_table)

run_spark_commands(
spark,
[
f"""
DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
""",
],
)

tbl.refresh()

reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
assert isinstance(reader, pa.RecordBatchReader)
pyiceberg_count = len(reader.read_all())
expected_count = 46 * multiplier
assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"
# @pytest.mark.integration
# @pytest.mark.filterwarnings("ignore:Merge on read is not yet supported, falling back to copy-on-write")
# def test_read_multiple_batches_in_task_with_position_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
# identifier = "default.test_read_multiple_batches_in_task_with_position_deletes"
# multiplier = 10
#
# run_spark_commands(
# spark,
# [
# f"DROP TABLE IF EXISTS {identifier}",
# f"""
# CREATE TABLE {identifier} (
# number int
# )
# USING iceberg
# TBLPROPERTIES(
# 'format-version' = 2,
# 'write.delete.mode'='merge-on-read',
# 'write.update.mode'='merge-on-read',
# 'write.merge.mode'='merge-on-read'
# )
# """,
# ],
# )
#
# tbl = session_catalog.load_table(identifier)
#
# arrow_table = pa.Table.from_arrays(
# [
# pa.array(list(range(1, 1001)) * multiplier),
# ],
# schema=pa.schema([pa.field("number", pa.int32())]),
# )
#
# tbl.append(arrow_table)
#
# run_spark_commands(
# spark,
# [
# f"""
# DELETE FROM {identifier} WHERE number in (1, 2, 3, 4)
# """,
# ],
# )
#
# tbl.refresh()
#
# reader = tbl.scan(row_filter="number <= 50").to_arrow_batch_reader()
# assert isinstance(reader, pa.RecordBatchReader)
# pyiceberg_count = len(reader.read_all())
# expected_count = 46 * multiplier
# assert pyiceberg_count == expected_count, f"Failing check. {pyiceberg_count} != {expected_count}"


@pytest.mark.integration
Expand Down

0 comments on commit fed83e8

Please sign in to comment.