Skip to content

Table commit retries based on table properties #330

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
)
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -417,6 +418,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""

@abstractmethod
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return list of retryable table commit exception for the catalog.

Returns:
CommitTableRetryableExceptions: Collection of commit exceptions to handle.
"""

@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Expand Down
10 changes: 9 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -195,6 +195,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return commit exceptions that can be retried by the table.

Returns:
CommitTableRetryableExceptions: The retryable exceptions.
"""
return CommitTableRetryableExceptions((GenericDynamoDbError,), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
10 changes: 9 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -417,6 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return commit exceptions that can be retried by the table.

Returns:
CommitTableRetryableExceptions: The retryable exceptions.
"""
return CommitTableRetryableExceptions((NoSuchTableError,), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
12 changes: 11 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
TableProperties,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -351,6 +358,9 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque

return lock_request

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((NoSuchTableError, NoSuchIcebergTableError), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -73,6 +74,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
raise NotImplementedError

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
TableIdentifier,
)
Expand Down Expand Up @@ -616,6 +617,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException,), (CommitFailedException,))

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from pyiceberg.exceptions import (
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
Expand All @@ -59,7 +60,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -360,6 +361,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.

Expand Down
117 changes: 114 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from copy import copy
from dataclasses import dataclass
from enum import Enum
from functools import cached_property, singledispatch
from functools import cached_property, partial, singledispatch
from itertools import chain
from typing import (
TYPE_CHECKING,
Expand All @@ -39,17 +39,30 @@
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)

from pydantic import Field, field_validator
from sortedcontainers import SortedList
from tenacity import (
RetryError,
Retrying,
retry_if_exception_type,
stop_after_attempt,
stop_after_delay,
wait_exponential,
)
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
import pyiceberg.expressions.visitors as visitors
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.exceptions import (
CommitFailedException,
ResolveError,
ValidationError,
)
from pyiceberg.expressions import (
AlwaysTrue,
And,
Expand Down Expand Up @@ -947,6 +960,97 @@ class CommitTableResponse(IcebergBaseModel):
metadata_location: str = Field(alias="metadata-location")


class CommitTableRetryableExceptions:
"""A catalogs commit exceptions that are retryable."""

def __init__(self, retry_exceptions: tuple[Type[Exception], ...], retry_refresh_exceptions: tuple[Type[Exception], ...]):
self.retry_exceptions: tuple[Type[Exception], ...] = retry_exceptions
self.retry_refresh_exceptions: tuple[Type[Exception], ...] = retry_refresh_exceptions
self.all: tuple[Type[Exception], ...] = tuple(set(retry_exceptions).union(retry_refresh_exceptions))


class TableCommitRetry:
"""Decorator for building the table commit retry controller."""

num_retries = "commit.retry.num-retries"
num_retries_default: int = 4
min_wait_ms = "commit.retry.min-wait-ms"
min_wait_ms_default: int = 100
max_wait_ms = "commit.retry.max-wait-ms"
max_wait_ms_default: int = 60000 # 1 min
total_timeout_ms = "commit.retry.total-timeout-ms"
total_timeout_ms_default: int = 1800000 # 30 mins

properties_attr: str = "properties"
refresh_attr: str = "refresh"
commit_retry_exceptions_attr: str = "commit_retry_exceptions"

def __init__(self, func: Callable[..., Any]) -> None:
self.func: Callable[..., Any] = func
self.loaded_properties: Properties = {}
self.loaded_exceptions: CommitTableRetryableExceptions = CommitTableRetryableExceptions((), ())

def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
"""Return the __call__ method with the instance caller."""
return partial(self.__call__, instance)

def __call__(self, instance: Table, *args: Any, **kwargs: Any) -> Any:
"""Run function with the retrying controller on the caller instance."""
self.loaded_properties = getattr(instance, self.properties_attr)
self.loaded_exceptions = getattr(instance, self.commit_retry_exceptions_attr)
previous_attempt_error = None
try:
for attempt in self.build_retry_controller():
with attempt:
# Refresh table is previous exception requires a refresh
if previous_attempt_error in self.loaded_exceptions.retry_refresh_exceptions:
self.refresh_table(instance)

result = self.func(instance, *args, **kwargs)

# Grab exception from the attempt
outcome = attempt.retry_state.outcome
previous_attempt_error = type(outcome.exception()) if outcome.failed else None

except RetryError as err:
raise Exception from err.reraise()
else:
return result

def build_retry_controller(self) -> Retrying:
"""Build the retry controller."""
return Retrying(
stop=(
stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default))
| stop_after_delay(
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default))
)
),
wait=wait_exponential(
min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0,
max=self.get_config(self.max_wait_ms, self.max_wait_ms_default) / 1000.0,
),
retry=retry_if_exception_type(self.loaded_exceptions.all),
)

def get_config(self, config: str, default: int) -> int:
"""Get config out of the properties."""
return self.to_int(value, default, config) if (value := self.loaded_properties.get(config)) else default

def refresh_table(self, instance: Table) -> None:
getattr(instance, self.refresh_attr)()
return

@staticmethod
def to_int(v: str, default: int, config: str) -> int:
"""Convert str value to int, otherwise return a default."""
try:
return int(v)
except (ValueError, TypeError):
warnings.warn(f"Expected an integer for table property {config}, got: {v}", category=UserWarning)
return default


class Table:
identifier: Identifier = Field()
metadata: TableMetadata
Expand Down Expand Up @@ -1188,6 +1292,12 @@ def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
return self.metadata.refs

@property
def commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return the commit exceptions that can be retried on the catalog."""
return self.catalog._accepted_commit_retry_exceptions() # pylint: disable=W0212

@TableCommitRetry
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
Expand Down Expand Up @@ -1702,7 +1812,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
)
return self
Expand Down
6 changes: 6 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
Expand All @@ -50,6 +52,7 @@
AddSchemaUpdate,
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Namespace,
SetCurrentSchemaUpdate,
Table,
Expand Down Expand Up @@ -130,6 +133,9 @@ def create_table(
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
Expand Down
Loading