From ce562822d348b649e52af002b2fdc495859af7a6 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 28 Aug 2024 23:35:45 +0200 Subject: [PATCH 1/4] Make `commit_table` public --- pyiceberg/catalog/__init__.py | 27 +++++++++++++++++++-------- pyiceberg/catalog/dynamodb.py | 13 +++++++++---- pyiceberg/catalog/glue.py | 22 +++++++++++++--------- pyiceberg/catalog/hive.py | 28 +++++++++++++++++----------- pyiceberg/catalog/noop.py | 8 ++++++-- pyiceberg/catalog/rest.py | 17 ++++++++++++----- pyiceberg/catalog/sql.py | 25 ++++++++++++++----------- pyiceberg/table/__init__.py | 8 ++------ tests/catalog/test_base.py | 33 ++++++++++++++++----------------- tests/catalog/test_rest.py | 24 +----------------------- 10 files changed, 109 insertions(+), 96 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 7b15a60c65..a23c70ea4a 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -51,11 +51,12 @@ from pyiceberg.serializers import ToOutputFile from pyiceberg.table import ( DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, - CommitTableRequest, CommitTableResponse, CreateTableTransaction, StagedTable, Table, + TableRequirement, + TableUpdate, update_table_metadata, ) from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata @@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U """ @abstractmethod - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update one or more tables. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -854,13 +859,19 @@ def _create_staged_table( catalog=self, ) - def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable: - for requirement in table_request.requirements: + def _update_and_stage_table( + self, + current_table: Optional[Table], + table_identifier: Identifier, + requirements: Tuple[TableRequirement, ...], + updates: Tuple[TableUpdate, ...], + ) -> StagedTable: + for requirement in requirements: requirement.validate(current_table.metadata if current_table else None) updated_metadata = update_table_metadata( base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), - updates=table_request.updates, + updates=updates, enforce_validation=current_table is None, metadata_location=current_table.metadata_location if current_table else None, ) @@ -869,7 +880,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request: new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) return StagedTable( - identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]), + identifier=table_identifier, metadata=updated_metadata, metadata_location=new_metadata_location, io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location), diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index f0ef7c6b4f..88b606a624 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -23,6 +23,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -57,7 +58,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 148b02f902..45be080f49 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -23,6 +23,7 @@ List, Optional, Set, + Tuple, Union, cast, ) @@ -69,9 +70,10 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, Table, + TableRequirement, + TableUpdate, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) return self.load_table(identifier=identifier) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) + table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + database_name, table_name = table_identifier current_glue_table: Optional[TableTypeDef] glue_table_version_id: Optional[str] @@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons glue_table_version_id = None current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 755cd34c80..35e590a6e1 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -26,6 +26,7 @@ List, Optional, Set, + Tuple, Type, Union, ) @@ -79,11 +80,12 @@ from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, StagedTable, Table, TableProperties, + TableRequirement, + TableUpdate, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -418,11 +420,15 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -431,10 +437,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + database_name, table_name = table_identifier # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: @@ -445,7 +449,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons if lock.state == LockState.WAITING: self._wait_for_lock(database_name, table_name, lock.lockid, open_client) else: - raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") + raise CommitFailedException( + f"Failed to acquire lock for {table_identifier.identifier}, state: {lock.state}" + ) hive_table: Optional[HiveTable] current_table: Optional[Table] @@ -456,7 +462,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons hive_table = None current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) @@ -486,7 +492,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) self._create_hive_table(open_client, hive_table) except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e + raise CommitFailedException(f"Failed to acquire lock for {table.identifier}, state: {lock.state}") from e finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index 1dfeb952f9..04d40cdfa5 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -19,6 +19,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -26,10 +27,11 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( - CommitTableRequest, CommitTableResponse, CreateTableTransaction, Table, + TableRequirement, + TableUpdate, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: raise NotImplementedError def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index d2acc70136..6be14063d7 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -66,6 +66,8 @@ StagedTable, Table, TableIdentifier, + TableRequirement, + TableUpdate, ) from pyiceberg.table.metadata import TableMetadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids @@ -719,12 +721,15 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm ) return table_request - @retry(**_RETRY_ARGS) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update the table. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -734,9 +739,11 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ + identifier = TableIdentifier(namespace=table.identifier[1:-1], name=table.identifier[-1]) + table_request = CommitTableRequest(identifier=identifier, requirements=requirements, updates=updates) response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), - data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8), + data=table_request.model_dump_json().encode(UTF8), ) try: response.raise_for_status() diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 229124ad3f..a008fdfa6b 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -20,6 +20,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -60,7 +61,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - """Update one or more tables. + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. Args: - table_request (CommitTableRequest): The table requests to be carried out. + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. Returns: CommitTableResponse: The updated metadata. @@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - namespace_tuple = Catalog.namespace_from(identifier_tuple) + table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + namespace_tuple = Catalog.namespace_from(table_identifier) namespace = Catalog.namespace_to_string(namespace_tuple) - table_name = Catalog.table_name_from(identifier_tuple) + table_name = Catalog.table_name_from(table_identifier) current_table: Optional[Table] try: - current_table = self.load_table(identifier_tuple) + current_table = self.load_table(table_identifier) except NoSuchTableError: current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table_request) + updated_staged_table = self._update_and_stage_table(current_table, table.identifier[1:], requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 0316b40443..6e1e53ba28 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1673,12 +1673,8 @@ def refs(self) -> Dict[str, SnapshotRef]: return self.metadata.refs def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: - response = self.catalog._commit_table( # pylint: disable=W0212 - CommitTableRequest( - identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]), - updates=updates, - requirements=requirements, - ) + response = self.catalog.commit_table( # pylint: disable=W0212 + self, requirements, updates ) # pylint: disable=W0212 self.metadata = response.metadata self.metadata_location = response.metadata_location diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index af5c67a955..608f1e049e 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -24,6 +24,7 @@ List, Optional, Set, + Tuple, Union, ) @@ -45,12 +46,11 @@ from pyiceberg.schema import Schema from pyiceberg.table import ( AddSchemaUpdate, - CommitTableRequest, CommitTableResponse, - Namespace, SetCurrentSchemaUpdate, Table, - TableIdentifier, + TableRequirement, + TableUpdate, update_table_metadata, ) from pyiceberg.table.metadata import new_table_metadata @@ -128,17 +128,17 @@ def create_table( def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: raise NotImplementedError - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier_tuple = self._identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier) current_table = self.load_table(identifier_tuple) base_metadata = current_table.metadata - for requirement in table_request.requirements: + for requirement in requirements: requirement.validate(base_metadata) - updated_metadata = update_table_metadata(base_metadata, table_request.updates) + updated_metadata = update_table_metadata(base_metadata, updates) if updated_metadata == base_metadata: # no changes, do nothing return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) @@ -667,14 +667,13 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: ) # When - response = given_table.catalog._commit_table( # pylint: disable=W0212 - CommitTableRequest( - identifier=TableIdentifier(namespace=Namespace(given_table._identifier[:-1]), name=given_table._identifier[-1]), - updates=[ - AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), - SetCurrentSchemaUpdate(schema_id=-1), - ], - ) + response = given_table.catalog.commit_table( # pylint: disable=W0212 + given_table, + updates=( + AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), + SetCurrentSchemaUpdate(schema_id=-1), + ), + requirements=(), ) # Then diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index bb609b4142..4c14056098 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -38,7 +38,7 @@ from pyiceberg.io import load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import CommitTableRequest, Table, TableIdentifier +from pyiceberg.table import Table from pyiceberg.table.metadata import TableMetadataV1 from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import IdentityTransform, TruncateTransform @@ -1239,25 +1239,3 @@ def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None: catalog = cast(RestCatalog, load_catalog("production", uri="https://other-service.io/api")) assert catalog.uri == "https://other-service.io/api" - - -def test_table_identifier_in_commit_table_request(rest_mock: Mocker, example_table_metadata_v2: Dict[str, Any]) -> None: - test_table_request = CommitTableRequest( - identifier=TableIdentifier(namespace=("catalog_name", "namespace"), name="table_name"), - updates=[], - requirements=[], - ) - rest_mock.post( - url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name", - json={ - "metadata": example_table_metadata_v2, - "metadata-location": "test", - }, - status_code=200, - request_headers=TEST_HEADERS, - ) - RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)._commit_table(test_table_request) - assert ( - rest_mock.last_request.text - == """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}""" - ) From c647256b1d9a278a88f5e757bde4db76017d47fd Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 29 Aug 2024 22:43:12 +0200 Subject: [PATCH 2/4] Comments --- pyiceberg/catalog/glue.py | 2 +- pyiceberg/catalog/hive.py | 8 +++----- pyiceberg/catalog/rest.py | 5 +++-- pyiceberg/catalog/sql.py | 2 +- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 7ced144568..fb7d05bb1f 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -468,7 +468,7 @@ def commit_table( NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) database_name, table_name = table_identifier current_glue_table: Optional[TableTypeDef] diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 94516c7aff..5c1acbbd36 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -440,7 +440,7 @@ def commit_table( NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) database_name, table_name = table_identifier # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 @@ -452,9 +452,7 @@ def commit_table( if lock.state == LockState.WAITING: self._wait_for_lock(database_name, table_name, lock.lockid, open_client) else: - raise CommitFailedException( - f"Failed to acquire lock for {table_identifier.identifier}, state: {lock.state}" - ) + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") hive_table: Optional[HiveTable] current_table: Optional[Table] @@ -495,7 +493,7 @@ def commit_table( ) self._create_hive_table(open_client, hive_table) except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table.identifier}, state: {lock.state}") from e + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 70808df9ba..428a58b63e 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -750,8 +750,9 @@ def commit_table( CommitFailedException: Requirement not met, or a conflict with a concurrent commit. CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ - identifier = TableIdentifier(namespace=table.identifier[1:-1], name=table.identifier[-1]) - table_request = CommitTableRequest(identifier=identifier, requirements=requirements, updates=updates) + identifier = self._identifier_to_tuple_without_catalog(table.identifier) + table_identifier = TableIdentifier(namespace=identifier[0:-1], name=identifier[-1]) + table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), data=table_request.model_dump_json().encode(UTF8), diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index b9cdca7647..edd10c9788 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -412,7 +412,7 @@ def commit_table( NoSuchTableError: If a table with the given identifier does not exist. CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ - table_identifier = self.identifier_to_tuple_without_catalog(table.identifier) + table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) namespace_tuple = Catalog.namespace_from(table_identifier) namespace = Catalog.namespace_to_string(namespace_tuple) table_name = Catalog.table_name_from(table_identifier) From 1ce5289516293cc56ada443b079c108cedafcfa7 Mon Sep 17 00:00:00 2001 From: Fokko Date: Sun, 1 Sep 2024 07:23:52 +0200 Subject: [PATCH 3/4] Thanks Kevin! --- pyiceberg/catalog/glue.py | 2 +- pyiceberg/catalog/hive.py | 2 +- pyiceberg/catalog/rest.py | 24 ++++++++++++------------ pyiceberg/catalog/sql.py | 2 +- pyiceberg/table/__init__.py | 4 +--- tests/catalog/test_base.py | 2 +- 6 files changed, 17 insertions(+), 19 deletions(-) diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index fb7d05bb1f..f5f36c2620 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -469,7 +469,7 @@ def commit_table( CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) - database_name, table_name = table_identifier + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) current_glue_table: Optional[TableTypeDef] glue_table_version_id: Optional[str] diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 5c1acbbd36..c0c8a3d77e 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -441,7 +441,7 @@ def commit_table( CommitFailedException: Requirement not met, or a conflict with a concurrent commit. """ table_identifier = self._identifier_to_tuple_without_catalog(table.identifier) - database_name, table_name = table_identifier + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 428a58b63e..c9363b434b 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -731,6 +731,17 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm ) return table_request + @retry(**_RETRY_ARGS) + def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: + namespace_tuple = self._check_valid_namespace_identifier(namespace) + namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) + response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) + try: + response.raise_for_status() + except HTTPError as exc: + self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) + return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers] + @retry(**_RETRY_ARGS) def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] @@ -751,7 +762,7 @@ def commit_table( CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ identifier = self._identifier_to_tuple_without_catalog(table.identifier) - table_identifier = TableIdentifier(namespace=identifier[0:-1], name=identifier[-1]) + table_identifier = TableIdentifier(namespace=identifier[:-1], name=identifier[-1]) table_request = CommitTableRequest(identifier=table_identifier, requirements=requirements, updates=updates) response = self._session.post( self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)), @@ -771,17 +782,6 @@ def commit_table( ) return CommitTableResponse(**response.json()) - @retry(**_RETRY_ARGS) - def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: - namespace_tuple = self._check_valid_namespace_identifier(namespace) - namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple) - response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat)) - try: - response.raise_for_status() - except HTTPError as exc: - self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) - return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers] - @retry(**_RETRY_ARGS) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index edd10c9788..0a104a1704 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -423,7 +423,7 @@ def commit_table( except NoSuchTableError: current_table = None - updated_staged_table = self._update_and_stage_table(current_table, table.identifier[1:], requirements, updates) + updated_staged_table = self._update_and_stage_table(current_table, table.identifier, requirements, updates) if current_table and updated_staged_table.metadata == current_table.metadata: # no changes, do nothing return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 6e1e53ba28..43e79fb1cf 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1673,9 +1673,7 @@ def refs(self) -> Dict[str, SnapshotRef]: return self.metadata.refs def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: - response = self.catalog.commit_table( # pylint: disable=W0212 - self, requirements, updates - ) # pylint: disable=W0212 + response = self.catalog.commit_table(self, requirements, updates) self.metadata = response.metadata self.metadata_location = response.metadata_location diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 71f355786e..ca2ce4a946 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -670,7 +670,7 @@ def test_commit_table(catalog: InMemoryCatalog) -> None: ) # When - response = given_table.catalog.commit_table( # pylint: disable=W0212 + response = given_table.catalog.commit_table( given_table, updates=( AddSchemaUpdate(schema=new_schema, last_column_id=new_schema.highest_field_id), From 15b8b47a385e765e6031dda5645ac9aa6c95f1b4 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 5 Sep 2024 16:53:59 +0200 Subject: [PATCH 4/4] Update tests --- tests/catalog/test_rest.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index a622148125..f05e15df38 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -1289,22 +1289,28 @@ def test_catalog_from_parameters_empty_env(rest_mock: Mocker) -> None: assert catalog.uri == "https://other-service.io/api" -def test_table_identifier_in_commit_table_request(rest_mock: Mocker, example_table_metadata_v2: Dict[str, Any]) -> None: - test_table_request = CommitTableRequest( - identifier=TableIdentifier(namespace=("catalog_name", "namespace"), name="table_name"), - updates=[], - requirements=[], - ) +def test_table_identifier_in_commit_table_request( + rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_v2: Dict[str, Any] +) -> None: + metadata_location = "s3://some_bucket/metadata.json" rest_mock.post( url=f"{TEST_URI}v1/namespaces/namespace/tables/table_name", json={ "metadata": example_table_metadata_v2, - "metadata-location": "test", + "metadata-location": metadata_location, }, status_code=200, request_headers=TEST_HEADERS, ) - RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN)._commit_table(test_table_request) + catalog = RestCatalog("catalog_name", uri=TEST_URI, token=TEST_TOKEN) + table = Table( + identifier=("namespace", "table_name"), + metadata=None, # type: ignore + metadata_location=metadata_location, + io=None, # type: ignore + catalog=catalog, + ) + catalog.commit_table(table, (), ()) assert ( rest_mock.last_request.text == """{"identifier":{"namespace":["namespace"],"name":"table_name"},"requirements":[],"updates":[]}"""