@@ -732,3 +732,39 @@ def test_transaction(catalog: Catalog) -> None:
732
732
df = table .scan ().to_arrow ()
733
733
734
734
assert df_before_transaction == df
735
+
736
+
737
+ def test_transaction_multiple_upserts (catalog : Catalog ) -> None :
738
+ identifier = "default.test_multi_upsert"
739
+ _drop_table (catalog , identifier )
740
+
741
+ schema = Schema (
742
+ NestedField (1 , "id" , IntegerType (), required = True ),
743
+ NestedField (2 , "name" , StringType (), required = True ),
744
+ identifier_field_ids = [1 ],
745
+ )
746
+
747
+ tbl = catalog .create_table (identifier , schema = schema )
748
+
749
+ # Define exact schema: required int32 and required string
750
+ arrow_schema = pa .schema ([
751
+ pa .field ("id" , pa .int32 (), nullable = False ),
752
+ pa .field ("name" , pa .string (), nullable = False ),
753
+ ])
754
+
755
+ tbl .append (pa .Table .from_pylist ([{"id" : 1 , "name" : "Alice" }], schema = arrow_schema ))
756
+
757
+ df = pa .Table .from_pylist ([{"id" : 2 , "name" : "Bob" }, {"id" : 1 , "name" : "Alicia" }], schema = arrow_schema )
758
+
759
+ with tbl .transaction () as txn :
760
+ # This should read the uncommitted changes?
761
+ txn .upsert (df , join_cols = ["id" ])
762
+
763
+ txn .upsert (df , join_cols = ["id" ])
764
+
765
+ result = tbl .scan ().to_arrow ().to_pylist ()
766
+ assert sorted (result , key = lambda x : x ["id" ]) == [
767
+ {"id" : 1 , "name" : "Alicia" },
768
+ {"id" : 2 , "name" : "Bob" },
769
+ ]
770
+
0 commit comments