Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CreateTableTransaction in Glue and Rest #498

Merged
merged 28 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
998c6f1
initial example
HonahX Mar 5, 2024
8eace7c
add integration test
HonahX Mar 5, 2024
64e6346
lint
HonahX Mar 5, 2024
ffb8ff6
fix test
HonahX Mar 5, 2024
3a579cd
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 5, 2024
049e0e2
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 7, 2024
df0c5ed
move create_staged catalog to init
HonahX Mar 7, 2024
755aebf
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 12, 2024
c98b3b4
Add IncompleteMetadata
HonahX Mar 12, 2024
09b60ca
Add support for rest
HonahX Mar 13, 2024
04ef8df
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
211de32
Fix merge issue
HonahX Mar 20, 2024
d57ac1c
get rid of IncompleteTableMetadata
HonahX Mar 20, 2024
978a0aa
simplify code
HonahX Mar 20, 2024
a413c2e
fix small issue
HonahX Mar 20, 2024
ad840d5
remove extra line
HonahX Mar 20, 2024
47ce986
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
1f5cc28
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 24, 2024
9ac2f7f
accumulates initial updates in self._updates in transaction directly
HonahX Mar 24, 2024
d2617fb
update comments
HonahX Mar 24, 2024
44df2d7
update tablemetadata v1 update
HonahX Mar 24, 2024
1a4d262
add doc
HonahX Mar 25, 2024
f7c04cf
fix lint
HonahX Mar 25, 2024
cecf1c0
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 27, 2024
2152542
revert some change
HonahX Mar 27, 2024
c449fb0
refactor catalog interface, add MetastoreCatalog
HonahX Mar 28, 2024
8fc1562
Merge branch 'main' into create_table_transaction_experiment
HonahX Apr 4, 2024
b99c619
update api doc
HonahX Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableMetadata,
)
Expand Down Expand Up @@ -288,6 +290,62 @@ def __init__(self, name: str, **properties: str):
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)

def _create_staged_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
"""Create a table and return the table instance without committing the changes.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.

Returns:
Table: the created table instance.

Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
raise NotImplementedError

def create_table_transaction(
Fokko marked this conversation as resolved.
Show resolved Hide resolved
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.

Returns:
CreateTableTransaction: createTableTransaction instance.

Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
return CreateTableTransaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

@abstractmethod
def create_table(
self,
Expand Down
139 changes: 98 additions & 41 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
construct_initial_table_metadata,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -325,12 +332,39 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
def _get_glue_table(self, database_name: str, table_name: str) -> Optional[TableTypeDef]:
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
return load_table_response["Table"]
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
except self.glue.exceptions.EntityNotFoundException:
return None

def _create_staged_table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might have an opportunity here to move _create_staged_table function into pyiceberg/catalog/init.py and refactor the existing create_table functions on the different catalog implementations to all use _create_staged_table to create an instance of the StagedTable, and then commit that StagedTable to the catalog backend.

I think what sets each catalog's implementation of create_table apart is how it handles the commit against the catalog backened, but they all seem to share the same sequence of operations in how it instantiates its notion of a new table.

What are your thoughts on this idea @HonahX ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion! In the initial implementation I did not pay much attention to the catalog code organization. Let me refactor it.

self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
io = load_file_io(properties=self.properties, location=metadata_location)
return StagedTable(
identifier=(self.name, database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)

def create_table(
self,
Expand Down Expand Up @@ -412,45 +446,68 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
if current_glue_table is not None:
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
HonahX marked this conversation as resolved.
Show resolved Hide resolved
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
else:
# Create the table
updated_metadata = construct_initial_table_metadata(table_request.updates)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
Loading
Loading