From 4efc975db0812f379c98e209d7e88962ce0af82c Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 23 Apr 2024 23:21:19 -0700 Subject: [PATCH 1/8] refactor _commit_table --- pyiceberg/catalog/glue.py | 74 +++++++++++++------------- tests/catalog/integration_test_glue.py | 14 +++-- tests/catalog/test_glue.py | 17 ++++-- 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index c3c2fdafc6..636eb89ad2 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -439,71 +439,73 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + current_glue_table: Optional[TableTypeDef] + glue_table_version_id: Optional[str] + current_table: Optional[Table] try: current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) - # Update the table glue_table_version_id = current_glue_table.get("VersionId") - if not glue_table_version_id: - raise CommitFailedException( - f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" - ) current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) - base_metadata = current_table.metadata + except NoSuchTableError: + current_glue_table = None + glue_table_version_id = None + current_table = None + + for requirement in table_request.requirements: + requirement.validate(current_table.metadata if current_table else None) - # Validate the update requirements - for requirement in table_request.requirements: - requirement.validate(base_metadata) + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) - updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + if current_table and updated_metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + self._write_metadata( + metadata=updated_metadata, + io=self._load_file_io(updated_metadata.properties, new_metadata_location), + metadata_path=new_metadata_location, + ) + + if current_table: + # table exists, update the table + if not glue_table_version_id: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" + ) + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, metadata_location=new_metadata_location, - properties=current_table.properties, + properties=updated_metadata.properties, metadata=updated_metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) - - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking self._update_glue_table( database_name=database_name, table_name=table_name, table_input=update_table_input, version_id=glue_table_version_id, ) - - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) - except NoSuchTableError: - # Create the table - updated_metadata = update_table_metadata( - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True - ) - new_metadata_version = 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) - self._write_metadata( - updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location - ) - + else: + # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, metadata_location=new_metadata_location, properties=updated_metadata.properties, metadata=updated_metadata, ) - self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 5cd60225c4..393271c644 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -462,7 +462,9 @@ def test_commit_table_update_schema( ] -def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: +def test_commit_table_properties( + test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: identifier = (database_name, table_name) test_catalog.create_namespace(namespace=database_name) table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"}) @@ -470,13 +472,19 @@ def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Sch assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 8aa4918636..7b12261bb6 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -677,7 +677,12 @@ def test_commit_table_update_schema( @mock_aws def test_commit_table_properties( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _glue: boto3.client, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, ) -> None: catalog_name = "glue" identifier = (database_name, table_name) @@ -688,13 +693,19 @@ def test_commit_table_properties( assert test_catalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = _glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @mock_aws From 93dfcc4f554dce14a1c304f91785c63fbe232dc1 Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 16 Apr 2024 23:46:34 -0700 Subject: [PATCH 2/8] support createTableTransaction in hive --- pyiceberg/catalog/__init__.py | 2 +- pyiceberg/catalog/hive.py | 91 +++++++++++++------- tests/integration/test_writes/test_writes.py | 9 +- 3 files changed, 68 insertions(+), 34 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f104aa94da..f8de9a58d1 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -719,7 +719,7 @@ def _create_staged_table( metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties ) - io = load_file_io(properties=self.properties, location=metadata_location) + io = self._load_file_io(properties=properties, location=metadata_location) return StagedTable( identifier=(self.name, database_name, table_name), metadata=metadata, diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 804b1105cc..bcb99d6a60 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -74,8 +74,15 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, PropertyUtil, Table, TableProperties, update_table_metadata -from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + PropertyUtil, + StagedTable, + Table, + TableProperties, + update_table_metadata, +) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties from pyiceberg.types import ( @@ -266,6 +273,26 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: catalog=self, ) + def _convert_iceberg_into_hive(self, table: Table) -> HiveTable: + identifier_tuple = self.identifier_to_tuple_without_catalog(table.identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + current_time_millis = int(time.time() * 1000) + + return HiveTable( + dbName=database_name, + tableName=table_name, + owner=table.properties[OWNER] if table.properties and OWNER in table.properties else getpass.getuser(), + createTime=current_time_millis // 1000, + lastAccessTime=current_time_millis // 1000, + sd=_construct_hive_storage_descriptor( + table.schema(), + table.location(), + PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), + ), + tableType=EXTERNAL_TABLE, + parameters=_construct_parameters(table.metadata_location), + ) + def create_table( self, identifier: Union[str, Identifier], @@ -292,37 +319,19 @@ def create_table( AlreadyExistsError: If a table with the name already exists. ValueError: If the identifier is invalid. """ - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore - properties = {**DEFAULT_PROPERTIES, **properties} - database_name, table_name = self.identifier_to_database_and_table(identifier) - current_time_millis = int(time.time() * 1000) - - location = self._resolve_table_location(location, database_name, table_name) - - metadata_location = self._get_metadata_location(location=location) - metadata = new_table_metadata( - location=location, + staged_table = self._create_staged_table( + identifier=identifier, schema=schema, + location=location, partition_spec=partition_spec, sort_order=sort_order, properties=properties, ) - io = load_file_io({**self.properties, **properties}, location=location) - self._write_metadata(metadata, io, metadata_location) + database_name, table_name = self.identifier_to_database_and_table(identifier) - tbl = HiveTable( - dbName=database_name, - tableName=table_name, - owner=properties[OWNER] if properties and OWNER in properties else getpass.getuser(), - createTime=current_time_millis // 1000, - lastAccessTime=current_time_millis // 1000, - sd=_construct_hive_storage_descriptor( - schema, location, PropertyUtil.property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT) - ), - tableType=EXTERNAL_TABLE, - parameters=_construct_parameters(metadata_location), - ) + self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) + tbl = self._convert_iceberg_into_hive(staged_table) try: with self._client as open_client: open_client.create_table(tbl) @@ -330,7 +339,7 @@ def create_table( except AlreadyExistsException as e: raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e - return self._convert_hive_into_iceberg(hive_table, io) + return self._convert_hive_into_iceberg(hive_table, staged_table.io) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -404,8 +413,32 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location ) open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) - except NoSuchObjectException as e: - raise NoSuchTableError(f"Table does not exist: {table_name}") from e + except NoSuchObjectException: + updated_metadata = update_table_metadata( + base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True + ) + new_metadata_version = 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + io = self._load_file_io(updated_metadata.properties, new_metadata_location) + self._write_metadata( + updated_metadata, + io, + new_metadata_location, + ) + + tbl = self._convert_iceberg_into_hive( + StagedTable( + identifier=(self.name, database_name, table_name), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=io, + catalog=self, + ) + ) + try: + open_client.create_table(tbl) + except AlreadyExistsException as e: + raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2cf2c9ef5c..4145847ded 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -610,7 +610,8 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None @pytest.mark.integration @pytest.mark.parametrize("format_version", [2]) -def test_create_table_transaction(session_catalog: Catalog, format_version: int) -> None: +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) +def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: if format_version == 1: pytest.skip( "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" @@ -619,7 +620,7 @@ def test_create_table_transaction(session_catalog: Catalog, format_version: int) identifier = f"default.arrow_create_table_transaction{format_version}" try: - session_catalog.drop_table(identifier=identifier) + catalog.drop_table(identifier=identifier) except NoSuchTableError: pass @@ -641,7 +642,7 @@ def test_create_table_transaction(session_catalog: Catalog, format_version: int) ]), ) - with session_catalog.create_table_transaction( + with catalog.create_table_transaction( identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)} ) as txn: with txn.update_snapshot().fast_append() as snapshot_update: @@ -657,7 +658,7 @@ def test_create_table_transaction(session_catalog: Catalog, format_version: int) ): snapshot_update.append_data_file(data_file) - tbl = session_catalog.load_table(identifier=identifier) + tbl = catalog.load_table(identifier=identifier) assert tbl.format_version == format_version assert len(tbl.scan().to_arrow()) == 6 From 99e43717c2897bf115bd7167cf190a91180408c1 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 17 Apr 2024 00:10:03 -0700 Subject: [PATCH 3/8] refactor hive --- pyiceberg/catalog/hive.py | 27 +++++++++----------- tests/integration/test_writes/test_writes.py | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index bcb99d6a60..1d011dd5d9 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -293,6 +293,12 @@ def _convert_iceberg_into_hive(self, table: Table) -> HiveTable: parameters=_construct_parameters(table.metadata_location), ) + def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None: + try: + open_client.create_table(hive_table) + except AlreadyExistsException as e: + raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e + def create_table( self, identifier: Union[str, Identifier], @@ -332,12 +338,10 @@ def create_table( self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) tbl = self._convert_iceberg_into_hive(staged_table) - try: - with self._client as open_client: - open_client.create_table(tbl) - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - except AlreadyExistsException as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + + with self._client as open_client: + self._create_hive_table(open_client, tbl) + hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) return self._convert_hive_into_iceberg(hive_table, staged_table.io) @@ -420,11 +424,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons new_metadata_version = 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) io = self._load_file_io(updated_metadata.properties, new_metadata_location) - self._write_metadata( - updated_metadata, - io, - new_metadata_location, - ) + self._write_metadata(updated_metadata, io, new_metadata_location) tbl = self._convert_iceberg_into_hive( StagedTable( @@ -435,10 +435,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons catalog=self, ) ) - try: - open_client.create_table(tbl) - except AlreadyExistsException as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + self._create_hive_table(open_client, tbl) finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 4145847ded..46b68b1eff 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -617,7 +617,7 @@ def test_create_table_transaction(catalog: Catalog, format_version: int) -> None "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" ) - identifier = f"default.arrow_create_table_transaction{format_version}" + identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" try: catalog.drop_table(identifier=identifier) From 3910e5e91c24f2ea8c39109a64f030455dd31814 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 00:36:15 -0700 Subject: [PATCH 4/8] refactor hive's _commit_table --- pyiceberg/catalog/hive.py | 102 +++++++++++-------- tests/integration/test_writes/test_writes.py | 5 +- 2 files changed, 60 insertions(+), 47 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1d011dd5d9..f635492d9a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -67,10 +67,10 @@ NamespaceNotEmptyError, NoSuchIcebergTableError, NoSuchNamespaceError, + NoSuchPropertyException, NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import FileIO, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile @@ -247,10 +247,12 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = _HiveClient(properties["uri"], properties.get("ugi")) - def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: + def _convert_hive_into_iceberg(self, table: HiveTable) -> Table: properties: Dict[str, str] = table.parameters if TABLE_TYPE not in properties: - raise NoSuchTableError(f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}") + raise NoSuchPropertyException( + f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}" + ) table_type = properties[TABLE_TYPE] if table_type.lower() != ICEBERG: @@ -261,8 +263,9 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: if prop_metadata_location := properties.get(METADATA_LOCATION): metadata_location = prop_metadata_location else: - raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing") + raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing") + io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( @@ -299,6 +302,12 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None except AlreadyExistsException as e: raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e + def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable: + try: + return open_client.get_table(dbname=database_name, tbl_name=table_name) + except NoSuchObjectException as e: + raise NoSuchTableError(f"Table does not exists: {table_name}") from e + def create_table( self, identifier: Union[str, Identifier], @@ -343,7 +352,7 @@ def create_table( self._create_hive_table(open_client, tbl) hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - return self._convert_hive_into_iceberg(hive_table, staged_table.io) + return self._convert_hive_into_iceberg(hive_table) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -395,47 +404,53 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons if lock.state != LockState.ACQUIRED: raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location) - current_table = self._convert_hive_into_iceberg(hive_table, io) + hive_table: Optional[HiveTable] + current_table: Optional[Table] + try: + hive_table = self._get_hive_table(open_client, database_name, table_name) + current_table = self._convert_hive_into_iceberg(hive_table) + except NoSuchTableError: + hive_table = None + current_table = None - base_metadata = current_table.metadata for requirement in table_request.requirements: - requirement.validate(base_metadata) + requirement.validate(current_table.metadata if current_table else None) - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - hive_table.parameters = _construct_parameters( - metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location - ) - open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) - except NoSuchObjectException: updated_metadata = update_table_metadata( - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, ) - new_metadata_version = 0 + + if current_table and updated_metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) io = self._load_file_io(updated_metadata.properties, new_metadata_location) - self._write_metadata(updated_metadata, io, new_metadata_location) - - tbl = self._convert_iceberg_into_hive( - StagedTable( - identifier=(self.name, database_name, table_name), - metadata=updated_metadata, - metadata_location=new_metadata_location, - io=io, - catalog=self, - ) + self._write_metadata( + metadata=updated_metadata, + io=io, + metadata_path=new_metadata_location, ) - self._create_hive_table(open_client, tbl) + + if hive_table and current_table: + hive_table.parameters = _construct_parameters( + metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location + ) + open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) + else: + hive_table = self._convert_iceberg_into_hive( + StagedTable( + identifier=(self.name, database_name, table_name), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=io, + catalog=self, + ) + ) + self._create_hive_table(open_client, hive_table) finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) @@ -458,14 +473,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: """ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) - try: - with self._client as open_client: - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - except NoSuchObjectException as e: - raise NoSuchTableError(f"Table does not exists: {table_name}") from e - io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location) - return self._convert_hive_into_iceberg(hive_table, io) + with self._client as open_client: + hive_table = self._get_hive_table(open_client, database_name, table_name) + + return self._convert_hive_into_iceberg(hive_table) def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 46b68b1eff..5e7487c8a9 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -34,6 +34,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.catalog.hive import HiveCatalog +from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.table import TableProperties, _dataframe_to_data_files @@ -609,10 +610,10 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None @pytest.mark.integration -@pytest.mark.parametrize("format_version", [2]) +@pytest.mark.parametrize("format_version", [1, 2]) @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: - if format_version == 1: + if format_version == 1 and isinstance(catalog, RestCatalog): pytest.skip( "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" ) From 12b94589177dcf10113af2994eb63c3c344c8ce6 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 01:02:28 -0700 Subject: [PATCH 5/8] support CreateTableTransaction for SQL --- pyiceberg/catalog/sql.py | 104 +++++++++++++++++++++++++------------- tests/catalog/test_sql.py | 60 ++++++++++++++++++++++ 2 files changed, 129 insertions(+), 35 deletions(-) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 978109b2a3..18d66c6017 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -376,55 +376,89 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons identifier_tuple = self.identifier_to_tuple_without_catalog( tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) - current_table = self.load_table(identifier_tuple) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) - base_metadata = current_table.metadata + + current_table: Optional[Table] + try: + current_table = self.load_table(identifier_tuple) + except NoSuchTableError: + current_table = None + for requirement in table_request.requirements: - requirement.validate(base_metadata) + requirement.validate(current_table.metadata if current_table else None) - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) + if current_table and updated_metadata == current_table.metadata: # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + self._write_metadata( + metadata=updated_metadata, + io=self._load_file_io(updated_metadata.properties, new_metadata_location), + metadata_path=new_metadata_location, + ) with Session(self.engine) as session: - if self.engine.dialect.supports_sane_rowcount: - stmt = ( - update(IcebergTables) - .where( - IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, - IcebergTables.table_name == table_name, - IcebergTables.metadata_location == current_table.metadata_location, - ) - .values(metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location) - ) - result = session.execute(stmt) - if result.rowcount < 1: - raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") - else: - try: - tbl = ( - session.query(IcebergTables) - .with_for_update(of=IcebergTables) - .filter( + if current_table: + # table exists, update it + if self.engine.dialect.supports_sane_rowcount: + stmt = ( + update(IcebergTables) + .where( IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == database_name, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) - .one() + .values( + metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location + ) ) - tbl.metadata_location = new_metadata_location - tbl.previous_metadata_location = current_table.metadata_location - except NoResultFound as e: - raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") from e - session.commit() + result = session.execute(stmt) + if result.rowcount < 1: + raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") + else: + try: + tbl = ( + session.query(IcebergTables) + .with_for_update(of=IcebergTables) + .filter( + IcebergTables.catalog_name == self.name, + IcebergTables.table_namespace == database_name, + IcebergTables.table_name == table_name, + IcebergTables.metadata_location == current_table.metadata_location, + ) + .one() + ) + tbl.metadata_location = new_metadata_location + tbl.previous_metadata_location = current_table.metadata_location + except NoResultFound as e: + raise CommitFailedException( + f"Table has been updated by another process: {database_name}.{table_name}" + ) from e + session.commit() + else: + # table does not exist, create it + try: + session.add( + IcebergTables( + catalog_name=self.name, + table_namespace=database_name, + table_name=table_name, + metadata_location=new_metadata_location, + previous_metadata_location=None, + ) + ) + session.commit() + except IntegrityError as e: + raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 99b8550602..9f2f370413 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -948,6 +948,66 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None: snapshot_update.append_data_file(data_file) +@pytest.mark.parametrize( + 'catalog', + [ + lazy_fixture('catalog_memory'), + lazy_fixture('catalog_sqlite'), + lazy_fixture('catalog_sqlite_without_rowcount'), + ], +) +@pytest.mark.parametrize("format_version", [1, 2]) +def test_create_table_transaction(catalog: SqlCatalog, format_version: int) -> None: + identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" + try: + catalog.create_namespace("default") + except NamespaceAlreadyExistsError: + pass + + try: + catalog.drop_table(identifier=identifier) + except NoSuchTableError: + pass + + pa_table = pa.Table.from_pydict( + { + 'foo': ['a', None, 'z'], + }, + schema=pa.schema([pa.field("foo", pa.string(), nullable=True)]), + ) + + pa_table_with_column = pa.Table.from_pydict( + { + 'foo': ['a', None, 'z'], + 'bar': [19, None, 25], + }, + schema=pa.schema([ + pa.field("foo", pa.string(), nullable=True), + pa.field("bar", pa.int32(), nullable=True), + ]), + ) + + with catalog.create_table_transaction( + identifier=identifier, schema=pa_table.schema, properties={"format-version": str(format_version)} + ) as txn: + with txn.update_snapshot().fast_append() as snapshot_update: + for data_file in _dataframe_to_data_files(table_metadata=txn.table_metadata, df=pa_table, io=txn._table.io): + snapshot_update.append_data_file(data_file) + + with txn.update_schema() as schema_txn: + schema_txn.union_by_name(pa_table_with_column.schema) + + with txn.update_snapshot().fast_append() as snapshot_update: + for data_file in _dataframe_to_data_files( + table_metadata=txn.table_metadata, df=pa_table_with_column, io=txn._table.io + ): + snapshot_update.append_data_file(data_file) + + tbl = catalog.load_table(identifier=identifier) + assert tbl.format_version == format_version + assert len(tbl.scan().to_arrow()) == 6 + + @pytest.mark.parametrize( 'catalog', [ From d35990a59481155771e5a554bf4a2c3418c84fd6 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 21:35:21 -0700 Subject: [PATCH 6/8] fix hive side bug --- pyiceberg/catalog/hive.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index f635492d9a..de10ca1c53 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -436,11 +436,13 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) if hive_table and current_table: + # Table exists, update it. hive_table.parameters = _construct_parameters( metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location ) open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) else: + # Table does not exist, create it. hive_table = self._convert_iceberg_into_hive( StagedTable( identifier=(self.name, database_name, table_name), @@ -450,7 +452,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons catalog=self, ) ) - self._create_hive_table(open_client, hive_table) + self._create_hive_table(open_client, hive_table) finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) From eed16e023dcfe2a1c9edf455c76e6d1b3073caf5 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 22:03:06 -0700 Subject: [PATCH 7/8] extract common logic of _commit_table --- pyiceberg/catalog/__init__.py | 22 +++++++++++++++++++++ pyiceberg/catalog/glue.py | 36 ++++++++++++++--------------------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f8de9a58d1..287ff848cf 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -48,6 +48,7 @@ CreateTableTransaction, StagedTable, Table, + update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -728,6 +729,27 @@ def _create_staged_table( catalog=self, ) + def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable: + for requirement in table_request.requirements: + requirement.validate(current_table.metadata if current_table else None) + + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) + + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + + return StagedTable( + identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), + catalog=self, + ) + def _get_updated_props_and_update_summary( self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties ) -> Tuple[PropertiesUpdateSummary, Properties]: diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 636eb89ad2..238b53a3a4 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -67,7 +67,6 @@ CommitTableResponse, PropertyUtil, Table, - update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -451,25 +450,16 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons glue_table_version_id = None current_table = None - for requirement in table_request.requirements: - requirement.validate(current_table.metadata if current_table else None) + updated_staged_table = self._update_and_stage_table(current_table, table_request) - updated_metadata = update_table_metadata( - base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, - enforce_validation=current_table is None, - ) - - if current_table and updated_metadata == current_table.metadata: + if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) self._write_metadata( - metadata=updated_metadata, - io=self._load_file_io(updated_metadata.properties, new_metadata_location), - metadata_path=new_metadata_location, + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, ) if current_table: @@ -483,9 +473,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=updated_metadata.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) @@ -499,13 +489,15 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=updated_metadata.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, ) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. From a41b4bb2c0d6c6dda93aea8b79eb1ff56669ed70 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 22:09:08 -0700 Subject: [PATCH 8/8] make hive and sql use _update_and_stage_table --- pyiceberg/catalog/glue.py | 2 -- pyiceberg/catalog/hive.py | 36 +++++++++++++----------------------- pyiceberg/catalog/sql.py | 34 +++++++++++++--------------------- 3 files changed, 26 insertions(+), 46 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 238b53a3a4..e7e9dd643a 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -451,11 +451,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons current_table = None updated_staged_table = self._update_and_stage_table(current_table, table_request) - if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - self._write_metadata( metadata=updated_staged_table.metadata, io=updated_staged_table.io, diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index de10ca1c53..760617e6cd 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -81,7 +81,6 @@ StagedTable, Table, TableProperties, - update_table_metadata, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -413,32 +412,21 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons hive_table = None current_table = None - for requirement in table_request.requirements: - requirement.validate(current_table.metadata if current_table else None) - - updated_metadata = update_table_metadata( - base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, - enforce_validation=current_table is None, - ) - - if current_table and updated_metadata == current_table.metadata: + updated_staged_table = self._update_and_stage_table(current_table, table_request) + if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) - io = self._load_file_io(updated_metadata.properties, new_metadata_location) self._write_metadata( - metadata=updated_metadata, - io=io, - metadata_path=new_metadata_location, + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, ) if hive_table and current_table: # Table exists, update it. hive_table.parameters = _construct_parameters( - metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location + metadata_location=updated_staged_table.metadata_location, + previous_metadata_location=current_table.metadata_location, ) open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) else: @@ -446,9 +434,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons hive_table = self._convert_iceberg_into_hive( StagedTable( identifier=(self.name, database_name, table_name), - metadata=updated_metadata, - metadata_location=new_metadata_location, - io=io, + metadata=updated_staged_table.metadata, + metadata_location=updated_staged_table.metadata_location, + io=updated_staged_table.io, catalog=self, ) ) @@ -456,7 +444,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and return the table instance. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 18d66c6017..9dc17a4d85 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -59,7 +59,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -384,25 +384,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons except NoSuchTableError: current_table = None - for requirement in table_request.requirements: - requirement.validate(current_table.metadata if current_table else None) - - updated_metadata = update_table_metadata( - base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, - enforce_validation=current_table is None, - ) - if current_table and updated_metadata == current_table.metadata: + updated_staged_table = self._update_and_stage_table(current_table, table_request) + if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) self._write_metadata( - metadata=updated_metadata, - io=self._load_file_io(updated_metadata.properties, new_metadata_location), - metadata_path=new_metadata_location, + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, ) with Session(self.engine) as session: @@ -418,7 +407,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons IcebergTables.metadata_location == current_table.metadata_location, ) .values( - metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location + metadata_location=updated_staged_table.metadata_location, + previous_metadata_location=current_table.metadata_location, ) ) result = session.execute(stmt) @@ -437,7 +427,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) .one() ) - tbl.metadata_location = new_metadata_location + tbl.metadata_location = updated_staged_table.metadata_location tbl.previous_metadata_location = current_table.metadata_location except NoResultFound as e: raise CommitFailedException( @@ -452,7 +442,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons catalog_name=self.name, table_namespace=database_name, table_name=table_name, - metadata_location=new_metadata_location, + metadata_location=updated_staged_table.metadata_location, previous_metadata_location=None, ) ) @@ -460,7 +450,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons except IntegrityError as e: raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool: namespace = self.identifier_to_database(identifier)