From b3c468efe3d6ed79e0ebd8b03cfdca5a581fdd6a Mon Sep 17 00:00:00 2001 From: Victoria Bukta Date: Thu, 25 Jan 2024 15:54:26 -0500 Subject: [PATCH] Table commit retries based on table properties --- pyiceberg/catalog/__init__.py | 9 +++ pyiceberg/catalog/dynamodb.py | 10 ++- pyiceberg/catalog/glue.py | 10 ++- pyiceberg/catalog/hive.py | 12 +++- pyiceberg/catalog/noop.py | 4 ++ pyiceberg/catalog/rest.py | 4 ++ pyiceberg/catalog/sql.py | 6 +- pyiceberg/table/__init__.py | 117 +++++++++++++++++++++++++++++++- tests/catalog/test_base.py | 6 ++ tests/table/test_init.py | 122 ++++++++++++++++++++++++++++++++++ 10 files changed, 293 insertions(+), 7 deletions(-) diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index f2b46fcde7..9432579d49 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -45,6 +45,7 @@ from pyiceberg.table import ( CommitTableRequest, CommitTableResponse, + CommitTableRetryableExceptions, Table, ) from pyiceberg.table.metadata import TableMetadata @@ -417,6 +418,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons CommitStateUnknownException: Failed due to an internal exception on the side of the catalog. """ + @abstractmethod + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + """Return list of retryable table commit exception for the catalog. + + Returns: + CommitTableRetryableExceptions: Collection of commit exceptions to handle. + """ + @abstractmethod def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 266dd6353d..2fb6b320ea 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -51,7 +51,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table +from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -195,6 +195,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + """Return commit exceptions that can be retried by the table. + + Returns: + CommitTableRetryableExceptions: The retryable exceptions. + """ + return CommitTableRetryableExceptions((GenericDynamoDbError,), ()) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index adec150d84..4bd69f1c9e 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -62,7 +62,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata from pyiceberg.table.metadata import TableMetadata, new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -417,6 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location: """ raise NotImplementedError + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + """Return commit exceptions that can be retried by the table. + + Returns: + CommitTableRetryableExceptions: The retryable exceptions. + """ + return CommitTableRetryableExceptions((NoSuchTableError,), (CommitFailedException,)) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 18bbcfe084..f6dc9cdef0 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -74,7 +74,14 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema, SchemaVisitor, visit from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + CommitTableRetryableExceptions, + Table, + TableProperties, + update_table_metadata, +) from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -351,6 +358,9 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque return lock_request + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + return CommitTableRetryableExceptions((NoSuchTableError, NoSuchIcebergTableError), ()) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/pyiceberg/catalog/noop.py b/pyiceberg/catalog/noop.py index e294390e61..30816c4dd0 100644 --- a/pyiceberg/catalog/noop.py +++ b/pyiceberg/catalog/noop.py @@ -28,6 +28,7 @@ from pyiceberg.table import ( CommitTableRequest, CommitTableResponse, + CommitTableRetryableExceptions, Table, ) from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder @@ -73,6 +74,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None: def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table: raise NotImplementedError + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + return CommitTableRetryableExceptions((), ()) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: raise NotImplementedError diff --git a/pyiceberg/catalog/rest.py b/pyiceberg/catalog/rest.py index 9f0d054493..c475f82dfe 100644 --- a/pyiceberg/catalog/rest.py +++ b/pyiceberg/catalog/rest.py @@ -61,6 +61,7 @@ from pyiceberg.table import ( CommitTableRequest, CommitTableResponse, + CommitTableRetryableExceptions, Table, TableIdentifier, ) @@ -616,6 +617,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U return self.load_table(to_identifier) + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + return CommitTableRetryableExceptions((CommitStateUnknownException,), (CommitFailedException,)) + @retry(**_RETRY_ARGS) def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update the table. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index d44d4996b6..6d012ee2a2 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -48,6 +48,7 @@ ) from pyiceberg.exceptions import ( CommitFailedException, + CommitStateUnknownException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, @@ -59,7 +60,7 @@ from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.serializers import FromInputFile -from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata +from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata from pyiceberg.table.metadata import new_table_metadata from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties @@ -360,6 +361,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e return self.load_table(to_identifier) + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,)) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: """Update one or more tables. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 517e6c86df..47ffb01fa8 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -24,7 +24,7 @@ from copy import copy from dataclasses import dataclass from enum import Enum -from functools import cached_property, singledispatch +from functools import cached_property, partial, singledispatch from itertools import chain from typing import ( TYPE_CHECKING, @@ -39,17 +39,30 @@ Optional, Set, Tuple, + Type, TypeVar, Union, ) from pydantic import Field, field_validator from sortedcontainers import SortedList +from tenacity import ( + RetryError, + Retrying, + retry_if_exception_type, + stop_after_attempt, + stop_after_delay, + wait_exponential, +) from typing_extensions import Annotated import pyiceberg.expressions.parser as parser import pyiceberg.expressions.visitors as visitors -from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError +from pyiceberg.exceptions import ( + CommitFailedException, + ResolveError, + ValidationError, +) from pyiceberg.expressions import ( AlwaysTrue, And, @@ -947,6 +960,97 @@ class CommitTableResponse(IcebergBaseModel): metadata_location: str = Field(alias="metadata-location") +class CommitTableRetryableExceptions: + """A catalogs commit exceptions that are retryable.""" + + def __init__(self, retry_exceptions: tuple[Type[Exception], ...], retry_refresh_exceptions: tuple[Type[Exception], ...]): + self.retry_exceptions: tuple[Type[Exception], ...] = retry_exceptions + self.retry_refresh_exceptions: tuple[Type[Exception], ...] = retry_refresh_exceptions + self.all: tuple[Type[Exception], ...] = tuple(set(retry_exceptions).union(retry_refresh_exceptions)) + + +class TableCommitRetry: + """Decorator for building the table commit retry controller.""" + + num_retries = "commit.retry.num-retries" + num_retries_default: int = 4 + min_wait_ms = "commit.retry.min-wait-ms" + min_wait_ms_default: int = 100 + max_wait_ms = "commit.retry.max-wait-ms" + max_wait_ms_default: int = 60000 # 1 min + total_timeout_ms = "commit.retry.total-timeout-ms" + total_timeout_ms_default: int = 1800000 # 30 mins + + properties_attr: str = "properties" + refresh_attr: str = "refresh" + commit_retry_exceptions_attr: str = "commit_retry_exceptions" + + def __init__(self, func: Callable[..., Any]) -> None: + self.func: Callable[..., Any] = func + self.loaded_properties: Properties = {} + self.loaded_exceptions: CommitTableRetryableExceptions = CommitTableRetryableExceptions((), ()) + + def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]: + """Return the __call__ method with the instance caller.""" + return partial(self.__call__, instance) + + def __call__(self, instance: Table, *args: Any, **kwargs: Any) -> Any: + """Run function with the retrying controller on the caller instance.""" + self.loaded_properties = getattr(instance, self.properties_attr) + self.loaded_exceptions = getattr(instance, self.commit_retry_exceptions_attr) + previous_attempt_error = None + try: + for attempt in self.build_retry_controller(): + with attempt: + # Refresh table is previous exception requires a refresh + if previous_attempt_error in self.loaded_exceptions.retry_refresh_exceptions: + self.refresh_table(instance) + + result = self.func(instance, *args, **kwargs) + + # Grab exception from the attempt + outcome = attempt.retry_state.outcome + previous_attempt_error = type(outcome.exception()) if outcome.failed else None + + except RetryError as err: + raise Exception from err.reraise() + else: + return result + + def build_retry_controller(self) -> Retrying: + """Build the retry controller.""" + return Retrying( + stop=( + stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default)) + | stop_after_delay( + datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default)) + ) + ), + wait=wait_exponential( + min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0, + max=self.get_config(self.max_wait_ms, self.max_wait_ms_default) / 1000.0, + ), + retry=retry_if_exception_type(self.loaded_exceptions.all), + ) + + def get_config(self, config: str, default: int) -> int: + """Get config out of the properties.""" + return self.to_int(value, default, config) if (value := self.loaded_properties.get(config)) else default + + def refresh_table(self, instance: Table) -> None: + getattr(instance, self.refresh_attr)() + return + + @staticmethod + def to_int(v: str, default: int, config: str) -> int: + """Convert str value to int, otherwise return a default.""" + try: + return int(v) + except (ValueError, TypeError): + warnings.warn(f"Expected an integer for table property {config}, got: {v}", category=UserWarning) + return default + + class Table: identifier: Identifier = Field() metadata: TableMetadata @@ -1188,6 +1292,12 @@ def refs(self) -> Dict[str, SnapshotRef]: """Return the snapshot references in the table.""" return self.metadata.refs + @property + def commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + """Return the commit exceptions that can be retried on the catalog.""" + return self.catalog._accepted_commit_retry_exceptions() # pylint: disable=W0212 + + @TableCommitRetry def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None: response = self.catalog._commit_table( # pylint: disable=W0212 CommitTableRequest( @@ -1702,7 +1812,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema: visit_with_partner( Catalog._convert_schema_if_needed(new_schema), -1, - UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore + UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), + # type: ignore PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive), ) return self diff --git a/tests/catalog/test_base.py b/tests/catalog/test_base.py index 5f78eb3bc4..63a0b42349 100644 --- a/tests/catalog/test_base.py +++ b/tests/catalog/test_base.py @@ -37,6 +37,8 @@ PropertiesUpdateSummary, ) from pyiceberg.exceptions import ( + CommitFailedException, + CommitStateUnknownException, NamespaceAlreadyExistsError, NamespaceNotEmptyError, NoSuchNamespaceError, @@ -50,6 +52,7 @@ AddSchemaUpdate, CommitTableRequest, CommitTableResponse, + CommitTableRetryableExceptions, Namespace, SetCurrentSchemaUpdate, Table, @@ -130,6 +133,9 @@ def create_table( def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table: raise NotImplementedError + def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions: + return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,)) + def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse: identifier_tuple = self.identifier_to_tuple_without_catalog( tuple(table_request.identifier.namespace.root + [table_request.identifier.name]) diff --git a/tests/table/test_init.py b/tests/table/test_init.py index bb212d696e..e64053cecd 100644 --- a/tests/table/test_init.py +++ b/tests/table/test_init.py @@ -54,12 +54,14 @@ AssertRefSnapshotId, AssertTableUUID, CommitTableRequest, + CommitTableRetryableExceptions, RemovePropertiesUpdate, SetDefaultSortOrderUpdate, SetPropertiesUpdate, SetSnapshotRefUpdate, StaticTable, Table, + TableCommitRetry, TableIdentifier, UpdateSchema, _apply_table_update, @@ -83,6 +85,7 @@ SortOrder, ) from pyiceberg.transforms import BucketTransform, IdentityTransform +from pyiceberg.typedef import Properties from pyiceberg.types import ( BinaryType, BooleanType, @@ -1125,3 +1128,122 @@ def test_serialize_commit_table_request() -> None: deserialized_request = CommitTableRequest.model_validate_json(request.model_dump_json()) assert request == deserialized_request + + +def test_non_commit_failure_retry() -> None: + class CustomException(Exception): + pass + + class TestTableCommitRetiesCustomError: + def __init__(self) -> None: + self.count: int = 0 + self.refresh_count: int = 0 + self.properties: Properties = { + "commit.retry.num-retries": "3", + "commit.retry.max-wait-ms": "0", + "commit.retry.min-wait-ms": "0", + } + self.commit_retry_exceptions = CommitTableRetryableExceptions((), ()) + + def refresh(self) -> None: + self.refresh_count += 1 + + @TableCommitRetry + def my_function(self) -> None: + self.count += 1 + raise CustomException + + test_table_commits_retry = TestTableCommitRetiesCustomError() + + with pytest.raises(CustomException): + test_table_commits_retry.my_function() + assert test_table_commits_retry.count == 1 + assert test_table_commits_retry.refresh_count == 0 + + +def test_custom_retry_commit_config() -> None: + class TestTableCommitReties: + def __init__(self) -> None: + self.count: int = 0 + self.refresh_count: int = 0 + self.properties: Properties = { + "commit.retry.num-retries": "3", + "commit.retry.max-wait-ms": "0", + "commit.retry.min-wait-ms": "0", + } + self.commit_retry_exceptions = CommitTableRetryableExceptions((), (CommitFailedException,)) + + def refresh(self) -> None: + self.refresh_count += 1 + + @TableCommitRetry + def my_function(self) -> None: + self.count += 1 + raise CommitFailedException + + test_table_commits_retry = TestTableCommitReties() + + with pytest.raises(CommitFailedException): + test_table_commits_retry.my_function() + assert test_table_commits_retry.count == 3 + assert test_table_commits_retry.refresh_count == 2 + + +def test_invalid_commit_retry_config() -> None: + class TestTableCommitReties: + def __init__(self) -> None: + self.count: int = 0 + self.refresh_count: int = 0 + self.properties: Properties = { + "commit.retry.num-retries": "I AM INVALID", + "commit.retry.max-wait-ms": "0", + "commit.retry.min-wait-ms": "0", + } + self.commit_retry_exceptions = CommitTableRetryableExceptions((), (CommitFailedException,)) + + def refresh(self) -> None: + self.refresh_count += 1 + + @TableCommitRetry + def my_function(self) -> None: + self.count += 1 + raise CommitFailedException + + test_table_commits_retry = TestTableCommitReties() + + with pytest.raises(CommitFailedException): + with pytest.warns( + UserWarning, match="Expected an integer for table property commit.retry.num-retries, got: I AM INVALID" + ): + test_table_commits_retry.my_function() + assert test_table_commits_retry.count == 4 + assert test_table_commits_retry.refresh_count == 3 + + +def test_table_commit_retry() -> None: + class TestTableCommitReties: + def __init__(self) -> None: + self.count: int = 0 + self.refresh_count: int = 0 + self.properties: Properties = { + "commit.retry.max-wait-ms": "0", + "commit.retry.min-wait-ms": "0", + } + self.commit_retry_exceptions = CommitTableRetryableExceptions((), (CommitFailedException,)) + + def refresh(self) -> None: + self.refresh_count += 1 + + @TableCommitRetry + def my_function(self) -> str: + self.count += 1 + if self.count < 4: + raise CommitFailedException + else: + return "PASS" + + test_table_commits_retry = TestTableCommitReties() + + assert test_table_commits_retry.my_function() == "PASS" + assert test_table_commits_retry.count == 4 + assert test_table_commits_retry.refresh_count == 3