Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CreateTableTransaction in Glue and Rest #498

Merged
merged 28 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
998c6f1
initial example
HonahX Mar 5, 2024
8eace7c
add integration test
HonahX Mar 5, 2024
64e6346
lint
HonahX Mar 5, 2024
ffb8ff6
fix test
HonahX Mar 5, 2024
3a579cd
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 5, 2024
049e0e2
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 7, 2024
df0c5ed
move create_staged catalog to init
HonahX Mar 7, 2024
755aebf
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 12, 2024
c98b3b4
Add IncompleteMetadata
HonahX Mar 12, 2024
09b60ca
Add support for rest
HonahX Mar 13, 2024
04ef8df
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
211de32
Fix merge issue
HonahX Mar 20, 2024
d57ac1c
get rid of IncompleteTableMetadata
HonahX Mar 20, 2024
978a0aa
simplify code
HonahX Mar 20, 2024
a413c2e
fix small issue
HonahX Mar 20, 2024
ad840d5
remove extra line
HonahX Mar 20, 2024
47ce986
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 20, 2024
1f5cc28
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 24, 2024
9ac2f7f
accumulates initial updates in self._updates in transaction directly
HonahX Mar 24, 2024
d2617fb
update comments
HonahX Mar 24, 2024
44df2d7
update tablemetadata v1 update
HonahX Mar 24, 2024
1a4d262
add doc
HonahX Mar 25, 2024
f7c04cf
fix lint
HonahX Mar 25, 2024
cecf1c0
Merge branch 'main' into create_table_transaction_experiment
HonahX Mar 27, 2024
2152542
revert some change
HonahX Mar 27, 2024
c449fb0
refactor catalog interface, add MetastoreCatalog
HonahX Mar 28, 2024
8fc1562
Merge branch 'main' into create_table_transaction_experiment
HonahX Apr 4, 2024
b99c619
update api doc
HonahX Apr 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ catalog.create_table(
)
```

To create a table with some subsequent changes in a transaction:
HonahX marked this conversation as resolved.
Show resolved Hide resolved

```python
with catalog.create_table_transaction(
identifier="docs_example.bids",
schema=schema,
location="s3://pyiceberg",
partition_spec=partition_spec,
sort_order=sort_order,
) as txn:
with txn.update_schema() as update_schema:
update_schema.add_column(path="new_column", field_type=StringType())

with txn.update_spec() as update_spec:
update_spec.add_identity("symbol")

txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
```

## Load a table

### Catalog table
Expand Down
298 changes: 203 additions & 95 deletions pyiceberg/catalog/__init__.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand Down Expand Up @@ -79,7 +79,7 @@
ITEM = "Item"


class DynamoDbCatalog(Catalog):
class DynamoDbCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
session = boto3.Session(
Expand Down
136 changes: 82 additions & 54 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
METADATA_LOCATION,
PREVIOUS_METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand All @@ -62,8 +62,13 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
from pyiceberg.types import (
Expand Down Expand Up @@ -273,7 +278,7 @@ def add_glue_catalog_id(params: Dict[str, str], **kwargs: Any) -> None:
event_system.register("provide-client-params.glue", add_glue_catalog_id)


class GlueCatalog(Catalog):
class GlueCatalog(MetastoreCatalog):
def __init__(self, name: str, **properties: Any):
super().__init__(name, **properties)

Expand Down Expand Up @@ -384,20 +389,18 @@ def create_table(
ValueError: If the identifier is invalid, or no path is given to store metadata.

"""
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)
io = load_file_io(properties=self.properties, location=metadata_location)
self._write_metadata(metadata, io, metadata_location)

table_input = _construct_table_input(table_name, metadata_location, properties, metadata)
database_name, table_name = self.identifier_to_database_and_table(identifier)

self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

return self.load_table(identifier=identifier)
Expand Down Expand Up @@ -435,46 +438,71 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
try:
current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
HonahX marked this conversation as resolved.
Show resolved Hide resolved
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata=base_metadata, updates=table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
except NoSuchTableError:
# Create the table
updated_metadata = update_table_metadata(
base_metadata=self._empty_table_metadata(), updates=table_request.updates, enforce_validation=True
)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
LOCATION,
METADATA_LOCATION,
TABLE_TYPE,
Catalog,
MetastoreCatalog,
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
Expand Down Expand Up @@ -230,7 +230,7 @@ def primitive(self, primitive: PrimitiveType) -> str:
return HIVE_PRIMITIVE_TYPES[type(primitive)]


class HiveCatalog(Catalog):
class HiveCatalog(MetastoreCatalog):
_client: _HiveClient

def __init__(self, name: str, **properties: str):
Expand Down
18 changes: 18 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand All @@ -49,9 +50,23 @@ def create_table(
) -> Table:
raise NotImplementedError

def create_table_transaction(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
raise NotImplementedError

def load_table(self, identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def table_exists(self, identifier: Union[str, Identifier]) -> bool:
raise NotImplementedError

def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
"""Register a new table using existing metadata.

Expand All @@ -70,6 +85,9 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
def drop_table(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def purge_table(self, identifier: Union[str, Identifier]) -> None:
raise NotImplementedError

def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

Expand Down
Loading