From 96ba8defbd152933c8a13443b405329e5288154f Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 30 Jan 2024 22:07:17 -0800 Subject: [PATCH 1/3] `create_table` write metadata file --- pyiceberg/catalog/in_memory.py | 10 +++-- pyiceberg/cli/output.py | 2 +- tests/catalog/test_base.py | 16 ++++++-- tests/cli/test_console.py | 69 ++++------------------------------ 4 files changed, 28 insertions(+), 69 deletions(-) diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py index 1840f492db..a7fb517fe4 100644 --- a/pyiceberg/catalog/in_memory.py +++ b/pyiceberg/catalog/in_memory.py @@ -38,7 +38,7 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import WAREHOUSE +from pyiceberg.io import WAREHOUSE, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( @@ -94,8 +94,7 @@ def create_table( if not location: location = f'{self._warehouse_location}/{"/".join(identifier)}' - metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' - + metadata_location = self._get_metadata_location(location=location) metadata = new_table_metadata( schema=schema, partition_spec=partition_spec, @@ -104,11 +103,14 @@ def create_table( properties=properties, table_uuid=table_uuid, ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + table = Table( identifier=identifier, metadata=metadata, metadata_location=metadata_location, - io=self._load_file_io(properties=metadata.properties, location=metadata_location), + io=io, catalog=self, ) self.__tables[identifier] = table diff --git a/pyiceberg/cli/output.py b/pyiceberg/cli/output.py index 18cdab1556..3377ef3d2a 100644 --- a/pyiceberg/cli/output.py +++ b/pyiceberg/cli/output.py @@ -157,7 +157,7 @@ def describe_properties(self, properties: Properties) -> None: Console().print(output_table) def text(self, response: str) -> None: - Console().print(response) + Console(soft_wrap=True).print(response) def schema(self, schema: Schema) -> None: output_table = self._table diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 0fd7d9de55..66140b8947 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -64,7 +64,6 @@ def catalog(tmp_path: PosixPath) -> InMemoryCatalog: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "protocol://some/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"key1": "value1", "key2": "value2"} NO_SUCH_TABLE_ERROR = "Table does not exist: \\('com', 'organization', 'department', 'my_table'\\)" @@ -123,13 +122,25 @@ def test_create_table(catalog: InMemoryCatalog) -> None: table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table +def test_create_table_location_override(catalog: InMemoryCatalog) -> None: + new_location = f"{catalog._warehouse_location}/new_location" + table = catalog.create_table( + identifier=TEST_TABLE_IDENTIFIER, + schema=TEST_TABLE_SCHEMA, + location=new_location, + partition_spec=TEST_TABLE_PARTITION_SPEC, + properties=TEST_TABLE_PROPERTIES, + ) + assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table + assert table.location() == new_location + + @pytest.mark.parametrize( "schema,expected", [ @@ -151,7 +162,6 @@ def test_create_table_pyarrow_schema(catalog: InMemoryCatalog, pyarrow_schema_si table = catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=pyarrow_schema_simple_without_ids, - location=TEST_TABLE_LOCATION, properties=TEST_TABLE_PROPERTIES, ) assert catalog.load_table(TEST_TABLE_IDENTIFIER) == table diff --git a/tests/cli/test_console.py b/tests/cli/test_console.py index 1f9f831d21..ec3199a4db 100644 --- a/tests/cli/test_console.py +++ b/tests/cli/test_console.py @@ -17,14 +17,15 @@ import datetime import os import uuid +from pathlib import PosixPath from unittest.mock import MagicMock import pytest from click.testing import CliRunner from pytest_mock import MockFixture -from pyiceberg.catalog.in_memory import DEFAULT_WAREHOUSE_LOCATION from pyiceberg.cli.console import run +from pyiceberg.io import WAREHOUSE from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.transforms import IdentityTransform @@ -52,8 +53,10 @@ def env_vars(mocker: MockFixture) -> None: @pytest.fixture(name="catalog") -def fixture_catalog(mocker: MockFixture) -> InMemoryCatalog: - in_memory_catalog = InMemoryCatalog("test.in_memory.catalog", **{"test.key": "test.value"}) +def fixture_catalog(mocker: MockFixture, tmp_path: PosixPath) -> InMemoryCatalog: + in_memory_catalog = InMemoryCatalog( + "test.in_memory.catalog", **{WAREHOUSE: tmp_path.absolute().as_posix(), "test.key": "test.value"} + ) mocker.patch("pyiceberg.cli.console.load_catalog", return_value=in_memory_catalog) return in_memory_catalog @@ -79,7 +82,6 @@ def mock_datetime_now(monkeypatch: pytest.MonkeyPatch) -> None: NestedField(2, "y", LongType(), doc="comment"), NestedField(3, "z", LongType()), ) -TEST_TABLE_LOCATION = "s3://bucket/test/location" TEST_TABLE_PARTITION_SPEC = PartitionSpec(PartitionField(name="x", transform=IdentityTransform(), source_id=1, field_id=1000)) TEST_TABLE_PROPERTIES = {"read.split.target.size": "134217728"} TEST_TABLE_UUID = uuid.UUID("d20125c8-7284-442c-9aea-15fee620737c") @@ -101,7 +103,6 @@ def test_list_namespace(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -138,7 +139,6 @@ def test_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) -> No catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -182,7 +182,6 @@ def test_schema(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -211,7 +210,6 @@ def test_spec(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -240,7 +238,6 @@ def test_uuid(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -266,25 +263,10 @@ def test_location(catalog: InMemoryCatalog) -> None: schema=TEST_TABLE_SCHEMA, partition_spec=TEST_TABLE_PARTITION_SPEC, ) - - runner = CliRunner() - result = runner.invoke(run, ["location", "default.my_table"]) - assert result.exit_code == 0 - assert result.output == f"""{DEFAULT_WAREHOUSE_LOCATION}/default/my_table\n""" - - -def test_location_override(catalog: InMemoryCatalog) -> None: - catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, - ) - runner = CliRunner() result = runner.invoke(run, ["location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f"""{TEST_TABLE_LOCATION}\n""" + assert result.output == f"""{catalog._warehouse_location}/default/my_table\n""" def test_location_does_not_exists(catalog: InMemoryCatalog) -> None: @@ -300,7 +282,6 @@ def test_drop_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -341,7 +322,6 @@ def test_rename_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -364,7 +344,6 @@ def test_properties_get_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -379,7 +358,6 @@ def test_properties_get_table_specific_property(catalog: InMemoryCatalog) -> Non catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -394,7 +372,6 @@ def test_properties_get_table_specific_property_that_doesnt_exist(catalog: InMem catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -463,7 +440,6 @@ def test_properties_set_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -504,7 +480,6 @@ def test_properties_remove_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -519,7 +494,6 @@ def test_properties_remove_table_property_does_not_exists(catalog: InMemoryCatal catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -551,7 +525,6 @@ def test_json_list_namespace(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -584,7 +557,6 @@ def test_json_describe_table(catalog: InMemoryCatalog, mock_datetime_now: None) catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -614,7 +586,6 @@ def test_json_schema(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -640,7 +611,6 @@ def test_json_spec(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -663,7 +633,6 @@ def test_json_uuid(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, table_uuid=TEST_TABLE_UUID, ) @@ -693,21 +662,7 @@ def test_json_location(catalog: InMemoryCatalog) -> None: runner = CliRunner() result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) assert result.exit_code == 0 - assert result.output == f'"{DEFAULT_WAREHOUSE_LOCATION}/default/my_table"\n' - - -def test_json_location_override(catalog: InMemoryCatalog) -> None: - catalog.create_table( - identifier=TEST_TABLE_IDENTIFIER, - schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, - partition_spec=TEST_TABLE_PARTITION_SPEC, - ) - - runner = CliRunner() - result = runner.invoke(run, ["--output=json", "location", "default.my_table"]) - assert result.exit_code == 0 - assert result.output == f'"{TEST_TABLE_LOCATION}"\n' + assert result.output == f'"{catalog._warehouse_location}/default/my_table"\n' def test_json_location_does_not_exists(catalog: InMemoryCatalog) -> None: @@ -723,7 +678,6 @@ def test_json_drop_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -764,7 +718,6 @@ def test_json_rename_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, ) @@ -787,7 +740,6 @@ def test_json_properties_get_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -802,7 +754,6 @@ def test_json_properties_get_table_specific_property(catalog: InMemoryCatalog) - catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -817,7 +768,6 @@ def test_json_properties_get_table_specific_property_that_doesnt_exist(catalog: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -891,7 +841,6 @@ def test_json_properties_set_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -937,7 +886,6 @@ def test_json_properties_remove_table(catalog: InMemoryCatalog) -> None: catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) @@ -952,7 +900,6 @@ def test_json_properties_remove_table_property_does_not_exists(catalog: InMemory catalog.create_table( identifier=TEST_TABLE_IDENTIFIER, schema=TEST_TABLE_SCHEMA, - location=TEST_TABLE_LOCATION, partition_spec=TEST_TABLE_PARTITION_SPEC, properties=TEST_TABLE_PROPERTIES, ) From 8a7b8768239a9bcfef20cfa44414413615c757b6 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 29 Feb 2024 20:03:14 -0800 Subject: [PATCH 2/3] move InMemoryCatalog back to test_base --- pyiceberg/catalog/__init__.py | 7 - pyiceberg/catalog/in_memory.py | 248 --------------------------------- tests/catalog/test_base.py | 97 ++++++++----- 3 files changed, 66 insertions(+), 286 deletions(-) delete mode 100644 pyiceberg/catalog/in_memory.py diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 3250b2426a..58a3d5999f 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -144,19 +144,12 @@ def load_sql(name: str, conf: Properties) -> Catalog: ) from exc -def load_in_memory(name: str, conf: Properties) -> Catalog: - from pyiceberg.catalog.in_memory import InMemoryCatalog - - return InMemoryCatalog(name, **conf) - - AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, - CatalogType.IN_MEMORY: load_in_memory, } diff --git a/pyiceberg/catalog/in_memory.py b/pyiceberg/catalog/in_memory.py deleted file mode 100644 index a7fb517fe4..0000000000 --- a/pyiceberg/catalog/in_memory.py +++ /dev/null @@ -1,248 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -import uuid -from typing import ( - Dict, - List, - Optional, - Set, - Union, -) - -import pyarrow as pa - -from pyiceberg.catalog import ( - Catalog, - Identifier, - Properties, - PropertiesUpdateSummary, -) -from pyiceberg.exceptions import ( - NamespaceAlreadyExistsError, - NamespaceNotEmptyError, - NoSuchNamespaceError, - NoSuchTableError, - TableAlreadyExistsError, -) -from pyiceberg.io import WAREHOUSE, load_file_io -from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec -from pyiceberg.schema import Schema -from pyiceberg.table import ( - CommitTableRequest, - CommitTableResponse, - Table, - update_table_metadata, -) -from pyiceberg.table.metadata import new_table_metadata -from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder -from pyiceberg.typedef import EMPTY_DICT - -DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" - - -class InMemoryCatalog(Catalog): - """ - An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. - - This is useful for test, demo, and playground but not in production as data is not persisted. - """ - - __tables: Dict[Identifier, Table] - __namespaces: Dict[Identifier, Properties] - - def __init__(self, name: str, **properties: str) -> None: - super().__init__(name, **properties) - self.__tables = {} - self.__namespaces = {} - self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION - - def create_table( - self, - identifier: Union[str, Identifier], - schema: Union[Schema, "pa.Schema"], - location: Optional[str] = None, - partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, - sort_order: SortOrder = UNSORTED_SORT_ORDER, - properties: Properties = EMPTY_DICT, - table_uuid: Optional[uuid.UUID] = None, - ) -> Table: - schema: Schema = self._convert_schema_if_needed(schema) # type: ignore - - identifier = Catalog.identifier_to_tuple(identifier) - namespace = Catalog.namespace_from(identifier) - - if identifier in self.__tables: - raise TableAlreadyExistsError(f"Table already exists: {identifier}") - else: - if namespace not in self.__namespaces: - self.__namespaces[namespace] = {} - - if not location: - location = f'{self._warehouse_location}/{"/".join(identifier)}' - - metadata_location = self._get_metadata_location(location=location) - metadata = new_table_metadata( - schema=schema, - partition_spec=partition_spec, - sort_order=sort_order, - location=location, - properties=properties, - table_uuid=table_uuid, - ) - io = load_file_io({**self.properties, **properties}, location=location) - self._write_metadata(metadata, io, metadata_location) - - table = Table( - identifier=identifier, - metadata=metadata, - metadata_location=metadata_location, - io=io, - catalog=self, - ) - self.__tables[identifier] = table - return table - - def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: - raise NotImplementedError - - def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier_tuple = self.identifier_to_tuple_without_catalog( - tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) - ) - current_table = self.load_table(identifier_tuple) - base_metadata = current_table.metadata - - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - # update table state - current_table.metadata = updated_metadata - - return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) - - def load_table(self, identifier: Union[str, Identifier]) -> Table: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - return self.__tables[identifier] - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def drop_table(self, identifier: Union[str, Identifier]) -> None: - identifier = self.identifier_to_tuple_without_catalog(identifier) - try: - self.__tables.pop(identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {identifier}") from error - - def purge_table(self, identifier: Union[str, Identifier]) -> None: - self.drop_table(identifier) - - def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: - from_identifier = self.identifier_to_tuple_without_catalog(from_identifier) - try: - table = self.__tables.pop(from_identifier) - except KeyError as error: - raise NoSuchTableError(f"Table does not exist: {from_identifier}") from error - - to_identifier = Catalog.identifier_to_tuple(to_identifier) - to_namespace = Catalog.namespace_from(to_identifier) - if to_namespace not in self.__namespaces: - self.__namespaces[to_namespace] = {} - - self.__tables[to_identifier] = Table( - identifier=to_identifier, - metadata=table.metadata, - metadata_location=table.metadata_location, - io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), - catalog=self, - ) - return self.__tables[to_identifier] - - def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - raise NamespaceAlreadyExistsError(f"Namespace already exists: {namespace}") - else: - self.__namespaces[namespace] = properties if properties else {} - - def drop_namespace(self, namespace: Union[str, Identifier]) -> None: - namespace = Catalog.identifier_to_tuple(namespace) - if [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]]: - raise NamespaceNotEmptyError(f"Namespace is not empty: {namespace}") - try: - self.__namespaces.pop(namespace) - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def list_tables(self, namespace: Optional[Union[str, Identifier]] = None) -> List[Identifier]: - if namespace: - namespace = Catalog.identifier_to_tuple(namespace) - list_tables = [table_identifier for table_identifier in self.__tables.keys() if namespace == table_identifier[:-1]] - else: - list_tables = list(self.__tables.keys()) - - return list_tables - - def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: - # Hierarchical namespace is not supported. Return an empty list - if namespace: - return [] - - return list(self.__namespaces.keys()) - - def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: - namespace = Catalog.identifier_to_tuple(namespace) - try: - return self.__namespaces[namespace] - except KeyError as error: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") from error - - def update_namespace_properties( - self, namespace: Union[str, Identifier], removals: Optional[Set[str]] = None, updates: Properties = EMPTY_DICT - ) -> PropertiesUpdateSummary: - removed: Set[str] = set() - updated: Set[str] = set() - - namespace = Catalog.identifier_to_tuple(namespace) - if namespace in self.__namespaces: - if removals: - for key in removals: - if key in self.__namespaces[namespace]: - del self.__namespaces[namespace][key] - removed.add(key) - if updates: - for key, value in updates.items(): - self.__namespaces[namespace][key] = value - updated.add(key) - else: - raise NoSuchNamespaceError(f"Namespace does not exist: {namespace}") - - expected_to_change = removed.difference(removals or set()) - - return PropertiesUpdateSummary( - removed=list(removed or []), updated=list(updates.keys() if updates else []), missing=list(expected_to_change) - ) diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 7e38d8f793..ae943c384b 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -17,8 +17,15 @@ # pylint:disable=redefined-outer-name +import uuid from pathlib import PosixPath -from typing import Union +from typing import ( + Dict, + List, + Optional, + Set, + Union, +) import pyarrow as pa import pytest @@ -27,9 +34,10 @@ from pyiceberg.catalog import ( Catalog, + Identifier, + Properties, PropertiesUpdateSummary, ) -from pyiceberg.catalog.in_memory import InMemoryCatalog from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -37,26 +45,34 @@ NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import WAREHOUSE -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.io import WAREHOUSE, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import ( AddSchemaUpdate, CommitTableRequest, + CommitTableResponse, Namespace, SetCurrentSchemaUpdate, Table, TableIdentifier, update_table_metadata, ) -from pyiceberg.table.metadata import TableMetadataV1 +from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.transforms import IdentityTransform +from pyiceberg.typedef import EMPTY_DICT from pyiceberg.types import IntegerType, LongType, NestedField +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + class InMemoryCatalog(Catalog): - """An in-memory catalog implementation for testing purposes.""" + """ + An in-memory catalog implementation that uses in-memory data-structures to store the namespaces and tables. + + This is useful for test, demo, and playground but not in production as data is not persisted. + """ __tables: Dict[Identifier, Table] __namespaces: Dict[Identifier, Properties] @@ -65,6 +81,7 @@ def __init__(self, name: str, **properties: str) -> None: super().__init__(name, **properties) self.__tables = {} self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION def create_table( self, @@ -74,6 +91,7 @@ def create_table( partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, sort_order: SortOrder = UNSORTED_SORT_ORDER, properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, ) -> Table: schema: Schema = self._convert_schema_if_needed(schema) # type: ignore @@ -86,24 +104,26 @@ def create_table( if namespace not in self.__namespaces: self.__namespaces[namespace] = {} - new_location = location or f's3://warehouse/{"/".join(identifier)}/data' - metadata = TableMetadataV1(**{ - "format-version": 1, - "table-uuid": "d20125c8-7284-442c-9aea-15fee620737c", - "location": new_location, - "last-updated-ms": 1602638573874, - "last-column-id": schema.highest_field_id, - "schema": schema.model_dump(), - "partition-spec": partition_spec.model_dump()["fields"], - "properties": properties, - "current-snapshot-id": -1, - "snapshots": [{"snapshot-id": 1925, "timestamp-ms": 1602638573822}], - }) + if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' + + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + location=location, + properties=properties, + table_uuid=table_uuid, + ) + io = load_file_io({**self.properties, **properties}, location=location) + self._write_metadata(metadata, io, metadata_location) + table = Table( identifier=identifier, metadata=metadata, - metadata_location=f's3://warehouse/{"/".join(identifier)}/metadata/metadata.json', - io=load_file_io(), + metadata_location=metadata_location, + io=io, catalog=self, ) self.__tables[identifier] = table @@ -113,14 +133,29 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: raise NotImplementedError def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: - identifier = tuple(table_request.identifier.namespace.root) + (table_request.identifier.name,) - table = self.__tables[identifier] - table.metadata = update_table_metadata(base_metadata=table.metadata, updates=table_request.updates) - - return CommitTableResponse( - metadata=table.metadata.model_dump(), - metadata_location=table.location(), + identifier_tuple = self.identifier_to_tuple_without_catalog( + tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) ) + current_table = self.load_table(identifier_tuple) + base_metadata = current_table.metadata + + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + # update table state + current_table.metadata = updated_metadata + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) def load_table(self, identifier: Union[str, Identifier]) -> Table: identifier = self.identifier_to_tuple_without_catalog(identifier) @@ -155,7 +190,7 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U identifier=to_identifier, metadata=table.metadata, metadata_location=table.metadata_location, - io=load_file_io(), + io=self._load_file_io(properties=table.metadata.properties, location=table.metadata_location), catalog=self, ) return self.__tables[to_identifier] @@ -666,7 +701,7 @@ def test_add_column_with_statement(catalog: InMemoryCatalog) -> None: def test_catalog_repr(catalog: InMemoryCatalog) -> None: s = repr(catalog) - assert s == "test.in_memory.catalog ()" + assert s == "test.in_memory.catalog ()" def test_table_properties_int_value(catalog: InMemoryCatalog) -> None: @@ -680,4 +715,4 @@ def test_table_properties_raise_for_none_value(catalog: InMemoryCatalog) -> None property_with_none = {"property_name": None} with pytest.raises(ValidationError) as exc_info: _ = given_catalog_has_a_table(catalog, properties=property_with_none) - assert "None type is not a supported value in properties: property_name" in str(exc_info.value) \ No newline at end of file + assert "None type is not a supported value in properties: property_name" in str(exc_info.value) From 10adb1cb03d4fd2b6ec457b5d8689b24e353bd00 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Thu, 29 Feb 2024 20:05:38 -0800 Subject: [PATCH 3/3] remove unused references --- mkdocs/docs/configuration.md | 9 --------- pyiceberg/catalog/__init__.py | 1 - 2 files changed, 10 deletions(-) diff --git a/mkdocs/docs/configuration.md b/mkdocs/docs/configuration.md index e1a6d3281b..4a461ccc0d 100644 --- a/mkdocs/docs/configuration.md +++ b/mkdocs/docs/configuration.md @@ -265,15 +265,6 @@ catalog: The In-Memory catalog uses in-memory data-structures to store information. This is useful for test, demo, and playground. Do not use in production as the data is not persisted. -While you can specify In-Memory catalog in the configuration file like this, it is not recommended since information is only persisted for the duration of the function call. - -```yaml -catalog: - default: - type: in_memory - warehouse: /tmp/warehouse # default warehouse location -``` - # Concurrency PyIceberg uses multiple threads to parallelize operations. The number of workers can be configured by supplying a `max-workers` entry in the configuration file, or by setting the `PYICEBERG_MAX_WORKERS` environment variable. The default value depends on the system hardware and Python version. See [the Python documentation](https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor) for more details. diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index 58a3d5999f..db83658f1f 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -97,7 +97,6 @@ class CatalogType(Enum): GLUE = "glue" DYNAMODB = "dynamodb" SQL = "sql" - IN_MEMORY = "in_memory" def load_rest(name: str, conf: Properties) -> Catalog: