Skip to content

Commit

Permalink
Add list_views to rest catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
ndrluis committed Jun 18, 2024
1 parent de2b299 commit 6259d3c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 0 deletions.
16 changes: 16 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,22 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
"""List views under the given namespace in the catalog.
If namespace not provided, will list all views in the catalog.
Args:
namespace (str | Identifier): Namespace identifier to search.
Returns:
List[Identifier]: list of table identifiers.
Raises:
NoSuchNamespaceError: If a namespace with the given name does not exist.
"""

@abstractmethod
def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
"""Get properties for a namespace.
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ def update_namespace_properties(

return properties_update_summary

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

def _get_iceberg_table_item(self, database_name: str, table_name: str) -> Dict[str, Any]:
try:
return self._get_dynamo_item(identifier=f"{database_name}.{table_name}", namespace=database_name)
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,3 +746,6 @@ def update_namespace_properties(
self.glue.update_database(Name=database_name, DatabaseInput=_construct_database_input(database_name, updated_properties))

return properties_update_summary

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError
3 changes: 3 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError

def _create_lock_request(self, database_name: str, table_name: str) -> LockRequest:
lock_component: LockComponent = LockComponent(
level=LockLevel.TABLE, type=LockType.EXCLUSIVE, dbname=database_name, tablename=table_name, isTransactional=True
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,6 @@ def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
raise NotImplementedError

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError
21 changes: 21 additions & 0 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class Endpoints:
table_exists: str = "namespaces/{namespace}/tables/{table}"
get_token: str = "oauth/tokens"
rename_table: str = "tables/rename"
list_views: str = "namespaces/{namespace}/views"


AUTHORIZATION_HEADER = "Authorization"
Expand Down Expand Up @@ -196,10 +197,19 @@ class ListTableResponseEntry(IcebergBaseModel):
namespace: Identifier = Field()


class ListViewResponseEntry(IcebergBaseModel):
name: str = Field()
namespace: Identifier = Field()


class ListTablesResponse(IcebergBaseModel):
identifiers: List[ListTableResponseEntry] = Field()


class ListViewsResponse(IcebergBaseModel):
identifiers: List[ListViewResponseEntry] = Field()


class ErrorResponseMessage(IcebergBaseModel):
message: str = Field()
type: str = Field()
Expand Down Expand Up @@ -674,6 +684,17 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

@retry(**_RETRY_ARGS)
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
namespace_tuple = self._check_valid_namespace_identifier(namespace)
namespace_concat = NAMESPACE_SEPARATOR.join(namespace_tuple)
response = self._session.get(self.url(Endpoints.list_views, namespace=namespace_concat))
try:
response.raise_for_status()
except HTTPError as exc:
self._handle_non_200_response(exc, {404: NoSuchNamespaceError})
return [(*view.namespace, view.name) for view in ListViewsResponse(**response.json()).identifiers]

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,6 @@ def update_namespace_properties(
session.execute(insert_stmt)
session.commit()
return properties_update_summary

def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
raise NotImplementedError
3 changes: 3 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ def update_namespace_properties(
removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
)

def list_views(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
raise NotImplementedError


@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
Expand Down
46 changes: 46 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,52 @@ def test_list_tables_404(rest_mock: Mocker) -> None:
assert "Namespace does not exist" in str(e.value)


def test_list_views_200(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace) == [("examples", "fooshare")]


def test_list_views_200_sigv4(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={"identifiers": [{"namespace": ["examples"], "name": "fooshare"}]},
status_code=200,
request_headers=TEST_HEADERS,
)

assert RestCatalog("rest", **{"uri": TEST_URI, "token": TEST_TOKEN, "rest.sigv4-enabled": "true"}).list_views(namespace) == [
("examples", "fooshare")
]
assert rest_mock.called


def test_list_views_404(rest_mock: Mocker) -> None:
namespace = "examples"
rest_mock.get(
f"{TEST_URI}v1/namespaces/{namespace}/views",
json={
"error": {
"message": "Namespace does not exist: personal in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
"type": "NoSuchNamespaceException",
"code": 404,
}
},
status_code=404,
request_headers=TEST_HEADERS,
)
with pytest.raises(NoSuchNamespaceError) as e:
RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_views(namespace)
assert "Namespace does not exist" in str(e.value)


def test_list_namespaces_200(rest_mock: Mocker) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces",
Expand Down

0 comments on commit 6259d3c

Please sign in to comment.