From a57cb0745efba8dd115d1089d70f4a390cd3e99a Mon Sep 17 00:00:00 2001 From: Patrick Ames Date: Sat, 25 Nov 2023 12:51:11 -0800 Subject: [PATCH] Support catalog in identifier to locate existing tables (#150) --- pyiceberg/catalog/__init__.py | 19 +- pyiceberg/catalog/dynamodb.py | 11 +- pyiceberg/catalog/glue.py | 9 +- pyiceberg/catalog/hive.py | 9 +- pyiceberg/catalog/rest.py | 33 ++- pyiceberg/catalog/sql.py | 9 +- tests/catalog/test_base.py | 53 +++- tests/catalog/test_dynamodb.py | 65 +++++ tests/catalog/test_glue.py | 62 +++++ tests/catalog/test_hive.py | 165 ++++++++++- tests/catalog/test_rest.py | 496 +++++++++++---------------------- tests/catalog/test_sql.py | 47 ++++ tests/conftest.py | 127 +++++++++ 13 files changed, 730 insertions(+), 375 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 2577a97bc1..c83fd1c792 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -536,6 +536,20 @@ def identifier_to_database_and_table( return tuple_identifier[0], tuple_identifier[1] + def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier: + """Convert an identifier to a tuple and drop this catalog's name from the first element. + + Args: + identifier (str | Identifier): Table identifier. + + Returns: + Identifier: a tuple of strings with this catalog's name removed + """ + identifier_tuple = Catalog.identifier_to_tuple(identifier) + if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name: + identifier_tuple = identifier_tuple[1:] + return identifier_tuple + def purge_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table and purge all data and metadata files. @@ -547,8 +561,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - table = self.load_table(identifier) - self.drop_table(identifier) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + table = self.load_table(identifier_tuple) + self.drop_table(identifier_tuple) io = load_file_io(self.properties, table.metadata_location) metadata = table.metadata manifest_lists_to_delete = set() diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 848ec03126..3eee95da3c 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -213,7 +213,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name) return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item) @@ -226,7 +227,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) try: self._delete_dynamo_item( @@ -256,7 +258,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U NoSuchPropertyException: When from table miss some required properties. NoSuchNamespaceError: When the destination namespace doesn't exist. """ - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) + from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) + from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name) @@ -287,7 +290,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e try: - self.drop_table(from_identifier) + self.drop_table(from_identifier_tuple) except (NoSuchTableError, GenericDynamoDbError) as e: log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. " diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index e0683632de..723de2f335 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -265,7 +265,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) try: load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name) except self.glue.exceptions.EntityNotFoundException as e: @@ -282,7 +283,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) try: self.glue.delete_table(DatabaseName=database_name, Name=table_name) except self.glue.exceptions.EntityNotFoundException as e: @@ -307,7 +309,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U NoSuchPropertyException: When from table miss some required properties. NoSuchNamespaceError: When the destination namespace doesn't exist. """ - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) + from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) + from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) try: get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 21f171421e..ffc9c5333c 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -347,7 +347,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) try: with self._client as open_client: hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) @@ -366,7 +367,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: Raises: NoSuchTableError: If a table with the name does not exist, or the identifier is invalid. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) try: with self._client as open_client: open_client.drop_table(dbname=database_name, name=table_name, deleteData=False) @@ -393,7 +395,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U NoSuchTableError: When a table with the name does not exist. NoSuchNamespaceError: When the destination namespace doesn't exist. """ - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) + from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) + from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) try: with self._client as open_client: diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 77025b59dd..3dbb5b72a9 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -302,19 +302,20 @@ def _fetch_config(self) -> None: # Update URI based on overrides self.uri = config[URI] - def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties: - if isinstance(identifier, TableIdentifier): - return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name} - + def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier: identifier_tuple = self.identifier_to_tuple(identifier) if len(identifier_tuple) <= 1: raise NoSuchTableError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}") + return identifier_tuple + + def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties: + if isinstance(identifier, TableIdentifier): + return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name} + identifier_tuple = self._identifier_to_validated_tuple(identifier) return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), "table": identifier_tuple[-1]} def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]: - identifier_tuple = self.identifier_to_tuple(identifier) - if len(identifier_tuple) <= 1: - raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier_tuple}") + identifier_tuple = self._identifier_to_validated_tuple(identifier) return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]} def _handle_non_200_response(self, exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None: @@ -499,12 +500,10 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers] def load_table(self, identifier: Union[str, Identifier]) -> Table: - identifier_tuple = self.identifier_to_tuple(identifier) - - if len(identifier_tuple) <= 1: - raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier}") - - response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier))) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + response = self._session.get( + self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple)) + ) try: response.raise_for_status() except HTTPError as exc: @@ -514,8 +513,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: return self._response_to_table(identifier_tuple, table_response) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) response = self._session.delete( - self.url(Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier)), + self.url( + Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple) + ), ) try: response.raise_for_status() @@ -526,8 +528,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: + from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) payload = { - "source": self._split_identifier_for_json(from_identifier), + "source": self._split_identifier_for_json(from_identifier_tuple), "destination": self._split_identifier_for_json(to_identifier), } response = self._session.post(self.url(Endpoints.rename_table), json=payload) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index bca0fe44da..b0c01eb52e 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -231,7 +231,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: Raises: NoSuchTableError: If a table with the name does not exist. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: stmt = select(IcebergTables).where( IcebergTables.catalog_name == self.name, @@ -252,7 +253,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: Raises: NoSuchTableError: If a table with the name does not exist. """ - database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError) + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) with Session(self.engine) as session: res = session.execute( delete(IcebergTables).where( @@ -280,7 +282,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U TableAlreadyExistsError: If a table with the new name already exist. NoSuchNamespaceError: If the target namespace does not exist. """ - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError) + from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) + from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) if not self._namespace_exists(to_database_name): raise NoSuchNamespaceError(f"Namespace does not exist: {to_database_name}") diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 1078dd1b0a..c7cd1c4fa1 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -149,14 +149,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) def load_table(self, identifier: Union[str, Identifier]) -> Table: - identifier = Catalog.identifier_to_tuple(identifier) + identifier = self.identifier_to_tuple_without_catalog(identifier) try: return self.__tables[identifier] except KeyError as error: raise NoSuchTableError(f"Table does not exist: {identifier}") from error def drop_table(self, identifier: Union[str, Identifier]) -> None: - identifier = Catalog.identifier_to_tuple(identifier) + identifier = self.identifier_to_tuple_without_catalog(identifier) try: self.__tables.pop(identifier) except KeyError as error: @@ -166,7 +166,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: - from_identifier = Catalog.identifier_to_tuple(from_identifier) + from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) try: table = self.__tables.pop(from_identifier) except KeyError as error: @@ -352,6 +352,16 @@ def test_load_table(catalog: InMemoryCatalog) -> None: assert table == given_table +def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None: + # Given + given_table = given_catalog_has_a_table(catalog) + # When + intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER) + table = catalog.load_table(intermediate.identifier) + # Then + assert table == given_table + + def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> None: with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): catalog.load_table(TEST_TABLE_IDENTIFIER) @@ -367,6 +377,18 @@ def test_drop_table(catalog: InMemoryCatalog) -> None: catalog.load_table(TEST_TABLE_IDENTIFIER) +def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None: + # Given + table = given_catalog_has_a_table(catalog) + # When + catalog.drop_table(table.identifier) + # Then + with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): + catalog.load_table(table.identifier) + with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) -> None: with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): catalog.load_table(TEST_TABLE_IDENTIFIER) @@ -405,6 +427,31 @@ def test_rename_table(catalog: InMemoryCatalog) -> None: catalog.load_table(TEST_TABLE_IDENTIFIER) +def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None: + # Given + table = given_catalog_has_a_table(catalog) + + # When + new_table_name = "new.namespace.new_table" + new_table = catalog.rename_table(table.identifier, new_table_name) + + # Then + assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name) + + # And + new_table = catalog.load_table(new_table.identifier) + assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name) + + # And + assert ("new", "namespace") in catalog.list_namespaces() + + # And + with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): + catalog.load_table(table.identifier) + with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR): + catalog.load_table(TEST_TABLE_IDENTIFIER) + + def test_create_namespace(catalog: InMemoryCatalog) -> None: # When catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES) diff --git a/tests/catalog/test_dynamodb.py b/tests/catalog/test_dynamodb.py index 582cb034e8..f03d1d931f 100644 --- a/tests/catalog/test_dynamodb.py +++ b/tests/catalog/test_dynamodb.py @@ -175,6 +175,23 @@ def test_load_table( assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) +@mock_dynamodb +def test_load_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "test_ddb_catalog" + identifier = (database_name, table_name) + test_catalog = DynamoDbCatalog( + catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + ) + test_catalog.create_namespace(namespace=database_name) + test_catalog.create_table(identifier, table_schema_nested) + intermediate = test_catalog.load_table(identifier) + table = test_catalog.load_table(intermediate.identifier) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + + @mock_dynamodb def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None: identifier = (database_name, table_name) @@ -203,6 +220,27 @@ def test_drop_table( test_catalog.load_table(identifier) +@mock_dynamodb +def test_drop_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "test_ddb_catalog" + identifier = (database_name, table_name) + test_catalog = DynamoDbCatalog( + catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + ) + test_catalog.create_namespace(namespace=database_name) + test_catalog.create_table(identifier, table_schema_nested) + table = test_catalog.load_table(identifier) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + test_catalog.drop_table(table.identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + + @mock_dynamodb def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None: identifier = (database_name, table_name) @@ -236,6 +274,33 @@ def test_rename_table( test_catalog.load_table(identifier) +@mock_dynamodb +def test_rename_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "test_ddb_catalog" + new_table_name = f"{table_name}_new" + identifier = (database_name, table_name) + new_identifier = (database_name, new_table_name) + test_catalog = DynamoDbCatalog( + catalog_name, **{"warehouse": f"s3://{BUCKET_NAME}", "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + ) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + test_catalog.rename_table(table.identifier, new_identifier) + new_table = test_catalog.load_table(new_identifier) + assert new_table.identifier == (catalog_name,) + new_identifier + # the metadata_location should not change + assert new_table.metadata_location == table.metadata_location + # old table should be dropped + with pytest.raises(NoSuchTableError): + test_catalog.load_table(identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + + @mock_dynamodb def test_fail_on_rename_table_with_missing_required_params( _bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 1d7027a216..f182bf1eda 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -153,6 +153,22 @@ def test_load_table( assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) +@mock_glue +def test_load_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog( + catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"} + ) + test_catalog.create_namespace(namespace=database_name) + intermediate = test_catalog.create_table(identifier, table_schema_nested) + table = test_catalog.load_table(intermediate.identifier) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + + @mock_glue def test_load_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None: identifier = (database_name, table_name) @@ -181,6 +197,27 @@ def test_drop_table( test_catalog.load_table(identifier) +@mock_glue +def test_drop_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + identifier = (database_name, table_name) + test_catalog = GlueCatalog( + catalog_name, **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"} + ) + test_catalog.create_namespace(namespace=database_name) + test_catalog.create_table(identifier, table_schema_nested) + table = test_catalog.load_table(identifier) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + test_catalog.drop_table(table.identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + + @mock_glue def test_drop_non_exist_table(_bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None: identifier = (database_name, table_name) @@ -212,6 +249,31 @@ def test_rename_table( test_catalog.load_table(identifier) +@mock_glue +def test_rename_table_from_self_identifier( + _bucket_initialize: None, _patch_aiobotocore: None, table_schema_nested: Schema, database_name: str, table_name: str +) -> None: + catalog_name = "glue" + new_table_name = f"{table_name}_new" + identifier = (database_name, table_name) + new_identifier = (database_name, new_table_name) + test_catalog = GlueCatalog("glue", **{"py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO", "warehouse": f"s3://{BUCKET_NAME}/"}) + test_catalog.create_namespace(namespace=database_name) + table = test_catalog.create_table(identifier, table_schema_nested) + assert table.identifier == (catalog_name,) + identifier + assert TABLE_METADATA_LOCATION_REGEX.match(table.metadata_location) + test_catalog.rename_table(table.identifier, new_identifier) + new_table = test_catalog.load_table(new_identifier) + assert new_table.identifier == (catalog_name,) + new_identifier + # the metadata_location should not change + assert new_table.metadata_location == table.metadata_location + # old table should be dropped + with pytest.raises(NoSuchTableError): + test_catalog.load_table(identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + + @mock_glue def test_rename_table_no_params(_glue, _bucket_initialize: None, _patch_aiobotocore: None, database_name: str, table_name: str) -> None: # type: ignore new_database_name = f"{database_name}_new" diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index c280146463..54336786f9 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -15,8 +15,9 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,redefined-outer-name +import copy import uuid -from unittest.mock import MagicMock, patch +from unittest.mock import MagicMock, call, patch import pytest from hive_metastore.ttypes import ( @@ -394,6 +395,155 @@ def test_load_table(hive_table: HiveTable) -> None: assert expected == table.metadata +def test_load_table_from_self_identifier(hive_table: HiveTable) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + + catalog._client = MagicMock() + catalog._client.__enter__().get_table.return_value = hive_table + intermediate = catalog.load_table(("default", "new_tabl2e")) + table = catalog.load_table(intermediate.identifier) + + catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e") + + expected = TableMetadataV2( + location="s3://bucket/test/location", + table_uuid=uuid.UUID("9c12d441-03fe-4693-9a96-a0705ddf69c1"), + last_updated_ms=1602638573590, + last_column_id=3, + schemas=[ + Schema( + NestedField(field_id=1, name="x", field_type=LongType(), required=True), + schema_id=0, + identifier_field_ids=[], + ), + Schema( + NestedField(field_id=1, name="x", field_type=LongType(), required=True), + NestedField(field_id=2, name="y", field_type=LongType(), required=True, doc="comment"), + NestedField(field_id=3, name="z", field_type=LongType(), required=True), + schema_id=1, + identifier_field_ids=[1, 2], + ), + ], + current_schema_id=1, + partition_specs=[ + PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=IdentityTransform(), name="x"), spec_id=0) + ], + default_spec_id=0, + last_partition_id=1000, + properties={"read.split.target.size": "134217728"}, + current_snapshot_id=3055729675574597004, + snapshots=[ + Snapshot( + snapshot_id=3051729675574597004, + parent_snapshot_id=None, + sequence_number=0, + timestamp_ms=1515100955770, + manifest_list="s3://a/b/1.avro", + summary=Summary(operation=Operation.APPEND), + schema_id=None, + ), + Snapshot( + snapshot_id=3055729675574597004, + parent_snapshot_id=3051729675574597004, + sequence_number=1, + timestamp_ms=1555100955770, + manifest_list="s3://a/b/2.avro", + summary=Summary(operation=Operation.APPEND), + schema_id=1, + ), + ], + snapshot_log=[ + SnapshotLogEntry(snapshot_id=3051729675574597004, timestamp_ms=1515100955770), + SnapshotLogEntry(snapshot_id=3055729675574597004, timestamp_ms=1555100955770), + ], + metadata_log=[MetadataLogEntry(metadata_file="s3://bucket/.../v1.json", timestamp_ms=1515100)], + sort_orders=[ + SortOrder( + SortField( + source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC, null_order=NullOrder.NULLS_FIRST + ), + SortField( + source_id=3, + transform=BucketTransform(num_buckets=4), + direction=SortDirection.DESC, + null_order=NullOrder.NULLS_LAST, + ), + order_id=3, + ) + ], + default_sort_order_id=3, + refs={ + "test": SnapshotRef( + snapshot_id=3051729675574597004, + snapshot_ref_type=SnapshotRefType.TAG, + min_snapshots_to_keep=None, + max_snapshot_age_ms=None, + max_ref_age_ms=10000000, + ), + "main": SnapshotRef( + snapshot_id=3055729675574597004, + snapshot_ref_type=SnapshotRefType.BRANCH, + min_snapshots_to_keep=None, + max_snapshot_age_ms=None, + max_ref_age_ms=None, + ), + }, + format_version=2, + last_sequence_number=34, + ) + + assert table.identifier == (HIVE_CATALOG_NAME, "default", "new_tabl2e") + assert expected == table.metadata + + +def test_rename_table(hive_table: HiveTable) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + + renamed_table = copy.deepcopy(hive_table) + renamed_table.dbName = "default" + renamed_table.tableName = "new_tabl3e" + + catalog._client = MagicMock() + catalog._client.__enter__().get_table.side_effect = [hive_table, renamed_table] + catalog._client.__enter__().alter_table.return_value = None + + from_identifier = ("default", "new_tabl2e") + to_identifier = ("default", "new_tabl3e") + table = catalog.rename_table(from_identifier, to_identifier) + + assert table.identifier == ("hive",) + to_identifier + + calls = [call(dbname="default", tbl_name="new_tabl2e"), call(dbname="default", tbl_name="new_tabl3e")] + catalog._client.__enter__().get_table.assert_has_calls(calls) + catalog._client.__enter__().alter_table.assert_called_with(dbname="default", tbl_name="new_tabl2e", new_tbl=renamed_table) + + +def test_rename_table_from_self_identifier(hive_table: HiveTable) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + + catalog._client = MagicMock() + catalog._client.__enter__().get_table.return_value = hive_table + + from_identifier = ("default", "new_tabl2e") + from_table = catalog.load_table(from_identifier) + catalog._client.__enter__().get_table.assert_called_with(dbname="default", tbl_name="new_tabl2e") + + renamed_table = copy.deepcopy(hive_table) + renamed_table.dbName = "default" + renamed_table.tableName = "new_tabl3e" + + catalog._client.__enter__().get_table.side_effect = [hive_table, renamed_table] + catalog._client.__enter__().alter_table.return_value = None + to_identifier = ("default", "new_tabl3e") + table = catalog.rename_table(from_table.identifier, to_identifier) + + assert table.identifier == ("hive",) + to_identifier + + calls = [call(dbname="default", tbl_name="new_tabl2e"), call(dbname="default", tbl_name="new_tabl3e")] + catalog._client.__enter__().get_table.assert_has_calls(calls) + catalog._client.__enter__().alter_table.assert_called_with(dbname="default", tbl_name="new_tabl2e", new_tbl=renamed_table) + + def test_rename_table_from_does_not_exists() -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) @@ -489,6 +639,19 @@ def test_drop_table() -> None: catalog._client.__enter__().drop_table.assert_called_with(dbname="default", name="table", deleteData=False) +def test_drop_table_from_self_identifier(hive_table: HiveTable) -> None: + catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) + + catalog._client = MagicMock() + catalog._client.__enter__().get_table.return_value = hive_table + table = catalog.load_table(("default", "new_tabl2e")) + + catalog._client.__enter__().get_all_databases.return_value = ["namespace1", "namespace2"] + catalog.drop_table(table.identifier) + + catalog._client.__enter__().drop_table.assert_called_with(dbname="default", name="new_tabl2e", deleteData=False) + + def test_drop_table_does_not_exists() -> None: catalog = HiveCatalog(HIVE_CATALOG_NAME, uri=HIVE_METASTORE_FAKE_URL) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 43313c03ce..79cf25f87a 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -16,9 +16,8 @@ # under the License. # pylint: disable=redefined-outer-name,unused-argument import os -from typing import cast +from typing import Any, Dict, cast from unittest import mock -from uuid import UUID import pytest from requests_mock import Mocker @@ -37,17 +36,9 @@ from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table.metadata import TableMetadataV1 -from pyiceberg.table.refs import SnapshotRef, SnapshotRefType -from pyiceberg.table.snapshots import Operation, Snapshot, Summary from pyiceberg.table.sorting import SortField, SortOrder from pyiceberg.transforms import IdentityTransform, TruncateTransform from pyiceberg.typedef import RecursiveDict -from pyiceberg.types import ( - BooleanType, - IntegerType, - NestedField, - StringType, -) from pyiceberg.utils.config import Config TEST_URI = "https://iceberg-test-catalog/" @@ -64,6 +55,30 @@ } +@pytest.fixture +def example_table_metadata_with_snapshot_v1_rest_json(example_table_metadata_with_snapshot_v1: Dict[str, Any]) -> Dict[str, Any]: + return { + "metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + "metadata": example_table_metadata_with_snapshot_v1, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2", + }, + } + + +@pytest.fixture +def example_table_metadata_no_snapshot_v1_rest_json(example_table_metadata_no_snapshot_v1: Dict[str, Any]) -> Dict[str, Any]: + return { + "metadata-location": "s3://warehouse/database/table/metadata.json", + "metadata": example_table_metadata_no_snapshot_v1, + "config": { + "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", + "region": "us-west-2", + }, + } + + @pytest.fixture def rest_mock(requests_mock: Mocker) -> Mocker: """Takes the default requests_mock and adds the config endpoint to it @@ -339,77 +354,10 @@ def test_update_namespace_properties_404(rest_mock: Mocker) -> None: assert "Namespace does not exist" in str(e.value) -def test_load_table_200(rest_mock: Mocker) -> None: +def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/table", - json={ - "metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", - "metadata": { - "format-version": 1, - "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f", - "location": "s3://warehouse/database/table", - "last-updated-ms": 1646787054459, - "last-column-id": 2, - "schema": { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], - }, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "fields": [ - {"id": 1, "name": "id", "required": False, "type": "int"}, - {"id": 2, "name": "data", "required": False, "type": "string"}, - ], - } - ], - "partition-spec": [], - "default-spec-id": 0, - "partition-specs": [{"spec-id": 0, "fields": []}], - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": {"owner": "bryan", "write.metadata.compression-codec": "gzip"}, - "current-snapshot-id": 3497810964824022504, - "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}}, - "snapshots": [ - { - "snapshot-id": 3497810964824022504, - "timestamp-ms": 1646787054459, - "summary": { - "operation": "append", - "spark.app.id": "local-1646787004168", - "added-data-files": "1", - "added-records": "1", - "added-files-size": "697", - "changed-partition-count": "1", - "total-records": "1", - "total-files-size": "697", - "total-data-files": "1", - "total-delete-files": "0", - "total-position-deletes": "0", - "total-equality-deletes": "0", - }, - "manifest-list": "s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro", - "schema-id": 0, - } - ], - "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 3497810964824022504}], - "metadata-log": [ - { - "timestamp-ms": 1646787031514, - "metadata-file": "s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json", - } - ], - }, - "config": {"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"}, - }, + json=example_table_metadata_with_snapshot_v1_rest_json, status_code=200, request_headers=TEST_HEADERS, ) @@ -417,78 +365,8 @@ def test_load_table_200(rest_mock: Mocker) -> None: actual = catalog.load_table(("fokko", "table")) expected = Table( identifier=("rest", "fokko", "table"), - metadata_location="s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", - metadata=TableMetadataV1( - location="s3://warehouse/database/table", - table_uuid=UUID("b55d9dda-6561-423a-8bfc-787980ce421f"), - last_updated_ms=1646787054459, - last_column_id=2, - schemas=[ - Schema( - NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), - NestedField(field_id=2, name="data", field_type=StringType(), required=False), - schema_id=0, - identifier_field_ids=[], - ) - ], - current_schema_id=0, - default_spec_id=0, - last_partition_id=999, - properties={"owner": "bryan", "write.metadata.compression-codec": "gzip"}, - current_snapshot_id=3497810964824022504, - snapshots=[ - Snapshot( - snapshot_id=3497810964824022504, - parent_snapshot_id=None, - sequence_number=None, - timestamp_ms=1646787054459, - manifest_list="s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro", - summary=Summary( - operation=Operation.APPEND, - **{ - "spark.app.id": "local-1646787004168", - "added-data-files": "1", - "added-records": "1", - "added-files-size": "697", - "changed-partition-count": "1", - "total-records": "1", - "total-files-size": "697", - "total-data-files": "1", - "total-delete-files": "0", - "total-position-deletes": "0", - "total-equality-deletes": "0", - }, - ), - schema_id=0, - ) - ], - snapshot_log=[{"timestamp-ms": 1646787054459, "snapshot-id": 3497810964824022504}], - metadata_log=[ - { - "timestamp-ms": 1646787031514, - "metadata-file": "s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json", - } - ], - sort_orders=[SortOrder(order_id=0)], - default_sort_order_id=0, - refs={ - "main": SnapshotRef( - snapshot_id=3497810964824022504, - snapshot_ref_type=SnapshotRefType.BRANCH, - min_snapshots_to_keep=None, - max_snapshot_age_ms=None, - max_ref_age_ms=None, - ) - }, - format_version=1, - schema_=Schema( - NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), - NestedField(field_id=2, name="data", field_type=StringType(), required=False), - schema_id=0, - identifier_field_ids=[], - ), - partition_spec=[], - ), + metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]), io=load_file_io(), catalog=catalog, ) @@ -497,6 +375,29 @@ def test_load_table_200(rest_mock: Mocker) -> None: assert actual == expected +def test_load_table_from_self_identifier_200( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] +) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/pdames/tables/table", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("pdames", "table")) + actual = catalog.load_table(table.identifier) + expected = Table( + identifier=("rest", "pdames", "table"), + metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]), + io=load_file_io(), + catalog=catalog, + ) + assert actual.metadata.model_dump() == expected.metadata.model_dump() + assert actual == expected + + def test_load_table_404(rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces/fokko/tables/does_not_exists", @@ -535,62 +436,12 @@ def test_drop_table_404(rest_mock: Mocker) -> None: assert "Table does not exist" in str(e.value) -def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> None: +def test_create_table_200( + rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any] +) -> None: rest_mock.post( f"{TEST_URI}v1/namespaces/fokko/tables", - json={ - "metadata-location": "s3://warehouse/database/table/metadata.json", - "metadata": { - "format-version": 1, - "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", - "location": "s3://warehouse/database/table", - "last-updated-ms": 1657810967051, - "last-column-id": 3, - "schema": { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": False, "type": "string"}, - {"id": 2, "name": "bar", "required": True, "type": "int"}, - {"id": 3, "name": "baz", "required": False, "type": "boolean"}, - ], - }, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": False, "type": "string"}, - {"id": 2, "name": "bar", "required": True, "type": "int"}, - {"id": 3, "name": "baz", "required": False, "type": "boolean"}, - ], - } - ], - "partition-spec": [], - "default-spec-id": 0, - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": { - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd", - }, - "current-snapshot-id": -1, - "refs": {}, - "snapshots": [], - "snapshot-log": [], - "metadata-log": [], - }, - "config": { - "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", - "region": "us-west-2", - }, - }, + json=example_table_metadata_no_snapshot_v1_rest_json, status_code=200, request_headers=TEST_HEADERS, ) @@ -607,47 +458,8 @@ def test_create_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> Non ) expected = Table( identifier=("rest", "fokko", "fokko2"), - metadata_location="s3://warehouse/database/table/metadata.json", - metadata=TableMetadataV1( - location="s3://warehouse/database/table", - table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"), - last_updated_ms=1657810967051, - last_column_id=3, - schemas=[ - Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), - schema_id=0, - identifier_field_ids=[2], - ) - ], - current_schema_id=0, - default_spec_id=0, - last_partition_id=999, - properties={ - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd", - }, - current_snapshot_id=None, - snapshots=[], - snapshot_log=[], - metadata_log=[], - sort_orders=[SortOrder(order_id=0)], - default_sort_order_id=0, - refs={}, - format_version=1, - schema_=Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), - schema_id=0, - identifier_field_ids=[2], - ), - partition_spec=[], - ), + metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]), io=load_file_io(), catalog=catalog, ) @@ -682,62 +494,12 @@ def test_create_table_409(rest_mock: Mocker, table_schema_simple: Schema) -> Non assert "Table already exists" in str(e.value) -def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> None: +def test_register_table_200( + rest_mock: Mocker, table_schema_simple: Schema, example_table_metadata_no_snapshot_v1_rest_json: Dict[str, Any] +) -> None: rest_mock.post( f"{TEST_URI}v1/namespaces/default/register", - json={ - "metadata-location": "s3://warehouse/database/table/metadata.json", - "metadata": { - "format-version": 1, - "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", - "location": "s3://warehouse/database/table", - "last-updated-ms": 1657810967051, - "last-column-id": 3, - "schema": { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": False, "type": "string"}, - {"id": 2, "name": "bar", "required": True, "type": "int"}, - {"id": 3, "name": "baz", "required": False, "type": "boolean"}, - ], - }, - "current-schema-id": 0, - "schemas": [ - { - "type": "struct", - "schema-id": 0, - "identifier-field-ids": [2], - "fields": [ - {"id": 1, "name": "foo", "required": False, "type": "string"}, - {"id": 2, "name": "bar", "required": True, "type": "int"}, - {"id": 3, "name": "baz", "required": False, "type": "boolean"}, - ], - } - ], - "partition-spec": [], - "default-spec-id": 0, - "last-partition-id": 999, - "default-sort-order-id": 0, - "sort-orders": [{"order-id": 0, "fields": []}], - "properties": { - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd", - }, - "current-snapshot-id": -1, - "refs": {}, - "snapshots": [], - "snapshot-log": [], - "metadata-log": [], - }, - "config": { - "client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", - "region": "us-west-2", - }, - }, + json=example_table_metadata_no_snapshot_v1_rest_json, status_code=200, request_headers=TEST_HEADERS, ) @@ -747,47 +509,8 @@ def test_register_table_200(rest_mock: Mocker, table_schema_simple: Schema) -> N ) expected = Table( identifier=("rest", "default", "registered_table"), - metadata_location="s3://warehouse/database/table/metadata.json", - metadata=TableMetadataV1( - location="s3://warehouse/database/table", - table_uuid=UUID("bf289591-dcc0-4234-ad4f-5c3eed811a29"), - last_updated_ms=1657810967051, - last_column_id=3, - schemas=[ - Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), - schema_id=0, - identifier_field_ids=[2], - ) - ], - current_schema_id=0, - default_spec_id=0, - last_partition_id=999, - properties={ - "write.delete.parquet.compression-codec": "zstd", - "write.metadata.compression-codec": "gzip", - "write.summary.partition-limit": "100", - "write.parquet.compression-codec": "zstd", - }, - current_snapshot_id=None, - snapshots=[], - snapshot_log=[], - metadata_log=[], - sort_orders=[SortOrder(order_id=0)], - default_sort_order_id=0, - refs={}, - format_version=1, - schema_=Schema( - NestedField(field_id=1, name="foo", field_type=StringType(), required=False), - NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True), - NestedField(field_id=3, name="baz", field_type=BooleanType(), required=False), - schema_id=0, - identifier_field_ids=[2], - ), - partition_spec=[], - ), + metadata_location=example_table_metadata_no_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_no_snapshot_v1_rest_json["metadata"]), io=load_file_io(), catalog=catalog, ) @@ -839,6 +562,97 @@ def test_delete_table_204(rest_mock: Mocker) -> None: RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).drop_table(("example", "fokko")) +def test_delete_table_from_self_identifier_204( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] +) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/pdames/tables/table", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("pdames", "table")) + rest_mock.delete( + f"{TEST_URI}v1/namespaces/pdames/tables/table", + json={}, + status_code=204, + request_headers=TEST_HEADERS, + ) + catalog.drop_table(table.identifier) + + +def test_rename_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: + rest_mock.post( + f"{TEST_URI}v1/tables/rename", + json={ + "source": {"namespace": ("pdames",), "name": "source"}, + "destination": {"namespace": ("pdames",), "name": "destination"}, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces/pdames/tables/destination", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + from_identifier = ("pdames", "source") + to_identifier = ("pdames", "destination") + actual = catalog.rename_table(from_identifier, to_identifier) + expected = Table( + identifier=("rest", "pdames", "destination"), + metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]), + io=load_file_io(), + catalog=catalog, + ) + assert actual.metadata.model_dump() == expected.metadata.model_dump() + assert actual == expected + + +def test_rename_table_from_self_identifier_200( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] +) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces/pdames/tables/source", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + from_identifier = ("pdames", "source") + to_identifier = ("pdames", "destination") + table = catalog.load_table(from_identifier) + rest_mock.post( + f"{TEST_URI}v1/tables/rename", + json={ + "source": {"namespace": ("pdames",), "name": "source"}, + "destination": {"namespace": ("pdames",), "name": "destination"}, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.get( + f"{TEST_URI}v1/namespaces/pdames/tables/destination", + json=example_table_metadata_with_snapshot_v1_rest_json, + status_code=200, + request_headers=TEST_HEADERS, + ) + actual = catalog.rename_table(table.identifier, to_identifier) + expected = Table( + identifier=("rest", "pdames", "destination"), + metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]), + io=load_file_io(), + catalog=catalog, + ) + assert actual.metadata.model_dump() == expected.metadata.model_dump() + assert actual == expected + + def test_delete_table_404(rest_mock: Mocker) -> None: rest_mock.delete( f"{TEST_URI}v1/namespaces/example/tables/fokko", diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index 4277845633..56d2c16c10 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -188,6 +188,20 @@ def test_load_table(test_catalog: SqlCatalog, table_schema_nested: Schema, rando assert table.metadata == loaded_table.metadata +def test_load_table_from_self_identifier( + test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier +) -> None: + database_name, _table_name = random_identifier + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(random_identifier, table_schema_nested) + intermediate = test_catalog.load_table(random_identifier) + assert intermediate.identifier == (test_catalog.name,) + random_identifier + loaded_table = test_catalog.load_table(intermediate.identifier) + assert table.identifier == loaded_table.identifier + assert table.metadata_location == loaded_table.metadata_location + assert table.metadata == loaded_table.metadata + + def test_drop_table(test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: database_name, _table_name = random_identifier test_catalog.create_namespace(database_name) @@ -198,6 +212,20 @@ def test_drop_table(test_catalog: SqlCatalog, table_schema_nested: Schema, rando test_catalog.load_table(random_identifier) +def test_drop_table_from_self_identifier( + test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier +) -> None: + database_name, _table_name = random_identifier + test_catalog.create_namespace(database_name) + table = test_catalog.create_table(random_identifier, table_schema_nested) + assert table.identifier == (test_catalog.name,) + random_identifier + test_catalog.drop_table(table.identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(random_identifier) + + def test_drop_table_that_does_not_exist(test_catalog: SqlCatalog, random_identifier: Identifier) -> None: with pytest.raises(NoSuchTableError): test_catalog.drop_table(random_identifier) @@ -220,6 +248,25 @@ def test_rename_table( test_catalog.load_table(random_identifier) +def test_rename_table_from_self_identifier( + test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier +) -> None: + from_database_name, _from_table_name = random_identifier + to_database_name, _to_table_name = another_random_identifier + test_catalog.create_namespace(from_database_name) + test_catalog.create_namespace(to_database_name) + table = test_catalog.create_table(random_identifier, table_schema_nested) + assert table.identifier == (test_catalog.name,) + random_identifier + test_catalog.rename_table(table.identifier, another_random_identifier) + new_table = test_catalog.load_table(another_random_identifier) + assert new_table.identifier == (test_catalog.name,) + another_random_identifier + assert new_table.metadata_location == table.metadata_location + with pytest.raises(NoSuchTableError): + test_catalog.load_table(table.identifier) + with pytest.raises(NoSuchTableError): + test_catalog.load_table(random_identifier) + + def test_rename_table_to_existing_one( test_catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier ) -> None: diff --git a/tests/conftest.py b/tests/conftest.py index 4bd1921d76..72a7ad0310 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -354,6 +354,133 @@ def all_avro_types() -> Dict[str, Any]: } +EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1 = { + "format-version": 1, + "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1646787054459, + "last-column-id": 2, + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": False, "type": "int"}, + {"id": 2, "name": "data", "required": False, "type": "string"}, + ], + } + ], + "partition-spec": [], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "owner": "bryan", + "write.metadata.compression-codec": "gzip", + }, + "current-snapshot-id": 3497810964824022504, + "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}}, + "snapshots": [ + { + "snapshot-id": 3497810964824022504, + "timestamp-ms": 1646787054459, + "summary": { + "operation": "append", + "spark.app.id": "local-1646787004168", + "added-data-files": "1", + "added-records": "1", + "added-files-size": "697", + "changed-partition-count": "1", + "total-records": "1", + "total-files-size": "697", + "total-data-files": "1", + "total-delete-files": "0", + "total-position-deletes": "0", + "total-equality-deletes": "0", + }, + "manifest-list": "s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro", + "schema-id": 0, + } + ], + "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 3497810964824022504}], + "metadata-log": [ + { + "timestamp-ms": 1646787031514, + "metadata-file": "s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json", + } + ], +} + + +@pytest.fixture +def example_table_metadata_with_snapshot_v1() -> Dict[str, Any]: + return EXAMPLE_TABLE_METADATA_WITH_SNAPSHOT_V1 + + +EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1 = { + "format-version": 1, + "table-uuid": "bf289591-dcc0-4234-ad4f-5c3eed811a29", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1657810967051, + "last-column-id": 3, + "schema": { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": False, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "int"}, + {"id": 3, "name": "baz", "required": False, "type": "boolean"}, + ], + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "identifier-field-ids": [2], + "fields": [ + {"id": 1, "name": "foo", "required": False, "type": "string"}, + {"id": 2, "name": "bar", "required": True, "type": "int"}, + {"id": 3, "name": "baz", "required": False, "type": "boolean"}, + ], + } + ], + "partition-spec": [], + "default-spec-id": 0, + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": { + "write.delete.parquet.compression-codec": "zstd", + "write.metadata.compression-codec": "gzip", + "write.summary.partition-limit": "100", + "write.parquet.compression-codec": "zstd", + }, + "current-snapshot-id": -1, + "refs": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], +} + + +@pytest.fixture +def example_table_metadata_no_snapshot_v1() -> Dict[str, Any]: + return EXAMPLE_TABLE_METADATA_NO_SNAPSHOT_V1 + + EXAMPLE_TABLE_METADATA_V2 = { "format-version": 2, "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",