Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table commit retries based on table properties #330

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Buktoria
Copy link
Contributor

@Buktoria Buktoria commented Jan 30, 2024

Created a decorator which when applied to a function performs commits, and retries the function on the table. It will look at the table properties and perform reties if the execution fails.

  • Created a Decorator / Descriptor Class that can wrap a function and retry it using the Tenacity retry library
  • The class configures defaults based on the documented defaults found in the Iceberg docs https://iceberg.apache.org/docs/latest/configuration/#table-behavior-properties
    • commit.retry.num-retries
    • commit.retry.min-wait-ms
    • commit.retry.max-wait-ms
    • commit.retry.total-timeout-ms
  • Config is parsed from a configured "properties" attribute/property on the instance class that is accessed within the decorator at runtime
  • A separate function table_commit_retry is used to capture the the name of the attribute on the caller that should be used when looking up table configs.
  • Access to the caller instance is performed via overloading the __get__ method of the class
  • Un-parsable config will be ignored and defaults will be used

Closes: #269

@Buktoria Buktoria force-pushed the vicky/catalog-commit-retries branch 3 times, most recently from edd1aad to af3f54b Compare January 31, 2024 16:14
@Buktoria Buktoria marked this pull request as ready for review January 31, 2024 16:25
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved
pyiceberg/table/__init__.py Outdated Show resolved Hide resolved

def get_config(self, config: str, default: int) -> int:
"""Get config out of the properties."""
return self.to_int(self.table_properties.get(config, ""), default)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the key doesn't exists we try to convert the empty string. How about throwing in some walrus :=:

Suggested change
return self.to_int(self.table_properties.get(config, ""), default)
return self.to_int(value) if (value := self.table_properties.get(config)) else default

@@ -994,6 +1065,7 @@ def refs(self) -> Dict[str, SnapshotRef]:
"""Return the snapshot references in the table."""
return self.metadata.refs

@table_commit_retry("properties")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on what we are trying to do here. There are two types of retries that we want to support:

  • Intermittent network issues, catalog temporarily not available, etc.
  • Retrying of commits because the table changed.

The first one should probably be done on the catalog level because we also need do differentiate between the different errors, and see if they are retriable.

For the second case, the one that you are solving here, we need some more logic around loading the latest version of the table. The retry is being done on the CommitFailedException which is thrown at a HTTP409 of the REST catalog. A 409 means a conflict and that the table has changed. At this point, we only support append and overwrite operations, which don't need any conflict detection. I believe that's Einstein's definition of insanity :)

Do we want to refresh the table metadata, and reapply the changes? I would expect like in

@pytest.mark.parametrize(
'catalog',
[
lazy_fixture('catalog_memory'),
lazy_fixture('catalog_sqlite'),
lazy_fixture('catalog_sqlite_without_rowcount'),
],
)
def test_concurrent_commit_table(catalog: SqlCatalog, table_schema_simple: Schema, random_identifier: Identifier) -> None:
database_name, _table_name = random_identifier
catalog.create_namespace(database_name)
table_a = catalog.create_table(random_identifier, table_schema_simple)
table_b = catalog.load_table(random_identifier)
with table_a.update_schema() as update:
update.add_column(path="b", field_type=IntegerType())
with pytest.raises(CommitFailedException, match="Requirement failed: current schema id has changed: expected 0, found 1"):
# This one should fail since it already has been updated
with table_b.update_schema() as update:
update.add_column(path="c", field_type=IntegerType())
to fail after this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My take on this is that if we try to implement retries in multiple places it will make it harder to prevent compounding retries, and thus result in a multiplying effect in the total number of retries, which is not a behaviour we want.

I would argue we should have just one place where we apply retries and it should keep the state of the number of attempts.

We could catch multiple errors here instead of just CommitFailedException to account for network errors.

In terms of accounting for table changes, that is a good point I had not originally considered. If the table has changed we would continue seeing the same error, in which case we need to perform a refresh. Refresh is an operation on the Table class, not the Catalog. So we need to have access to Table. With the current positioning of the retry decorator, we have access to the table instance so we can refresh the table.

What we can do is after every attempt we refresh the table. This is not optimal in the case where we failed due to network issues, since the table may have not changed. I would argue that the penalty that we are taking, in this case, would be minimal and is more than enough to pay for a more simplistic approach.

@Fokko Fokko added this to the PyIceberg 0.7.0 release milestone Feb 7, 2024
@Buktoria Buktoria force-pushed the vicky/catalog-commit-retries branch from af3f54b to 0696c76 Compare March 18, 2024 14:08
@Buktoria Buktoria force-pushed the vicky/catalog-commit-retries branch from 0696c76 to b3c468e Compare March 18, 2024 18:24
@Buktoria
Copy link
Contributor Author

Buktoria commented Mar 18, 2024

So I made a large fundamental change to the original design, where catalogs need to implement a function where they declare what exceptions are retryable. This becomes the bridge between the Table and Catalog. Since Table contains an instance of Catalog, our retry wrapper can grab this list of exceptions through the Table instance.

Retrying happens within the Table object and wraps the _do_commit function.

  • Since Table calls this function, we can grab a reference to the Table object which we can then use to load the table's properties and commit_retry_exceptions.
  • With this information we can build the Retry Controler
  • To support executing refresh before a new attempt but after sleeping, we grab the exception the attempt received, hold on to it, and then on the next attempt but before running _do_commit we check to see if the exception requires a refresh of the table.
    • I had to do this because Tenacity does not have an after_sleep parameter, even though its supports taking a before_sleep parameter.

Copy link
Collaborator

@sungwy sungwy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Buktoria - thank you for working on this PR. The TableCommitRetry looks well organized and I think this will be a great feature enhancement to PyIceberg!

I think it will be helpful to add integration tests that simulate different retry scenarios. I think retries in Iceberg can be complex, and this will help us understand if the current implementation does successfully handle retries for a typical Iceberg commit.

As a concrete example, when a concurrent snapshot update is made to the Iceberg table, the expectation is that the next retry will be based against the new metadata as well as the new snapshot-id. In the current implementation, it looks like the table updates and requirements remain unchanged, which may lead to the commit being retried multiple times, and just failing in the end.

We use the AssertRefSnapshotId requirement when we are producing a new snapshot, to ensure that the table's snapshot ID hasn't changed. If there's a concurrent snapshot update, this condition will fail multiple times, unless we update the stored Table Requirement as well.

class CustomException(Exception):
pass

class TestTableCommitRetiesCustomError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo:

Suggested change
class TestTableCommitRetiesCustomError:
class TestTableCommitRetriesCustomError:

@sungwy sungwy removed this from the PyIceberg 0.8.0 release milestone Sep 24, 2024
@kevinjqliu kevinjqliu added this to the PyIceberg 0.9.0 release milestone Oct 30, 2024
@kevinjqliu kevinjqliu removed this from the PyIceberg 0.9.0 release milestone Feb 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support commit retries
4 participants