diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index b189b4094d..3b633c3b67 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -35,6 +35,7 @@ Type, Union, cast, + override ) from pyiceberg.exceptions import ( @@ -778,6 +779,7 @@ def __repr__(self) -> str: class MetastoreCatalog(Catalog, ABC): + @override def __init__(self, name: str, **properties: str): super().__init__(name, **properties) @@ -788,6 +790,7 @@ def __init__(self, name: str, **properties: str): help_message=f"The property {DEPRECATED_BOTOCORE_SESSION} is deprecated and will be removed.", ) + @override def create_table_transaction( self, identifier: Union[str, Identifier], @@ -801,6 +804,7 @@ def create_table_transaction( self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) ) + @override def table_exists(self, identifier: Union[str, Identifier]) -> bool: try: self.load_table(identifier) @@ -808,6 +812,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool: except NoSuchTableError: return False + @override def purge_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self._identifier_to_tuple_without_catalog(identifier) table = self.load_table(identifier_tuple) diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 6dfb243a42..62e320cafe 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -25,6 +25,7 @@ Set, Tuple, Union, + override ) import boto3 @@ -140,6 +141,7 @@ def _dynamodb_table_exists(self) -> bool: else: return True + @override def create_table( self, identifier: Union[str, Identifier], @@ -194,6 +196,7 @@ def create_table( return self.load_table(identifier=identifier) + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -209,6 +212,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -228,6 +232,7 @@ def commit_table( """ raise NotImplementedError + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: """ Load the table's metadata and returns the table instance. @@ -249,6 +254,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name) return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item) + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. @@ -270,6 +276,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: except ConditionalCheckFailedException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. @@ -337,6 +344,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U return self.load_table(to_identifier) + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -358,6 +366,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except ConditionalCheckFailedException as e: raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. @@ -385,6 +394,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except ConditionalCheckFailedException as e: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e + @override def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List Iceberg tables under the given namespace in the catalog. @@ -429,6 +439,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: return table_identifiers + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List top-level namespaces from the catalog. @@ -471,6 +482,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi return database_identifiers + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """ Get properties for a namespace. @@ -489,6 +501,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper namespace_dict = _convert_dynamo_item_to_regular_dict(namespace_item) return _get_namespace_properties(namespace_dict=namespace_dict) + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: @@ -526,9 +539,11 @@ def update_namespace_properties( return properties_update_summary + @override def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 5742173fa6..58af84dead 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -26,6 +26,7 @@ Tuple, Union, cast, + override, ) import boto3 @@ -158,21 +159,27 @@ def _construct_parameters( class _IcebergSchemaToGlueType(SchemaVisitor[str]): + @override def schema(self, schema: Schema, struct_result: str) -> str: return struct_result + @override def struct(self, struct: StructType, field_results: List[str]) -> str: return f"struct<{','.join(field_results)}>" + @override def field(self, field: NestedField, field_result: str) -> str: return f"{field.name}:{field_result}" + @override def list(self, list_type: ListType, element_result: str) -> str: return f"array<{element_result}>" + @override def map(self, map_type: MapType, key_result: str, value_result: str) -> str: return f"map<{key_result},{value_result}>" + @override def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" @@ -376,6 +383,7 @@ def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef: except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + @override def create_table( self, identifier: Union[str, Identifier], @@ -420,6 +428,7 @@ def create_table( return self.load_table(identifier=identifier) + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -442,6 +451,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) return self.load_table(identifier=identifier) + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -521,6 +531,7 @@ def commit_table( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and returns the table instance. @@ -541,6 +552,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: return self._convert_glue_to_iceberg(self._get_glue_table(database_name=database_name, table_name=table_name)) + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. @@ -557,6 +569,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. @@ -618,6 +631,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U return self.load_table(to_identifier) + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -635,6 +649,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except self.glue.exceptions.AlreadyExistsException as e: raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. @@ -658,6 +673,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: self.glue.delete_database(Name=database_name) + @override def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List Iceberg tables under the given namespace in the catalog. @@ -689,6 +705,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NoSuchNamespaceError(f"Database does not exist: {database_name}") from e return [(database_name, table["Name"]) for table in table_list if self.__is_iceberg_table(table)] + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. @@ -711,6 +728,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi return [self.identifier_to_tuple(database["Name"]) for database in database_list] + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -741,6 +759,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper return properties + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: @@ -765,9 +784,11 @@ def update_namespace_properties( return properties_update_summary + @override def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 030470e164..e651966ba3 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -29,6 +29,7 @@ Tuple, Type, Union, + override ) from urllib.parse import urlparse @@ -230,22 +231,28 @@ class SchemaToHiveConverter(SchemaVisitor[str]): def __init__(self, hive2_compatible: bool): self.hive2_compatible = hive2_compatible + @override def schema(self, schema: Schema, struct_result: str) -> str: return struct_result + @override def struct(self, struct: StructType, field_results: List[str]) -> str: return f"struct<{','.join(field_results)}>" + @override def field(self, field: NestedField, field_result: str) -> str: return f"{field.name}:{field_result}" + @override def list(self, list_type: ListType, element_result: str) -> str: return f"array<{element_result}>" + @override def map(self, map_type: MapType, key_result: str, value_result: str) -> str: # Key has to be primitive for Hive return f"map<{key_result},{value_result}>" + @override def primitive(self, primitive: PrimitiveType) -> str: if isinstance(primitive, DecimalType): return f"decimal({primitive.precision},{primitive.scale})" @@ -345,6 +352,7 @@ def _get_hive_table(self, open_client: Client, database_name: str, table_name: s except NoSuchObjectException as e: raise NoSuchTableError(f"Table does not exists: {table_name}") from e + @override def create_table( self, identifier: Union[str, Identifier], @@ -391,6 +399,7 @@ def create_table( return self._convert_hive_into_iceberg(hive_table) + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -406,6 +415,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + @override def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError @@ -438,6 +448,7 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -516,6 +527,7 @@ def commit_table( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and return the table instance. @@ -539,6 +551,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: return self._convert_hive_into_iceberg(hive_table) + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. @@ -557,10 +570,12 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: # When the namespace doesn't exist, it throws the same error raise NoSuchTableError(f"Table does not exists: {table_name}") from e + @override def purge_table(self, identifier: Union[str, Identifier]) -> None: # This requires to traverse the reachability set, and drop all the data files. raise NotImplementedError("Not yet implemented") + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. @@ -591,6 +606,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise NoSuchNamespaceError(f"Database does not exists: {to_database_name}") from e return self.load_table(to_identifier) + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -611,6 +627,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except AlreadyExistsException as e: raise NamespaceAlreadyExistsError(f"Database {database_name} already exists") from e + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. @@ -630,6 +647,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except MetaException as e: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e + @override def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List Iceberg tables under the given namespace in the catalog. @@ -654,6 +672,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: if table.parameters[TABLE_TYPE].lower() == ICEBERG ] + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. @@ -667,6 +686,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi with self._client as open_client: return list(map(self.identifier_to_tuple, open_client.get_all_databases())) + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -691,6 +711,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper except NoSuchObjectException as e: raise NoSuchNamespaceError(f"Database does not exists: {database_name}") from e + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: @@ -733,5 +754,6 @@ def update_namespace_properties( return PropertiesUpdateSummary(removed=list(removed or []), updated=list(updated or []), missing=list(expected_to_change)) + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index caebf1e445..763875be1a 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -20,7 +20,7 @@ Optional, Set, Tuple, - Union, + Union, override, ) from pyiceberg.catalog import Catalog, PropertiesUpdateSummary @@ -43,6 +43,7 @@ class NoopCatalog(Catalog): + @override def create_table( self, identifier: Union[str, Identifier], @@ -54,6 +55,7 @@ def create_table( ) -> Table: raise NotImplementedError + @override def create_table_transaction( self, identifier: Union[str, Identifier], @@ -65,12 +67,15 @@ def create_table_transaction( ) -> CreateTableTransaction: raise NotImplementedError + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: raise NotImplementedError + @override def table_exists(self, identifier: Union[str, Identifier]) -> bool: raise NotImplementedError + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -86,42 +91,54 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError + @override def purge_table(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: raise NotImplementedError + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: raise NotImplementedError + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: raise NotImplementedError + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: raise NotImplementedError + @override def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: raise NotImplementedError + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: raise NotImplementedError + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: raise NotImplementedError + @override def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index bcfa46b7a7..d3053478f5 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -27,6 +27,7 @@ Tuple, Type, Union, + override ) from pydantic import Field, ValidationError, field_validator @@ -602,6 +603,7 @@ def _create_table( return TableResponse(**response.json()) + @override @retry(**_RETRY_ARGS) def create_table( self, @@ -623,6 +625,7 @@ def create_table( ) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + @override @retry(**_RETRY_ARGS) def create_table_transaction( self, @@ -645,6 +648,7 @@ def create_table_transaction( staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response) return CreateTableTransaction(staged_table) + @override @retry(**_RETRY_ARGS) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -678,6 +682,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: table_response = TableResponse(**response.json()) return self._response_to_table(self.identifier_to_tuple(identifier), table_response) + @override @retry(**_RETRY_ARGS) def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -689,6 +694,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers] + @override @retry(**_RETRY_ARGS) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier_tuple = self._identifier_to_tuple_without_catalog(identifier) @@ -703,6 +709,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: table_response = TableResponse(**response.json()) return self._response_to_table(identifier_tuple, table_response) + @override @retry(**_RETRY_ARGS) def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None: identifier_tuple = self._identifier_to_tuple_without_catalog(identifier) @@ -716,10 +723,12 @@ def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchTableError}) + @override @retry(**_RETRY_ARGS) def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier=identifier, purge_requested=True) + @override @retry(**_RETRY_ARGS) def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: from_identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier) @@ -746,6 +755,7 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm ) return table_request + @override @retry(**_RETRY_ARGS) def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -757,6 +767,7 @@ def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: self._handle_non_200_response(exc, {404: NoSuchNamespaceError}) return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers] + @override @retry(**_RETRY_ARGS) def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] @@ -803,6 +814,7 @@ def commit_table( ) return CommitTableResponse(**response.json()) + @override @retry(**_RETRY_ARGS) def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -813,6 +825,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper except HTTPError as exc: self._handle_non_200_response(exc, {409: NamespaceAlreadyExistsError}) + @override @retry(**_RETRY_ARGS) def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -823,6 +836,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except HTTPError as exc: self._handle_non_200_response(exc, {404: NoSuchNamespaceError, 409: NamespaceNotEmptyError}) + @override @retry(**_RETRY_ARGS) def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: namespace_tuple = self.identifier_to_tuple(namespace) @@ -840,6 +854,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi return ListNamespaceResponse(**response.json()).namespaces + @override @retry(**_RETRY_ARGS) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace_tuple = self._check_valid_namespace_identifier(namespace) @@ -852,6 +867,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper return NamespaceResponse(**response.json()).properties + @override @retry(**_RETRY_ARGS) def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT @@ -871,6 +887,7 @@ def update_namespace_properties( missing=parsed_response.missing, ) + @override @retry(**_RETRY_ARGS) def table_exists(self, identifier: Union[str, Identifier]) -> bool: """Check if a table exists. @@ -898,6 +915,7 @@ def table_exists(self, identifier: Union[str, Identifier]) -> bool: return False + @override @retry(**_RETRY_ARGS) def drop_view(self, identifier: Union[str]) -> None: identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 6a4318253f..fdac9f8d1f 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -21,7 +21,7 @@ Optional, Set, Tuple, - Union, + Union, override, ) from sqlalchemy import ( @@ -170,6 +170,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table: catalog=self, ) + @override def create_table( self, identifier: Union[str, Identifier], @@ -232,6 +233,7 @@ def create_table( return self.load_table(identifier=identifier) + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -270,6 +272,7 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: return self.load_table(identifier=identifier) + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: """Load the table's metadata and return the table instance. @@ -300,6 +303,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: return self._convert_orm_to_iceberg(result) raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. @@ -341,6 +345,7 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e session.commit() + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: """Rename a fully classified table name. @@ -402,6 +407,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -523,6 +529,7 @@ def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool: return True return False + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -551,6 +558,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper ) session.commit() + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: """Drop a namespace. @@ -577,6 +585,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: ) session.commit() + @override def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: """List tables under the given namespace in the catalog. @@ -598,6 +607,7 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: result = session.scalars(stmt) return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. @@ -626,6 +636,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi with Session(self.engine) as session: return [Catalog.identifier_to_tuple(namespace_col) for namespace_col in session.execute(stmt).scalars()] + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """Get properties for a namespace. @@ -649,6 +660,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper result = session.scalars(stmt) return {props.property_key: props.property_value for props in result} + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: @@ -705,8 +717,10 @@ def update_namespace_properties( session.commit() return properties_update_summary + @override def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]: raise NotImplementedError + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 56b544c99f..6659ee4673 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -21,7 +21,7 @@ Dict, List, Optional, - Tuple, + Tuple, override, ) from uuid import UUID @@ -86,12 +86,14 @@ def __init__(self, **properties: Any) -> None: def _table(self) -> RichTable: return RichTable.grid(padding=(0, 2)) + @override def exception(self, ex: Exception) -> None: if self.verbose: Console(stderr=True).print_exception() else: Console(stderr=True).print(ex) + @override def identifiers(self, identifiers: List[Identifier]) -> None: table = self._table for identifier in identifiers: @@ -99,6 +101,7 @@ def identifiers(self, identifiers: List[Identifier]) -> None: Console().print(table) + @override def describe_table(self, table: Table) -> None: metadata = table.metadata table_properties = self._table @@ -128,6 +131,7 @@ def describe_table(self, table: Table) -> None: output_table.add_row("Properties", table_properties) Console().print(output_table) + @override def files(self, table: Table, history: bool) -> None: if history: snapshots = table.metadata.snapshots @@ -151,30 +155,37 @@ def files(self, table: Table, history: bool) -> None: manifest_tree.add(f"Datafile: {manifest_entry.data_file.file_path}") Console().print(snapshot_tree) + @override def describe_properties(self, properties: Properties) -> None: output_table = self._table for k, v in properties.items(): output_table.add_row(k, v) Console().print(output_table) + @override def text(self, response: str) -> None: Console(soft_wrap=True).print(response) + @override def schema(self, schema: Schema) -> None: output_table = self._table for field in schema.fields: output_table.add_row(field.name, str(field.field_type), field.doc or "") Console().print(output_table) + @override def spec(self, spec: PartitionSpec) -> None: Console().print(str(spec)) + @override def uuid(self, uuid: Optional[UUID]) -> None: Console().print(str(uuid) if uuid else "missing") + @override def version(self, version: str) -> None: Console().print(version) + @override def describe_refs(self, ref_details: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: refs_table = RichTable(title="Snapshot Refs") refs_table.add_column("Ref") @@ -200,12 +211,15 @@ def __init__(self, **properties: Any) -> None: def _out(self, d: Any) -> None: print(json.dumps(d)) + @override def exception(self, ex: Exception) -> None: self._out({"type": ex.__class__.__name__, "message": str(ex)}) + @override def identifiers(self, identifiers: List[Identifier]) -> None: self._out([".".join(identifier) for identifier in identifiers]) + @override def describe_table(self, table: Table) -> None: class FauxTable(IcebergBaseModel): """Just to encode it using Pydantic.""" @@ -220,27 +234,35 @@ class FauxTable(IcebergBaseModel): ).model_dump_json() ) + @override def describe_properties(self, properties: Properties) -> None: self._out(properties) + @override def text(self, response: str) -> None: print(json.dumps(response)) + @override def schema(self, schema: Schema) -> None: print(schema.model_dump_json()) + @override def files(self, table: Table, history: bool) -> None: pass + @override def spec(self, spec: PartitionSpec) -> None: print(spec.model_dump_json()) + @override def uuid(self, uuid: Optional[UUID]) -> None: self._out({"uuid": str(uuid) if uuid else "missing"}) + @override def version(self, version: str) -> None: self._out({"version": version}) + @override def describe_refs(self, refs: List[Tuple[str, SnapshotRefType, Dict[str, str]]]) -> None: self._out([ {"name": name, "type": type, detail_key: detail_val} diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index e212854ee2..7da365fe20 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -26,6 +26,7 @@ Set, Tuple, Union, + override, ) import pyarrow as pa @@ -80,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: self.__namespaces = {} self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION) + @override def create_table( self, identifier: Union[str, Identifier], @@ -127,9 +129,11 @@ def create_table( self.__tables[identifier] = table return table + @override def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: raise NotImplementedError + @override def commit_table( self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -155,6 +159,7 @@ def commit_table( return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + @override def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier_tuple = self._identifier_to_tuple_without_catalog(identifier) try: @@ -162,6 +167,7 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: except KeyError as error: raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error + @override def drop_table(self, identifier: Union[str, Identifier]) -> None: identifier_tuple = self._identifier_to_tuple_without_catalog(identifier) try: @@ -169,9 +175,11 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: except KeyError as error: raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error + @override def purge_table(self, identifier: Union[str, Identifier]) -> None: self.drop_table(identifier) + @override def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier) try: @@ -193,6 +201,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U ) return self.__tables[to_identifier] + @override def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: namespace = Catalog.identifier_to_tuple(namespace) if namespace in self.__namespaces: @@ -200,6 +209,7 @@ def create_namespace(self, namespace: Union[str, Identifier], properties: Proper else: self.__namespaces[namespace] = properties if properties else {} + @override def drop_namespace(self, namespace: Union[str, Identifier]) -> None: namespace = Catalog.identifier_to_tuple(namespace) if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: @@ -209,6 +219,7 @@ def drop_namespace(self, namespace: Union[str, Identifier]) -> None: except KeyError as error: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error + @override def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: if namespace: namespace = Catalog.identifier_to_tuple(namespace) @@ -218,6 +229,7 @@ def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> Lis return list_tables + @override def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: # Hierarchical namespace is not supported. Return an empty list if namespace: @@ -225,6 +237,7 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi return list(self.__namespaces.keys()) + @override def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: namespace = Catalog.identifier_to_tuple(namespace) try: @@ -232,6 +245,7 @@ def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Proper except KeyError as error: raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error + @override def update_namespace_properties( self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: @@ -258,9 +272,11 @@ def update_namespace_properties( removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) ) + @override def list_views(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: raise NotImplementedError + @override def drop_view(self, identifier: Union[str, Identifier]) -> None: raise NotImplementedError