Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CreateTableTransaction in Glue and Rest #498

Merged
merged 28 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
998c6f1
initial example
HonahX Mar 5, 2024
8eace7c
add integration test
HonahX Mar 5, 2024
64e6346
lint
HonahX Mar 5, 2024
ffb8ff6
fix test
HonahX Mar 5, 2024
3a579cd
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 5, 2024
049e0e2
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 7, 2024
df0c5ed
move create_staged catalog to init
HonahX Mar 7, 2024
755aebf
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 12, 2024
c98b3b4
Add IncompleteMetadata
HonahX Mar 12, 2024
09b60ca
Add support for rest
HonahX Mar 13, 2024
04ef8df
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
211de32
Fix merge issue
HonahX Mar 20, 2024
d57ac1c
get rid of IncompleteTableMetadata
HonahX Mar 20, 2024
978a0aa
simplify code
HonahX Mar 20, 2024
a413c2e
fix small issue
HonahX Mar 20, 2024
ad840d5
remove extra line
HonahX Mar 20, 2024
47ce986
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
1f5cc28
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 24, 2024
9ac2f7f
accumulates initial updates in self._updates in transaction directly
HonahX Mar 24, 2024
d2617fb
update comments
HonahX Mar 24, 2024
44df2d7
update tablemetadata v1 update
HonahX Mar 24, 2024
1a4d262
add doc
HonahX Mar 25, 2024
f7c04cf
fix lint
HonahX Mar 25, 2024
cecf1c0
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 27, 2024
2152542
revert some change
HonahX Mar 27, 2024
c449fb0
refactor catalog interface, add MetastoreCatalog
HonahX Mar 28, 2024
8fc1562
Merge branch 'main' into create_table_transaction_experiment
HonahX Apr 4, 2024
b99c619
update api doc
HonahX Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableMetadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import (
EMPTY_DICT,
Expand Down Expand Up @@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)

def _create_staged_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
"""Create a table and return the table instance without committing the changes.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.

Returns:
Table: the created table instance.

Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
io = load_file_io(properties=self.properties, location=metadata_location)
return StagedTable(
identifier=(self.name, database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)

def create_table_transaction(
Fokko marked this conversation as resolved.
Show resolved Hide resolved
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.

Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.

Returns:
CreateTableTransaction: createTableTransaction instance.

Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
return CreateTableTransaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

@abstractmethod
def create_table(
self,
Expand Down
131 changes: 79 additions & 52 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,14 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
construct_table_metadata,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
Expand Down Expand Up @@ -384,20 +390,18 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)

table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
database_name, table_name = self.identifier_to_database_and_table(identifier)

self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

return self.load_table(identifier=identifier)
Expand Down Expand Up @@ -435,46 +439,69 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
HonahX marked this conversation as resolved.
Show resolved Hide resolved
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
except NoSuchTableError:
# Create the table
updated_metadata = construct_table_metadata(table_request.updates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect create_table to be called from the _commit from the Transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking of it, and my rationale for the current implementation centers around ensuring a uniform transaction creation and commit process for both RestCatalogs and other types of catalogs. Specifically, for RestCatalogs, it's required to initiate CreateTableTransaction with _create_table(staged_create=True) and to use _commit_table with both initial and subsequent updates during transaction commitment. On the other hand, alternative catalogs offer more flexibility, allowing for either the use of _commit_table to reconstruct table metadata upon commitment or a modified _create_table API to create table during the transaction commitment.

Considering pyiceberg's alignment with Rest API principles, where _commit_table aggregates metadata updates to construct the revised metadata for table updates within a transaction, it seems prudent to maintain consistency with Rest API practices for table creation within transactions. This approach simplifies the process by relying on _commit_table to generate and commit metadata from scratch, eliminating the need to distinguish between RestCatalogs and other catalog types during transaction commitments.

Additionally, I've noted that the existing create_table and new_table_metadata APIs lack support for initializing metadata with snapshot information. I think that responsibility should belong to AddSnapshotUpdate and update_table_metadata. Thus, I've opted to maintain the current approach of utilizing _commit_table for both functions.

Does this approach sound reasonable to you? Please feel free to correct me if I've misunderstood any aspect of this process. Thanks for your input!

new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
Loading