Skip to content

Commit cfb4f25

Browse files
kevinjqliuFokko
authored andcommitted
fix upsert with null values (#1861)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Closes #1835 Original implementation, `!=` ([not_equal](https://arrow.apache.org/docs/python/generated/pyarrow.compute.not_equal.html#pyarrow.compute.not_equal)) does not account for `null` values. It emits `null` when either sides are `null` The new implementation, first checks for `not_equal`. And on null values, returns `true` only if both sides are `null` Similar to apache/iceberg-rust#1045 # Are these changes tested? Yes # Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 903fb91 commit cfb4f25

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

pyiceberg/table/upsert_util.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,16 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
7171
# When the target table is empty, there is nothing to update :)
7272
return source_table.schema.empty_table()
7373

74-
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
74+
diff_expr = functools.reduce(
75+
operator.or_,
76+
[
77+
pc.or_kleene(
78+
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
79+
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
80+
)
81+
for col in non_key_cols
82+
],
83+
)
7584

7685
return (
7786
source_table

tests/table/test_upsert.py

+43
Original file line numberDiff line numberDiff line change
@@ -496,3 +496,46 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None:
496496
ValueError, match="Join columns could not be found, please set identifier-field-ids or pass in explicitly."
497497
):
498498
tbl.upsert(df)
499+
500+
501+
def test_upsert_with_nulls(catalog: Catalog) -> None:
502+
identifier = "default.test_upsert_with_nulls"
503+
_drop_table(catalog, identifier)
504+
505+
schema = pa.schema(
506+
[
507+
("foo", pa.string()),
508+
("bar", pa.int32()),
509+
("baz", pa.bool_()),
510+
]
511+
)
512+
513+
# create table with null value
514+
table = catalog.create_table(identifier, schema)
515+
data_with_null = pa.Table.from_pylist(
516+
[
517+
{"foo": "apple", "bar": None, "baz": False},
518+
{"foo": "banana", "bar": None, "baz": False},
519+
],
520+
schema=schema,
521+
)
522+
table.append(data_with_null)
523+
assert table.scan().to_arrow()["bar"].is_null()
524+
525+
# upsert table with non-null value
526+
data_without_null = pa.Table.from_pylist(
527+
[
528+
{"foo": "apple", "bar": 7, "baz": False},
529+
],
530+
schema=schema,
531+
)
532+
upd = table.upsert(data_without_null, join_cols=["foo"])
533+
assert upd.rows_updated == 1
534+
assert upd.rows_inserted == 0
535+
assert table.scan().to_arrow() == pa.Table.from_pylist(
536+
[
537+
{"foo": "apple", "bar": 7, "baz": False},
538+
{"foo": "banana", "bar": None, "baz": False},
539+
],
540+
schema=schema,
541+
)

0 commit comments

Comments
 (0)