Skip to content

Commit d69a191

Browse files
authored
fix upsert with null values (#1861)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Closes #1835 Original implementation, `!=` ([not_equal](https://arrow.apache.org/docs/python/generated/pyarrow.compute.not_equal.html#pyarrow.compute.not_equal)) does not account for `null` values. It emits `null` when either sides are `null` The new implementation, first checks for `not_equal`. And on null values, returns `true` only if both sides are `null` Similar to apache/iceberg-rust#1045 # Are these changes tested? Yes # Are there any user-facing changes? No <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent 1a5e32a commit d69a191

File tree

2 files changed

+53
-1
lines changed

2 files changed

+53
-1
lines changed

pyiceberg/table/upsert_util.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,16 @@ def get_rows_to_update(source_table: pa.Table, target_table: pa.Table, join_cols
7171
# When the target table is empty, there is nothing to update :)
7272
return source_table.schema.empty_table()
7373

74-
diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != pc.field(f"{col}-rhs") for col in non_key_cols])
74+
diff_expr = functools.reduce(
75+
operator.or_,
76+
[
77+
pc.or_kleene(
78+
pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs")),
79+
pc.is_null(pc.not_equal(pc.field(f"{col}-lhs"), pc.field(f"{col}-rhs"))),
80+
)
81+
for col in non_key_cols
82+
],
83+
)
7584

7685
return (
7786
source_table

tests/table/test_upsert.py

+43
Original file line numberDiff line numberDiff line change
@@ -509,3 +509,46 @@ def test_upsert_without_identifier_fields(catalog: Catalog) -> None:
509509
ValueError, match="Join columns could not be found, please set identifier-field-ids or pass in explicitly."
510510
):
511511
tbl.upsert(df)
512+
513+
514+
def test_upsert_with_nulls(catalog: Catalog) -> None:
515+
identifier = "default.test_upsert_with_nulls"
516+
_drop_table(catalog, identifier)
517+
518+
schema = pa.schema(
519+
[
520+
("foo", pa.string()),
521+
("bar", pa.int32()),
522+
("baz", pa.bool_()),
523+
]
524+
)
525+
526+
# create table with null value
527+
table = catalog.create_table(identifier, schema)
528+
data_with_null = pa.Table.from_pylist(
529+
[
530+
{"foo": "apple", "bar": None, "baz": False},
531+
{"foo": "banana", "bar": None, "baz": False},
532+
],
533+
schema=schema,
534+
)
535+
table.append(data_with_null)
536+
assert table.scan().to_arrow()["bar"].is_null()
537+
538+
# upsert table with non-null value
539+
data_without_null = pa.Table.from_pylist(
540+
[
541+
{"foo": "apple", "bar": 7, "baz": False},
542+
],
543+
schema=schema,
544+
)
545+
upd = table.upsert(data_without_null, join_cols=["foo"])
546+
assert upd.rows_updated == 1
547+
assert upd.rows_inserted == 0
548+
assert table.scan().to_arrow() == pa.Table.from_pylist(
549+
[
550+
{"foo": "apple", "bar": 7, "baz": False},
551+
{"foo": "banana", "bar": None, "baz": False},
552+
],
553+
schema=schema,
554+
)

0 commit comments

Comments
 (0)