@@ -747,8 +747,8 @@ def upsert(
747
747
748
748
if join_cols is None :
749
749
join_cols = []
750
- for field_id in df . schema .identifier_field_ids :
751
- col = df . schema .find_column_name (field_id )
750
+ for field_id in self . table_metadata . schema () .identifier_field_ids :
751
+ col = self . table_metadata . schema () .find_column_name (field_id )
752
752
if col is not None :
753
753
join_cols .append (col )
754
754
else :
@@ -767,12 +767,12 @@ def upsert(
767
767
768
768
downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
769
769
_check_pyarrow_schema_compatible (
770
- df . schema , provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
770
+ self . table_metadata . schema () , provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
771
771
)
772
772
773
773
# get list of rows that exist so we don't have to load the entire target table
774
774
matched_predicate = upsert_util .create_match_filter (df , join_cols )
775
- matched_iceberg_table = df .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
775
+ matched_iceberg_table = self . _table .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
776
776
777
777
update_row_cnt = 0
778
778
insert_row_cnt = 0
@@ -793,7 +793,7 @@ def upsert(
793
793
794
794
if when_not_matched_insert_all :
795
795
expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
796
- expr_match_bound = bind (df . schema , expr_match , case_sensitive = case_sensitive )
796
+ expr_match_bound = bind (self . table_metadata . schema () , expr_match , case_sensitive = case_sensitive )
797
797
expr_match_arrow = expression_to_pyarrow (expr_match_bound )
798
798
rows_to_insert = df .filter (~ expr_match_arrow )
799
799
@@ -1270,8 +1270,11 @@ def upsert(
1270
1270
"""
1271
1271
with self .transaction () as tx :
1272
1272
return tx .upsert (
1273
- df = df , join_cols = join_cols , when_matched_update_all = when_matched_update_all , when_not_matched_insert_all = when_not_matched_insert_all ,
1274
- case_sensitive = case_sensitive
1273
+ df = df ,
1274
+ join_cols = join_cols ,
1275
+ when_matched_update_all = when_matched_update_all ,
1276
+ when_not_matched_insert_all = when_not_matched_insert_all ,
1277
+ case_sensitive = case_sensitive ,
1275
1278
)
1276
1279
1277
1280
def append (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
0 commit comments