3
3
// SPDX-License-Identifier: Apache-2.0
4
4
//
5
5
6
+ use crate :: r#async:: utils;
6
7
use nix:: unistd;
7
- use protobuf:: { CodedInputStream , Message } ;
8
8
use std:: collections:: HashMap ;
9
9
use std:: os:: unix:: io:: RawFd ;
10
+ use std:: result:: Result as StdResult ;
10
11
use std:: sync:: Arc ;
11
12
12
13
use crate :: asynchronous:: stream:: { receive, respond, respond_with_status} ;
@@ -15,7 +16,7 @@ use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
15
16
use crate :: context;
16
17
use crate :: error:: { get_status, Error , Result } ;
17
18
use crate :: r#async:: { MethodHandler , TtrpcContext } ;
18
- use crate :: ttrpc:: { Code , Request } ;
19
+ use crate :: ttrpc:: { Code , Status } ;
19
20
use crate :: MessageHeader ;
20
21
use futures:: stream:: Stream ;
21
22
use futures:: StreamExt as _;
@@ -303,63 +304,53 @@ async fn spawn_connection_handler<S>(
303
304
} ) ;
304
305
}
305
306
307
+ async fn do_handle_request (
308
+ fd : RawFd ,
309
+ methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
310
+ header : MessageHeader ,
311
+ body : & [ u8 ] ,
312
+ ) -> StdResult < ( u32 , Vec < u8 > ) , Status > {
313
+ let req = utils:: body_to_request ( body) ?;
314
+ let path = utils:: get_path ( & req. service , & req. method ) ;
315
+ let method = methods
316
+ . get ( & path)
317
+ . ok_or_else ( || get_status ( Code :: INVALID_ARGUMENT , format ! ( "{} does not exist" , & path) ) ) ?;
318
+
319
+ let ctx = TtrpcContext {
320
+ fd,
321
+ mh : header,
322
+ metadata : context:: from_pb ( & req. metadata ) ,
323
+ } ;
324
+
325
+ method. handler ( ctx, req) . await . map_err ( |e| {
326
+ error ! ( "method handle {} got error {:?}" , path, & e) ;
327
+ get_status ( Code :: UNKNOWN , e)
328
+ } )
329
+ }
330
+
306
331
async fn handle_request (
307
332
tx : Sender < Vec < u8 > > ,
308
333
fd : RawFd ,
309
334
methods : Arc < HashMap < String , Box < dyn MethodHandler + Send + Sync > > > ,
310
335
message : ( MessageHeader , Vec < u8 > ) ,
311
336
) {
312
337
let ( header, body) = message;
313
- if header. type_ != MESSAGE_TYPE_REQUEST {
314
- return ;
315
- }
316
-
317
- let mut req = Request :: new ( ) ;
318
- let merge_result;
319
- {
320
- let mut s = CodedInputStream :: from_bytes ( & body) ;
321
- merge_result = req. merge_from ( & mut s) ;
322
- }
323
-
324
- if merge_result. is_err ( ) {
325
- let status = get_status ( Code :: INVALID_ARGUMENT , "" . to_string ( ) ) ;
326
-
327
- if let Err ( x) = respond_with_status ( tx. clone ( ) , header. stream_id , status) . await {
328
- error ! ( "respond get error {:?}" , x) ;
329
- }
338
+ let stream_id = header. stream_id ;
330
339
340
+ if header. type_ != MESSAGE_TYPE_REQUEST {
331
341
return ;
332
342
}
333
- trace ! ( "Got Message request {:?}" , req) ;
334
343
335
- let stream_id = header. stream_id ;
336
- let path = format ! ( "/{}/{}" , req. service, req. method) ;
337
- if let Some ( x) = methods. get ( & path) {
338
- let method = x;
339
- let ctx = TtrpcContext {
340
- fd,
341
- mh : header,
342
- metadata : context:: from_pb ( & req. metadata ) ,
343
- } ;
344
-
345
- match method. handler ( ctx, req) . await {
346
- Ok ( ( stream_id, body) ) => {
347
- if let Err ( x) = respond ( tx. clone ( ) , stream_id, body) . await {
348
- error ! ( "respond get error {:?}" , x) ;
349
- }
350
- }
351
- Err ( e) => {
352
- error ! ( "method handle {} get error {:?}" , path, e) ;
353
- let status = get_status ( Code :: UNKNOWN , e) ;
354
- if let Err ( e) = respond_with_status ( tx, stream_id, status) . await {
355
- error ! ( "respond get error {:?}" , e) ;
356
- }
344
+ match do_handle_request ( fd, methods, header, & body) . await {
345
+ Ok ( ( stream_id, resp_body) ) => {
346
+ if let Err ( x) = respond ( tx. clone ( ) , stream_id, resp_body) . await {
347
+ error ! ( "respond got error {:?}" , x) ;
357
348
}
358
349
}
359
- } else {
360
- let status = get_status ( Code :: INVALID_ARGUMENT , format ! ( "{} does not exist" , path ) ) ;
361
- if let Err ( e ) = respond_with_status ( tx , header . stream_id , status ) . await {
362
- error ! ( "respond get error {:?}" , e ) ;
350
+ Err ( status ) => {
351
+ if let Err ( x ) = respond_with_status ( tx . clone ( ) , stream_id , status ) . await {
352
+ error ! ( "respond got error {:?}" , x ) ;
353
+ }
363
354
}
364
355
}
365
356
}
0 commit comments