Skip to content

Commit

Permalink
Conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed May 30, 2024
1 parent 5b10f25 commit 4cd67ac
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 135 deletions.
28 changes: 14 additions & 14 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2143,31 +2143,31 @@ def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":

return pa.Table.from_pydict(
{
'bool': [False, None, True],
'string': ['a', None, 'z'],
"bool": [False, None, True],
"string": ["a", None, "z"],
# Go over the 16 bytes to kick in truncation
'string_long': ['a' * 22, None, 'z' * 22],
'int': [1, None, 9],
'long': [1, None, 9],
'float': [0.0, None, 0.9],
'double': [0.0, None, 0.9],
"string_long": ["a" * 22, None, "z" * 22],
"int": [1, None, 9],
"long": [1, None, 9],
"float": [0.0, None, 0.9],
"double": [0.0, None, 0.9],
# 'time': [1_000_000, None, 3_000_000], # Example times: 1s, none, and 3s past midnight #Spark does not support time fields
'timestamp': [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
'timestamptz': [
"timestamp": [datetime(2023, 1, 1, 19, 25, 00), None, datetime(2023, 3, 1, 19, 25, 00)],
"timestamptz": [
datetime(2023, 1, 1, 19, 25, 00, tzinfo=timezone.utc),
None,
datetime(2023, 3, 1, 19, 25, 00, tzinfo=timezone.utc),
],
'date': [date(2023, 1, 1), None, date(2023, 3, 1)],
"date": [date(2023, 1, 1), None, date(2023, 3, 1)],
# Not supported by Spark
# 'time': [time(1, 22, 0), None, time(19, 25, 0)],
# Not natively supported by Arrow
# 'uuid': [uuid.UUID('00000000-0000-0000-0000-000000000000').bytes, None, uuid.UUID('11111111-1111-1111-1111-111111111111').bytes],
'binary': [b'\01', None, b'\22'],
'fixed': [
uuid.UUID('00000000-0000-0000-0000-000000000000').bytes,
"binary": [b"\01", None, b"\22"],
"fixed": [
uuid.UUID("00000000-0000-0000-0000-000000000000").bytes,
None,
uuid.UUID('11111111-1111-1111-1111-111111111111').bytes,
uuid.UUID("11111111-1111-1111-1111-111111111111").bytes,
],
},
schema=pa_schema,
Expand Down
80 changes: 40 additions & 40 deletions tests/integration/test_deletes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = 'default.table_partitioned_delete'
identifier = "default.table_partitioned_delete"

run_spark_commands(
spark,
Expand Down Expand Up @@ -66,14 +66,14 @@ def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog
tbl.delete(EqualTo("number_partitioned", 10))

# No overwrite operation
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'delete']
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]}
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "delete"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 11], "number": [20, 30]}


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = 'default.table_partitioned_delete'
identifier = "default.table_partitioned_delete"

run_spark_commands(
spark,
Expand Down Expand Up @@ -101,14 +101,14 @@ def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCat
tbl.delete(EqualTo("number", 20))

# We don't delete a whole partition, so there is only a overwrite
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'overwrite']
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 10], 'number': [30, 30]}
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "append", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [11, 10], "number": [30, 30]}


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None:
identifier = 'default.table_partitioned_delete'
identifier = "default.table_partitioned_delete"

run_spark_commands(
spark,
Expand All @@ -132,13 +132,13 @@ def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCa
tbl = session_catalog.load_table(identifier)
tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data

assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append']
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10, 10], 'number': [20, 30]}
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10, 10], "number": [20, 30]}


@pytest.mark.integration
def test_partitioned_table_positional_deletes(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = 'default.table_partitioned_delete'
identifier = "default.table_partitioned_delete"

run_spark_commands(
spark,
Expand Down Expand Up @@ -180,13 +180,13 @@ def test_partitioned_table_positional_deletes(spark: SparkSession, session_catal

# One positional delete has been added, but an OVERWRITE status is set
# https://github.com/apache/iceberg/issues/10122
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'overwrite', 'overwrite']
assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [10], 'number': [20]}
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ["append", "overwrite", "overwrite"]
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [10], "number": [20]}


@pytest.mark.integration
def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSession, session_catalog: RestCatalog) -> None:
identifier = 'default.table_partitioned_delete_sequence_number'
identifier = "default.table_partitioned_delete_sequence_number"

# This test case is a bit more complex. Here we run a MoR delete on a file, we make sure that
# the manifest gets rewritten (but not the data file with a MoR), and check if the delete is still there
Expand Down Expand Up @@ -234,40 +234,40 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio
assert len(snapshots) == 3

# Snapshots produced by Spark
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ['append', 'overwrite']
assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()[0:2]] == ["append", "overwrite"]

# Will rewrite one parquet file
assert snapshots[2].summary == Summary(
Operation.OVERWRITE,
**{
'added-files-size': '1145',
'added-data-files': '1',
'added-records': '2',
'changed-partition-count': '1',
'total-files-size': snapshots[2].summary['total-files-size'],
'total-delete-files': '0',
'total-data-files': '1',
'total-position-deletes': '0',
'total-records': '2',
'total-equality-deletes': '0',
'deleted-data-files': '2',
'removed-delete-files': '1',
'deleted-records': '5',
'removed-files-size': snapshots[2].summary['removed-files-size'],
'removed-position-deletes': '1',
"added-files-size": "1145",
"added-data-files": "1",
"added-records": "2",
"changed-partition-count": "1",
"total-files-size": snapshots[2].summary["total-files-size"],
"total-delete-files": "0",
"total-data-files": "1",
"total-position-deletes": "0",
"total-records": "2",
"total-equality-deletes": "0",
"deleted-data-files": "2",
"removed-delete-files": "1",
"deleted-records": "5",
"removed-files-size": snapshots[2].summary["removed-files-size"],
"removed-position-deletes": "1",
},
)

assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [20, 20, 10], 'number': [200, 202, 100]}
assert tbl.scan().to_arrow().to_pydict() == {"number_partitioned": [20, 20, 10], "number": [200, 202, 100]}


@pytest.mark.integration
def test_delete_no_match(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{'ints': 1},
{'ints': 3},
{"ints": 1},
{"ints": 3},
],
schema=arrow_schema,
)
Expand All @@ -286,7 +286,7 @@ def test_delete_no_match(session_catalog: RestCatalog) -> None:

assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]

tbl.delete('ints == 2') # Only 1 and 3 in the file, but is between the lower and upper bound
tbl.delete("ints == 2") # Only 1 and 3 in the file, but is between the lower and upper bound

assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [Operation.APPEND]

Expand All @@ -296,8 +296,8 @@ def test_delete_overwrite(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{'ints': 1},
{'ints': 2},
{"ints": 1},
{"ints": 2},
],
schema=arrow_schema,
)
Expand All @@ -318,28 +318,28 @@ def test_delete_overwrite(session_catalog: RestCatalog) -> None:

arrow_tbl_overwrite = pa.Table.from_pylist(
[
{'ints': 3},
{'ints': 4},
{"ints": 3},
{"ints": 4},
],
schema=arrow_schema,
)
tbl.overwrite(arrow_tbl_overwrite, 'ints == 2') # Should rewrite one file
tbl.overwrite(arrow_tbl_overwrite, "ints == 2") # Should rewrite one file

assert [snapshot.summary.operation for snapshot in tbl.snapshots()] == [
Operation.APPEND,
Operation.OVERWRITE,
Operation.APPEND,
]

assert tbl.scan().to_arrow()['ints'].to_pylist() == [3, 4, 1]
assert tbl.scan().to_arrow()["ints"].to_pylist() == [3, 4, 1]


@pytest.mark.integration
def test_delete_truncate(session_catalog: RestCatalog) -> None:
arrow_schema = pa.schema([pa.field("ints", pa.int32())])
arrow_tbl = pa.Table.from_pylist(
[
{'ints': 1},
{"ints": 1},
],
schema=arrow_schema,
)
Expand Down
46 changes: 23 additions & 23 deletions tests/integration/test_inspect_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,38 @@ def test_inspect_snapshots(
for snapshot_id in df["snapshot_id"]:
assert isinstance(snapshot_id.as_py(), int)

assert df['parent_id'][0].as_py() is None
assert df['parent_id'][1:].to_pylist() == df['snapshot_id'][:-1].to_pylist()
assert df["parent_id"][0].as_py() is None
assert df["parent_id"][1:].to_pylist() == df["snapshot_id"][:-1].to_pylist()

assert [operation.as_py() for operation in df['operation']] == ['append', 'delete', 'append', 'append']
assert [operation.as_py() for operation in df["operation"]] == ["append", "delete", "append", "append"]

for manifest_list in df["manifest_list"]:
assert manifest_list.as_py().startswith("s3://")

# Append
assert df['summary'][0].as_py() == [
('added-files-size', '5459'),
('added-data-files', '1'),
('added-records', '3'),
('total-data-files', '1'),
('total-delete-files', '0'),
('total-records', '3'),
('total-files-size', '5459'),
('total-position-deletes', '0'),
('total-equality-deletes', '0'),
assert df["summary"][0].as_py() == [
("added-files-size", "5459"),
("added-data-files", "1"),
("added-records", "3"),
("total-data-files", "1"),
("total-delete-files", "0"),
("total-records", "3"),
("total-files-size", "5459"),
("total-position-deletes", "0"),
("total-equality-deletes", "0"),
]

# Delete
assert df['summary'][1].as_py() == [
('removed-files-size', '5459'),
('deleted-data-files', '1'),
('deleted-records', '3'),
('total-data-files', '0'),
('total-delete-files', '0'),
('total-records', '0'),
('total-files-size', '0'),
('total-position-deletes', '0'),
('total-equality-deletes', '0'),
assert df["summary"][1].as_py() == [
("removed-files-size", "5459"),
("deleted-data-files", "1"),
("deleted-records", "3"),
("total-data-files", "0"),
("total-delete-files", "0"),
("total-records", "0"),
("total-files-size", "0"),
("total-position-deletes", "0"),
("total-equality-deletes", "0"),
]

lhs = spark.table(f"{identifier}.snapshots").toPandas()
Expand Down
Loading

0 comments on commit 4cd67ac

Please sign in to comment.