Skip to content

Commit 4fb6a34

Browse files
authored
bootstrap benchmarks (#122)
1 parent 14435dc commit 4fb6a34

File tree

6 files changed

+225
-3
lines changed

6 files changed

+225
-3
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,8 @@
22
resolver = "2"
33
members = [
44
"aws-sdk-s3-transfer-manager",
5-
"s3-mock-server"
5+
6+
# internal
7+
"s3-mock-server",
8+
"benches"
69
]

benches/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "benches"
3+
version = "0.0.0"
4+
edition = "2021"
5+
authors = ["AWS Rust SDK Team <[email protected]>"]
6+
license = "Apache-2.0"
7+
publish = false
8+
description = "Internal benchmarks for aws-sdk-s3-transfer-manager"
9+
10+
[[bench]]
11+
name = "throughput"
12+
path = "throughput.rs"
13+
harness = false
14+
15+
[dependencies]
16+
criterion = { version = "0.7", features = ["html_reports"] }
17+
tokio = { version = "1", features = ["full"] }
18+
aws-sdk-s3 = "1"
19+
aws-config = "1"
20+
s3-mock-server = { path = "../s3-mock-server" }
21+
aws-sdk-s3-transfer-manager = { path = "../aws-sdk-s3-transfer-manager" }
22+
bytes = "1"
23+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

benches/throughput.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
use aws_sdk_s3_transfer_manager::error::Error;
6+
use aws_sdk_s3_transfer_manager::operation::download::{Body, DownloadHandle};
7+
use bytes::Bytes;
8+
use criterion::{criterion_group, BenchmarkId, Criterion, Throughput};
9+
use s3_mock_server::S3MockServer;
10+
use std::hint::black_box;
11+
use tokio::fs;
12+
use tokio::io::AsyncWriteExt;
13+
14+
/// drain/consume the body
15+
pub async fn drain(handle: &mut DownloadHandle) -> Result<(), Error> {
16+
let body = handle.body_mut();
17+
while let Some(chunk) = body.next().await {
18+
match chunk {
19+
Ok(_chunk) => {}
20+
Err(err) => return Err(err),
21+
}
22+
}
23+
24+
Ok(())
25+
}
26+
27+
/// write body to file
28+
pub async fn write_body(
29+
body: &mut Body,
30+
mut dest: fs::File,
31+
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
32+
while let Some(chunk) = body.next().await {
33+
let chunk = chunk?.data;
34+
for segment in chunk.into_segments() {
35+
dest.write_all(segment.as_ref()).await?;
36+
}
37+
}
38+
Ok(())
39+
}
40+
41+
/// Setup shared test infrastructure
42+
async fn setup_test(
43+
size: usize,
44+
) -> (
45+
S3MockServer,
46+
s3_mock_server::ServerHandle,
47+
aws_sdk_s3_transfer_manager::Client,
48+
) {
49+
let mock_server = S3MockServer::builder()
50+
.with_in_memory_store()
51+
.build()
52+
.unwrap();
53+
54+
let data = Bytes::from(vec![0u8; size]);
55+
mock_server
56+
.add_object("test-key", data, None)
57+
.await
58+
.unwrap();
59+
60+
let handle = mock_server.start().await.unwrap();
61+
let s3_client = handle.client().await;
62+
63+
let tm_config = aws_sdk_s3_transfer_manager::Config::builder()
64+
.client(s3_client)
65+
.build();
66+
let tm = aws_sdk_s3_transfer_manager::Client::new(tm_config);
67+
68+
(mock_server, handle, tm)
69+
}
70+
71+
fn download_throughput_benchmark(c: &mut Criterion) {
72+
let rt = tokio::runtime::Runtime::new().unwrap();
73+
74+
let mut group = c.benchmark_group("download_throughput");
75+
group.sample_size(10);
76+
77+
// Test sizes relevant for multipart downloads (5MB minimum part size)
78+
let sizes = vec![
79+
("5GB", 5 * 1024 * 1024 * 1024),
80+
("10GB", 10 * 1024 * 1024 * 1024),
81+
];
82+
83+
for (name, size) in sizes {
84+
group.throughput(Throughput::Bytes(size as u64));
85+
86+
// Benchmark Transfer Manager - drain to memory
87+
group.bench_with_input(
88+
BenchmarkId::new("transfer_manager_ram", name),
89+
&size,
90+
|b, &size| {
91+
b.iter_custom(|iters| {
92+
rt.block_on(async {
93+
let (_mock_server, handle, tm) = setup_test(size).await;
94+
95+
let start = std::time::Instant::now();
96+
for _ in 0..iters {
97+
let mut dl_handle = tm
98+
.download()
99+
.bucket("test-bucket")
100+
.key("test-key")
101+
.initiate()
102+
.expect("successful transfer initiate");
103+
black_box(drain(&mut dl_handle).await.unwrap());
104+
}
105+
let elapsed = start.elapsed();
106+
107+
handle.shutdown().await.unwrap();
108+
elapsed
109+
})
110+
});
111+
},
112+
);
113+
114+
// Benchmark Transfer Manager - write to file
115+
group.bench_with_input(
116+
BenchmarkId::new("transfer_manager_tmpfs", name),
117+
&size,
118+
|b, &size| {
119+
b.iter_custom(|iters| {
120+
rt.block_on(async {
121+
let (_mock_server, handle, tm) = setup_test(size).await;
122+
123+
let mut tmp_files = Vec::new();
124+
let start = std::time::Instant::now();
125+
for i in 0..iters {
126+
let mut dl_handle = tm
127+
.download()
128+
.bucket("test-bucket")
129+
.key("test-key")
130+
.initiate()
131+
.expect("successful transfer initiate");
132+
133+
let temp_file = format!("/tmp/benchmark_output_{}.dat", i);
134+
tmp_files.push(temp_file.clone());
135+
let file = fs::File::create(&temp_file).await.unwrap();
136+
write_body(dl_handle.body_mut(), file).await.unwrap();
137+
}
138+
let elapsed = start.elapsed();
139+
140+
// cleanup
141+
for temp_file in tmp_files {
142+
let _ = fs::remove_file(&temp_file).await;
143+
}
144+
handle.shutdown().await.unwrap();
145+
elapsed
146+
})
147+
});
148+
},
149+
);
150+
}
151+
152+
group.finish();
153+
}
154+
155+
criterion_group!(benches, download_throughput_benchmark);
156+
157+
fn main() {
158+
tracing_subscriber::fmt::init();
159+
benches();
160+
161+
Criterion::default().configure_from_args().final_summary();
162+
}

s3-mock-server/src/s3s.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use aws_smithy_checksums::ChecksumAlgorithm;
1414
use base64::Engine;
1515
use bytes::BytesMut;
1616
use futures_util::StreamExt;
17-
use s3s::dto::StreamingBlob;
1817
use s3s::dto::Timestamp;
18+
use s3s::dto::{HeadBucketInput, HeadBucketOutput, StreamingBlob};
1919
use s3s::{S3Request, S3Response, S3Result};
2020
use std::str::FromStr;
2121

@@ -529,6 +529,13 @@ impl<S: StorageBackend + 'static> s3s::S3 for Inner<S> {
529529

530530
Ok(S3Response::new(output))
531531
}
532+
533+
async fn head_bucket(
534+
&self,
535+
_req: S3Request<HeadBucketInput>,
536+
) -> S3Result<S3Response<HeadBucketOutput>> {
537+
Ok(S3Response::new(HeadBucketOutput::default()))
538+
}
532539
}
533540

534541
fn validate_checksum_and_type(

s3-mock-server/src/server.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,29 @@ impl S3MockServer {
169169
S3MockServerBuilder::new()
170170
}
171171

172+
/// Add an object to the mock server storage.
173+
pub async fn add_object(
174+
&self,
175+
key: &str,
176+
content: impl Into<bytes::Bytes>,
177+
metadata: Option<std::collections::HashMap<String, String>>,
178+
) -> Result<()> {
179+
use crate::storage::StoreObjectRequest;
180+
use crate::types::ObjectIntegrityChecks;
181+
use futures::stream;
182+
183+
let bytes = content.into();
184+
let stream = stream::once(async move { Ok(bytes) });
185+
let boxed_stream = Box::pin(stream);
186+
187+
let request =
188+
StoreObjectRequest::new(key.to_string(), boxed_stream, ObjectIntegrityChecks::new())
189+
.with_user_metadata(metadata.unwrap_or_default());
190+
191+
self.storage.put_object(request).await?;
192+
Ok(())
193+
}
194+
172195
/// Start the server.
173196
pub async fn start(&self) -> Result<ServerHandle> {
174197
// Create the address to bind to

s3-mock-server/src/storage.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ pub(crate) struct CompleteMultipartUploadResponse {
109109
}
110110

111111
impl StoreObjectRequest {
112-
#[cfg(test)]
113112
pub(crate) fn new(
114113
key: impl Into<String>,
115114
body: Pin<Box<dyn Stream<Item = std::result::Result<Bytes, std::io::Error>> + Send>>,
@@ -123,6 +122,11 @@ impl StoreObjectRequest {
123122
user_metadata: HashMap::new(),
124123
}
125124
}
125+
126+
pub(crate) fn with_user_metadata(mut self, metadata: HashMap<String, String>) -> Self {
127+
self.user_metadata = metadata;
128+
self
129+
}
126130
}
127131

128132
impl From<s3s::dto::PutObjectInput> for StoreObjectRequest {

0 commit comments

Comments
 (0)