From 4efc975db0812f379c98e209d7e88962ce0af82c Mon Sep 17 00:00:00 2001 From: HonahX Date: Tue, 23 Apr 2024 23:21:19 -0700 Subject: [PATCH 1/4] refactor _commit_table --- pyiceberg/catalog/glue.py | 74 +++++++++++++------------- tests/catalog/integration_test_glue.py | 14 +++-- tests/catalog/test_glue.py | 17 ++++-- 3 files changed, 63 insertions(+), 42 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index c3c2fdafc6..636eb89ad2 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -439,71 +439,73 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + current_glue_table: Optional[TableTypeDef] + glue_table_version_id: Optional[str] + current_table: Optional[Table] try: current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) - # Update the table glue_table_version_id = current_glue_table.get("VersionId") - if not glue_table_version_id: - raise CommitFailedException( - f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" - ) current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) - base_metadata = current_table.metadata + except NoSuchTableError: + current_glue_table = None + glue_table_version_id = None + current_table = None + + for requirement in table_request.requirements: + requirement.validate(current_table.metadata if current_table else None) - # Validate the update requirements - for requirement in table_request.requirements: - requirement.validate(base_metadata) + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) - updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + if current_table and updated_metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + self._write_metadata( + metadata=updated_metadata, + io=self._load_file_io(updated_metadata.properties, new_metadata_location), + metadata_path=new_metadata_location, + ) + + if current_table: + # table exists, update the table + if not glue_table_version_id: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" + ) + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, metadata_location=new_metadata_location, - properties=current_table.properties, + properties=updated_metadata.properties, metadata=updated_metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) - - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking self._update_glue_table( database_name=database_name, table_name=table_name, table_input=update_table_input, version_id=glue_table_version_id, ) - - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) - except NoSuchTableError: - # Create the table - updated_metadata = update_table_metadata( - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True - ) - new_metadata_version = 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) - self._write_metadata( - updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location - ) - + else: + # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, metadata_location=new_metadata_location, properties=updated_metadata.properties, metadata=updated_metadata, ) - self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. diff --git a/tests/catalog/integration_test_glue.py b/tests/catalog/integration_test_glue.py index 5cd60225c4..393271c644 100644 --- a/tests/catalog/integration_test_glue.py +++ b/tests/catalog/integration_test_glue.py @@ -462,7 +462,9 @@ def test_commit_table_update_schema( ] -def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None: +def test_commit_table_properties( + test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: identifier = (database_name, table_name) test_catalog.create_namespace(namespace=database_name) table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"}) @@ -470,13 +472,19 @@ def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Sch assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @pytest.mark.parametrize("format_version", [1, 2]) diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 8aa4918636..7b12261bb6 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -677,7 +677,12 @@ def test_commit_table_update_schema( @mock_aws def test_commit_table_properties( - _bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str + _glue: boto3.client, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, ) -> None: catalog_name = "glue" identifier = (database_name, table_name) @@ -688,13 +693,19 @@ def test_commit_table_properties( assert test_catalog._parse_metadata_version(table.metadata_location) == 0 transaction = table.transaction() - transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") + transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description") transaction.remove_properties("test_b") transaction.commit_transaction() updated_table_metadata = table.metadata assert test_catalog._parse_metadata_version(table.metadata_location) == 1 - assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"} + assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"} + + table_info = _glue.get_table( + DatabaseName=database_name, + Name=table_name, + ) + assert table_info["Table"]["Description"] == "test_description" @mock_aws From f5e5a5b9d8971207e8c1125cfebdb7876cff968e Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 01:08:14 -0700 Subject: [PATCH 2/4] small refactor --- pyiceberg/catalog/glue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 636eb89ad2..0e46d27a1e 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -58,7 +58,6 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile @@ -321,7 +320,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table: ) metadata_location = properties[METADATA_LOCATION] - io = load_file_io(properties=self.properties, location=metadata_location) + io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( From f2e3a0daf70920589515d7c0d62700f46f22f9bd Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 22:03:06 -0700 Subject: [PATCH 3/4] extract common logic of _commit_table --- pyiceberg/catalog/__init__.py | 22 +++++++++++++++++++++ pyiceberg/catalog/glue.py | 36 ++++++++++++++--------------------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f104aa94da..18d803fe1c 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -48,6 +48,7 @@ CreateTableTransaction, StagedTable, Table, + update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -728,6 +729,27 @@ def _create_staged_table( catalog=self, ) + def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable: + for requirement in table_request.requirements: + requirement.validate(current_table.metadata if current_table else None) + + updated_metadata = update_table_metadata( + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, + ) + + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 + new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) + + return StagedTable( + identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), + catalog=self, + ) + def _get_updated_props_and_update_summary( self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties ) -> Tuple[PropertiesUpdateSummary, Properties]: diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 0e46d27a1e..af5954f8b5 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -66,7 +66,6 @@ CommitTableResponse, PropertyUtil, Table, - update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -450,25 +449,16 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons glue_table_version_id = None current_table = None - for requirement in table_request.requirements: - requirement.validate(current_table.metadata if current_table else None) + updated_staged_table = self._update_and_stage_table(current_table, table_request) - updated_metadata = update_table_metadata( - base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, - enforce_validation=current_table is None, - ) - - if current_table and updated_metadata == current_table.metadata: + if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 - new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) self._write_metadata( - metadata=updated_metadata, - io=self._load_file_io(updated_metadata.properties, new_metadata_location), - metadata_path=new_metadata_location, + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, ) if current_table: @@ -482,9 +472,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking update_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=updated_metadata.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, glue_table=current_glue_table, prev_metadata_location=current_table.metadata_location, ) @@ -498,13 +488,15 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons # table does not exist, create the table create_table_input = _construct_table_input( table_name=table_name, - metadata_location=new_metadata_location, - properties=updated_metadata.properties, - metadata=updated_metadata, + metadata_location=updated_staged_table.metadata_location, + properties=updated_staged_table.properties, + metadata=updated_staged_table.metadata, ) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input) - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. From 6bdeeabb2d467350072e10bd1fe040666288a967 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 24 Apr 2024 22:13:42 -0700 Subject: [PATCH 4/4] reformat --- pyiceberg/catalog/glue.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index af5954f8b5..275cda7ed0 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -450,11 +450,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons current_table = None updated_staged_table = self._update_and_stage_table(current_table, table_request) - if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - self._write_metadata( metadata=updated_staged_table.metadata, io=updated_staged_table.io,