diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f104aa94da..18d803fe1c 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -48,6 +48,7 @@ CreateTableTransaction, StagedTable, Table, + update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -728,6 +729,27 @@ def _create_staged_table( catalog=self, ) + def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable: + for requirement in table_request.requirements: + requirement.validate(current_table.metadata if current_table else None) + + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) + + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + + return StagedTable( + identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), + catalog=self, + ) + def _get_updated_props_and_update_summary( self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties ) -> Tuple[PropertiesUpdateSummary, Properties]: diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index c3c2fdafc6..275cda7ed0 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -58,7 +58,6 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile @@ -67,7 +66,6 @@ CommitTableResponse, PropertyUtil, Table, - update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -321,7 +319,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table: ) metadata_location = properties[METADATA_LOCATION] - io = load_file_io(properties=self.properties, location=metadata_location) + io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( @@ -439,71 +437,64 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + current_glue_table: Optional[TableTypeDef] + glue_table_version_id: Optional[str] + current_table: Optional[Table] try: current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) - # Update the table glue_table_version_id = current_glue_table.get("VersionId") + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + except NoSuchTableError: + current_glue_table = None + glue_table_version_id = None + current_table = None + + updated_staged_table = self._update_and_stage_table(current_table, table_request) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + ) + + if current_table: + # table exists, update the table if not glue_table_version_id: raise CommitFailedException( f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" ) - current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) - base_metadata = current_table.metadata - - # Validate the update requirements - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=current_table.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) - - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking self._update_glue_table( database_name=database_name, table_name=table_name, table_input=update_table_input, version_id=glue_table_version_id, ) - - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) - except NoSuchTableError: - # Create the table - updated_metadata = update_table_metadata( - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True - ) - new_metadata_version = 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) - self._write_metadata( - updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location - ) - + else: + # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=updated_metadata.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, ) - self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 5cd60225c4..393271c644 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -462,7 +462,9 @@ def test_commit_table_update_schema( ] -def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: +def test_commit_table_properties( + test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: identifier = (database_name, table_name) test_catalog.create_namespace(namespace=database_name) table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"}) @@ -470,13 +472,19 @@ def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Sch assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 8aa4918636..7b12261bb6 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -677,7 +677,12 @@ def test_commit_table_update_schema( @mock_aws def test_commit_table_properties( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _glue: boto3.client, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, ) -> None: catalog_name = "glue" identifier = (database_name, table_name) @@ -688,13 +693,19 @@ def test_commit_table_properties( assert test_catalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = _glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @mock_aws