From c6a1a0c1bf52043c2a6ae2c25b2d1b539b5095d7 Mon Sep 17 00:00:00 2001 From: HonahX Date: Wed, 17 Apr 2024 00:10:03 -0700 Subject: [PATCH] refactor hive --- pyiceberg/catalog/hive.py | 27 +++++++++----------- tests/integration/test_writes/test_writes.py | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index bcb99d6a60..1d011dd5d9 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -293,6 +293,12 @@ def _convert_iceberg_into_hive(self, table: Table) -> HiveTable: parameters=_construct_parameters(table.metadata_location), ) + def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None: + try: + open_client.create_table(hive_table) + except AlreadyExistsException as e: + raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e + def create_table( self, identifier: Union[str, Identifier], @@ -332,12 +338,10 @@ def create_table( self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) tbl = self._convert_iceberg_into_hive(staged_table) - try: - with self._client as open_client: - open_client.create_table(tbl) - hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) - except AlreadyExistsException as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + + with self._client as open_client: + self._create_hive_table(open_client, tbl) + hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name) return self._convert_hive_into_iceberg(hive_table, staged_table.io) @@ -420,11 +424,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons new_metadata_version = 0 new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version) io = self._load_file_io(updated_metadata.properties, new_metadata_location) - self._write_metadata( - updated_metadata, - io, - new_metadata_location, - ) + self._write_metadata(updated_metadata, io, new_metadata_location) tbl = self._convert_iceberg_into_hive( StagedTable( @@ -435,10 +435,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons catalog=self, ) ) - try: - open_client.create_table(tbl) - except AlreadyExistsException as e: - raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e + self._create_hive_table(open_client, tbl) finally: open_client.unlock(UnlockRequest(lockid=lock.lockid)) diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 951932ecef..4af29e0ca5 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -599,7 +599,7 @@ def test_create_table_transaction(catalog: Catalog, format_version: int) -> None "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" ) - identifier = f"default.arrow_create_table_transaction{format_version}" + identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}" try: catalog.drop_table(identifier=identifier)