Skip to content

Commit

Permalink
Table commit retries based on table properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Buktoria committed Mar 18, 2024
1 parent c311dac commit b3c468e
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 7 deletions.
9 changes: 9 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
)
from pyiceberg.table.metadata import TableMetadata
Expand Down Expand Up @@ -417,6 +418,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""

@abstractmethod
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return list of retryable table commit exception for the catalog.
Returns:
CommitTableRetryableExceptions: Collection of commit exceptions to handle.
"""

@abstractmethod
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.
Expand Down
10 changes: 9 additions & 1 deletion pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -195,6 +195,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return commit exceptions that can be retried by the table.
Returns:
CommitTableRetryableExceptions: The retryable exceptions.
"""
return CommitTableRetryableExceptions((GenericDynamoDbError,), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
10 changes: 9 additions & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -417,6 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
"""
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return commit exceptions that can be retried by the table.
Returns:
CommitTableRetryableExceptions: The retryable exceptions.
"""
return CommitTableRetryableExceptions((NoSuchTableError,), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
12 changes: 11 additions & 1 deletion pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema, SchemaVisitor, visit
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
TableProperties,
update_table_metadata,
)
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -351,6 +358,9 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque

return lock_request

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((NoSuchTableError, NoSuchIcebergTableError), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/catalog/noop.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
)
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
Expand Down Expand Up @@ -73,6 +74,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((), ())

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
raise NotImplementedError

Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from pyiceberg.table import (
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Table,
TableIdentifier,
)
Expand Down Expand Up @@ -616,6 +617,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U

return self.load_table(to_identifier)

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException,), (CommitFailedException,))

@retry(**_RETRY_ARGS)
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update the table.
Expand Down
6 changes: 5 additions & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from pyiceberg.exceptions import (
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
Expand All @@ -59,7 +60,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.serializers import FromInputFile
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
from pyiceberg.table.metadata import new_table_metadata
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
Expand Down Expand Up @@ -360,6 +361,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
"""Update one or more tables.
Expand Down
117 changes: 114 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from copy import copy
from dataclasses import dataclass
from enum import Enum
from functools import cached_property, singledispatch
from functools import cached_property, partial, singledispatch
from itertools import chain
from typing import (
TYPE_CHECKING,
Expand All @@ -39,17 +39,30 @@
Optional,
Set,
Tuple,
Type,
TypeVar,
Union,
)

from pydantic import Field, field_validator
from sortedcontainers import SortedList
from tenacity import (
RetryError,
Retrying,
retry_if_exception_type,
stop_after_attempt,
stop_after_delay,
wait_exponential,
)
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
import pyiceberg.expressions.visitors as visitors
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.exceptions import (
CommitFailedException,
ResolveError,
ValidationError,
)
from pyiceberg.expressions import (
AlwaysTrue,
And,
Expand Down Expand Up @@ -947,6 +960,97 @@ class CommitTableResponse(IcebergBaseModel):
metadata_location: str = Field(alias="metadata-location")


class CommitTableRetryableExceptions:
"""A catalogs commit exceptions that are retryable."""

def __init__(self, retry_exceptions: tuple[Type[Exception], ...], retry_refresh_exceptions: tuple[Type[Exception], ...]):
self.retry_exceptions: tuple[Type[Exception], ...] = retry_exceptions
self.retry_refresh_exceptions: tuple[Type[Exception], ...] = retry_refresh_exceptions
self.all: tuple[Type[Exception], ...] = tuple(set(retry_exceptions).union(retry_refresh_exceptions))


class TableCommitRetry:
"""Decorator for building the table commit retry controller."""

num_retries = "commit.retry.num-retries"
num_retries_default: int = 4
min_wait_ms = "commit.retry.min-wait-ms"
min_wait_ms_default: int = 100
max_wait_ms = "commit.retry.max-wait-ms"
max_wait_ms_default: int = 60000 # 1 min
total_timeout_ms = "commit.retry.total-timeout-ms"
total_timeout_ms_default: int = 1800000 # 30 mins

properties_attr: str = "properties"
refresh_attr: str = "refresh"
commit_retry_exceptions_attr: str = "commit_retry_exceptions"

def __init__(self, func: Callable[..., Any]) -> None:
self.func: Callable[..., Any] = func
self.loaded_properties: Properties = {}
self.loaded_exceptions: CommitTableRetryableExceptions = CommitTableRetryableExceptions((), ())

def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
"""Return the __call__ method with the instance caller."""
return partial(self.__call__, instance)

def __call__(self, instance: Table, *args: Any, **kwargs: Any) -> Any:
"""Run function with the retrying controller on the caller instance."""
self.loaded_properties = getattr(instance, self.properties_attr)
self.loaded_exceptions = getattr(instance, self.commit_retry_exceptions_attr)
previous_attempt_error = None
try:
for attempt in self.build_retry_controller():
with attempt:
# Refresh table is previous exception requires a refresh
if previous_attempt_error in self.loaded_exceptions.retry_refresh_exceptions:
self.refresh_table(instance)

result = self.func(instance, *args, **kwargs)

# Grab exception from the attempt
outcome = attempt.retry_state.outcome
previous_attempt_error = type(outcome.exception()) if outcome.failed else None

except RetryError as err:
raise Exception from err.reraise()
else:
return result

def build_retry_controller(self) -> Retrying:
"""Build the retry controller."""
return Retrying(
stop=(
stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default))
| stop_after_delay(
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default))
)
),
wait=wait_exponential(
min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0,
max=self.get_config(self.max_wait_ms, self.max_wait_ms_default) / 1000.0,
),
retry=retry_if_exception_type(self.loaded_exceptions.all),
)

def get_config(self, config: str, default: int) -> int:
"""Get config out of the properties."""
return self.to_int(value, default, config) if (value := self.loaded_properties.get(config)) else default

def refresh_table(self, instance: Table) -> None:
getattr(instance, self.refresh_attr)()
return

@staticmethod
def to_int(v: str, default: int, config: str) -> int:
"""Convert str value to int, otherwise return a default."""
try:
return int(v)
except (ValueError, TypeError):
warnings.warn(f"Expected an integer for table property {config}, got: {v}", category=UserWarning)
return default


class Table:
identifier: Identifier = Field()
metadata: TableMetadata
Expand Down Expand Up @@ -1188,6 +1292,12 @@ def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
return self.metadata.refs

@property
def commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
"""Return the commit exceptions that can be retried on the catalog."""
return self.catalog._accepted_commit_retry_exceptions() # pylint: disable=W0212

@TableCommitRetry
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
response = self.catalog._commit_table( # pylint: disable=W0212
CommitTableRequest(
Expand Down Expand Up @@ -1702,7 +1812,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
visit_with_partner(
Catalog._convert_schema_if_needed(new_schema),
-1,
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
# type: ignore
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
)
return self
Expand Down
6 changes: 6 additions & 0 deletions tests/catalog/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
PropertiesUpdateSummary,
)
from pyiceberg.exceptions import (
CommitFailedException,
CommitStateUnknownException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
Expand All @@ -50,6 +52,7 @@
AddSchemaUpdate,
CommitTableRequest,
CommitTableResponse,
CommitTableRetryableExceptions,
Namespace,
SetCurrentSchemaUpdate,
Table,
Expand Down Expand Up @@ -130,6 +133,9 @@ def create_table(
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
raise NotImplementedError

def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))

def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
Expand Down
Loading

0 comments on commit b3c468e

Please sign in to comment.