Skip to content

Commit c420ff5

Browse files
committed
async-server: rework handler_request
rework handler_request. Signed-off-by: Tim Zhang <[email protected]>
1 parent df2ca2f commit c420ff5

File tree

2 files changed

+62
-49
lines changed

2 files changed

+62
-49
lines changed

src/asynchronous/server.rs

Lines changed: 37 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
// SPDX-License-Identifier: Apache-2.0
44
//
55

6+
use crate::r#async::utils;
67
use nix::unistd;
7-
use protobuf::{CodedInputStream, Message};
88
use std::collections::HashMap;
99
use std::os::unix::io::RawFd;
10+
use std::result::Result as StdResult;
1011
use std::sync::Arc;
1112

1213
use crate::asynchronous::stream::{receive, respond, respond_with_status};
@@ -15,7 +16,7 @@ use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
1516
use crate::context;
1617
use crate::error::{get_status, Error, Result};
1718
use crate::r#async::{MethodHandler, TtrpcContext};
18-
use crate::ttrpc::{Code, Request};
19+
use crate::ttrpc::{Code, Status};
1920
use crate::MessageHeader;
2021
use futures::stream::Stream;
2122
use futures::StreamExt as _;
@@ -303,63 +304,53 @@ async fn spawn_connection_handler<S>(
303304
});
304305
}
305306

307+
async fn do_handle_request(
308+
fd: RawFd,
309+
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
310+
header: MessageHeader,
311+
body: &[u8],
312+
) -> StdResult<(u32, Vec<u8>), Status> {
313+
let req = utils::body_to_request(body)?;
314+
let path = utils::get_path(&req.service, &req.method);
315+
let method = methods
316+
.get(&path)
317+
.ok_or_else(|| get_status(Code::INVALID_ARGUMENT, format!("{} does not exist", &path)))?;
318+
319+
let ctx = TtrpcContext {
320+
fd,
321+
mh: header,
322+
metadata: context::from_pb(&req.metadata),
323+
};
324+
325+
method.handler(ctx, req).await.map_err(|e| {
326+
error!("method handle {} got error {:?}", path, &e);
327+
get_status(Code::UNKNOWN, e)
328+
})
329+
}
330+
306331
async fn handle_request(
307332
tx: Sender<Vec<u8>>,
308333
fd: RawFd,
309334
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
310335
message: (MessageHeader, Vec<u8>),
311336
) {
312337
let (header, body) = message;
313-
if header.type_ != MESSAGE_TYPE_REQUEST {
314-
return;
315-
}
316-
317-
let mut req = Request::new();
318-
let merge_result;
319-
{
320-
let mut s = CodedInputStream::from_bytes(&body);
321-
merge_result = req.merge_from(&mut s);
322-
}
323-
324-
if merge_result.is_err() {
325-
let status = get_status(Code::INVALID_ARGUMENT, "".to_string());
326-
327-
if let Err(x) = respond_with_status(tx.clone(), header.stream_id, status).await {
328-
error!("respond get error {:?}", x);
329-
}
338+
let stream_id = header.stream_id;
330339

340+
if header.type_ != MESSAGE_TYPE_REQUEST {
331341
return;
332342
}
333-
trace!("Got Message request {:?}", req);
334343

335-
let stream_id = header.stream_id;
336-
let path = format!("/{}/{}", req.service, req.method);
337-
if let Some(x) = methods.get(&path) {
338-
let method = x;
339-
let ctx = TtrpcContext {
340-
fd,
341-
mh: header,
342-
metadata: context::from_pb(&req.metadata),
343-
};
344-
345-
match method.handler(ctx, req).await {
346-
Ok((stream_id, body)) => {
347-
if let Err(x) = respond(tx.clone(), stream_id, body).await {
348-
error!("respond get error {:?}", x);
349-
}
350-
}
351-
Err(e) => {
352-
error!("method handle {} get error {:?}", path, e);
353-
let status = get_status(Code::UNKNOWN, e);
354-
if let Err(e) = respond_with_status(tx, stream_id, status).await {
355-
error!("respond get error {:?}", e);
356-
}
344+
match do_handle_request(fd, methods, header, &body).await {
345+
Ok((stream_id, resp_body)) => {
346+
if let Err(x) = respond(tx.clone(), stream_id, resp_body).await {
347+
error!("respond got error {:?}", x);
357348
}
358349
}
359-
} else {
360-
let status = get_status(Code::INVALID_ARGUMENT, format!("{} does not exist", path));
361-
if let Err(e) = respond_with_status(tx, header.stream_id, status).await {
362-
error!("respond get error {:?}", e);
350+
Err(status) => {
351+
if let Err(x) = respond_with_status(tx.clone(), stream_id, status).await {
352+
error!("respond got error {:?}", x);
353+
}
363354
}
364355
}
365356
}

src/asynchronous/utils.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
//
55

66
use crate::common::{MessageHeader, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE};
7-
use crate::error::{Error, Result};
8-
use crate::ttrpc::{Request, Response};
7+
use crate::error::{get_status, Error, Result};
8+
use crate::ttrpc::{Code, Request, Response, Status};
99
use async_trait::async_trait;
10-
use protobuf::Message;
10+
use protobuf::{CodedInputStream, Message};
1111
use std::collections::HashMap;
1212
use std::os::unix::io::{FromRawFd, RawFd};
13+
use std::result::Result as StdResult;
1314
use tokio::net::UnixStream;
1415

1516
/// Handle request in async mode.
@@ -130,3 +131,24 @@ pub fn new_unix_stream_from_raw_fd(fd: RawFd) -> UnixStream {
130131
std_stream.set_nonblocking(true).unwrap();
131132
UnixStream::from_std(std_stream).unwrap()
132133
}
134+
135+
pub fn body_to_request(body: &[u8]) -> StdResult<Request, Status> {
136+
let mut req = Request::new();
137+
let merge_result;
138+
{
139+
let mut s = CodedInputStream::from_bytes(body);
140+
merge_result = req.merge_from(&mut s);
141+
}
142+
143+
if merge_result.is_err() {
144+
return Err(get_status(Code::INVALID_ARGUMENT, "".to_string()));
145+
}
146+
147+
trace!("Got Message request {:?}", req);
148+
149+
Ok(req)
150+
}
151+
152+
pub fn get_path(service: &str, method: &str) -> String {
153+
format!("/{}/{}", service, method)
154+
}

0 commit comments

Comments
 (0)