diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1d011dd5d9..f635492d9a 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -67,10 +67,10 @@ NamespaceNotEmptyError, NoSuchIcebergTableError, NoSuchNamespaceError, + NoSuchPropertyException, NoSuchTableError, TableAlreadyExistsError, ) -from pyiceberg.io import FileIO, load_file_io from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile @@ -247,10 +247,12 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = _HiveClient(properties["uri"], properties.get("ugi")) - def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: + def _convert_hive_into_iceberg(self, table: HiveTable) -> Table: properties: Dict[str, str] = table.parameters if TABLE_TYPE not in properties: - raise NoSuchTableError(f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}") + raise NoSuchPropertyException( + f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}" + ) table_type = properties[TABLE_TYPE] if table_type.lower() != ICEBERG: @@ -261,8 +263,9 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table: if prop_metadata_location := properties.get(METADATA_LOCATION): metadata_location = prop_metadata_location else: - raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing") + raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing") + io = self._load_file_io(location=metadata_location) file = io.new_input(metadata_location) metadata = FromInputFile.table_metadata(file) return Table( @@ -299,6 +302,12 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None except AlreadyExistsException as e: raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e + def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable: + try: + return open_client.get_table(dbname=database_name, tbl_name=table_name) + except NoSuchObjectException as e: + raise NoSuchTableError(f"Table does not exists: {table_name}") from e + def create_table( self, identifier: Union[str, Identifier], @@ -343,7 +352,7 @@ def create_table( self._create_hive_table(open_client, tbl) hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - return self._convert_hive_into_iceberg(hive_table, staged_table.io) + return self._convert_hive_into_iceberg(hive_table) def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: """Register a new table using existing metadata. @@ -395,47 +404,53 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons if lock.state != LockState.ACQUIRED: raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location) - current_table = self._convert_hive_into_iceberg(hive_table, io) + hive_table: Optional[HiveTable] + current_table: Optional[Table] + try: + hive_table = self._get_hive_table(open_client, database_name, table_name) + current_table = self._convert_hive_into_iceberg(hive_table) + except NoSuchTableError: + hive_table = None + current_table = None - base_metadata = current_table.metadata for requirement in table_request.requirements: - requirement.validate(base_metadata) + requirement.validate(current_table.metadata if current_table else None) - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - hive_table.parameters = _construct_parameters( - metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location - ) - open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) - except NoSuchObjectException: updated_metadata = update_table_metadata( - base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True + base_metadata=current_table.metadata if current_table else self._empty_table_metadata(), + updates=table_request.updates, + enforce_validation=current_table is None, ) - new_metadata_version = 0 + + if current_table and updated_metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) io = self._load_file_io(updated_metadata.properties, new_metadata_location) - self._write_metadata(updated_metadata, io, new_metadata_location) - - tbl = self._convert_iceberg_into_hive( - StagedTable( - identifier=(self.name, database_name, table_name), - metadata=updated_metadata, - metadata_location=new_metadata_location, - io=io, - catalog=self, - ) + self._write_metadata( + metadata=updated_metadata, + io=io, + metadata_path=new_metadata_location, ) - self._create_hive_table(open_client, tbl) + + if hive_table and current_table: + hive_table.parameters = _construct_parameters( + metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location + ) + open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table) + else: + hive_table = self._convert_iceberg_into_hive( + StagedTable( + identifier=(self.name, database_name, table_name), + metadata=updated_metadata, + metadata_location=new_metadata_location, + io=io, + catalog=self, + ) + ) + self._create_hive_table(open_client, hive_table) finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) @@ -458,14 +473,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table: """ identifier_tuple = self.identifier_to_tuple_without_catalog(identifier) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError) - try: - with self._client as open_client: - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - except NoSuchObjectException as e: - raise NoSuchTableError(f"Table does not exists: {table_name}") from e - io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location) - return self._convert_hive_into_iceberg(hive_table, io) + with self._client as open_client: + hive_table = self._get_hive_table(open_client, database_name, table_name) + + return self._convert_hive_into_iceberg(hive_table) def drop_table(self, identifier: Union[str, Identifier]) -> None: """Drop a table. diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 46b68b1eff..5e7487c8a9 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -34,6 +34,7 @@ from pyiceberg.catalog import Catalog from pyiceberg.catalog.hive import HiveCatalog +from pyiceberg.catalog.rest import RestCatalog from pyiceberg.catalog.sql import SqlCatalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.table import TableProperties, _dataframe_to_data_files @@ -609,10 +610,10 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None @pytest.mark.integration -@pytest.mark.parametrize("format_version", [2]) +@pytest.mark.parametrize("format_version", [1, 2]) @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')]) def test_create_table_transaction(catalog: Catalog, format_version: int) -> None: - if format_version == 1: + if format_version == 1 and isinstance(catalog, RestCatalog): pytest.skip( "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" )