From f7a7a8795fa67644ed4abc14563e89de9e9d2640 Mon Sep 17 00:00:00 2001 From: Yingjian Wu Date: Wed, 23 Oct 2024 02:11:00 -0700 Subject: [PATCH] abort the whole transaction if any update on the chain has failed --- pyiceberg/table/__init__.py | 12 +++++++++--- tests/catalog/test_base.py | 23 +++++++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 66b22a7a79..c852aee565 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -23,6 +23,7 @@ from dataclasses import dataclass from functools import cached_property from itertools import chain +from types import TracebackType from typing import ( TYPE_CHECKING, Any, @@ -33,6 +34,7 @@ Optional, Set, Tuple, + Type, TypeVar, Union, ) @@ -231,9 +233,13 @@ def __enter__(self) -> Transaction: """Start a transaction to update the table.""" return self - def __exit__(self, _: Any, value: Any, traceback: Any) -> None: - """Close and commit the transaction.""" - self.commit_transaction() + def __exit__( + self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] + ) -> None: + """Close and commit the transaction, or handle exceptions.""" + # Only commit the full transaction, if there is no exception in all updates on the chain + if exctype is None and excinst is None and exctb is None: + self.commit_transaction() def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction: """Check if the requirements are met, and applies the updates to the metadata.""" diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index e212854ee2..df64a7b2c6 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -766,3 +766,26 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None with pytest.raises(ValidationError) as exc_info: _ = given_catalog_has_a_table(catalog, properties=property_with_none) assert "None type is not a supported value in properties: property_name" in str(exc_info.value) + + +def test_abort_table_transaction_on_exception(catalog: InMemoryCatalog) -> None: + tbl = given_catalog_has_a_table(catalog) + # Populate some initial data + data = pa.Table.from_pylist( + [{"x": 1, "y": 2, "z": 3}, {"x": 4, "y": 5, "z": 6}], + schema=TEST_TABLE_SCHEMA.as_arrow(), + ) + tbl.append(data) + + # Data to overwrite + data = pa.Table.from_pylist( + [{"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}], + schema=TEST_TABLE_SCHEMA.as_arrow(), + ) + + with pytest.raises(ValueError): + with tbl.transaction() as txn: + txn.overwrite(data) + raise ValueError + + assert len(tbl.scan().to_pandas()) == 2 # type: ignore