|
27 | 27 | from pyiceberg.catalog import Identifier
|
28 | 28 | from pyiceberg.catalog.sql import SqlCatalog
|
29 | 29 | from pyiceberg.exceptions import (
|
| 30 | + CommitFailedException, |
30 | 31 | NamespaceAlreadyExistsError,
|
31 | 32 | NamespaceNotEmptyError,
|
32 | 33 | NoSuchNamespaceError,
|
@@ -719,3 +720,26 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i
|
719 | 720 | assert new_schema
|
720 | 721 | assert new_schema == update._apply()
|
721 | 722 | assert new_schema.find_field("b").field_type == IntegerType()
|
| 723 | + |
| 724 | + |
| 725 | +@pytest.mark.parametrize( |
| 726 | + 'catalog', |
| 727 | + [ |
| 728 | + lazy_fixture('catalog_memory'), |
| 729 | + lazy_fixture('catalog_sqlite'), |
| 730 | + lazy_fixture('catalog_sqlite_without_rowcount'), |
| 731 | + ], |
| 732 | +) |
| 733 | +def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: |
| 734 | + database_name, _table_name = random_identifier |
| 735 | + catalog.create_namespace(database_name) |
| 736 | + table_a = catalog.create_table(random_identifier, table_schema_simple) |
| 737 | + table_b = catalog.load_table(random_identifier) |
| 738 | + |
| 739 | + with table_a.update_schema() as update: |
| 740 | + update.add_column(path="b", field_type=IntegerType()) |
| 741 | + |
| 742 | + with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 0, found 1"): |
| 743 | + # This one should fail since it already has been updated |
| 744 | + with table_b.update_schema() as update: |
| 745 | + update.add_column(path="c", field_type=IntegerType()) |
0 commit comments