Skip to content

Commit fa5239a

Browse files
author
Wenqi Mou
authored
Issue 72: Micro benchmark (#98)
Micro benchmark
1 parent 0cbfab0 commit fa5239a

File tree

14 files changed

+627
-131
lines changed

14 files changed

+627
-131
lines changed

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ authors = ["Tom Kaitchuck <[email protected]>", "Wenqi Mou <wenqi.mou@dell.
1515

1616
[workspace]
1717
members = [
18-
"controller-client", "shared", "wire_protocol", "retry", "integration_test", "connection_pool", "channel"
18+
"controller-client", "shared", "wire_protocol", "retry", "integration_test", "connection_pool", "channel",
1919
]
2020

2121
[dependencies]
@@ -49,6 +49,8 @@ async-stream = "0.2.1"
4949
pravega-rust-client-integration-test = { path = "./integration_test"}
5050
mockall = "0.7.0"
5151
ordered-float = { version= "1.0.2", features = ["serde"]}
52+
criterion = "0.3"
53+
byteorder = "1.3"
5254

5355
[[bin]]
5456
name = "server-cli"
@@ -60,5 +62,7 @@ required-features = ["cli"]
6062
default = ["cli"]
6163
cli = ["clap", "structopt"]
6264

63-
65+
[[bench]]
66+
name = "benchmark"
67+
harness = false
6468

benches/benchmark.rs

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
//
2+
// Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
11+
use criterion::{criterion_group, criterion_main, Criterion};
12+
13+
use byteorder::BigEndian;
14+
use log::info;
15+
use pravega_client_rust::client_factory::ClientFactory;
16+
use pravega_client_rust::error::EventStreamWriterError;
17+
use pravega_client_rust::event_stream_writer::EventStreamWriter;
18+
use pravega_controller_client::ControllerClient;
19+
use pravega_rust_client_shared::*;
20+
use pravega_wire_protocol::client_config::{ClientConfig, ClientConfigBuilder};
21+
use pravega_wire_protocol::client_connection::{LENGTH_FIELD_LENGTH, LENGTH_FIELD_OFFSET};
22+
use pravega_wire_protocol::commands::{AppendSetupCommand, DataAppendedCommand};
23+
use pravega_wire_protocol::connection_factory::ConnectionType;
24+
use pravega_wire_protocol::wire_commands::{Decode, Encode, Replies, Requests};
25+
use std::io::Cursor;
26+
use std::net::SocketAddr;
27+
use tokio::io::AsyncReadExt;
28+
use tokio::io::AsyncWriteExt;
29+
use tokio::net::TcpListener;
30+
31+
static EVENT_NUM: usize = 1000;
32+
static EVENT_SIZE: usize = 100;
33+
34+
struct MockServer {
35+
address: SocketAddr,
36+
listener: TcpListener,
37+
}
38+
39+
impl MockServer {
40+
pub async fn new() -> Self {
41+
let listener = TcpListener::bind("127.0.0.1:0").await.expect("local server");
42+
let address = listener.local_addr().unwrap();
43+
MockServer { address, listener }
44+
}
45+
46+
pub async fn run(mut self) {
47+
let (mut stream, _addr) = self.listener.accept().await.expect("get incoming stream");
48+
loop {
49+
let mut header: Vec<u8> = vec![0; LENGTH_FIELD_OFFSET as usize + LENGTH_FIELD_LENGTH as usize];
50+
stream
51+
.read_exact(&mut header[..])
52+
.await
53+
.expect("read header from incoming stream");
54+
let mut rdr = Cursor::new(&header[4..8]);
55+
let payload_length =
56+
byteorder::ReadBytesExt::read_u32::<BigEndian>(&mut rdr).expect("exact size");
57+
let mut payload: Vec<u8> = vec![0; payload_length as usize];
58+
stream
59+
.read_exact(&mut payload[..])
60+
.await
61+
.expect("read payload from incoming stream");
62+
let concatenated = [&header[..], &payload[..]].concat();
63+
let request: Requests = Requests::read_from(&concatenated).expect("decode wirecommand");
64+
match request {
65+
Requests::Hello(cmd) => {
66+
let reply = Replies::Hello(cmd).write_fields().expect("encode reply");
67+
stream
68+
.write_all(&reply)
69+
.await
70+
.expect("write reply back to client");
71+
}
72+
Requests::SetupAppend(cmd) => {
73+
let reply = Replies::AppendSetup(AppendSetupCommand {
74+
request_id: cmd.request_id,
75+
segment: cmd.segment,
76+
writer_id: cmd.writer_id,
77+
last_event_number: -9223372036854775808, // when there is no previous event in this segment
78+
})
79+
.write_fields()
80+
.expect("encode reply");
81+
stream
82+
.write_all(&reply)
83+
.await
84+
.expect("write reply back to client");
85+
}
86+
Requests::AppendBlockEnd(cmd) => {
87+
let reply = Replies::DataAppended(DataAppendedCommand {
88+
writer_id: cmd.writer_id,
89+
event_number: cmd.last_event_number,
90+
previous_event_number: 0, //not used in event stream writer
91+
request_id: cmd.request_id,
92+
current_segment_write_offset: 0, //not used in event stream writer
93+
})
94+
.write_fields()
95+
.expect("encode reply");
96+
stream
97+
.write_all(&reply)
98+
.await
99+
.expect("write reply back to client");
100+
}
101+
_ => {
102+
panic!("unsupported request {:?}", request);
103+
}
104+
}
105+
}
106+
}
107+
}
108+
109+
// This benchmark test uses a mock server that replies ok to any requests instantly. It involves
110+
// kernel latency.
111+
fn mock_server(c: &mut Criterion) {
112+
let mut rt = tokio::runtime::Runtime::new().unwrap();
113+
let mock_server = rt.block_on(MockServer::new());
114+
let config = ClientConfigBuilder::default()
115+
.controller_uri(mock_server.address)
116+
.mock(true)
117+
.build()
118+
.expect("creating config");
119+
let mut writer = rt.block_on(set_up(config));
120+
rt.spawn(async { MockServer::run(mock_server).await });
121+
122+
info!("start mock server performance testing");
123+
c.bench_function("mock server", |b| {
124+
b.iter(|| {
125+
rt.block_on(run(&mut writer));
126+
});
127+
});
128+
info!("mock server performance testing finished");
129+
}
130+
131+
// This benchmark test uses a mock server that replies ok to any requests instantly. It involves
132+
// kernel latency. It does not wait for reply.
133+
fn mock_server_no_block(c: &mut Criterion) {
134+
let mut rt = tokio::runtime::Runtime::new().unwrap();
135+
let mock_server = rt.block_on(MockServer::new());
136+
let config = ClientConfigBuilder::default()
137+
.controller_uri(mock_server.address)
138+
.mock(true)
139+
.build()
140+
.expect("creating config");
141+
let mut writer = rt.block_on(set_up(config));
142+
rt.spawn(async { MockServer::run(mock_server).await });
143+
144+
info!("start mock server(no block) performance testing");
145+
c.bench_function("mock server(no block)", |b| {
146+
b.iter(|| {
147+
rt.block_on(run_no_block(&mut writer));
148+
});
149+
});
150+
info!("mock server(no block) performance testing finished");
151+
}
152+
153+
// This benchmark test uses a mock connection that replies ok to any requests instantly. It does not
154+
// involve kernel latency.
155+
fn mock_connection(c: &mut Criterion) {
156+
let mut rt = tokio::runtime::Runtime::new().unwrap();
157+
let config = ClientConfigBuilder::default()
158+
.controller_uri("127.0.0.1:9090".parse::<SocketAddr>().unwrap())
159+
.mock(true)
160+
.connection_type(ConnectionType::Mock)
161+
.build()
162+
.expect("creating config");
163+
let mut writer = rt.block_on(set_up(config));
164+
165+
info!("start mock connection performance testing");
166+
c.bench_function("mock connection", |b| {
167+
b.iter(|| {
168+
rt.block_on(run(&mut writer));
169+
});
170+
});
171+
info!("mock server connection testing finished");
172+
}
173+
174+
// This benchmark test uses a mock connection that replies ok to any requests instantly. It does not
175+
// involve kernel latency. It does not wait for reply.
176+
fn mock_connection_no_block(c: &mut Criterion) {
177+
let mut rt = tokio::runtime::Runtime::new().unwrap();
178+
let config = ClientConfigBuilder::default()
179+
.controller_uri("127.0.0.1:9090".parse::<SocketAddr>().unwrap())
180+
.mock(true)
181+
.connection_type(ConnectionType::Mock)
182+
.build()
183+
.expect("creating config");
184+
let mut writer = rt.block_on(set_up(config));
185+
186+
info!("start mock connection(no block) performance testing");
187+
c.bench_function("mock connection(no block)", |b| {
188+
b.iter(|| {
189+
rt.block_on(run_no_block(&mut writer));
190+
});
191+
});
192+
info!("mock server connection(no block) testing finished");
193+
}
194+
195+
// helper functions
196+
async fn set_up(config: ClientConfig) -> EventStreamWriter {
197+
let scope_name = Scope::new("testWriterPerf".into());
198+
let stream_name = Stream::new("testWriterPerf".into());
199+
let client_factory = ClientFactory::new(config.clone());
200+
let controller_client = client_factory.get_controller_client();
201+
create_scope_stream(controller_client, &scope_name, &stream_name, 1).await;
202+
let scoped_stream = ScopedStream {
203+
scope: scope_name.clone(),
204+
stream: stream_name.clone(),
205+
};
206+
client_factory.create_event_stream_writer(scoped_stream, config.clone())
207+
}
208+
209+
async fn create_scope_stream(
210+
controller_client: &dyn ControllerClient,
211+
scope_name: &Scope,
212+
stream_name: &Stream,
213+
segment_number: i32,
214+
) {
215+
controller_client
216+
.create_scope(scope_name)
217+
.await
218+
.expect("create scope");
219+
info!("Scope created");
220+
let request = StreamConfiguration {
221+
scoped_stream: ScopedStream {
222+
scope: scope_name.clone(),
223+
stream: stream_name.clone(),
224+
},
225+
scaling: Scaling {
226+
scale_type: ScaleType::FixedNumSegments,
227+
target_rate: 0,
228+
scale_factor: 0,
229+
min_num_segments: segment_number,
230+
},
231+
retention: Retention {
232+
retention_type: RetentionType::None,
233+
retention_param: 0,
234+
},
235+
};
236+
controller_client
237+
.create_stream(&request)
238+
.await
239+
.expect("create stream");
240+
info!("Stream created");
241+
}
242+
243+
// run sends request to server and wait for the reply
244+
async fn run(writer: &mut EventStreamWriter) {
245+
let mut receivers = vec![];
246+
for _i in 0..EVENT_NUM {
247+
let rx = writer.write_event(vec![0; EVENT_SIZE]).await;
248+
receivers.push(rx);
249+
}
250+
assert_eq!(receivers.len(), EVENT_NUM);
251+
252+
for rx in receivers {
253+
let reply: Result<(), EventStreamWriterError> = rx.await.expect("wait for result from oneshot");
254+
assert_eq!(reply.is_ok(), true);
255+
}
256+
}
257+
258+
// run no block sends request to server and does not wait for the reply
259+
async fn run_no_block(writer: &mut EventStreamWriter) {
260+
let mut receivers = vec![];
261+
for _i in 0..EVENT_NUM {
262+
let rx = writer.write_event(vec![0; EVENT_SIZE]).await;
263+
receivers.push(rx);
264+
}
265+
assert_eq!(receivers.len(), EVENT_NUM);
266+
}
267+
268+
criterion_group!(
269+
performance,
270+
mock_server,
271+
mock_server_no_block,
272+
mock_connection,
273+
mock_connection_no_block
274+
);
275+
criterion_main!(performance);

controller-client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub mod controller {
6161
// this is the rs file name generated after compiling the proto file, located inside the target folder.
6262
}
6363

64-
mod mock_controller;
64+
pub mod mock_controller;
6565
mod model_helper;
6666
#[cfg(test)]
6767
mod test;

controller-client/src/mock_controller.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,45 @@ use super::ControllerClient;
1212
use super::ControllerError;
1313
use async_trait::async_trait;
1414
use ordered_float::OrderedFloat;
15+
use pravega_connection_pool::connection_pool::ConnectionPool;
1516
use pravega_rust_client_shared::*;
16-
use pravega_wire_protocol::client_connection::ClientConnection;
17+
use pravega_wire_protocol::client_connection::{ClientConnection, ClientConnectionImpl};
1718
use pravega_wire_protocol::commands::{CreateSegmentCommand, DeleteSegmentCommand, MergeSegmentsCommand};
19+
use pravega_wire_protocol::connection_factory::{
20+
ConnectionFactory, ConnectionType, SegmentConnectionManager,
21+
};
1822
use pravega_wire_protocol::error::ClientConnectionError;
1923
use pravega_wire_protocol::wire_commands::{Replies, Requests};
2024
use std::collections::HashSet;
2125
use std::collections::{BTreeMap, HashMap};
26+
use std::net::SocketAddr;
2227
use std::sync::atomic::{AtomicUsize, Ordering};
2328
use std::time::Duration;
24-
use tokio::sync::{Mutex, RwLock, RwLockReadGuard};
29+
use tokio::sync::{RwLock, RwLockReadGuard};
2530
use uuid::Uuid;
2631

2732
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(0);
2833

29-
struct MockController {
30-
endpoint: String,
31-
port: i32,
32-
connection: Mutex<Box<dyn ClientConnection>>, // a fake client connection to send wire command.
34+
pub struct MockController {
35+
endpoint: SocketAddr,
36+
pool: ConnectionPool<SegmentConnectionManager>,
3337
created_scopes: RwLock<HashMap<String, HashSet<ScopedStream>>>,
3438
created_streams: RwLock<HashMap<ScopedStream, StreamConfiguration>>,
3539
}
3640

41+
impl MockController {
42+
pub fn new(endpoint: SocketAddr) -> Self {
43+
let cf = ConnectionFactory::create(ConnectionType::Mock) as Box<dyn ConnectionFactory>;
44+
let manager = SegmentConnectionManager::new(cf, 10);
45+
let pool = ConnectionPool::new(manager);
46+
MockController {
47+
endpoint,
48+
pool,
49+
created_scopes: RwLock::new(HashMap::new()),
50+
created_streams: RwLock::new(HashMap::new()),
51+
}
52+
}
53+
}
3754
#[async_trait]
3855
impl ControllerClient for MockController {
3956
async fn create_scope(&self, scope: &Scope) -> Result<bool, ControllerError> {
@@ -251,8 +268,7 @@ impl ControllerClient for MockController {
251268
&self,
252269
_segment: &ScopedSegment,
253270
) -> Result<PravegaNodeUri, ControllerError> {
254-
let uri = self.endpoint.clone() + ":" + &self.port.to_string();
255-
Ok(PravegaNodeUri(uri))
271+
Ok(PravegaNodeUri(self.endpoint.to_string()))
256272
}
257273

258274
async fn get_or_refresh_delegation_token_for(
@@ -578,6 +594,14 @@ async fn send_request_over_connection(
578594
command: &Requests,
579595
controller: &MockController,
580596
) -> Result<Replies, ClientConnectionError> {
581-
controller.connection.lock().await.write(command).await?;
582-
controller.connection.lock().await.read().await
597+
let pooled_connection = controller
598+
.pool
599+
.get_connection(controller.endpoint)
600+
.await
601+
.expect("get connection from pool");
602+
let mut connection = ClientConnectionImpl {
603+
connection: pooled_connection,
604+
};
605+
connection.write(command).await?;
606+
connection.read().await
583607
}

0 commit comments

Comments
 (0)