Skip to content

Commit

Permalink
Support catalog in identifier to locate existing tables (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdames authored Nov 25, 2023
1 parent c55fe5e commit a57cb07
Show file tree
Hide file tree
Showing 13 changed files with 730 additions and 375 deletions.
19 changes: 17 additions & 2 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,20 @@ def identifier_to_database_and_table(

return tuple_identifier[0], tuple_identifier[1]

def identifier_to_tuple_without_catalog(self, identifier: Union[str, Identifier]) -> Identifier:
"""Convert an identifier to a tuple and drop this catalog's name from the first element.
Args:
identifier (str | Identifier): Table identifier.
Returns:
Identifier: a tuple of strings with this catalog's name removed
"""
identifier_tuple = Catalog.identifier_to_tuple(identifier)
if len(identifier_tuple) >= 3 and identifier_tuple[0] == self.name:
identifier_tuple = identifier_tuple[1:]
return identifier_tuple

def purge_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table and purge all data and metadata files.
Expand All @@ -547,8 +561,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
table = self.load_table(identifier)
self.drop_table(identifier)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
table = self.load_table(identifier_tuple)
self.drop_table(identifier_tuple)
io = load_file_io(self.properties, table.metadata_location)
metadata = table.metadata
manifest_lists_to_delete = set()
Expand Down
11 changes: 7 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
dynamo_table_item = self._get_iceberg_table_item(database_name=database_name, table_name=table_name)
return self._convert_dynamo_table_item_to_iceberg_table(dynamo_table_item=dynamo_table_item)

Expand All @@ -226,7 +227,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)

try:
self._delete_dynamo_item(
Expand Down Expand Up @@ -256,7 +258,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)

from_table_item = self._get_iceberg_table_item(database_name=from_database_name, table_name=from_table_name)
Expand Down Expand Up @@ -287,7 +290,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e

try:
self.drop_table(from_identifier)
self.drop_table(from_identifier_tuple)
except (NoSuchTableError, GenericDynamoDbError) as e:
log_message = f"Failed to drop old table {from_database_name}.{from_table_name}. "

Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -282,7 +283,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except self.glue.exceptions.EntityNotFoundException as e:
Expand All @@ -307,7 +309,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchPropertyException: When from table miss some required properties.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
get_table_response = self.glue.get_table(DatabaseName=from_database_name, Name=from_table_name)
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
Expand All @@ -366,7 +367,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
open_client.drop_table(dbname=database_name, name=table_name, deleteData=False)
Expand All @@ -393,7 +395,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
NoSuchTableError: When a table with the name does not exist.
NoSuchNamespaceError: When the destination namespace doesn't exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
try:
with self._client as open_client:
Expand Down
33 changes: 18 additions & 15 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,20 @@ def _fetch_config(self) -> None:
# Update URI based on overrides
self.uri = config[URI]

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}

def _identifier_to_validated_tuple(self, identifier: Union[str, Identifier]) -> Identifier:
identifier_tuple = self.identifier_to_tuple(identifier)
if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {'.'.join(identifier_tuple)}")
return identifier_tuple

def _split_identifier_for_path(self, identifier: Union[str, Identifier, TableIdentifier]) -> Properties:
if isinstance(identifier, TableIdentifier):
return {"namespace": NAMESPACE_SEPARATOR.join(identifier.namespace.root[1:]), "table": identifier.name}
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": NAMESPACE_SEPARATOR.join(identifier_tuple[:-1]), "table": identifier_tuple[-1]}

def _split_identifier_for_json(self, identifier: Union[str, Identifier]) -> Dict[str, Union[Identifier, str]]:
identifier_tuple = self.identifier_to_tuple(identifier)
if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier_tuple}")
identifier_tuple = self._identifier_to_validated_tuple(identifier)
return {"namespace": identifier_tuple[:-1], "name": identifier_tuple[-1]}

def _handle_non_200_response(self, exc: HTTPError, error_handler: Dict[int, Type[Exception]]) -> None:
Expand Down Expand Up @@ -499,12 +500,10 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]:
return [(*table.namespace, table.name) for table in ListTablesResponse(**response.json()).identifiers]

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self.identifier_to_tuple(identifier)

if len(identifier_tuple) <= 1:
raise NoSuchTableError(f"Missing namespace or invalid identifier: {identifier}")

response = self._session.get(self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier)))
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
response = self._session.get(
self.url(Endpoints.load_table, prefixed=True, **self._split_identifier_for_path(identifier_tuple))
)
try:
response.raise_for_status()
except HTTPError as exc:
Expand All @@ -514,8 +513,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
return self._response_to_table(identifier_tuple, table_response)

def drop_table(self, identifier: Union[str, Identifier], purge_requested: bool = False) -> None:
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
response = self._session.delete(
self.url(Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier)),
self.url(
Endpoints.drop_table, prefixed=True, purge=purge_requested, **self._split_identifier_for_path(identifier_tuple)
),
)
try:
response.raise_for_status()
Expand All @@ -526,8 +528,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier=identifier, purge_requested=True)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
payload = {
"source": self._split_identifier_for_json(from_identifier),
"source": self._split_identifier_for_json(from_identifier_tuple),
"destination": self._split_identifier_for_json(to_identifier),
}
response = self._session.post(self.url(Endpoints.rename_table), json=payload)
Expand Down
9 changes: 6 additions & 3 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
stmt = select(IcebergTables).where(
IcebergTables.catalog_name == self.name,
Expand All @@ -252,7 +253,8 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
Raises:
NoSuchTableError: If a table with the name does not exist.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier, NoSuchTableError)
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
with Session(self.engine) as session:
res = session.execute(
delete(IcebergTables).where(
Expand Down Expand Up @@ -280,7 +282,8 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
TableAlreadyExistsError: If a table with the new name already exist.
NoSuchNamespaceError: If the target namespace does not exist.
"""
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier, NoSuchTableError)
from_identifier_tuple = self.identifier_to_tuple_without_catalog(from_identifier)
from_database_name, from_table_name = self.identifier_to_database_and_table(from_identifier_tuple, NoSuchTableError)
to_database_name, to_table_name = self.identifier_to_database_and_table(to_identifier)
if not self._namespace_exists(to_database_name):
raise NoSuchNamespaceError(f"Namespace does not exist: {to_database_name}")
Expand Down
53 changes: 50 additions & 3 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier = Catalog.identifier_to_tuple(identifier)
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
return self.__tables[identifier]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier}") from error

def drop_table(self, identifier: Union[str, Identifier]) -> None:
identifier = Catalog.identifier_to_tuple(identifier)
identifier = self.identifier_to_tuple_without_catalog(identifier)
try:
self.__tables.pop(identifier)
except KeyError as error:
Expand All @@ -166,7 +166,7 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
from_identifier = Catalog.identifier_to_tuple(from_identifier)
from_identifier = self.identifier_to_tuple_without_catalog(from_identifier)
try:
table = self.__tables.pop(from_identifier)
except KeyError as error:
Expand Down Expand Up @@ -352,6 +352,16 @@ def test_load_table(catalog: InMemoryCatalog) -> None:
assert table == given_table


def test_load_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
given_table = given_catalog_has_a_table(catalog)
# When
intermediate = catalog.load_table(TEST_TABLE_IDENTIFIER)
table = catalog.load_table(intermediate.identifier)
# Then
assert table == given_table


def test_table_raises_error_on_table_not_found(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
Expand All @@ -367,6 +377,18 @@ def test_drop_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_drop_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)
# When
catalog.drop_table(table.identifier)
# Then
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_drop_table_that_does_not_exist_raise_error(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)
Expand Down Expand Up @@ -405,6 +427,31 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:
# Given
table = given_catalog_has_a_table(catalog)

# When
new_table_name = "new.namespace.new_table"
new_table = catalog.rename_table(table.identifier, new_table_name)

# Then
assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)

# And
new_table = catalog.load_table(new_table.identifier)
assert new_table.identifier == Catalog.identifier_to_tuple(new_table_name)

# And
assert ("new", "namespace") in catalog.list_namespaces()

# And
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(table.identifier)
with pytest.raises(NoSuchTableError, match=NO_SUCH_TABLE_ERROR):
catalog.load_table(TEST_TABLE_IDENTIFIER)


def test_create_namespace(catalog: InMemoryCatalog) -> None:
# When
catalog.create_namespace(TEST_TABLE_NAMESPACE, TEST_TABLE_PROPERTIES)
Expand Down
Loading

0 comments on commit a57cb07

Please sign in to comment.