diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 0b70fe32e1..ea2bc65760 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -588,7 +588,7 @@ def identifier_to_tuple(identifier: Union[str, Identifier]) -> Identifier: If the identifier is a string, it is split into a tuple on '.'. If it is a tuple, it is used as-is. Args: - identifier (str | Identifier: an identifier, either a string or tuple of strings. + identifier (str | Identifier): an identifier, either a string or tuple of strings. Returns: Identifier: a tuple of strings. @@ -619,6 +619,29 @@ def namespace_from(identifier: Union[str, Identifier]) -> Identifier: """ return Catalog.identifier_to_tuple(identifier)[:-1] + @staticmethod + def namespace_to_string( + identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError + ) -> str: + """Transform a namespace identifier into a string. + + Args: + identifier (Union[str, Identifier]): a namespace identifier. + err (Union[Type[ValueError], Type[NoSuchNamespaceError]]): the error type to raise when identifier is empty. + + Returns: + Identifier: Namespace identifier. + """ + tuple_identifier = Catalog.identifier_to_tuple(identifier) + if len(tuple_identifier) < 1: + raise err("Empty namespace identifier") + + # Check if any segment of the tuple is an empty string + if any(segment.strip() == "" for segment in tuple_identifier): + raise err("Namespace identifier contains an empty segment or a segment with only whitespace") + + return ".".join(segment.strip() for segment in tuple_identifier) + @staticmethod def identifier_to_database( identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 978109b2a3..6c198767e7 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -43,6 +43,7 @@ from pyiceberg.catalog import ( METADATA_LOCATION, + Catalog, MetastoreCatalog, PropertiesUpdateSummary, ) @@ -94,6 +95,16 @@ class IcebergNamespaceProperties(SqlCatalogBaseTable): class SqlCatalog(MetastoreCatalog): + """Implementation of a SQL based catalog. + + In the `JDBCCatalog` implementation, a `Namespace` is composed of a list of strings separated by dots: `'ns1.ns2.ns3'`. + And you can have as many levels as you want, but you need at least one. The `SqlCatalog` honors the same convention. + + In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an optional `Namespace` and a table name. + When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`. A valid `TableIdentifier` could be `'name'` (no namespace). + The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`. + """ + def __init__(self, name: str, **properties: str): super().__init__(name, **properties) @@ -136,7 +147,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table: file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( - identifier=(self.name, table_namespace, table_name), + identifier=(self.name,) + Catalog.identifier_to_tuple(table_namespace) + (table_name,), metadata=metadata, metadata_location=metadata_location, io=self._load_file_io(metadata.properties, metadata_location), @@ -173,11 +184,14 @@ def create_table( """ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore - database_name, table_name = self.identifier_to_database_and_table(identifier) - if not self._namespace_exists(database_name): - raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}") + identifier_nocatalog = self.identifier_to_tuple_without_catalog(identifier) + namespace_identifier = Catalog.namespace_from(identifier_nocatalog) + table_name = Catalog.table_name_from(identifier_nocatalog) + if not self._namespace_exists(namespace_identifier): + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace_identifier}") - location = self._resolve_table_location(location, database_name, table_name) + namespace = Catalog.namespace_to_string(namespace_identifier) + location = self._resolve_table_location(location, namespace, table_name) metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties @@ -190,7 +204,7 @@ def create_table( session.add( IcebergTables( catalog_name=self.name, - table_namespace=database_name, + table_namespace=namespace, table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, @@ -198,7 +212,7 @@ def create_table( ) session.commit() except IntegrityError as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e return self.load_table(identifier=identifier) @@ -216,16 +230,19 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: TableAlreadyExistsError: If the table already exists NoSuchNamespaceError: If namespace does not exist """ - database_name, table_name = self.identifier_to_database_and_table(identifier) - if not self._namespace_exists(database_name): - raise NoSuchNamespaceError(f"Namespace does not exist: {database_name}") + identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) + namespace_tuple = Catalog.namespace_from(identifier_tuple) + namespace = Catalog.namespace_to_string(namespace_tuple) + table_name = Catalog.table_name_from(identifier_tuple) + if not self._namespace_exists(namespace): + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") with Session(self.engine) as session: try: session.add( IcebergTables( catalog_name=self.name, - table_namespace=database_name, + table_namespace=namespace, table_name=table_name, metadata_location=metadata_location, previous_metadata_location=None, @@ -233,7 +250,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: ) session.commit() except IntegrityError as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + raise TableAlreadyExistsError(f"Table {namespace}.{table_name} already exists") from e return self.load_table(identifier=identifier) @@ -253,17 +270,19 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: NoSuchTableError: If a table with the name does not exist. """ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + namespace_tuple = Catalog.namespace_from(identifier_tuple) + namespace = Catalog.namespace_to_string(namespace_tuple) + table_name = Catalog.table_name_from(identifier_tuple) with Session(self.engine) as session: stmt = select(IcebergTables).where( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, + IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) result = session.scalar(stmt) if result: return self._convert_orm_to_iceberg(result) - raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") + raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. @@ -275,18 +294,20 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: NoSuchTableError: If a table with the name does not exist. """ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + namespace_tuple = Catalog.namespace_from(identifier_tuple) + namespace = Catalog.namespace_to_string(namespace_tuple) + table_name = Catalog.table_name_from(identifier_tuple) with Session(self.engine) as session: if self.engine.dialect.supports_sane_rowcount: res = session.execute( delete(IcebergTables).where( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, + IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) ) if res.rowcount < 1: - raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") + raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") else: try: tbl = ( @@ -294,14 +315,14 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, + IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, ) .one() ) session.delete(tbl) except NoResultFound as e: - raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e session.commit() def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: @@ -320,10 +341,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U NoSuchNamespaceError: If the target namespace does not exist. """ from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier) - from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError) - to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier) - if not self._namespace_exists(to_database_name): - raise NoSuchNamespaceError(f"Namespace does not exist: {to_database_name}") + to_identifier_tuple = self.identifier_to_tuple_without_catalog(to_identifier) + from_namespace_tuple = Catalog.namespace_from(from_identifier_tuple) + from_namespace = Catalog.namespace_to_string(from_namespace_tuple) + from_table_name = Catalog.table_name_from(from_identifier_tuple) + to_namespace_tuple = Catalog.namespace_from(to_identifier_tuple) + to_namespace = Catalog.namespace_to_string(to_namespace_tuple) + to_table_name = Catalog.table_name_from(to_identifier_tuple) + if not self._namespace_exists(to_namespace): + raise NoSuchNamespaceError(f"Namespace does not exist: {to_namespace}") with Session(self.engine) as session: try: if self.engine.dialect.supports_sane_rowcount: @@ -331,10 +357,10 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U update(IcebergTables) .where( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == from_database_name, + IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, ) - .values(table_namespace=to_database_name, table_name=to_table_name) + .values(table_namespace=to_namespace, table_name=to_table_name) ) result = session.execute(stmt) if result.rowcount < 1: @@ -346,18 +372,18 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == from_database_name, + IcebergTables.table_namespace == from_namespace, IcebergTables.table_name == from_table_name, ) .one() ) - tbl.table_namespace = to_database_name + tbl.table_namespace = to_namespace tbl.table_name = to_table_name except NoResultFound as e: raise NoSuchTableError(f"Table does not exist: {from_table_name}") from e session.commit() except IntegrityError as e: - raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e + raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: @@ -377,7 +403,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) current_table = self.load_table(identifier_tuple) - database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) + namespace_tuple = Catalog.namespace_from(identifier_tuple) + namespace = Catalog.namespace_to_string(namespace_tuple) + table_name = Catalog.table_name_from(identifier_tuple) base_metadata = current_table.metadata for requirement in table_request.requirements: requirement.validate(base_metadata) @@ -398,7 +426,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons update(IcebergTables) .where( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, + IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) @@ -406,7 +434,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) result = session.execute(stmt) if result.rowcount < 1: - raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") + raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}") else: try: tbl = ( @@ -414,7 +442,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons .with_for_update(of=IcebergTables) .filter( IcebergTables.catalog_name == self.name, - IcebergTables.table_namespace == database_name, + IcebergTables.table_namespace == namespace, IcebergTables.table_name == table_name, IcebergTables.metadata_location == current_table.metadata_location, ) @@ -423,13 +451,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons tbl.metadata_location = new_metadata_location tbl.previous_metadata_location = current_table.metadata_location except NoResultFound as e: - raise CommitFailedException(f"Table has been updated by another process: {database_name}.{table_name}") from e + raise CommitFailedException(f"Table has been updated by another process: {namespace}.{table_name}") from e session.commit() return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool: - namespace = self.identifier_to_database(identifier) + namespace_tuple = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError) with Session(self.engine) as session: stmt = ( select(IcebergTables) @@ -462,18 +491,20 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper Raises: NamespaceAlreadyExistsError: If a namespace with the given name already exists. """ + if self._namespace_exists(namespace): + raise NamespaceAlreadyExistsError(f"Namespace {namespace} already exists") + if not properties: properties = IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES - database_name = self.identifier_to_database(namespace) - if self._namespace_exists(database_name): - raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") - create_properties = properties if properties else IcebergNamespaceProperties.NAMESPACE_MINIMAL_PROPERTIES with Session(self.engine) as session: for key, value in create_properties.items(): session.add( IcebergNamespaceProperties( - catalog_name=self.name, namespace=database_name, property_key=key, property_value=value + catalog_name=self.name, + namespace=Catalog.namespace_to_string(namespace, NoSuchNamespaceError), + property_key=key, + property_value=value, ) ) session.commit() @@ -488,16 +519,16 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: NoSuchNamespaceError: If a namespace with the given name does not exist. NamespaceNotEmptyError: If the namespace is not empty. """ - database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) - if self._namespace_exists(database_name): - if tables := self.list_tables(database_name): - raise NamespaceNotEmptyError(f"Database {database_name} is not empty. {len(tables)} tables exist.") + if self._namespace_exists(namespace): + namespace_str = Catalog.namespace_to_string(namespace) + if tables := self.list_tables(namespace): + raise NamespaceNotEmptyError(f"Namespace {namespace_str} is not empty. {len(tables)} tables exist.") with Session(self.engine) as session: session.execute( delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, - IcebergNamespaceProperties.namespace == database_name, + IcebergNamespaceProperties.namespace == namespace_str, ) ) session.commit() @@ -516,14 +547,14 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ - database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) + if namespace and not self._namespace_exists(namespace): + raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") - stmt = select(IcebergTables).where( - IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == database_name - ) + namespace = Catalog.namespace_to_string(namespace) + stmt = select(IcebergTables).where(IcebergTables.catalog_name == self.name, IcebergTables.table_namespace == namespace) with Session(self.engine) as session: result = session.scalars(stmt) - return [(table.table_namespace, table.table_name) for table in result] + return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. @@ -543,15 +574,15 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi table_stmt = select(IcebergTables.table_namespace).where(IcebergTables.catalog_name == self.name) namespace_stmt = select(IcebergNamespaceProperties.namespace).where(IcebergNamespaceProperties.catalog_name == self.name) if namespace: - database_name = self.identifier_to_database(namespace, NoSuchNamespaceError) - table_stmt = table_stmt.where(IcebergTables.table_namespace.like(database_name)) - namespace_stmt = namespace_stmt.where(IcebergNamespaceProperties.namespace.like(database_name)) + namespace_str = Catalog.namespace_to_string(namespace, NoSuchNamespaceError) + table_stmt = table_stmt.where(IcebergTables.table_namespace.like(namespace_str)) + namespace_stmt = namespace_stmt.where(IcebergNamespaceProperties.namespace.like(namespace_str)) stmt = union( table_stmt, namespace_stmt, ) with Session(self.engine) as session: - return [self.identifier_to_tuple(namespace_col) for namespace_col in session.execute(stmt).scalars()] + return [Catalog.identifier_to_tuple(namespace_col) for namespace_col in session.execute(stmt).scalars()] def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -565,12 +596,12 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper Raises: NoSuchNamespaceError: If a namespace with the given name does not exist. """ - database_name = self.identifier_to_database(namespace) - if not self._namespace_exists(database_name): - raise NoSuchNamespaceError(f"Database {database_name} does not exists") + namespace_str = Catalog.namespace_to_string(namespace) + if not self._namespace_exists(namespace): + raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists") stmt = select(IcebergNamespaceProperties).where( - IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == database_name + IcebergNamespaceProperties.catalog_name == self.name, IcebergNamespaceProperties.namespace == namespace_str ) with Session(self.engine) as session: result = session.scalars(stmt) @@ -590,9 +621,9 @@ def update_namespace_properties( NoSuchNamespaceError: If a namespace with the given name does not exist. ValueError: If removals and updates have overlapping keys. """ - database_name = self.identifier_to_database(namespace) - if not self._namespace_exists(database_name): - raise NoSuchNamespaceError(f"Database {database_name} does not exists") + namespace_str = Catalog.namespace_to_string(namespace) + if not self._namespace_exists(namespace): + raise NoSuchNamespaceError(f"Namespace {namespace_str} does not exists") current_properties = self.load_namespace_properties(namespace=namespace) properties_update_summary = self._get_updated_props_and_update_summary( @@ -603,7 +634,7 @@ def update_namespace_properties( if removals: delete_stmt = delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, - IcebergNamespaceProperties.namespace == database_name, + IcebergNamespaceProperties.namespace == namespace_str, IcebergNamespaceProperties.property_key.in_(removals), ) session.execute(delete_stmt) @@ -614,14 +645,14 @@ def update_namespace_properties( # This is not a problem since it runs in a single transaction delete_stmt = delete(IcebergNamespaceProperties).where( IcebergNamespaceProperties.catalog_name == self.name, - IcebergNamespaceProperties.namespace == database_name, + IcebergNamespaceProperties.namespace == namespace_str, IcebergNamespaceProperties.property_key.in_(set(updates.keys())), ) session.execute(delete_stmt) insert_stmt = insert(IcebergNamespaceProperties) for property_key, property_value in updates.items(): insert_stmt = insert_stmt.values( - catalog_name=self.name, namespace=database_name, property_key=property_key, property_value=property_value + catalog_name=self.name, namespace=namespace_str, property_key=property_key, property_value=property_value ) session.execute(insert_stmt) session.commit() diff --git a/pyiceberg/cli/console.py b/pyiceberg/cli/console.py index 0fbda10960..d1833df081 100644 --- a/pyiceberg/cli/console.py +++ b/pyiceberg/cli/console.py @@ -112,9 +112,13 @@ def list(ctx: Context, parent: Optional[str]) -> None: # pylint: disable=redefi """List tables or namespaces.""" catalog, output = _catalog_and_output(ctx) - identifiers = catalog.list_namespaces(parent or ()) - if not identifiers and parent: + identifiers = [] + if parent: + # Do we have tables under parent namespace? identifiers = catalog.list_tables(parent) + if not identifiers: + # List hierarchical namespaces if parent, root namespaces otherwise. + identifiers = catalog.list_namespaces(parent or ()) output.identifiers(identifiers) diff --git a/tests/catalog/test_sql.py b/tests/catalog/test_sql.py index efa7b746a9..285cfd9ab9 100644 --- a/tests/catalog/test_sql.py +++ b/tests/catalog/test_sql.py @@ -17,7 +17,7 @@ import os from pathlib import Path -from typing import Generator, List +from typing import Any, Generator, List import pyarrow as pa import pytest @@ -25,6 +25,9 @@ from pytest_lazyfixture import lazy_fixture from sqlalchemy.exc import ArgumentError, IntegrityError +from pyiceberg.catalog import ( + Catalog, +) from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import ( CommitFailedException, @@ -52,51 +55,90 @@ from pyiceberg.types import IntegerType -@pytest.fixture(name="random_identifier") -def fixture_random_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: +@pytest.fixture(scope="module") +def catalog_name() -> str: + return "test_sql_catalog" + + +@pytest.fixture(name="random_table_identifier") +def fixture_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", exist_ok=True) return database_name, table_name -@pytest.fixture(name="another_random_identifier") -def fixture_another_random_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: +@pytest.fixture(name="random_table_identifier_with_catalog") +def fixture_random_table_identifier_with_catalog( + warehouse: Path, catalog_name: str, database_name: str, table_name: str +) -> Identifier: + os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", exist_ok=True) + return catalog_name, database_name, table_name + + +@pytest.fixture(name="another_random_table_identifier") +def fixture_another_random_table_identifier(warehouse: Path, database_name: str, table_name: str) -> Identifier: database_name = database_name + "_new" table_name = table_name + "_new" os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", exist_ok=True) return database_name, table_name +@pytest.fixture(name="another_random_table_identifier_with_catalog") +def fixture_another_random_table_identifier_with_catalog( + warehouse: Path, catalog_name: str, database_name: str, table_name: str +) -> Identifier: + database_name = database_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{database_name}.db/{table_name}/metadata/", exist_ok=True) + return catalog_name, database_name, table_name + + +@pytest.fixture(name="random_hierarchical_identifier") +def fixture_random_hierarchical_identifier(warehouse: Path, hierarchical_namespace_name: str, table_name: str) -> Identifier: + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + +@pytest.fixture(name="another_random_hierarchical_identifier") +def fixture_another_random_hierarchical_identifier( + warehouse: Path, hierarchical_namespace_name: str, table_name: str +) -> Identifier: + hierarchical_namespace_name = hierarchical_namespace_name + "_new" + table_name = table_name + "_new" + os.makedirs(f"{warehouse}/{hierarchical_namespace_name}.db/{table_name}/metadata/", exist_ok=True) + return Catalog.identifier_to_tuple(".".join((hierarchical_namespace_name, table_name))) + + @pytest.fixture(scope="module") -def catalog_memory(warehouse: Path) -> Generator[SqlCatalog, None, None]: +def catalog_memory(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { "uri": "sqlite:///:memory:", "warehouse": f"file://{warehouse}", } - catalog = SqlCatalog("test_sql_catalog", **props) + catalog = SqlCatalog(catalog_name, **props) catalog.create_tables() yield catalog catalog.destroy_tables() @pytest.fixture(scope="module") -def catalog_sqlite(warehouse: Path) -> Generator[SqlCatalog, None, None]: +def catalog_sqlite(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { "uri": f"sqlite:////{warehouse}/sql-catalog.db", "warehouse": f"file://{warehouse}", } - catalog = SqlCatalog("test_sql_catalog", **props) + catalog = SqlCatalog(catalog_name, **props) catalog.create_tables() yield catalog catalog.destroy_tables() @pytest.fixture(scope="module") -def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, None, None]: +def catalog_sqlite_without_rowcount(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { "uri": f"sqlite:////{warehouse}/sql-catalog.db", "warehouse": f"file://{warehouse}", } - catalog = SqlCatalog("test_sql_catalog", **props) + catalog = SqlCatalog(catalog_name, **props) catalog.engine.dialect.supports_sane_rowcount = False catalog.create_tables() yield catalog @@ -104,26 +146,26 @@ def catalog_sqlite_without_rowcount(warehouse: Path) -> Generator[SqlCatalog, No @pytest.fixture(scope="module") -def catalog_sqlite_fsspec(warehouse: Path) -> Generator[SqlCatalog, None, None]: +def catalog_sqlite_fsspec(catalog_name: str, warehouse: Path) -> Generator[SqlCatalog, None, None]: props = { "uri": f"sqlite:////{warehouse}/sql-catalog.db", "warehouse": f"file://{warehouse}", PY_IO_IMPL: FSSPEC_FILE_IO, } - catalog = SqlCatalog("test_sql_catalog", **props) + catalog = SqlCatalog(catalog_name, **props) catalog.create_tables() yield catalog catalog.destroy_tables() -def test_creation_with_no_uri() -> None: +def test_creation_with_no_uri(catalog_name: str) -> None: with pytest.raises(NoSuchPropertyException): - SqlCatalog("test_ddb_catalog", not_uri="unused") + SqlCatalog(catalog_name, not_uri="unused") -def test_creation_with_unsupported_uri() -> None: +def test_creation_with_unsupported_uri(catalog_name: str) -> None: with pytest.raises(ArgumentError): - SqlCatalog("test_ddb_catalog", uri="unsupported:xxx") + SqlCatalog(catalog_name, uri="unsupported:xxx") @pytest.mark.parametrize( @@ -146,13 +188,22 @@ def test_create_tables_idempotency(catalog: SqlCatalog) -> None: lazy_fixture('catalog_sqlite'), ], ) -def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) assert table.sort_order().order_id == 0, "Order ID must match" assert table.sort_order().is_unsorted is True, "Order must be unsorted" - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -162,15 +213,24 @@ def test_create_table_default_sort_order(catalog: SqlCatalog, table_schema_neste lazy_fixture('catalog_sqlite'), ], ) -def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested, properties={"format-version": "1"}) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested, properties={"format-version": "1"}) assert table.sort_order().order_id == 0, "Order ID must match" assert table.sort_order().is_unsorted is True, "Order must be unsorted" assert table.format_version == 1 assert table.spec() == UNPARTITIONED_PARTITION_SPEC - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -180,17 +240,26 @@ def test_create_v1_table(catalog: SqlCatalog, table_schema_nested: Schema, rando lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) def test_create_table_with_pyarrow_schema( catalog: SqlCatalog, pyarrow_schema_simple_without_ids: pa.Schema, iceberg_table_schema_simple: Schema, - random_identifier: Identifier, + table_identifier: Identifier, ) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, pyarrow_schema_simple_without_ids) + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, pyarrow_schema_simple_without_ids) assert table.schema() == iceberg_table_schema_simple - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -200,7 +269,15 @@ def test_create_table_with_pyarrow_schema( lazy_fixture('catalog_sqlite'), ], ) -def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: Identifier) -> None: +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_write_pyarrow_schema(catalog: SqlCatalog, table_identifier: Identifier) -> None: import pyarrow as pa pyarrow_table = pa.Table.from_arrays( @@ -217,9 +294,10 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: Identifier pa.field('large', pa.large_string(), nullable=True), ]), ) - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, pyarrow_table.schema) + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, pyarrow_table.schema) table.overwrite(pyarrow_table) @@ -230,18 +308,27 @@ def test_write_pyarrow_schema(catalog: SqlCatalog, random_identifier: Identifier lazy_fixture('catalog_sqlite'), ], ) -def test_create_table_custom_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_table_custom_sort_order(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) order = SortOrder(SortField(source_id=2, transform=IdentityTransform(), null_order=NullOrder.NULLS_FIRST)) - table = catalog.create_table(random_identifier, table_schema_nested, sort_order=order) + table = catalog.create_table(table_identifier, table_schema_nested, sort_order=order) given_sort_order = table.sort_order() assert given_sort_order.order_id == 1, "Order ID must match" assert len(given_sort_order.fields) == 1, "Order must have 1 field" assert given_sort_order.fields[0].direction == SortDirection.ASC, "Direction must match" assert given_sort_order.fields[0].null_order == NullOrder.NULLS_FIRST, "Null order must match" assert isinstance(given_sort_order.fields[0].transform, IdentityTransform), "Transform must match" - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -251,17 +338,26 @@ def test_create_table_custom_sort_order(catalog: SqlCatalog, table_schema_nested lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) def test_create_table_with_default_warehouse_location( - warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier + warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier ) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - catalog.create_table(random_identifier, table_schema_nested) - table = catalog.load_table(random_identifier) - assert table.identifier == (catalog.name,) + random_identifier + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + catalog.create_table(table_identifier, table_schema_nested) + table = catalog.load_table(table_identifier) + assert table.identifier == (catalog.name,) + table_identifier_nocatalog assert table.metadata_location.startswith(f"file://{warehouse}") assert os.path.exists(table.metadata_location[len("file://") :]) - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -271,19 +367,29 @@ def test_create_table_with_default_warehouse_location( lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) def test_create_table_with_given_location_removes_trailing_slash( - warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier + warehouse: Path, catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier ) -> None: - database_name, table_name = random_identifier - location = f"file://{warehouse}/{database_name}.db/{table_name}-given" - catalog.create_namespace(database_name) - catalog.create_table(random_identifier, table_schema_nested, location=f"{location}/") - table = catalog.load_table(random_identifier) - assert table.identifier == (catalog.name,) + random_identifier + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + table_name = Catalog.table_name_from(table_identifier_nocatalog) + location = f"file://{warehouse}/{catalog.name}.db/{table_name}-given" + catalog.create_namespace(namespace) + catalog.create_table(table_identifier, table_schema_nested, location=f"{location}/") + table = catalog.load_table(table_identifier) + assert table.identifier == (catalog.name,) + table_identifier_nocatalog assert table.metadata_location.startswith(f"file://{warehouse}") assert os.path.exists(table.metadata_location[len("file://") :]) assert table.location() == location - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -293,12 +399,21 @@ def test_create_table_with_given_location_removes_trailing_slash( lazy_fixture('catalog_sqlite'), ], ) -def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - catalog.create_table(random_identifier, table_schema_nested) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + catalog.create_table(table_identifier, table_schema_nested) with pytest.raises(TableAlreadyExistsError): - catalog.create_table(random_identifier, table_schema_nested) + catalog.create_table(table_identifier, table_schema_nested) @pytest.mark.parametrize( @@ -308,13 +423,22 @@ def test_create_duplicated_table(catalog: SqlCatalog, table_schema_nested: Schem lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) def test_create_table_if_not_exists_duplicated_table( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier ) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table1 = catalog.create_table(random_identifier, table_schema_nested) - table2 = catalog.create_table_if_not_exists(random_identifier, table_schema_nested) + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table1 = catalog.create_table(table_identifier, table_schema_nested) + table2 = catalog.create_table_if_not_exists(table_identifier, table_schema_nested) assert table1.identifier == table2.identifier @@ -339,7 +463,7 @@ def test_create_table_with_non_existing_namespace(catalog: SqlCatalog, table_sch ], ) def test_create_table_without_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_name: str) -> None: - with pytest.raises(ValueError): + with pytest.raises(NoSuchNamespaceError): catalog.create_table(table_name, table_schema_nested) @@ -350,14 +474,23 @@ def test_create_table_without_namespace(catalog: SqlCatalog, table_schema_nested lazy_fixture('catalog_sqlite'), ], ) -def test_register_table(catalog: SqlCatalog, random_identifier: Identifier, metadata_location: str) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.register_table(random_identifier, metadata_location) - assert table.identifier == (catalog.name,) + random_identifier +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_register_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.register_table(table_identifier, metadata_location) + assert table.identifier == (catalog.name,) + table_identifier_nocatalog assert table.metadata_location == metadata_location assert os.path.exists(metadata_location) - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -367,12 +500,21 @@ def test_register_table(catalog: SqlCatalog, random_identifier: Identifier, meta lazy_fixture('catalog_sqlite'), ], ) -def test_register_existing_table(catalog: SqlCatalog, random_identifier: Identifier, metadata_location: str) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - catalog.register_table(random_identifier, metadata_location) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_register_existing_table(catalog: SqlCatalog, table_identifier: Identifier, metadata_location: str) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + catalog.register_table(table_identifier, metadata_location) with pytest.raises(TableAlreadyExistsError): - catalog.register_table(random_identifier, metadata_location) + catalog.register_table(table_identifier, metadata_location) @pytest.mark.parametrize( @@ -407,11 +549,20 @@ def test_register_table_without_namespace(catalog: SqlCatalog, metadata_location lazy_fixture('catalog_sqlite'), ], ) -def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - loaded_table = catalog.load_table(random_identifier) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) + loaded_table = catalog.load_table(table_identifier) assert table.identifier == loaded_table.identifier assert table.metadata_location == loaded_table.metadata_location assert table.metadata == loaded_table.metadata @@ -424,12 +575,21 @@ def test_load_table(catalog: SqlCatalog, table_schema_nested: Schema, random_ide lazy_fixture('catalog_sqlite'), ], ) -def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - intermediate = catalog.load_table(random_identifier) - assert intermediate.identifier == (catalog.name,) + random_identifier +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) + intermediate = catalog.load_table(table_identifier) + assert intermediate.identifier == (catalog.name,) + table_identifier_nocatalog loaded_table = catalog.load_table(intermediate.identifier) assert table.identifier == loaded_table.identifier assert table.metadata_location == loaded_table.metadata_location @@ -444,14 +604,23 @@ def test_load_table_from_self_identifier(catalog: SqlCatalog, table_schema_neste lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier - catalog.drop_table(random_identifier) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + table_identifier_nocatalog + catalog.drop_table(table_identifier) with pytest.raises(NoSuchTableError): - catalog.load_table(random_identifier) + catalog.load_table(table_identifier) @pytest.mark.parametrize( @@ -462,16 +631,25 @@ def test_drop_table(catalog: SqlCatalog, table_schema_nested: Schema, random_ide lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + table_identifier_nocatalog catalog.drop_table(table.identifier) with pytest.raises(NoSuchTableError): catalog.load_table(table.identifier) with pytest.raises(NoSuchTableError): - catalog.load_table(random_identifier) + catalog.load_table(table_identifier) @pytest.mark.parametrize( @@ -482,9 +660,17 @@ def test_drop_table_from_self_identifier(catalog: SqlCatalog, table_schema_neste lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_drop_table_that_does_not_exist(catalog: SqlCatalog, random_identifier: Identifier) -> None: +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_drop_table_that_does_not_exist(catalog: SqlCatalog, table_identifier: Identifier) -> None: with pytest.raises(NoSuchTableError): - catalog.drop_table(random_identifier) + catalog.drop_table(table_identifier) @pytest.mark.parametrize( @@ -495,21 +681,39 @@ def test_drop_table_that_does_not_exist(catalog: SqlCatalog, random_identifier: lazy_fixture('catalog_sqlite_without_rowcount'), ], ) +@pytest.mark.parametrize( + "from_table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "to_table_identifier", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) def test_rename_table( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier ) -> None: - from_database_name, _from_table_name = random_identifier - to_database_name, _to_table_name = another_random_identifier - catalog.create_namespace(from_database_name) - catalog.create_namespace(to_database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier - catalog.rename_table(random_identifier, another_random_identifier) - new_table = catalog.load_table(another_random_identifier) - assert new_table.identifier == (catalog.name,) + another_random_identifier + from_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(from_table_identifier) + to_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(to_table_identifier) + from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog) + to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(from_table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + from_table_identifier_nocatalog + catalog.rename_table(from_table_identifier, to_table_identifier) + new_table = catalog.load_table(to_table_identifier) + assert new_table.identifier == (catalog.name,) + to_table_identifier_nocatalog assert new_table.metadata_location == table.metadata_location with pytest.raises(NoSuchTableError): - catalog.load_table(random_identifier) + catalog.load_table(from_table_identifier) @pytest.mark.parametrize( @@ -520,23 +724,41 @@ def test_rename_table( lazy_fixture('catalog_sqlite_without_rowcount'), ], ) +@pytest.mark.parametrize( + "from_table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "to_table_identifier", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) def test_rename_table_from_self_identifier( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier ) -> None: - from_database_name, _from_table_name = random_identifier - to_database_name, _to_table_name = another_random_identifier - catalog.create_namespace(from_database_name) - catalog.create_namespace(to_database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier - catalog.rename_table(table.identifier, another_random_identifier) - new_table = catalog.load_table(another_random_identifier) - assert new_table.identifier == (catalog.name,) + another_random_identifier + from_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(from_table_identifier) + to_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(to_table_identifier) + from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog) + to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(from_table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + from_table_identifier_nocatalog + catalog.rename_table(table.identifier, to_table_identifier) + new_table = catalog.load_table(to_table_identifier) + assert new_table.identifier == (catalog.name,) + to_table_identifier_nocatalog assert new_table.metadata_location == table.metadata_location with pytest.raises(NoSuchTableError): catalog.load_table(table.identifier) with pytest.raises(NoSuchTableError): - catalog.load_table(random_identifier) + catalog.load_table(from_table_identifier) @pytest.mark.parametrize( @@ -547,19 +769,37 @@ def test_rename_table_from_self_identifier( lazy_fixture('catalog_sqlite_without_rowcount'), ], ) +@pytest.mark.parametrize( + "from_table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "to_table_identifier", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) def test_rename_table_to_existing_one( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier ) -> None: - from_database_name, _from_table_name = random_identifier - to_database_name, _to_table_name = another_random_identifier - catalog.create_namespace(from_database_name) - catalog.create_namespace(to_database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier - new_table = catalog.create_table(another_random_identifier, table_schema_nested) - assert new_table.identifier == (catalog.name,) + another_random_identifier + from_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(from_table_identifier) + to_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(to_table_identifier) + from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog) + to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog) + catalog.create_namespace(from_namespace) + catalog.create_namespace(to_namespace) + table = catalog.create_table(from_table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + from_table_identifier_nocatalog + new_table = catalog.create_table(to_table_identifier, table_schema_nested) + assert new_table.identifier == (catalog.name,) + to_table_identifier_nocatalog with pytest.raises(TableAlreadyExistsError): - catalog.rename_table(random_identifier, another_random_identifier) + catalog.rename_table(from_table_identifier, to_table_identifier) @pytest.mark.parametrize( @@ -570,11 +810,28 @@ def test_rename_table_to_existing_one( lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_rename_missing_table(catalog: SqlCatalog, random_identifier: Identifier, another_random_identifier: Identifier) -> None: - to_database_name, _to_table_name = another_random_identifier - catalog.create_namespace(to_database_name) +@pytest.mark.parametrize( + "from_table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "to_table_identifier", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) +def test_rename_missing_table(catalog: SqlCatalog, from_table_identifier: Identifier, to_table_identifier: Identifier) -> None: + to_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(to_table_identifier) + to_namespace = Catalog.namespace_from(to_table_identifier_nocatalog) + catalog.create_namespace(to_namespace) with pytest.raises(NoSuchTableError): - catalog.rename_table(random_identifier, another_random_identifier) + catalog.rename_table(from_table_identifier, to_table_identifier) @pytest.mark.parametrize( @@ -585,15 +842,32 @@ def test_rename_missing_table(catalog: SqlCatalog, random_identifier: Identifier lazy_fixture('catalog_sqlite_without_rowcount'), ], ) +@pytest.mark.parametrize( + "from_table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "to_table_identifier", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) def test_rename_table_to_missing_namespace( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, from_table_identifier: Identifier, to_table_identifier: Identifier ) -> None: - from_database_name, _from_table_name = random_identifier - catalog.create_namespace(from_database_name) - table = catalog.create_table(random_identifier, table_schema_nested) - assert table.identifier == (catalog.name,) + random_identifier + from_table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(from_table_identifier) + from_namespace = Catalog.namespace_from(from_table_identifier_nocatalog) + catalog.create_namespace(from_namespace) + table = catalog.create_table(from_table_identifier, table_schema_nested) + assert table.identifier == (catalog.name,) + from_table_identifier_nocatalog with pytest.raises(NoSuchNamespaceError): - catalog.rename_table(random_identifier, another_random_identifier) + catalog.rename_table(from_table_identifier, to_table_identifier) @pytest.mark.parametrize( @@ -603,22 +877,40 @@ def test_rename_table_to_missing_namespace( lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize( + "table_identifier_1", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +@pytest.mark.parametrize( + "table_identifier_2", + [ + lazy_fixture("another_random_table_identifier"), + lazy_fixture("another_random_hierarchical_identifier"), + lazy_fixture("another_random_table_identifier_with_catalog"), + ], +) def test_list_tables( - catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier, another_random_identifier: Identifier + catalog: SqlCatalog, table_schema_nested: Schema, table_identifier_1: Identifier, table_identifier_2: Identifier ) -> None: - database_name_1, _table_name_1 = random_identifier - database_name_2, _table_name_2 = another_random_identifier - catalog.create_namespace(database_name_1) - catalog.create_namespace(database_name_2) - catalog.create_table(random_identifier, table_schema_nested) - catalog.create_table(another_random_identifier, table_schema_nested) - identifier_list = catalog.list_tables(database_name_1) + table_identifier_1_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier_1) + table_identifier_2_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier_2) + namespace_1 = Catalog.namespace_from(table_identifier_1_nocatalog) + namespace_2 = Catalog.namespace_from(table_identifier_2_nocatalog) + catalog.create_namespace(namespace_1) + catalog.create_namespace(namespace_2) + catalog.create_table(table_identifier_1, table_schema_nested) + catalog.create_table(table_identifier_2, table_schema_nested) + identifier_list = catalog.list_tables(namespace_1) assert len(identifier_list) == 1 - assert random_identifier in identifier_list + assert table_identifier_1_nocatalog in identifier_list - identifier_list = catalog.list_tables(database_name_2) + identifier_list = catalog.list_tables(namespace_2) assert len(identifier_list) == 1 - assert another_random_identifier in identifier_list + assert table_identifier_2_nocatalog in identifier_list @pytest.mark.parametrize( @@ -628,9 +920,10 @@ def test_list_tables( lazy_fixture('catalog_sqlite'), ], ) -def test_create_namespace(catalog: SqlCatalog, database_name: str) -> None: - catalog.create_namespace(database_name) - assert (database_name,) in catalog.list_namespaces() +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_list_tables_when_missing_namespace(catalog: SqlCatalog, namespace: str) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.list_tables(namespace) @pytest.mark.parametrize( @@ -654,10 +947,24 @@ def test_create_namespace_if_not_exists(catalog: SqlCatalog, database_name: str) lazy_fixture('catalog_sqlite'), ], ) -def test_create_duplicate_namespace(catalog: SqlCatalog, database_name: str) -> None: - catalog.create_namespace(database_name) +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_create_namespace(catalog: SqlCatalog, namespace: str) -> None: + catalog.create_namespace(namespace) + assert (Catalog.identifier_to_tuple(namespace)) in catalog.list_namespaces() + + +@pytest.mark.parametrize( + 'catalog', + [ + lazy_fixture('catalog_memory'), + lazy_fixture('catalog_sqlite'), + ], +) +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_create_duplicate_namespace(catalog: SqlCatalog, namespace: str) -> None: + catalog.create_namespace(namespace) with pytest.raises(NamespaceAlreadyExistsError): - catalog.create_namespace(database_name) + catalog.create_namespace(namespace) @pytest.mark.parametrize( @@ -667,10 +974,11 @@ def test_create_duplicate_namespace(catalog: SqlCatalog, database_name: str) -> lazy_fixture('catalog_sqlite'), ], ) -def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, database_name: str) -> None: - catalog.create_namespace(database_name + "_1") +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, namespace: str) -> None: + catalog.create_namespace(namespace + "_1") # Second namespace is a prefix of the first one, make sure it can be added. - catalog.create_namespace(database_name) + catalog.create_namespace(namespace) @pytest.mark.parametrize( @@ -680,16 +988,17 @@ def test_create_namespaces_sharing_same_prefix(catalog: SqlCatalog, database_nam lazy_fixture('catalog_sqlite'), ], ) -def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, database_name: str) -> None: +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, namespace: str) -> None: test_location = "/test/location" test_properties = { "comment": "this is a test description", "location": test_location, } - catalog.create_namespace(namespace=database_name, properties=test_properties) + catalog.create_namespace(namespace=namespace, properties=test_properties) loaded_database_list = catalog.list_namespaces() - assert (database_name,) in loaded_database_list - properties = catalog.load_namespace_properties(database_name) + assert Catalog.identifier_to_tuple(namespace) in loaded_database_list + properties = catalog.load_namespace_properties(namespace) assert properties["comment"] == "this is a test description" assert properties["location"] == test_location @@ -701,13 +1010,27 @@ def test_create_namespace_with_comment_and_location(catalog: SqlCatalog, databas lazy_fixture('catalog_sqlite'), ], ) +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) @pytest.mark.filterwarnings("ignore") -def test_create_namespace_with_null_properties(catalog: SqlCatalog, database_name: str) -> None: +def test_create_namespace_with_null_properties(catalog: SqlCatalog, namespace: str) -> None: with pytest.raises(IntegrityError): - catalog.create_namespace(namespace=database_name, properties={None: "value"}) # type: ignore + catalog.create_namespace(namespace=namespace, properties={None: "value"}) # type: ignore with pytest.raises(IntegrityError): - catalog.create_namespace(namespace=database_name, properties={"key": None}) + catalog.create_namespace(namespace=namespace, properties={"key": None}) + + +@pytest.mark.parametrize( + 'catalog', + [ + lazy_fixture('catalog_memory'), + lazy_fixture('catalog_sqlite'), + ], +) +@pytest.mark.parametrize("empty_namespace", ["", (), (""), ("", ""), " ", (" ")]) +def test_create_namespace_with_empty_identifier(catalog: SqlCatalog, empty_namespace: Any) -> None: + with pytest.raises(NoSuchNamespaceError): + catalog.create_namespace(empty_namespace) @pytest.mark.parametrize( @@ -717,13 +1040,17 @@ def test_create_namespace_with_null_properties(catalog: SqlCatalog, database_nam lazy_fixture('catalog_sqlite'), ], ) -def test_list_namespaces(catalog: SqlCatalog, database_list: List[str]) -> None: - for database_name in database_list: - catalog.create_namespace(database_name) - db_list = catalog.list_namespaces() - for database_name in database_list: - assert (database_name,) in db_list - assert len(catalog.list_namespaces(database_name)) == 1 +@pytest.mark.parametrize("namespace_list", [lazy_fixture("database_list"), lazy_fixture("hierarchical_namespace_list")]) +def test_list_namespaces(catalog: SqlCatalog, namespace_list: List[str]) -> None: + for namespace in namespace_list: + catalog.create_namespace(namespace) + # Test global list + ns_list = catalog.list_namespaces() + for namespace in namespace_list: + assert Catalog.identifier_to_tuple(namespace) in ns_list + # Test individual namespace list + assert len(one_namespace := catalog.list_namespaces(namespace)) == 1 + assert Catalog.identifier_to_tuple(namespace) == one_namespace[0] @pytest.mark.parametrize( @@ -745,16 +1072,25 @@ def test_list_non_existing_namespaces(catalog: SqlCatalog) -> None: lazy_fixture('catalog_sqlite'), ], ) -def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, table_name = random_identifier - catalog.create_namespace(database_name) - assert (database_name,) in catalog.list_namespaces() - catalog.create_table((database_name, table_name), table_schema_nested) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + assert namespace in catalog.list_namespaces() + catalog.create_table(table_identifier, table_schema_nested) with pytest.raises(NamespaceNotEmptyError): - catalog.drop_namespace(database_name) - catalog.drop_table((database_name, table_name)) - catalog.drop_namespace(database_name) - assert (database_name,) not in catalog.list_namespaces() + catalog.drop_namespace(namespace) + catalog.drop_table(table_identifier) + catalog.drop_namespace(namespace) + assert namespace not in catalog.list_namespaces() @pytest.mark.parametrize( @@ -764,18 +1100,19 @@ def test_drop_namespace(catalog: SqlCatalog, table_schema_nested: Schema, random lazy_fixture('catalog_sqlite'), ], ) -def test_load_namespace_properties(catalog: SqlCatalog, database_name: str) -> None: +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_load_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: warehouse_location = "/test/location" test_properties = { "comment": "this is a test description", - "location": f"{warehouse_location}/{database_name}.db", + "location": f"{warehouse_location}/{namespace}.db", "test_property1": "1", "test_property2": "2", "test_property3": "3", } - catalog.create_namespace(database_name, test_properties) - listed_properties = catalog.load_namespace_properties(database_name) + catalog.create_namespace(namespace, test_properties) + listed_properties = catalog.load_namespace_properties(namespace) for k, v in listed_properties.items(): assert k in test_properties assert v == test_properties[k] @@ -788,9 +1125,10 @@ def test_load_namespace_properties(catalog: SqlCatalog, database_name: str) -> N lazy_fixture('catalog_sqlite'), ], ) -def test_load_empty_namespace_properties(catalog: SqlCatalog, database_name: str) -> None: - catalog.create_namespace(database_name) - listed_properties = catalog.load_namespace_properties(database_name) +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_load_empty_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: + catalog.create_namespace(namespace) + listed_properties = catalog.load_namespace_properties(namespace) assert listed_properties == {"exists": "true"} @@ -813,19 +1151,20 @@ def test_load_namespace_properties_non_existing_namespace(catalog: SqlCatalog) - lazy_fixture('catalog_sqlite'), ], ) -def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) -> None: +@pytest.mark.parametrize("namespace", [lazy_fixture("database_name"), lazy_fixture("hierarchical_namespace_name")]) +def test_update_namespace_properties(catalog: SqlCatalog, namespace: str) -> None: warehouse_location = "/test/location" test_properties = { "comment": "this is a test description", - "location": f"{warehouse_location}/{database_name}.db", + "location": f"{warehouse_location}/{namespace}.db", "test_property1": "1", "test_property2": "2", "test_property3": "3", } removals = {"test_property1", "test_property2", "test_property3", "should_not_removed"} updates = {"test_property4": "4", "test_property5": "5", "comment": "updated test description"} - catalog.create_namespace(database_name, test_properties) - update_report = catalog.update_namespace_properties(database_name, removals, updates) + catalog.create_namespace(namespace, test_properties) + update_report = catalog.update_namespace_properties(namespace, removals, updates) for k in updates.keys(): assert k in update_report.updated for k in removals: @@ -833,7 +1172,7 @@ def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) -> assert k in update_report.missing else: assert k in update_report.removed - assert "updated test description" == catalog.load_namespace_properties(database_name)["comment"] + assert "updated test description" == catalog.load_namespace_properties(namespace)["comment"] @pytest.mark.parametrize( @@ -844,10 +1183,19 @@ def test_update_namespace_properties(catalog: SqlCatalog, database_name: str) -> lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_nested) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_nested) assert catalog._parse_metadata_version(table.metadata_location) == 0 assert table.metadata.current_schema_id == 0 @@ -878,10 +1226,19 @@ def test_commit_table(catalog: SqlCatalog, table_schema_nested: Schema, random_i lazy_fixture('catalog_sqlite_fsspec'), ], ) -def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table = catalog.create_table(random_identifier, table_schema_simple) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table = catalog.create_table(table_identifier, table_schema_simple) df = pa.Table.from_pydict( { @@ -918,11 +1275,20 @@ def test_append_table(catalog: SqlCatalog, table_schema_simple: Schema, random_i lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - table_a = catalog.create_table(random_identifier, table_schema_simple) - table_b = catalog.load_table(random_identifier) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + table_a = catalog.create_table(table_identifier, table_schema_simple) + table_b = catalog.load_table(table_identifier) with table_a.update_schema() as update: update.add_column(path="b", field_type=IntegerType()) @@ -992,12 +1358,21 @@ def test_write_and_evolve(catalog: SqlCatalog, format_version: int) -> None: lazy_fixture('catalog_sqlite_without_rowcount'), ], ) -def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: # table properties can be set to int, but still serialized to string - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) property_with_int = {"property_name": 42} - table = catalog.create_table(random_identifier, table_schema_simple, properties=property_with_int) + table = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_int) assert isinstance(table.properties["property_name"], str) @@ -1009,14 +1384,23 @@ def test_table_properties_int_value(catalog: SqlCatalog, table_schema_simple: Sc lazy_fixture('catalog_sqlite_without_rowcount'), ], ) +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) def test_table_properties_raise_for_none_value( - catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier + catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier ) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) property_with_none = {"property_name": None} with pytest.raises(ValidationError) as exc_info: - _ = catalog.create_table(random_identifier, table_schema_simple, properties=property_with_none) + _ = catalog.create_table(table_identifier, table_schema_simple, properties=property_with_none) assert "None type is not a supported value in properties: property_name" in str(exc_info.value) @@ -1027,11 +1411,20 @@ def test_table_properties_raise_for_none_value( lazy_fixture('catalog_sqlite'), ], ) -def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None: - database_name, _table_name = random_identifier - catalog.create_namespace(database_name) - catalog.create_table(random_identifier, table_schema_simple, properties={"format-version": "2"}) - existing_table = random_identifier +@pytest.mark.parametrize( + "table_identifier", + [ + lazy_fixture("random_table_identifier"), + lazy_fixture("random_hierarchical_identifier"), + lazy_fixture("random_table_identifier_with_catalog"), + ], +) +def test_table_exists(catalog: SqlCatalog, table_schema_simple: Schema, table_identifier: Identifier) -> None: + table_identifier_nocatalog = catalog.identifier_to_tuple_without_catalog(table_identifier) + namespace = Catalog.namespace_from(table_identifier_nocatalog) + catalog.create_namespace(namespace) + catalog.create_table(table_identifier, table_schema_simple, properties={"format-version": "2"}) + existing_table = table_identifier # Act and Assert for an existing table assert catalog.table_exists(existing_table) is True diff --git a/tests/conftest.py b/tests/conftest.py index 6679543694..4baefafef4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1878,6 +1878,19 @@ def database_list(database_name: str) -> List[str]: return [f"{database_name}_{idx}" for idx in range(NUM_TABLES)] +@pytest.fixture() +def hierarchical_namespace_name() -> str: + prefix = "my_iceberg_ns-" + random_tag1 = "".join(choice(string.ascii_letters) for _ in range(RANDOM_LENGTH)) + random_tag2 = "".join(choice(string.ascii_letters) for _ in range(RANDOM_LENGTH)) + return ".".join([prefix + random_tag1, prefix + random_tag2]).lower() + + +@pytest.fixture() +def hierarchical_namespace_list(hierarchical_namespace_name: str) -> List[str]: + return [f"{hierarchical_namespace_name}_{idx}" for idx in range(NUM_TABLES)] + + BUCKET_NAME = "test_bucket" TABLE_METADATA_LOCATION_REGEX = re.compile( r"""s3://test_bucket/my_iceberg_database-[a-z]{20}.db/