Skip to content

Commit

Permalink
initial example
Browse files Browse the repository at this point in the history
  • Loading branch information
HonahX committed Mar 5, 2024
1 parent 3c225a7 commit 998c6f1
Show file tree
Hide file tree
Showing 5 changed files with 401 additions and 45 deletions.
58 changes: 58 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableMetadata,
)
Expand Down Expand Up @@ -288,6 +290,62 @@ def __init__(self, name: str, **properties: str):
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)

def _create_staged_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
"""Create a table and return the table instance without committing the changes.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
Table: the created table instance.
Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
raise NotImplementedError

def create_table_transaction(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> CreateTableTransaction:
"""Create a CreateTableTransaction.
Args:
identifier (str | Identifier): Table identifier.
schema (Schema): Table's schema.
location (str | None): Location for the table. Optional Argument.
partition_spec (PartitionSpec): PartitionSpec for the table.
sort_order (SortOrder): SortOrder for the table.
properties (Properties): Table properties that can be a string based dictionary.
Returns:
CreateTableTransaction: createTableTransaction instance.
Raises:
TableAlreadyExistsError: If a table with the name already exists.
"""
return CreateTableTransaction(
self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties)
)

@abstractmethod
def create_table(
self,
Expand Down
139 changes: 98 additions & 41 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
construct_initial_table_metadata,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT
Expand Down Expand Up @@ -325,12 +332,39 @@ def _update_glue_table(self, database_name: str, table_name: str, table_input: T
f"Cannot commit {database_name}.{table_name} because Glue detected concurrent update to table version {version_id}"
) from e

def _get_glue_table(self, database_name: str, table_name: str) -> TableTypeDef:
def _get_glue_table(self, database_name: str, table_name: str) -> Optional[TableTypeDef]:
try:
load_table_response = self.glue.get_table(DatabaseName=database_name, Name=table_name)
return load_table_response["Table"]
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e
except self.glue.exceptions.EntityNotFoundException:
return None

def _create_staged_table(
self,
identifier: Union[str, Identifier],
schema: Union[Schema, "pa.Schema"],
location: Optional[str] = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
properties: Properties = EMPTY_DICT,
) -> StagedTable:
schema: Schema = self._convert_schema_if_needed(schema) # type: ignore

database_name, table_name = self.identifier_to_database_and_table(identifier)

location = self._resolve_table_location(location, database_name, table_name)
metadata_location = self._get_metadata_location(location=location)
metadata = new_table_metadata(
location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties
)
io = load_file_io(properties=self.properties, location=metadata_location)
return StagedTable(
identifier=(self.name, database_name, table_name),
metadata=metadata,
metadata_location=metadata_location,
io=io,
catalog=self,
)

def create_table(
self,
Expand Down Expand Up @@ -412,45 +446,68 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)

current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing")
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)
if current_glue_table is not None:
# Update the table
glue_table_version_id = current_glue_table.get("VersionId")
if not glue_table_version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table)
base_metadata = current_table.metadata

# Validate the update requirements
for requirement in table_request.requirements:
requirement.validate(base_metadata)

updated_metadata = update_table_metadata(base_metadata, table_request.updates)
if updated_metadata == base_metadata:
# no changes, do nothing
return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location)

# write new metadata
new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1
new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version)
self._write_metadata(updated_metadata, current_table.io, new_metadata_location)

update_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=current_table.properties,
metadata=updated_metadata,
glue_table=current_glue_table,
prev_metadata_location=current_table.metadata_location,
)

# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)
# Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent
# modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=update_table_input,
version_id=glue_table_version_id,
)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
else:
# Create the table
updated_metadata = construct_initial_table_metadata(table_request.updates)
new_metadata_version = 0
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)
self._write_metadata(
updated_metadata, self._load_file_io(updated_metadata.properties, new_metadata_location), new_metadata_location
)

create_table_input = _construct_table_input(
table_name=table_name,
metadata_location=new_metadata_location,
properties=updated_metadata.properties,
metadata=updated_metadata,
)

self._create_glue_table(database_name=database_name, table_name=table_name, table_input=create_table_input)

return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)
return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location)

def load_table(self, identifier: Union[str, Identifier]) -> Table:
"""Load the table's metadata and returns the table instance.
Expand Down
Loading

0 comments on commit 998c6f1

Please sign in to comment.