Skip to content

Commit

Permalink
abort the whole transaction if any update on the chain has failed
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingjian Wu committed Oct 27, 2024
1 parent ff3a249 commit f7a7a87
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 3 deletions.
12 changes: 9 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dataclasses import dataclass
from functools import cached_property
from itertools import chain
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -33,6 +34,7 @@
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)
Expand Down Expand Up @@ -231,9 +233,13 @@ def __enter__(self) -> Transaction:
"""Start a transaction to update the table."""
return self

def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
"""Close and commit the transaction."""
self.commit_transaction()
def __exit__(
self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType]
) -> None:
"""Close and commit the transaction, or handle exceptions."""
# Only commit the full transaction, if there is no exception in all updates on the chain
if exctype is None and excinst is None and exctb is None:
self.commit_transaction()

def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...] = ()) -> Transaction:
"""Check if the requirements are met, and applies the updates to the metadata."""
Expand Down
23 changes: 23 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,3 +766,26 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None
with pytest.raises(ValidationError) as exc_info:
_ = given_catalog_has_a_table(catalog, properties=property_with_none)
assert "None type is not a supported value in properties: property_name" in str(exc_info.value)


def test_abort_table_transaction_on_exception(catalog: InMemoryCatalog) -> None:
tbl = given_catalog_has_a_table(catalog)
# Populate some initial data
data = pa.Table.from_pylist(
[{"x": 1, "y": 2, "z": 3}, {"x": 4, "y": 5, "z": 6}],
schema=TEST_TABLE_SCHEMA.as_arrow(),
)
tbl.append(data)

# Data to overwrite
data = pa.Table.from_pylist(
[{"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}, {"x": 7, "y": 8, "z": 9}],
schema=TEST_TABLE_SCHEMA.as_arrow(),
)

with pytest.raises(ValueError):
with tbl.transaction() as txn:
txn.overwrite(data)
raise ValueError

assert len(tbl.scan().to_pandas()) == 2 # type: ignore

0 comments on commit f7a7a87

Please sign in to comment.