Skip to content

Commit

Permalink
Refactor GlueCatalog _commit_table (#653)
Browse files Browse the repository at this point in the history
* refactor _commit_table

* small refactor

* extract common logic of _commit_table

* reformat
  • Loading branch information
HonahX authored Apr 25, 2024
1 parent f2acf1d commit f72e363
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 50 deletions.
22 changes: 22 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
CreateTableTransaction,
StagedTable,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -728,6 +729,27 @@ def _create_staged_table(
catalog=self,
)

def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
for requirement in table_request.requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)

return StagedTable(
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
catalog=self,
)

def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
) -> Tuple[PropertiesUpdateSummary, Properties]:
Expand Down
79 changes: 35 additions & 44 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
Expand All @@ -67,7 +66,6 @@
CommitTableResponse,
PropertyUtil,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -321,7 +319,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
)
metadata_location = properties[METADATA_LOCATION]

io = load_file_io(properties=self.properties, location=metadata_location)
io = self._load_file_io(location=metadata_location)
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
Expand Down Expand Up @@ -439,71 +437,64 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table: Optional[TableTypeDef]
glue_table_version_id: Optional[str]
current_table: Optional[Table]
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
except NoSuchTableError:
current_glue_table = None
glue_table_version_id = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if current_table:
# table exists, update the table
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
metadata_location=updated_staged_table.metadata_location,
properties=updated_staged_table.properties,
metadata=updated_staged_table.metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
except NoSuchTableError:
# Create the table
updated_metadata = update_table_metadata(
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

else:
# table does not exist, create the table
create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
metadata_location=updated_staged_table.metadata_location,
properties=updated_staged_table.properties,
metadata=updated_staged_table.metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
14 changes: 11 additions & 3 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,21 +462,29 @@ def test_commit_table_update_schema(
]


def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
def test_commit_table_properties(
test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})

assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}

table_info = glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"


@pytest.mark.parametrize("format_version", [1, 2])
Expand Down
17 changes: 14 additions & 3 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,12 @@ def test_commit_table_update_schema(

@mock_aws
def test_commit_table_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
Expand All @@ -688,13 +693,19 @@ def test_commit_table_properties(
assert test_catalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}

table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"


@mock_aws
Expand Down

0 comments on commit f72e363

Please sign in to comment.