|
23 | 23 |
|
24 | 24 | from pyiceberg.catalog import Catalog
|
25 | 25 | from pyiceberg.exceptions import NoSuchTableError
|
26 |
| -from pyiceberg.expressions import And, EqualTo, Reference |
| 26 | +from pyiceberg.expressions import AlwaysTrue, And, EqualTo, Reference |
27 | 27 | from pyiceberg.expressions.literals import LongLiteral
|
28 | 28 | from pyiceberg.io.pyarrow import schema_to_pyarrow
|
29 | 29 | from pyiceberg.schema import Schema
|
@@ -709,3 +709,26 @@ def test_upsert_with_nulls(catalog: Catalog) -> None:
|
709 | 709 | ],
|
710 | 710 | schema=schema,
|
711 | 711 | )
|
| 712 | + |
| 713 | + |
| 714 | +def test_transaction(catalog: Catalog) -> None: |
| 715 | + """Test the upsert within a Transaction. Make sure that if something fails the entire Transaction is |
| 716 | + rolled back.""" |
| 717 | + identifier = "default.test_merge_source_dups" |
| 718 | + _drop_table(catalog, identifier) |
| 719 | + |
| 720 | + ctx = SessionContext() |
| 721 | + |
| 722 | + table = gen_target_iceberg_table(1, 10, False, ctx, catalog, identifier) |
| 723 | + df_before_transaction = table.scan().to_arrow() |
| 724 | + |
| 725 | + source_df = gen_source_dataset(5, 15, False, True, ctx) |
| 726 | + |
| 727 | + with pytest.raises(Exception, match="Duplicate rows found in source dataset based on the key columns. No upsert executed"): |
| 728 | + with table.transaction() as tx: |
| 729 | + tx.delete(delete_filter=AlwaysTrue()) |
| 730 | + tx.upsert(df=source_df, join_cols=["order_id"]) |
| 731 | + |
| 732 | + df = table.scan().to_arrow() |
| 733 | + |
| 734 | + assert df_before_transaction == df |
0 commit comments