Skip to content

Commit d2e1e16

Browse files
authored
retry download part failures (#49)
1 parent f57573c commit d2e1e16

File tree

6 files changed

+325
-6
lines changed

6 files changed

+325
-6
lines changed

aws-s3-transfer-manager/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ aws-smithy-runtime-api = "1.7.1"
1717
aws-smithy-types = "1.2.0"
1818
aws-types = "1.3.3"
1919
bytes = "1"
20+
futures-util = "0.3.30"
2021
# FIXME - upgrade to hyper 1.x
2122
hyper = { version = "0.14.29", features = ["client"] }
2223
path-clean = "1.0.1"
24+
pin-project-lite = "0.2.14"
2325
tokio = { version = "1.38.0", features = ["rt-multi-thread", "io-util", "sync", "fs", "macros"] }
24-
tower = { version = "0.5.0", features = ["limit", "util"] }
26+
tower = { version = "0.5.0", features = ["limit", "retry", "util"] }
2527
tracing = "0.1"
2628

2729
[dev-dependencies]
@@ -31,10 +33,13 @@ aws-smithy-runtime = { version = "1.7.1", features = ["client", "connector-hyper
3133
clap = { version = "4.5.7", default-features = false, features = ["derive", "std", "help"] }
3234
console-subscriber = "0.3.0"
3335
http-02x = { package = "http", version = "0.2.9" }
36+
http-body-1x = { package = "http-body", version = "1" }
3437
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
3538
tempfile = "3.10.1"
3639
fastrand = "2.1.0"
3740
walkdir = "2"
41+
tower-test = "0.4.0"
42+
tokio-test = "0.4.4"
3843

3944
[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
4045
jemallocator = "0.5.4"

aws-s3-transfer-manager/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,8 @@ pub mod operation;
4343
/// Transfer manager configuration
4444
pub mod config;
4545

46+
/// Tower related middleware and components
47+
pub(crate) mod middleware;
48+
4649
pub use self::client::Client;
4750
pub use self::config::Config;
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
pub(crate) mod retry;
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use aws_smithy_types::byte_stream::error::Error as ByteStreamError;
7+
use futures_util::future;
8+
use std::sync::Arc;
9+
use tower::retry::budget::{Budget, TpsBudget};
10+
11+
/// A `tower::retry::Policy` implementation for retrying requests
12+
#[derive(Debug, Clone)]
13+
pub(crate) struct RetryPolicy {
14+
budget: Arc<TpsBudget>,
15+
remaining_attempts: usize,
16+
}
17+
18+
impl Default for RetryPolicy {
19+
fn default() -> Self {
20+
Self {
21+
budget: Arc::new(TpsBudget::default()),
22+
remaining_attempts: 2,
23+
}
24+
}
25+
}
26+
27+
fn find_source<'a, E: std::error::Error + 'static>(
28+
err: &'a (dyn std::error::Error + 'static),
29+
) -> Option<&'a E> {
30+
let mut next = Some(err);
31+
while let Some(err) = next {
32+
if let Some(matching_err) = err.downcast_ref::<E>() {
33+
return Some(matching_err);
34+
}
35+
next = err.source();
36+
}
37+
None
38+
}
39+
40+
impl<Req, Res, E> tower::retry::Policy<Req, Res, E> for RetryPolicy
41+
where
42+
Req: Clone,
43+
E: std::error::Error + 'static,
44+
{
45+
type Future = future::Ready<()>;
46+
47+
fn retry(&mut self, _req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future> {
48+
match result {
49+
Ok(_) => {
50+
self.budget.deposit();
51+
None
52+
}
53+
Err(err) => {
54+
// the only type of error we care about at this point is errors that come from
55+
// reading the body, all other errors go through the SDK retry implementation
56+
// already
57+
find_source::<ByteStreamError>(err)?;
58+
if self.remaining_attempts == 0 || !self.budget.withdraw() {
59+
return None;
60+
}
61+
self.remaining_attempts -= 1;
62+
Some(future::ready(()))
63+
}
64+
}
65+
}
66+
67+
fn clone_request(&mut self, req: &Req) -> Option<Req> {
68+
Some(req.clone())
69+
}
70+
}

aws-s3-transfer-manager/src/operation/download/service.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55
use crate::error;
6+
use crate::middleware::retry;
67
use crate::operation::download::header;
78
use crate::operation::download::DownloadContext;
89
use aws_smithy_types::body::SdkBody;
@@ -61,6 +62,7 @@ pub(super) fn chunk_service(
6162

6263
ServiceBuilder::new()
6364
.concurrency_limit(ctx.handle.num_workers())
65+
.retry(retry::RetryPolicy::default())
6466
.service(svc)
6567
}
6668

0 commit comments

Comments
 (0)