Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support catalog name in table identifier during load, rename, drop, and purge #150

Merged
merged 1 commit into from
Nov 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,20 @@ def identifier_to_database_and_table(

return tuple_identifier[0], tuple_identifier[1]

def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.

Args:
identifier (str | Identifier): Table identifier.

Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def purge_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table and purge all data and metadata files.

Expand All @@ -547,8 +561,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
table = self.load_table(identifier)
self.drop_table(identifier)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
Expand Down
11 changes: 7 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

Expand All @@ -226,7 +227,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

try:
self._delete_dynamo_item(
Expand Down Expand Up @@ -256,7 +258,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name)
Expand Down Expand Up @@ -287,7 +290,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e

try:
self.drop_table(from_identifier)
self.drop_table(from_identifier_tuple)
except (NoSuchTableError, GenericDynamoDbError) as e:
log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -282,7 +283,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -307,7 +309,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
Expand All @@ -366,7 +367,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
Expand All @@ -393,7 +395,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
Expand Down
33 changes: 18 additions & 15 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,20 @@ def _fetch_config(self) -> None:
# Update URI based on overrides
self.uri = config[URI]

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}

def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier:
identifier_tuple = self.identifier_to_tuple(identifier)
if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}")
return identifier_tuple

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), "table": identifier_tuple[-1]}

def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]:
identifier_tuple = self.identifier_to_tuple(identifier)
if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier_tuple}")
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]}

def _handle_non_200_response(self, exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None:
Expand Down Expand Up @@ -499,12 +500,10 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers]

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self.identifier_to_tuple(identifier)

if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier}")

response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)))
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
try:
response.raise_for_status()
except HTTPError as exc:
Expand All @@ -514,8 +513,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
return self._response_to_table(identifier_tuple, table_response)

def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier)),
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
),
)
try:
response.raise_for_status()
Expand All @@ -526,8 +528,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier=identifier, purge_requested=True)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier),
"source": self._split_identifier_for_json(from_identifier_tuple),
"destination": self._split_identifier_for_json(to_identifier),
}
response = self._session.post(self.url(Endpoints.rename_table), json=payload)
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
Expand All @@ -252,7 +253,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
res = session.execute(
delete(IcebergTables).where(
Expand Down Expand Up @@ -280,7 +282,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
if not self._namespace_exists(to_database_name):
raise NoSuchNamespaceError(f"Namespace does not exist: {to_database_name}")
Expand Down
53 changes: 50 additions & 3 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = Catalog.identifier_to_tuple(identifier)
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
return self.__tables[identifier]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier}") from error

def drop_table(self, identifier: Union[str, Identifier]) -> None:
identifier = Catalog.identifier_to_tuple(identifier)
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
self.__tables.pop(identifier)
except KeyError as error:
Expand All @@ -166,7 +166,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier = Catalog.identifier_to_tuple(from_identifier)
from_identifier = self.identifier_to_tuple_without_catalog(from_identifier)
try:
table = self.__tables.pop(from_identifier)
except KeyError as error:
Expand Down Expand Up @@ -352,6 +352,16 @@ def test_load_table(catalog: InMemoryCatalog) -> None:
assert table == given_table


def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
given_table = given_catalog_has_a_table(catalog)
# When
intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
table = catalog.load_table(intermediate.identifier)
# Then
assert table == given_table


def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
Expand All @@ -367,6 +377,18 @@ def test_drop_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)
# When
catalog.drop_table(table.identifier)
# Then
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
Expand Down Expand Up @@ -405,6 +427,31 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)

# When
new_table_name = "new.namespace.new_table"
new_table = catalog.rename_table(table.identifier, new_table_name)

# Then
assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)

# And
new_table = catalog.load_table(new_table.identifier)
assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)

# And
assert ("new", "namespace") in catalog.list_namespaces()

# And
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_create_namespace(catalog: InMemoryCatalog) -> None:
# When
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
Expand Down
Loading