Skip to content

Commit

Permalink
make hive and sql use _update_and_stage_table
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Apr 25, 2024
1 parent eed16e0 commit a41b4bb
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 46 deletions.
2 changes: 0 additions & 2 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)

if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
Expand Down
36 changes: 13 additions & 23 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
StagedTable,
Table,
TableProperties,
update_table_metadata,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -413,50 +412,41 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
hive_table = None
current_table = None

for requirement in table_request.requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
)

if current_table and updated_metadata == current_table.metadata:
updated_staged_table = self._update_and_stage_table(current_table, table_request)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
io = self._load_file_io(updated_metadata.properties, new_metadata_location)
self._write_metadata(
metadata=updated_metadata,
io=io,
metadata_path=new_metadata_location,
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if hive_table and current_table:
# Table exists, update it.
hive_table.parameters = _construct_parameters(
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
)
open_client.alter_table(dbname=database_name, tbl_name=table_name, new_tbl=hive_table)
else:
# Table does not exist, create it.
hive_table = self._convert_iceberg_into_hive(
StagedTable(
identifier=(self.name, database_name, table_name),
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=io,
metadata=updated_staged_table.metadata,
metadata_location=updated_staged_table.metadata_location,
io=updated_staged_table.io,
catalog=self,
)
)
self._create_hive_table(open_client, hive_table)
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and return the table instance.
Expand Down
34 changes: 13 additions & 21 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -384,25 +384,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
except NoSuchTableError:
current_table = None

for requirement in table_request.requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
)
if current_table and updated_metadata == current_table.metadata:
updated_staged_table = self._update_and_stage_table(current_table, table_request)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
metadata=updated_metadata,
io=self._load_file_io(updated_metadata.properties, new_metadata_location),
metadata_path=new_metadata_location,
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

with Session(self.engine) as session:
Expand All @@ -418,7 +407,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
IcebergTables.metadata_location == current_table.metadata_location,
)
.values(
metadata_location=new_metadata_location, previous_metadata_location=current_table.metadata_location
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=current_table.metadata_location,
)
)
result = session.execute(stmt)
Expand All @@ -437,7 +427,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
.one()
)
tbl.metadata_location = new_metadata_location
tbl.metadata_location = updated_staged_table.metadata_location
tbl.previous_metadata_location = current_table.metadata_location
except NoResultFound as e:
raise CommitFailedException(
Expand All @@ -452,15 +442,17 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
catalog_name=self.name,
table_namespace=database_name,
table_name=table_name,
metadata_location=new_metadata_location,
metadata_location=updated_staged_table.metadata_location,
previous_metadata_location=None,
)
)
session.commit()
except IntegrityError as e:
raise TableAlreadyExistsError(f"Table {database_name}.{table_name} already exists") from e

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def _namespace_exists(self, identifier: Union[str, Identifier]) -> bool:
namespace = self.identifier_to_database(identifier)
Expand Down

0 comments on commit a41b4bb

Please sign in to comment.