Skip to content

Commit

Permalink
refactor hive's _commit_table
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Apr 24, 2024
1 parent 7971593 commit 3910e5e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 47 deletions.
102 changes: 57 additions & 45 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@
NamespaceNotEmptyError,
NoSuchIcebergTableError,
NoSuchNamespaceError,
NoSuchPropertyException,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import FileIO, load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
Expand Down Expand Up @@ -247,10 +247,12 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = _HiveClient(properties["uri"], properties.get("ugi"))

def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
def _convert_hive_into_iceberg(self, table: HiveTable) -> Table:
properties: Dict[str, str] = table.parameters
if TABLE_TYPE not in properties:
raise NoSuchTableError(f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}")
raise NoSuchPropertyException(
f"Property table_type missing, could not determine type: {table.dbName}.{table.tableName}"
)

table_type = properties[TABLE_TYPE]
if table_type.lower() != ICEBERG:
Expand All @@ -261,8 +263,9 @@ def _convert_hive_into_iceberg(self, table: HiveTable, io: FileIO) -> Table:
if prop_metadata_location := properties.get(METADATA_LOCATION):
metadata_location = prop_metadata_location
else:
raise NoSuchTableError(f"Table property {METADATA_LOCATION} is missing")
raise NoSuchPropertyException(f"Table property {METADATA_LOCATION} is missing")

io = self._load_file_io(location=metadata_location)
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
Expand Down Expand Up @@ -299,6 +302,12 @@ def _create_hive_table(self, open_client: Client, hive_table: HiveTable) -> None
except AlreadyExistsException as e:
raise TableAlreadyExistsError(f"Table {hive_table.dbName}.{hive_table.tableName} already exists") from e

def _get_hive_table(self, open_client: Client, database_name: str, table_name: str) -> HiveTable:
try:
return open_client.get_table(dbname=database_name, tbl_name=table_name)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exists: {table_name}") from e

def create_table(
self,
identifier: Union[str, Identifier],
Expand Down Expand Up @@ -343,7 +352,7 @@ def create_table(
self._create_hive_table(open_client, tbl)
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)

return self._convert_hive_into_iceberg(hive_table, staged_table.io)
return self._convert_hive_into_iceberg(hive_table)

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down Expand Up @@ -395,47 +404,53 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
if lock.state != LockState.ACQUIRED:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")

hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
current_table = self._convert_hive_into_iceberg(hive_table, io)
hive_table: Optional[HiveTable]
current_table: Optional[Table]
try:
hive_table = self._get_hive_table(open_client, database_name, table_name)
current_table = self._convert_hive_into_iceberg(hive_table)
except NoSuchTableError:
hive_table = None
current_table = None

base_metadata = current_table.metadata
for requirement in table_request.requirements:
requirement.validate(base_metadata)
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

hive_table.parameters = _construct_parameters(
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
except NoSuchObjectException:
updated_metadata = update_table_metadata(
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
)
new_metadata_version = 0

if current_table and updated_metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
io = self._load_file_io(updated_metadata.properties, new_metadata_location)
self._write_metadata(updated_metadata, io, new_metadata_location)

tbl = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=io,
catalog=self,
)
self._write_metadata(
metadata=updated_metadata,
io=io,
metadata_path=new_metadata_location,
)
self._create_hive_table(open_client, tbl)

if hive_table and current_table:
hive_table.parameters = _construct_parameters(
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
else:
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand All @@ -458,14 +473,11 @@ def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(identifier)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
try:
with self._client as open_client:
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
except NoSuchObjectException as e:
raise NoSuchTableError(f"Table does not exists: {table_name}") from e

io = load_file_io({**self.properties, **hive_table.parameters}, hive_table.sd.location)
return self._convert_hive_into_iceberg(hive_table, io)
with self._client as open_client:
hive_table = self._get_hive_table(open_client, database_name, table_name)

return self._convert_hive_into_iceberg(hive_table)

def drop_table(self, identifier: Union[str, Identifier]) -> None:
"""Drop a table.
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from pyiceberg.catalog import Catalog
from pyiceberg.catalog.hive import HiveCatalog
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.table import TableProperties, _dataframe_to_data_files
Expand Down Expand Up @@ -609,10 +610,10 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [2])
@pytest.mark.parametrize("format_version", [1, 2])
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('session_catalog_hive'), pytest.lazy_fixture('session_catalog')])
def test_create_table_transaction(catalog: Catalog, format_version: int) -> None:
if format_version == 1:
if format_version == 1 and isinstance(catalog, RestCatalog):
pytest.skip(
"There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table"
)
Expand Down

0 comments on commit 3910e5e

Please sign in to comment.