Skip to content

Commit

Permalink
feat: implement InMemoryCatalog as a subclass of SqlCatalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Sep 6, 2024
1 parent 052a9cd commit aa6efc6
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 271 deletions.
263 changes: 30 additions & 233 deletions tests/catalog/test_base.py → tests/catalog/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,251 +17,35 @@
# pylint:disable=redefined-outer-name


import uuid
from pathlib import PosixPath
from typing import (
Dict,
List,
Optional,
Set,
Tuple,
Union,
)
from typing import Union

import pyarrow as pa
import pytest
from pydantic_core import ValidationError
from pytest_lazyfixture import lazy_fixture

from pyiceberg.catalog import Catalog, MetastoreCatalog, PropertiesUpdateSummary, load_catalog
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
from pyiceberg.io import WAREHOUSE
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
AddSchemaUpdate,
CommitTableResponse,
SetCurrentSchemaUpdate,
Table,
TableRequirement,
TableUpdate,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.transforms import IdentityTransform
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.typedef import EMPTY_DICT, Properties
from pyiceberg.types import IntegerType, LongType, NestedField

DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse"


class InMemoryCatalog(MetastoreCatalog):
"""
An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables.
This is useful for test, demo, and playground but not in production as data is not persisted.
"""

__tables: Dict[Identifier, Table]
__namespaces: Dict[Identifier, Properties]

def __init__(self, name: str, **properties: str) -> None:
super().__init__(name, **properties)
self.__tables = {}
self.__namespaces = {}
self._warehouse_location = properties.get(WAREHOUSE, DEFAULT_WAREHOUSE_LOCATION)

def create_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
table_uuid: Optional[uuid.UUID] = None,
) -> Table:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

identifier = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_from(identifier)

if identifier in self.__tables:
raise TableAlreadyExistsError(f"Table already exists: {identifier}")
else:
if namespace not in self.__namespaces:
self.__namespaces[namespace] = {}

if not location:
location = f'{self._warehouse_location}/{"/".join(identifier)}'
location = location.rstrip("/")

metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
schema=schema,
partition_spec=partition_spec,
sort_order=sort_order,
location=location,
properties=properties,
table_uuid=table_uuid,
)
io = load_file_io({**self.properties, **properties}, location=location)
self._write_metadata(metadata, io, metadata_location)

table = Table(
identifier=identifier,
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)
self.__tables[identifier] = table
return table

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
raise NotImplementedError

def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
identifier_tuple = self._identifier_to_tuple_without_catalog(table.identifier)
current_table = self.load_table(identifier_tuple)
base_metadata = current_table.metadata

for requirement in requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# update table state
current_table.metadata = updated_metadata

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
try:
return self.__tables[identifier_tuple]
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error

def drop_table(self, identifier: Union[str, Identifier]) -> None:
identifier_tuple = self._identifier_to_tuple_without_catalog(identifier)
try:
self.__tables.pop(identifier_tuple)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error

def purge_table(self, identifier: Union[str, Identifier]) -> None:
self.drop_table(identifier)

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
identifier_tuple = self._identifier_to_tuple_without_catalog(from_identifier)
try:
table = self.__tables.pop(identifier_tuple)
except KeyError as error:
raise NoSuchTableError(f"Table does not exist: {identifier_tuple}") from error

to_identifier = Catalog.identifier_to_tuple(to_identifier)
to_namespace = Catalog.namespace_from(to_identifier)
if to_namespace not in self.__namespaces:
self.__namespaces[to_namespace] = {}

self.__tables[to_identifier] = Table(
identifier=to_identifier,
metadata=table.metadata,
metadata_location=table.metadata_location,
io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location),
catalog=self,
)
return self.__tables[to_identifier]

def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
namespace = Catalog.identifier_to_tuple(namespace)
if namespace in self.__namespaces:
raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}")
else:
self.__namespaces[namespace] = properties if properties else {}

def drop_namespace(self, namespace: Union[str, Identifier]) -> None:
namespace = Catalog.identifier_to_tuple(namespace)
if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]:
raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}")
try:
self.__namespaces.pop(namespace)
except KeyError as error:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error

def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
if namespace:
namespace = Catalog.identifier_to_tuple(namespace)
list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]
else:
list_tables = list(self.__tables.keys())

return list_tables

def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]:
# Hierarchical namespace is not supported. Return an empty list
if namespace:
return []

return list(self.__namespaces.keys())

def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties:
namespace = Catalog.identifier_to_tuple(namespace)
try:
return self.__namespaces[namespace]
except KeyError as error:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error

def update_namespace_properties(
self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
removed: Set[str] = set()
updated: Set[str] = set()

namespace = Catalog.identifier_to_tuple(namespace)
if namespace in self.__namespaces:
if removals:
for key in removals:
if key in self.__namespaces[namespace]:
del self.__namespaces[namespace][key]
removed.add(key)
if updates:
for key, value in updates.items():
self.__namespaces[namespace][key] = value
updated.add(key)
else:
raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}")

expected_to_change = removed.difference(removals or set())

return PropertiesUpdateSummary(
removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change)
)

def list_views(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]:
raise NotImplementedError

def drop_view(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError


@pytest.fixture
def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
Expand All @@ -278,17 +62,20 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog:
)
TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000))
TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"}
NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)"
TABLE_ALREADY_EXISTS_ERROR = "Table already exists: \\('com', 'organization', 'department', 'my_table'\\)"
NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace already exists: \\('com', 'organization', 'department'\\)"
NO_SUCH_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)"
NAMESPACE_NOT_EMPTY_ERROR = "Namespace is not empty: \\('com', 'organization', 'department'\\)"
NO_SUCH_TABLE_ERROR = "Table does not exist: com.organization.department.my_table"
TABLE_ALREADY_EXISTS_ERROR = "Table com.organization.department.my_table already exists"
NAMESPACE_ALREADY_EXISTS_ERROR = "Namespace \\('com', 'organization', 'department'\\) already exists"
# TODO: consolidate namespace error messages then remove this
DROP_NOT_EXISTING_NAMESPACE_ERROR = "Namespace does not exist: \\('com', 'organization', 'department'\\)"
NO_SUCH_NAMESPACE_ERROR = "Namespace com.organization.department does not exists"
NAMESPACE_NOT_EMPTY_ERROR = "Namespace com.organization.department is not empty"


def given_catalog_has_a_table(
catalog: InMemoryCatalog,
properties: Properties = EMPTY_DICT,
) -> Table:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
return catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
Expand Down Expand Up @@ -358,6 +145,7 @@ def test_name_from_str() -> None:


def test_create_table(catalog: InMemoryCatalog) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
Expand All @@ -369,6 +157,7 @@ def test_create_table(catalog: InMemoryCatalog) -> None:

def test_create_table_location_override(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
Expand All @@ -382,6 +171,7 @@ def test_create_table_location_override(catalog: InMemoryCatalog) -> None:

def test_create_table_removes_trailing_slash_from_location(catalog: InMemoryCatalog) -> None:
new_location = f"{catalog._warehouse_location}/new_location"
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=TEST_TABLE_SCHEMA,
Expand Down Expand Up @@ -411,6 +201,7 @@ def test_convert_schema_if_needed(


def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_simple_without_ids: pa.Schema) -> None:
catalog.create_namespace(TEST_TABLE_NAMESPACE)
table = catalog.create_table(
identifier=TEST_TABLE_IDENTIFIER,
schema=pyarrow_schema_simple_without_ids,
Expand Down Expand Up @@ -508,6 +299,7 @@ def test_rename_table(catalog: InMemoryCatalog) -> None:

# When
new_table = "new.namespace.new_table"
catalog.create_namespace(("new", "namespace"))
table = catalog.rename_table(TEST_TABLE_IDENTIFIER, new_table)

# Then
Expand All @@ -531,6 +323,7 @@ def test_rename_table_from_self_identifier(catalog: InMemoryCatalog) -> None:

# When
new_table_name = "new.namespace.new_table"
catalog.create_namespace(("new", "namespace"))
new_table = catalog.rename_table(table._identifier, new_table_name)

# Then
Expand Down Expand Up @@ -591,7 +384,7 @@ def test_drop_namespace(catalog: InMemoryCatalog) -> None:


def test_drop_namespace_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
with pytest.raises(NoSuchNamespaceError, match=NO_SUCH_NAMESPACE_ERROR):
with pytest.raises(NoSuchNamespaceError, match=DROP_NOT_EXISTING_NAMESPACE_ERROR):
catalog.drop_namespace(TEST_TABLE_NAMESPACE)


Expand All @@ -607,7 +400,7 @@ def test_list_tables(catalog: InMemoryCatalog) -> None:
# Given
given_catalog_has_a_table(catalog)
# When
tables = catalog.list_tables()
tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
# Then
assert tables
assert TEST_TABLE_IDENTIFIER in tables
Expand All @@ -619,7 +412,7 @@ def test_list_tables_under_a_namespace(catalog: InMemoryCatalog) -> None:
new_namespace = ("new", "namespace")
catalog.create_namespace(new_namespace)
# When
all_tables = catalog.list_tables()
all_tables = catalog.list_tables(namespace=TEST_TABLE_NAMESPACE)
new_namespace_tables = catalog.list_tables(new_namespace)
# Then
assert all_tables
Expand All @@ -638,7 +431,9 @@ def test_update_namespace_metadata(catalog: InMemoryCatalog) -> None:
# Then
assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
assert summary == PropertiesUpdateSummary(removed=[], updated=["key3", "key4"], missing=[])
assert summary.removed == []
assert sorted(summary.updated) == ["key3", "key4"]
assert summary.missing == []


def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
Expand All @@ -654,7 +449,9 @@ def test_update_namespace_metadata_removals(catalog: InMemoryCatalog) -> None:
assert TEST_TABLE_NAMESPACE in catalog.list_namespaces()
assert new_metadata.items() <= catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).items()
assert remove_metadata.isdisjoint(catalog.load_namespace_properties(TEST_TABLE_NAMESPACE).keys())
assert summary == PropertiesUpdateSummary(removed=["key1"], updated=["key3", "key4"], missing=[])
assert summary.removed == ["key1"]
assert sorted(summary.updated) == ["key3", "key4"]
assert summary.missing == []


def test_update_namespace_metadata_raises_error_when_namespace_does_not_exist(catalog: InMemoryCatalog) -> None:
Expand Down Expand Up @@ -749,7 +546,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None:

def test_catalog_repr(catalog: InMemoryCatalog) -> None:
s = repr(catalog)
assert s == "test.in_memory.catalog (<class 'test_base.InMemoryCatalog'>)"
assert s == "test.in_memory.catalog (<class 'pyiceberg.catalog.memory.InMemoryCatalog'>)"


def test_table_properties_int_value(catalog: InMemoryCatalog) -> None:
Expand Down
Loading

0 comments on commit aa6efc6

Please sign in to comment.