Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make commit_table public #1112

Merged
merged 7 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@
from pyiceberg.serializers import ToOutputFile
from pyiceberg.table import (
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE,
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
StagedTable,
Table,
TableRequirement,
TableUpdate,
update_table_metadata,
)
from pyiceberg.table.metadata import TableMetadata, TableMetadataV1, new_table_metadata
Expand Down Expand Up @@ -502,11 +503,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
"""

@abstractmethod
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand Down Expand Up @@ -854,13 +859,19 @@ def _create_staged_table(
catalog=self,
)

def _update_and_stage_table(self, current_table: Optional[Table], table_request: CommitTableRequest) -> StagedTable:
for requirement in table_request.requirements:
def _update_and_stage_table(
self,
current_table: Optional[Table],
table_identifier: Identifier,
requirements: Tuple[TableRequirement, ...],
updates: Tuple[TableUpdate, ...],
) -> StagedTable:
for requirement in requirements:
requirement.validate(current_table.metadata if current_table else None)

updated_metadata = update_table_metadata(
base_metadata=current_table.metadata if current_table else self._empty_table_metadata(),
updates=table_request.updates,
updates=updates,
enforce_validation=current_table is None,
metadata_location=current_table.metadata_location if current_table else None,
)
Expand All @@ -869,7 +880,7 @@ def _update_and_stage_table(self, current_table: Optional[Table], table_request:
new_metadata_location = self._get_metadata_location(updated_metadata.location, new_metadata_version)

return StagedTable(
identifier=tuple(table_request.identifier.namespace.root + [table_request.identifier.name]),
identifier=table_identifier,
metadata=updated_metadata,
metadata_location=new_metadata_location,
io=self._load_file_io(properties=updated_metadata.properties, location=new_metadata_location),
Expand Down
13 changes: 9 additions & 4 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -57,7 +58,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -215,11 +216,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
List,
Optional,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -69,9 +70,10 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -449,11 +451,15 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)
return self.load_table(identifier=identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -462,10 +468,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple)
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
Fokko marked this conversation as resolved.
Show resolved Hide resolved
database_name, table_name = table_identifier
Fokko marked this conversation as resolved.
Show resolved Hide resolved

current_glue_table: Optional[TableTypeDef]
glue_table_version_id: Optional[str]
Expand All @@ -479,7 +483,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
glue_table_version_id = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
28 changes: 17 additions & 11 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
List,
Optional,
Set,
Tuple,
Type,
Union,
)
Expand Down Expand Up @@ -79,11 +80,12 @@
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
StagedTable,
Table,
TableProperties,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -418,11 +420,15 @@ def _do_wait_for_lock() -> LockResponse:

return _do_wait_for_lock()

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -431,10 +437,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
database_name, table_name = self.identifier_to_database_and_table(identifier_tuple, NoSuchTableError)
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
database_name, table_name = table_identifier
Fokko marked this conversation as resolved.
Show resolved Hide resolved
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
Expand All @@ -445,7 +449,9 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
if lock.state == LockState.WAITING:
self._wait_for_lock(database_name, table_name, lock.lockid, open_client)
else:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}")
raise CommitFailedException(
f"Failed to acquire lock for {table_identifier.identifier}, state: {lock.state}"
Fokko marked this conversation as resolved.
Show resolved Hide resolved
)

hive_table: Optional[HiveTable]
current_table: Optional[Table]
Expand All @@ -456,7 +462,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
hive_table = None
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates)
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down Expand Up @@ -486,7 +492,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
)
self._create_hive_table(open_client, hive_table)
except WaitingForLockException as e:
raise CommitFailedException(f"Failed to acquire lock for {table_request.identifier}, state: {lock.state}") from e
raise CommitFailedException(f"Failed to acquire lock for {table.identifier}, state: {lock.state}") from e
Fokko marked this conversation as resolved.
Show resolved Hide resolved
finally:
open_client.unlock(UnlockRequest(lockid=lock.lockid))

Expand Down
8 changes: 6 additions & 2 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
List,
Optional,
Set,
Tuple,
Union,
)

from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CreateTableTransaction,
Table,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -91,7 +93,9 @@ def purge_table(self, identifier: Union[str, Identifier]) -> None:
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
raise NotImplementedError

def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
Expand Down
17 changes: 12 additions & 5 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
StagedTable,
Table,
TableIdentifier,
TableRequirement,
TableUpdate,
)
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder, assign_fresh_sort_order_ids
Expand Down Expand Up @@ -719,12 +721,15 @@ def _remove_catalog_name_from_table_request_identifier(self, table_request: Comm
)
return table_request

@retry(**_RETRY_ARGS)
Fokko marked this conversation as resolved.
Show resolved Hide resolved
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -734,9 +739,11 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""
identifier = TableIdentifier(namespace=table.identifier[1:-1], name=table.identifier[-1])
Fokko marked this conversation as resolved.
Show resolved Hide resolved
table_request = CommitTableRequest(identifier=identifier, requirements=requirements, updates=updates)
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
data=self._remove_catalog_name_from_table_request_identifier(table_request).model_dump_json().encode(UTF8),
data=table_request.model_dump_json().encode(UTF8),
)
try:
response.raise_for_status()
Expand Down
25 changes: 14 additions & 11 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
List,
Optional,
Set,
Tuple,
Union,
)

Expand Down Expand Up @@ -60,7 +61,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableResponse, Table, TableRequirement, TableUpdate
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -394,11 +395,15 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
def commit_table(
self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...]
) -> CommitTableResponse:
"""Commit updates to a table.

Args:
table_request (CommitTableRequest): The table requests to be carried out.
table (Table): The table to be updated.
requirements: (Tuple[TableRequirement, ...]): Table requirements.
updates: (Tuple[TableUpdate, ...]): Table updates.

Returns:
CommitTableResponse: The updated metadata.
Expand All @@ -407,20 +412,18 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self._identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
)
namespace_tuple = Catalog.namespace_from(identifier_tuple)
table_identifier = self.identifier_to_tuple_without_catalog(table.identifier)
namespace_tuple = Catalog.namespace_from(table_identifier)
namespace = Catalog.namespace_to_string(namespace_tuple)
table_name = Catalog.table_name_from(identifier_tuple)
table_name = Catalog.table_name_from(table_identifier)

current_table: Optional[Table]
try:
current_table = self.load_table(identifier_tuple)
current_table = self.load_table(table_identifier)
except NoSuchTableError:
current_table = None

updated_staged_table = self._update_and_stage_table(current_table, table_request)
updated_staged_table = self._update_and_stage_table(current_table, table.identifier[1:], requirements, updates)
Fokko marked this conversation as resolved.
Show resolved Hide resolved
if current_table and updated_staged_table.metadata == current_table.metadata:
# no changes, do nothing
return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location)
Expand Down
8 changes: 2 additions & 6 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,12 +1673,8 @@ def refs(self) -> Dict[str, SnapshotRef]:
return self.metadata.refs

def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
identifier=TableIdentifier(namespace=self._identifier[:-1], name=self._identifier[-1]),
updates=updates,
requirements=requirements,
)
response = self.catalog.commit_table( # pylint: disable=W0212
Fokko marked this conversation as resolved.
Show resolved Hide resolved
self, requirements, updates
) # pylint: disable=W0212
Fokko marked this conversation as resolved.
Show resolved Hide resolved
self.metadata = response.metadata
self.metadata_location = response.metadata_location
Expand Down
Loading