Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor GlueCatalog's _commit_table #653

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
CreateTableTransaction,
StagedTable,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -728,6 +729,27 @@ def _create_staged_table(
catalog=self,
)

def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
for requirement in table_request.requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
enforce_validation=current_table is None,
)

new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 if current_table else 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)

return StagedTable(
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
catalog=self,
)

def _get_updated_props_and_update_summary(
self, current_properties: Properties, removals: Optional[Set[str]], updates: Properties
) -> Tuple[PropertiesUpdateSummary, Properties]:
Expand Down
79 changes: 35 additions & 44 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
Expand All @@ -67,7 +66,6 @@
CommitTableResponse,
PropertyUtil,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -321,7 +319,7 @@ def _convert_glue_to_iceberg(self, glue_table: TableTypeDef) -> Table:
)
metadata_location = properties[METADATA_LOCATION]

io = load_file_io(properties=self.properties, location=metadata_location)
io = self._load_file_io(location=metadata_location)
file = io.new_input(metadata_location)
metadata = FromInputFile.table_metadata(file)
return Table(
Expand Down Expand Up @@ -439,71 +437,64 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table: Optional[TableTypeDef]
glue_table_version_id: Optional[str]
current_table: Optional[Table]
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
except NoSuchTableError:
current_glue_table = None
glue_table_version_id = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
self._write_metadata(
metadata=updated_staged_table.metadata,
io=updated_staged_table.io,
metadata_path=updated_staged_table.metadata_location,
)

if current_table:
# table exists, update the table
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
metadata_location=updated_staged_table.metadata_location,
properties=updated_staged_table.properties,
metadata=updated_staged_table.metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
except NoSuchTableError:
# Create the table
updated_metadata = update_table_metadata(
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

else:
# table does not exist, create the table
create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
metadata_location=updated_staged_table.metadata_location,
properties=updated_staged_table.properties,
metadata=updated_staged_table.metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
14 changes: 11 additions & 3 deletions tests/catalog/integration_test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,21 +462,29 @@ def test_commit_table_update_schema(
]


def test_commit_table_properties(test_catalog: Catalog, table_schema_nested: Schema, database_name: str, table_name: str) -> None:
def test_commit_table_properties(
test_catalog: Catalog, glue: boto3.client, table_schema_nested: Schema, database_name: str, table_name: str
) -> None:
identifier = (database_name, table_name)
test_catalog.create_namespace(namespace=database_name)
table = test_catalog.create_table(identifier=identifier, schema=table_schema_nested, properties={"test_a": "test_a"})

assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert MetastoreCatalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}

table_info = glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"


@pytest.mark.parametrize("format_version", [1, 2])
Expand Down
17 changes: 14 additions & 3 deletions tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,12 @@ def test_commit_table_update_schema(

@mock_aws
def test_commit_table_properties(
_bucket_initialize: None, moto_endpoint_url: str, table_schema_nested: Schema, database_name: str, table_name: str
_glue: boto3.client,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
catalog_name = "glue"
identifier = (database_name, table_name)
Expand All @@ -688,13 +693,19 @@ def test_commit_table_properties(
assert test_catalog._parse_metadata_version(table.metadata_location) == 0

transaction = table.transaction()
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
transaction.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c", Description="test_description")
transaction.remove_properties("test_b")
transaction.commit_transaction()

updated_table_metadata = table.metadata
assert test_catalog._parse_metadata_version(table.metadata_location) == 1
assert updated_table_metadata.properties == {"test_a": "test_aa", "test_c": "test_c"}
assert updated_table_metadata.properties == {'Description': 'test_description', "test_a": "test_aa", "test_c": "test_c"}

table_info = _glue.get_table(
DatabaseName=database_name,
Name=table_name,
)
assert table_info["Table"]["Description"] == "test_description"


@mock_aws
Expand Down