Skip to content

Commit 60133a9

Browse files
authored
Normailze UploadContext with the other OperationContext types (#123)
1 parent 4fb6a34 commit 60133a9

File tree

4 files changed

+47
-33
lines changed

4 files changed

+47
-33
lines changed

aws-sdk-s3-transfer-manager/src/operation/upload.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,13 @@ async fn put_object(
120120
.scheduler
121121
.acquire_permit(PermitType::Network(NetworkPermitContext {
122122
payload_size_estimate: content_length as u64,
123-
bucket_type: ctx.bucket_type(),
123+
bucket_type: ctx.state.bucket_type(),
124124
direction: TransferDirection::Upload,
125125
}))
126126
.await?;
127127

128128
let req = copy_fields_to_put_object_request(
129-
&ctx.request,
129+
&ctx.state.request,
130130
ctx.client()
131131
.put_object()
132132
.body(body)
@@ -139,8 +139,8 @@ async fn put_object(
139139
.send()
140140
.instrument(tracing::info_span!(
141141
"send-upload-part",
142-
bucket = ctx.request.bucket().unwrap_or_default(),
143-
key = ctx.request.key().unwrap_or_default()
142+
bucket = ctx.state.request.bucket().unwrap_or_default(),
143+
key = ctx.state.request.key().unwrap_or_default()
144144
))
145145
.await?;
146146
Ok(UploadOutputBuilder::from(resp).build()?)
@@ -189,16 +189,16 @@ async fn try_start_mpu_upload(
189189
}
190190

191191
fn new_context(handle: Arc<crate::client::Handle>, req: UploadInput) -> UploadContext {
192-
UploadContext {
192+
UploadContext::new(
193193
handle,
194-
bucket_type: BucketType::from_bucket_name(req.bucket().expect("bucket is availabe")),
195-
request: Arc::new(req),
196-
}
194+
BucketType::from_bucket_name(req.bucket().expect("bucket is availabe")),
195+
req,
196+
)
197197
}
198198

199199
/// start a new multipart upload by invoking `CreateMultipartUpload`
200200
async fn start_mpu(ctx: &UploadContext) -> Result<UploadOutputBuilder, crate::error::Error> {
201-
let req = ctx.request();
201+
let req = ctx.state.request();
202202
let client = ctx.client();
203203

204204
let req = copy_fields_to_mpu_request(req, client.create_multipart_upload());

aws-sdk-s3-transfer-manager/src/operation/upload/context.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,38 @@
44
*/
55

66
use crate::operation::upload::UploadInput;
7+
use crate::operation::TransferContext;
78
use crate::types::BucketType;
89
use std::ops::Deref;
910
use std::sync::Arc;
1011

12+
pub(crate) type UploadContext = TransferContext<UploadState>;
13+
14+
impl UploadContext {
15+
pub(crate) fn new(
16+
handle: Arc<crate::client::Handle>,
17+
bucket_type: BucketType,
18+
req: UploadInput,
19+
) -> Self {
20+
let state = Arc::new(UploadState {
21+
request: Arc::new(req),
22+
bucket_type,
23+
});
24+
TransferContext { handle, state }
25+
}
26+
}
27+
1128
/// Internal context used to drive a single Upload operation
1229
#[derive(Debug, Clone)]
13-
pub(crate) struct UploadContext {
14-
/// reference to client handle used to do actual work
15-
pub(crate) handle: Arc<crate::client::Handle>,
30+
pub(crate) struct UploadState {
1631
/// the original request (NOTE: the body will have been taken for processing, only the other fields remain)
1732
pub(crate) request: Arc<UploadInput>,
1833

1934
/// Type of S3 bucket targeted by this operation
2035
pub(crate) bucket_type: BucketType,
2136
}
2237

23-
impl UploadContext {
24-
/// The S3 client to use for SDK operations
25-
pub(crate) fn client(&self) -> &aws_sdk_s3::Client {
26-
self.handle.config.client()
27-
}
28-
38+
impl UploadState {
2939
/// The original request (sans the body as it will have been taken for processing)
3040
pub(crate) fn request(&self) -> &UploadInput {
3141
self.request.deref()

aws-sdk-s3-transfer-manager/src/operation/upload/handle.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ async fn abort_multipart_upload(
126126
while (tasks.join_next().await).is_some() {}
127127

128128
let abort_policy = ctx
129+
.state
129130
.request
130131
.failed_multipart_upload_policy
131132
.clone()
@@ -134,7 +135,7 @@ async fn abort_multipart_upload(
134135
FailedMultipartUploadPolicy::Retain => Ok(AbortedUpload::default()),
135136
FailedMultipartUploadPolicy::AbortUpload => {
136137
let abort_mpu_resp = copy_fields_to_abort_mpu_request(
137-
&ctx.request,
138+
&ctx.state.request,
138139
ctx.client()
139140
.abort_multipart_upload()
140141
.set_upload_id(Some(mpu_data.upload_id.clone())),
@@ -210,7 +211,7 @@ async fn complete_upload(handle: UploadHandle) -> Result<UploadOutput, crate::er
210211

211212
// complete the multipart upload
212213
let req = copy_fields_to_complete_mpu_request(
213-
&handle.ctx.request,
214+
&handle.ctx.state.request,
214215
handle
215216
.ctx
216217
.client()

aws-sdk-s3-transfer-manager/src/operation/upload/service.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl ProvideNetworkPermitContext for UploadPartRequest {
4141
fn network_permit_context(&self) -> NetworkPermitContext {
4242
NetworkPermitContext {
4343
payload_size_estimate: self.part_data.data.len() as u64,
44-
bucket_type: self.ctx.bucket_type(),
44+
bucket_type: self.ctx.state.bucket_type(),
4545
direction: TransferDirection::Upload,
4646
}
4747
}
@@ -52,7 +52,7 @@ pub(crate) struct UploadHedgePolicy;
5252

5353
impl Policy<UploadPartRequest> for UploadHedgePolicy {
5454
fn clone_request(&self, req: &UploadPartRequest) -> Option<UploadPartRequest> {
55-
if req.ctx.bucket_type() == BucketType::Standard {
55+
if req.ctx.state.bucket_type() == BucketType::Standard {
5656
Some(req.clone())
5757
} else {
5858
None
@@ -71,7 +71,7 @@ async fn upload_part_handler(request: UploadPartRequest) -> Result<CompletedPart
7171
let part_number = part_data.part_number as i32;
7272

7373
let req = copy_fields_to_upload_part_request(
74-
&ctx.request,
74+
&ctx.state.request,
7575
ctx.client()
7676
.upload_part()
7777
.set_upload_id(Some(request.upload_id))
@@ -139,8 +139,8 @@ pub(super) fn distribute_work(
139139
// group all spawned tasks together
140140
let parent_span_for_all_tasks = tracing::debug_span!(
141141
parent: None, "upload-tasks", // TODO: for upload_objects, parent should be upload-objects-tasks
142-
bucket = ctx.request.bucket().unwrap_or_default(),
143-
key = ctx.request.key().unwrap_or_default(),
142+
bucket = ctx.state.request.bucket().unwrap_or_default(),
143+
key = ctx.state.request.key().unwrap_or_default(),
144144
);
145145
parent_span_for_all_tasks.follows_from(tracing::Span::current());
146146

@@ -225,6 +225,7 @@ pub(super) async fn read_body(
225225
mod tests {
226226
use super::*;
227227
use crate::client::Handle;
228+
use crate::operation::upload::context::UploadState;
228229
use crate::operation::upload::UploadInput;
229230
use crate::runtime::scheduler::Scheduler;
230231
use crate::types::ConcurrencyMode;
@@ -240,14 +241,16 @@ mod tests {
240241
config: Config::builder().client(s3_client).build(),
241242
scheduler: Scheduler::new(ConcurrencyMode::Explicit(1)),
242243
}),
243-
request: Arc::new(
244-
UploadInput::builder()
245-
.bucket(bucket_name)
246-
.key("test-key")
247-
.build()
248-
.unwrap(),
249-
),
250-
bucket_type: BucketType::from_bucket_name(bucket_name),
244+
state: Arc::new(UploadState {
245+
request: Arc::new(
246+
UploadInput::builder()
247+
.bucket(bucket_name)
248+
.key("test-key")
249+
.build()
250+
.unwrap(),
251+
),
252+
bucket_type: BucketType::from_bucket_name(bucket_name),
253+
}),
251254
},
252255
part_data: PartData::new(1, Bytes::default()),
253256
upload_id: "test-id".to_string(),

0 commit comments

Comments
 (0)