Skip to content

Commit 0696c76

Browse files
committed
Table commit retries based on table properties
1 parent b447461 commit 0696c76

File tree

10 files changed

+293
-7
lines changed

10 files changed

+293
-7
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from pyiceberg.table import (
4646
CommitTableRequest,
4747
CommitTableResponse,
48+
CommitTableRetryableExceptions,
4849
Table,
4950
)
5051
from pyiceberg.table.metadata import TableMetadata
@@ -417,6 +418,14 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
417418
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
418419
"""
419420

421+
@abstractmethod
422+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
423+
"""Return list of retryable table commit exception for the catalog.
424+
425+
Returns:
426+
CommitTableRetryableExceptions: Collection of commit exceptions to handle.
427+
"""
428+
420429
@abstractmethod
421430
def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None:
422431
"""Create a namespace in the catalog.

pyiceberg/catalog/dynamodb.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
5252
from pyiceberg.schema import Schema
5353
from pyiceberg.serializers import FromInputFile
54-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table
54+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table
5555
from pyiceberg.table.metadata import new_table_metadata
5656
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
5757
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -195,6 +195,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
195195
"""
196196
raise NotImplementedError
197197

198+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
199+
"""Return commit exceptions that can be retried by the table.
200+
201+
Returns:
202+
CommitTableRetryableExceptions: The retryable exceptions.
203+
"""
204+
return CommitTableRetryableExceptions((GenericDynamoDbError,), ())
205+
198206
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
199207
"""Update the table.
200208

pyiceberg/catalog/glue.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6363
from pyiceberg.schema import Schema, SchemaVisitor, visit
6464
from pyiceberg.serializers import FromInputFile
65-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
65+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
6666
from pyiceberg.table.metadata import TableMetadata, new_table_metadata
6767
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6868
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -417,6 +417,14 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
417417
"""
418418
raise NotImplementedError
419419

420+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
421+
"""Return commit exceptions that can be retried by the table.
422+
423+
Returns:
424+
CommitTableRetryableExceptions: The retryable exceptions.
425+
"""
426+
return CommitTableRetryableExceptions((NoSuchTableError,), (CommitFailedException,))
427+
420428
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
421429
"""Update the table.
422430

pyiceberg/catalog/hive.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,14 @@
7474
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
7575
from pyiceberg.schema import Schema, SchemaVisitor, visit
7676
from pyiceberg.serializers import FromInputFile
77-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, TableProperties, update_table_metadata
77+
from pyiceberg.table import (
78+
CommitTableRequest,
79+
CommitTableResponse,
80+
CommitTableRetryableExceptions,
81+
Table,
82+
TableProperties,
83+
update_table_metadata,
84+
)
7885
from pyiceberg.table.metadata import new_table_metadata
7986
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8087
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -351,6 +358,9 @@ def _create_lock_request(self, database_name: str, table_name: str) -> LockReque
351358

352359
return lock_request
353360

361+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
362+
return CommitTableRetryableExceptions((NoSuchTableError, NoSuchIcebergTableError), ())
363+
354364
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
355365
"""Update the table.
356366

pyiceberg/catalog/noop.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from pyiceberg.table import (
2929
CommitTableRequest,
3030
CommitTableResponse,
31+
CommitTableRetryableExceptions,
3132
Table,
3233
)
3334
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
@@ -73,6 +74,9 @@ def drop_table(self, identifier: Union[str, Identifier]) -> None:
7374
def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: Union[str, Identifier]) -> Table:
7475
raise NotImplementedError
7576

77+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
78+
return CommitTableRetryableExceptions((), ())
79+
7680
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
7781
raise NotImplementedError
7882

pyiceberg/catalog/rest.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from pyiceberg.table import (
6262
CommitTableRequest,
6363
CommitTableResponse,
64+
CommitTableRetryableExceptions,
6465
Table,
6566
TableIdentifier,
6667
)
@@ -616,6 +617,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
616617

617618
return self.load_table(to_identifier)
618619

620+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
621+
return CommitTableRetryableExceptions((CommitStateUnknownException,), (CommitFailedException,))
622+
619623
@retry(**_RETRY_ARGS)
620624
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
621625
"""Update the table.

pyiceberg/catalog/sql.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
)
4949
from pyiceberg.exceptions import (
5050
CommitFailedException,
51+
CommitStateUnknownException,
5152
NamespaceAlreadyExistsError,
5253
NamespaceNotEmptyError,
5354
NoSuchNamespaceError,
@@ -59,7 +60,7 @@
5960
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
6061
from pyiceberg.schema import Schema
6162
from pyiceberg.serializers import FromInputFile
62-
from pyiceberg.table import CommitTableRequest, CommitTableResponse, Table, update_table_metadata
63+
from pyiceberg.table import CommitTableRequest, CommitTableResponse, CommitTableRetryableExceptions, Table, update_table_metadata
6364
from pyiceberg.table.metadata import new_table_metadata
6465
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
6566
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
@@ -360,6 +361,9 @@ def rename_table(self, from_identifier: Union[str, Identifier], to_identifier: U
360361
raise TableAlreadyExistsError(f"Table {to_database_name}.{to_table_name} already exists") from e
361362
return self.load_table(to_identifier)
362363

364+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
365+
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))
366+
363367
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
364368
"""Update one or more tables.
365369

pyiceberg/table/__init__.py

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from copy import copy
2525
from dataclasses import dataclass
2626
from enum import Enum
27-
from functools import cached_property, singledispatch
27+
from functools import cached_property, partial, singledispatch
2828
from itertools import chain
2929
from typing import (
3030
TYPE_CHECKING,
@@ -38,17 +38,30 @@
3838
Optional,
3939
Set,
4040
Tuple,
41+
Type,
4142
TypeVar,
4243
Union,
4344
)
4445

4546
from pydantic import Field, SerializeAsAny, field_validator
4647
from sortedcontainers import SortedList
48+
from tenacity import (
49+
RetryError,
50+
Retrying,
51+
retry_if_exception_type,
52+
stop_after_attempt,
53+
stop_after_delay,
54+
wait_exponential,
55+
)
4756
from typing_extensions import Annotated
4857

4958
import pyiceberg.expressions.parser as parser
5059
import pyiceberg.expressions.visitors as visitors
51-
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
60+
from pyiceberg.exceptions import (
61+
CommitFailedException,
62+
ResolveError,
63+
ValidationError,
64+
)
5265
from pyiceberg.expressions import (
5366
AlwaysTrue,
5467
And,
@@ -934,6 +947,97 @@ class CommitTableResponse(IcebergBaseModel):
934947
metadata_location: str = Field(alias="metadata-location")
935948

936949

950+
class CommitTableRetryableExceptions:
951+
"""A catalogs commit exceptions that are retryable."""
952+
953+
def __init__(self, retry_exceptions: tuple[Type[Exception], ...], retry_refresh_exceptions: tuple[Type[Exception], ...]):
954+
self.retry_exceptions: tuple[Type[Exception], ...] = retry_exceptions
955+
self.retry_refresh_exceptions: tuple[Type[Exception], ...] = retry_refresh_exceptions
956+
self.all: tuple[Type[Exception], ...] = tuple(set(retry_exceptions).union(retry_refresh_exceptions))
957+
958+
959+
class TableCommitRetry:
960+
"""Decorator for building the table commit retry controller."""
961+
962+
num_retries = "commit.retry.num-retries"
963+
num_retries_default: int = 4
964+
min_wait_ms = "commit.retry.min-wait-ms"
965+
min_wait_ms_default: int = 100
966+
max_wait_ms = "commit.retry.max-wait-ms"
967+
max_wait_ms_default: int = 60000 # 1 min
968+
total_timeout_ms = "commit.retry.total-timeout-ms"
969+
total_timeout_ms_default: int = 1800000 # 30 mins
970+
971+
properties_attr: str = "properties"
972+
refresh_attr: str = "refresh"
973+
commit_retry_exceptions_attr: str = "commit_retry_exceptions"
974+
975+
def __init__(self, func: Callable[..., Any]) -> None:
976+
self.func: Callable[..., Any] = func
977+
self.loaded_properties: Properties = {}
978+
self.loaded_exceptions: CommitTableRetryableExceptions = CommitTableRetryableExceptions((), ())
979+
980+
def __get__(self, instance: Any, owner: Any) -> Callable[..., Any]:
981+
"""Return the __call__ method with the instance caller."""
982+
return partial(self.__call__, instance)
983+
984+
def __call__(self, instance: Table, *args: Any, **kwargs: Any) -> Any:
985+
"""Run function with the retrying controller on the caller instance."""
986+
self.loaded_properties = getattr(instance, self.properties_attr)
987+
self.loaded_exceptions = getattr(instance, self.commit_retry_exceptions_attr)
988+
previous_attempt_error = None
989+
try:
990+
for attempt in self.build_retry_controller():
991+
with attempt:
992+
# Refresh table is previous exception requires a refresh
993+
if previous_attempt_error in self.loaded_exceptions.retry_refresh_exceptions:
994+
self.refresh_table(instance)
995+
996+
result = self.func(instance, *args, **kwargs)
997+
998+
# Grab exception from the attempt
999+
outcome = attempt.retry_state.outcome
1000+
previous_attempt_error = type(outcome.exception()) if outcome.failed else None
1001+
1002+
except RetryError as err:
1003+
raise Exception from err.reraise()
1004+
else:
1005+
return result
1006+
1007+
def build_retry_controller(self) -> Retrying:
1008+
"""Build the retry controller."""
1009+
return Retrying(
1010+
stop=(
1011+
stop_after_attempt(self.get_config(self.num_retries, self.num_retries_default))
1012+
| stop_after_delay(
1013+
datetime.timedelta(milliseconds=self.get_config(self.total_timeout_ms, self.total_timeout_ms_default))
1014+
)
1015+
),
1016+
wait=wait_exponential(
1017+
min=self.get_config(self.min_wait_ms, self.min_wait_ms_default) / 1000.0,
1018+
max=self.get_config(self.max_wait_ms, self.max_wait_ms_default) / 1000.0,
1019+
),
1020+
retry=retry_if_exception_type(self.loaded_exceptions.all),
1021+
)
1022+
1023+
def get_config(self, config: str, default: int) -> int:
1024+
"""Get config out of the properties."""
1025+
return self.to_int(value, default, config) if (value := self.loaded_properties.get(config)) else default
1026+
1027+
def refresh_table(self, instance: Table) -> None:
1028+
getattr(instance, self.refresh_attr)()
1029+
return
1030+
1031+
@staticmethod
1032+
def to_int(v: str, default: int, config: str) -> int:
1033+
"""Convert str value to int, otherwise return a default."""
1034+
try:
1035+
return int(v)
1036+
except (ValueError, TypeError):
1037+
warnings.warn(f"Expected an integer for table property {config}, got: {v}", category=UserWarning)
1038+
return default
1039+
1040+
9371041
class Table:
9381042
identifier: Identifier = Field()
9391043
metadata: TableMetadata
@@ -1154,6 +1258,12 @@ def refs(self) -> Dict[str, SnapshotRef]:
11541258
"""Return the snapshot references in the table."""
11551259
return self.metadata.refs
11561260

1261+
@property
1262+
def commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
1263+
"""Return the commit exceptions that can be retried on the catalog."""
1264+
return self.catalog._accepted_commit_retry_exceptions() # pylint: disable=W0212
1265+
1266+
@TableCommitRetry
11571267
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
11581268
response = self.catalog._commit_table( # pylint: disable=W0212
11591269
CommitTableRequest(
@@ -1668,7 +1778,8 @@ def union_by_name(self, new_schema: Union[Schema, "pa.Schema"]) -> UpdateSchema:
16681778
visit_with_partner(
16691779
Catalog._convert_schema_if_needed(new_schema),
16701780
-1,
1671-
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive), # type: ignore
1781+
UnionByNameVisitor(update_schema=self, existing_schema=self._schema, case_sensitive=self._case_sensitive),
1782+
# type: ignore
16721783
PartnerIdByNameAccessor(partner_schema=self._schema, case_sensitive=self._case_sensitive),
16731784
)
16741785
return self

tests/catalog/test_base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
PropertiesUpdateSummary,
3838
)
3939
from pyiceberg.exceptions import (
40+
CommitFailedException,
41+
CommitStateUnknownException,
4042
NamespaceAlreadyExistsError,
4143
NamespaceNotEmptyError,
4244
NoSuchNamespaceError,
@@ -50,6 +52,7 @@
5052
AddSchemaUpdate,
5153
CommitTableRequest,
5254
CommitTableResponse,
55+
CommitTableRetryableExceptions,
5356
Namespace,
5457
SetCurrentSchemaUpdate,
5558
Table,
@@ -130,6 +133,9 @@ def create_table(
130133
def register_table(self, identifier: Union[str, Identifier], metadata_location: str) -> Table:
131134
raise NotImplementedError
132135

136+
def _accepted_commit_retry_exceptions(self) -> CommitTableRetryableExceptions:
137+
return CommitTableRetryableExceptions((CommitStateUnknownException, NoSuchTableError), (CommitFailedException,))
138+
133139
def _commit_table(self, table_request: CommitTableRequest) -> CommitTableResponse:
134140
identifier_tuple = self.identifier_to_tuple_without_catalog(
135141
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])

0 commit comments

Comments
 (0)