Skip to content

Commit 680c7f8

Browse files
committed
refactor(server): re-enable http2 support
1 parent 304e642 commit 680c7f8

File tree

4 files changed

+89
-74
lines changed

4 files changed

+89
-74
lines changed

src/proto/h2/server.rs

+75-74
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use std::error::Error as StdError;
2+
use std::marker::Unpin;
23

4+
use futures_core::Stream;
35
use h2::Reason;
46
use h2::server::{Builder, Connection, Handshake, SendResponse};
57
use tokio_io::{AsyncRead, AsyncWrite};
@@ -49,27 +51,23 @@ where
4951

5052
impl<T, S, B, E> Server<T, S, B, E>
5153
where
52-
T: AsyncRead + AsyncWrite,
54+
T: AsyncRead + AsyncWrite + Unpin,
5355
S: Service<ReqBody=Body, ResBody=B>,
5456
S::Error: Into<Box<dyn StdError + Send + Sync>>,
5557
B: Payload,
58+
B::Data: Unpin,
5659
E: H2Exec<S::Future, B>,
5760
{
5861
pub(crate) fn new(io: T, service: S, builder: &Builder, exec: E) -> Server<T, S, B, E> {
59-
unimplemented!("proto::h2::Server::new")
60-
/*
6162
let handshake = builder.handshake(io);
6263
Server {
6364
exec,
6465
state: State::Handshaking(handshake),
6566
service,
6667
}
67-
*/
6868
}
6969

7070
pub fn graceful_shutdown(&mut self) {
71-
unimplemented!("proto::h2::Server::graceful_shutdown")
72-
/*
7371
trace!("graceful_shutdown");
7472
match self.state {
7573
State::Handshaking(..) => {
@@ -86,54 +84,53 @@ where
8684
}
8785
}
8886
self.state = State::Closed;
89-
*/
9087
}
9188
}
9289

9390
impl<T, S, B, E> Future for Server<T, S, B, E>
9491
where
95-
T: AsyncRead + AsyncWrite,
92+
T: AsyncRead + AsyncWrite + Unpin,
9693
S: Service<ReqBody=Body, ResBody=B>,
9794
S::Error: Into<Box<dyn StdError + Send + Sync>>,
9895
B: Payload,
96+
B::Data: Unpin,
9997
E: H2Exec<S::Future, B>,
10098
{
10199
type Output = crate::Result<Dispatched>;
102100

103101
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
104-
unimplemented!("h2 server future")
105-
/*
102+
let me = &mut *self;
106103
loop {
107-
let next = match self.state {
104+
let next = match me.state {
108105
State::Handshaking(ref mut h) => {
109-
let conn = try_ready!(h.poll().map_err(crate::Error::new_h2));
106+
let conn = ready!(Pin::new(h).poll(cx).map_err(crate::Error::new_h2))?;
110107
State::Serving(Serving {
111108
conn,
112109
closing: None,
113110
})
114111
},
115112
State::Serving(ref mut srv) => {
116-
try_ready!(srv.poll_server(&mut self.service, &self.exec));
117-
return Ok(Async::Ready(Dispatched::Shutdown));
113+
ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
114+
return Poll::Ready(Ok(Dispatched::Shutdown));
118115
}
119116
State::Closed => {
120117
// graceful_shutdown was called before handshaking finished,
121118
// nothing to do here...
122-
return Ok(Async::Ready(Dispatched::Shutdown));
119+
return Poll::Ready(Ok(Dispatched::Shutdown));
123120
}
124121
};
125-
self.state = next;
122+
me.state = next;
126123
}
127-
*/
128124
}
129125
}
130126

131127
impl<T, B> Serving<T, B>
132128
where
133-
T: AsyncRead + AsyncWrite,
129+
T: AsyncRead + AsyncWrite + Unpin,
134130
B: Payload,
131+
B::Data: Unpin,
135132
{
136-
fn poll_server<S, E>(&mut self, service: &mut S, exec: &E) -> Poll<crate::Result<()>>
133+
fn poll_server<S, E>(&mut self, cx: &mut task::Context<'_>, service: &mut S, exec: &mut E) -> Poll<crate::Result<()>>
137134
where
138135
S: Service<
139136
ReqBody=Body,
@@ -142,19 +139,18 @@ where
142139
S::Error: Into<Box<dyn StdError + Send + Sync>>,
143140
E: H2Exec<S::Future, B>,
144141
{
145-
/*
146142
if self.closing.is_none() {
147143
loop {
148144
// At first, polls the readiness of supplied service.
149-
match service.poll_ready() {
150-
Ok(Async::Ready(())) => (),
151-
Ok(Async::NotReady) => {
145+
match service.poll_ready(cx) {
146+
Poll::Ready(Ok(())) => (),
147+
Poll::Pending => {
152148
// use `poll_close` instead of `poll`, in order to avoid accepting a request.
153-
try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2));
149+
ready!(self.conn.poll_close(cx).map_err(crate::Error::new_h2))?;
154150
trace!("incoming connection complete");
155-
return Ok(Async::Ready(()));
151+
return Poll::Ready(Ok(()));
156152
}
157-
Err(err) => {
153+
Poll::Ready(Err(err)) => {
158154
let err = crate::Error::new_user_service(err);
159155
debug!("service closed: {}", err);
160156

@@ -173,29 +169,33 @@ where
173169
}
174170

175171
// When the service is ready, accepts an incoming request.
176-
if let Some((req, respond)) = try_ready!(self.conn.poll().map_err(crate::Error::new_h2)) {
177-
trace!("incoming request");
178-
let content_length = content_length_parse_all(req.headers());
179-
let req = req.map(|stream| {
180-
crate::Body::h2(stream, content_length)
181-
});
182-
let fut = H2Stream::new(service.call(req), respond);
183-
exec.execute_h2stream(fut)?;
184-
} else {
185-
// no more incoming streams...
186-
trace!("incoming connection complete");
187-
return Ok(Async::Ready(()))
172+
match ready!(Pin::new(&mut self.conn).poll_next(cx)) {
173+
Some(Ok((req, respond))) => {
174+
trace!("incoming request");
175+
let content_length = content_length_parse_all(req.headers());
176+
let req = req.map(|stream| {
177+
crate::Body::h2(stream, content_length)
178+
});
179+
let fut = H2Stream::new(service.call(req), respond);
180+
exec.execute_h2stream(fut)?;
181+
},
182+
Some(Err(e)) => {
183+
return Poll::Ready(Err(crate::Error::new_h2(e)));
184+
},
185+
None => {
186+
// no more incoming streams...
187+
trace!("incoming connection complete");
188+
return Poll::Ready(Ok(()));
189+
},
188190
}
189191
}
190192
}
191193

192194
debug_assert!(self.closing.is_some(), "poll_server broke loop without closing");
193195

194-
try_ready!(self.conn.poll_close().map_err(crate::Error::new_h2));
196+
ready!(self.conn.poll_close(cx).map_err(crate::Error::new_h2))?;
195197

196-
Err(self.closing.take().expect("polled after error"))
197-
*/
198-
unimplemented!("h2 server poll_server")
198+
Poll::Ready(Err(self.closing.take().expect("polled after error")))
199199
}
200200
}
201201

@@ -230,38 +230,37 @@ where
230230
}
231231
}
232232

233-
impl<F, B> Future for H2Stream<F, B>
233+
impl<F, B, E> H2Stream<F, B>
234234
where
235-
//F: Future<Item=Response<B>>,
236-
//F::Error: Into<Box<dyn StdError + Send + Sync>>,
237-
B: Payload,
235+
F: Future<Output = Result<Response<B>, E>>,
236+
B: Payload + Unpin,
237+
B::Data: Unpin,
238+
E: Into<Box<dyn StdError + Send + Sync>>,
238239
{
239-
type Output = ();
240-
241-
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
242-
unimplemented!("impl Future for H2Stream");
243-
/*
240+
fn poll2(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
241+
// Safety: State::{Service, Body} futures are never moved
242+
let me = unsafe { self.get_unchecked_mut() };
244243
loop {
245-
let next = match self.state {
244+
let next = match me.state {
246245
H2StreamState::Service(ref mut h) => {
247-
let res = match h.poll() {
248-
Ok(Async::Ready(r)) => r,
249-
Ok(Async::NotReady) => {
250-
// Body is not yet ready, so we want to check if the client has sent a
246+
let res = match unsafe { Pin::new_unchecked(h) }.poll(cx) {
247+
Poll::Ready(Ok(r)) => r,
248+
Poll::Pending => {
249+
// Response is not yet ready, so we want to check if the client has sent a
251250
// RST_STREAM frame which would cancel the current request.
252-
if let Async::Ready(reason) =
253-
self.reply.poll_reset().map_err(|e| crate::Error::new_h2(e))?
251+
if let Poll::Ready(reason) =
252+
me.reply.poll_reset(cx).map_err(|e| crate::Error::new_h2(e))?
254253
{
255254
debug!("stream received RST_STREAM: {:?}", reason);
256-
return Err(crate::Error::new_h2(reason.into()));
255+
return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
257256
}
258-
return Ok(Async::NotReady);
257+
return Poll::Pending;
259258
}
260-
Err(e) => {
259+
Poll::Ready(Err(e)) => {
261260
let err = crate::Error::new_user_service(e);
262261
warn!("http2 service errored: {}", err);
263-
self.reply.send_reset(err.h2_reason());
264-
return Err(err);
262+
me.reply.send_reset(err.h2_reason());
263+
return Poll::Ready(Err(err));
265264
},
266265
};
267266

@@ -278,12 +277,12 @@ where
278277

279278
macro_rules! reply {
280279
($eos:expr) => ({
281-
match self.reply.send_response(res, $eos) {
280+
match me.reply.send_response(res, $eos) {
282281
Ok(tx) => tx,
283282
Err(e) => {
284283
debug!("send response error: {}", e);
285-
self.reply.send_reset(Reason::INTERNAL_ERROR);
286-
return Err(crate::Error::new_h2(e));
284+
me.reply.send_reset(Reason::INTERNAL_ERROR);
285+
return Poll::Ready(Err(crate::Error::new_h2(e)));
287286
}
288287
}
289288
})
@@ -300,33 +299,35 @@ where
300299
body_tx
301300
.send_data(buf, true)
302301
.map_err(crate::Error::new_body_write)?;
303-
return Ok(Async::Ready(()));
302+
return Poll::Ready(Ok(()));
304303
}
305304

306305
if !body.is_end_stream() {
307306
let body_tx = reply!(false);
308307
H2StreamState::Body(PipeToSendStream::new(body, body_tx))
309308
} else {
310309
reply!(true);
311-
return Ok(Async::Ready(()));
310+
return Poll::Ready(Ok(()));
312311
}
313312
},
314313
H2StreamState::Body(ref mut pipe) => {
315-
return pipe.poll();
314+
return Pin::new(pipe).poll(cx);
316315
}
317316
};
318-
self.state = next;
317+
me.state = next;
319318
}
320-
*/
321319
}
322320
}
323321
/*
324-
impl<F, B> Future for H2Stream<F, B>
322+
impl<F, B, E> Future for H2Stream<F, B>
325323
where
326-
F: Future<Item=Response<B>>,
327-
F::Error: Into<Box<dyn StdError + Send + Sync>>,
324+
F: Future<Output = Result<Response<B>, E>>,
328325
B: Payload,
326+
E: Into<Box<dyn StdError + Send + Sync>>,
329327
{
328+
type Output = ();
329+
330+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
330331
type Item = ();
331332
type Error = ();
332333

src/server/conn.rs

+8
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ impl<E> Http<E> {
358358
S: Service<ReqBody=Body, ResBody=Bd>,
359359
S::Error: Into<Box<dyn StdError + Send + Sync>>,
360360
Bd: Payload,
361+
Bd::Data: Unpin,
361362
I: AsyncRead + AsyncWrite + Unpin,
362363
E: H2Exec<S::Future, Bd>,
363364
{
@@ -479,6 +480,7 @@ where
479480
S::Error: Into<Box<dyn StdError + Send + Sync>>,
480481
I: AsyncRead + AsyncWrite + Unpin,
481482
B: Payload + 'static,
483+
B::Data: Unpin,
482484
E: H2Exec<S::Future, B>,
483485
{
484486
/// Start a graceful shutdown process for this connection.
@@ -629,6 +631,7 @@ where
629631
S::Error: Into<Box<dyn StdError + Send + Sync>>,
630632
I: AsyncRead + AsyncWrite + Unpin + 'static,
631633
B: Payload + 'static,
634+
B::Data: Unpin,
632635
E: H2Exec<S::Future, B>,
633636
{
634637
type Output = crate::Result<()>;
@@ -744,6 +747,7 @@ where
744747
F: Future<Output=Result<S, FE>>,
745748
S: Service<ReqBody=Body, ResBody=B>,
746749
B: Payload,
750+
B::Data: Unpin,
747751
E: H2Exec<S::Future, B>,
748752
{
749753
type Output = Result<Connection<I, S, E>, FE>;
@@ -852,6 +856,7 @@ pub(crate) mod spawn_all {
852856
where
853857
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
854858
S: Service<ReqBody=Body> + 'static,
859+
<S::ResBody as Payload>::Data: Unpin,
855860
E: H2Exec<S::Future, S::ResBody>,
856861
{
857862
type Future = UpgradeableConnection<I, S, E>;
@@ -895,6 +900,7 @@ pub(crate) mod spawn_all {
895900
NE: Into<Box<dyn StdError + Send + Sync>>,
896901
S: Service<ReqBody=Body, ResBody=B>,
897902
B: Payload,
903+
B::Data: Unpin,
898904
E: H2Exec<S::Future, B>,
899905
W: Watcher<I, S, E>,
900906
{
@@ -960,6 +966,7 @@ mod upgrades {
960966
S::Error: Into<Box<dyn StdError + Send + Sync>>,
961967
I: AsyncRead + AsyncWrite + Unpin,
962968
B: Payload + 'static,
969+
B::Data: Unpin,
963970
E: H2Exec<S::Future, B>,
964971
{
965972
/// Start a graceful shutdown process for this connection.
@@ -977,6 +984,7 @@ mod upgrades {
977984
S::Error: Into<Box<dyn StdError + Send + Sync>>,
978985
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
979986
B: Payload + 'static,
987+
B::Data: Unpin,
980988
E: super::H2Exec<S::Future, B>,
981989
{
982990
type Output = crate::Result<()>;

src/server/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ where
154154
S::Error: Into<Box<dyn StdError + Send + Sync>>,
155155
S::Service: 'static,
156156
B: Payload,
157+
B::Data: Unpin,
157158
E: H2Exec<<S::Service as Service>::Future, B>,
158159
E: NewSvcExec<IO, S::Future, S::Service, E, GracefulWatcher>,
159160
{
@@ -211,6 +212,7 @@ where
211212
S::Error: Into<Box<dyn StdError + Send + Sync>>,
212213
S::Service: 'static,
213214
B: Payload,
215+
B::Data: Unpin,
214216
E: H2Exec<<S::Service as Service>::Future, B>,
215217
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
216218
{
@@ -409,6 +411,7 @@ impl<I, E> Builder<I, E> {
409411
S::Error: Into<Box<dyn StdError + Send + Sync>>,
410412
S::Service: 'static,
411413
B: Payload,
414+
B::Data: Unpin,
412415
E: NewSvcExec<IO, S::Future, S::Service, E, NoopWatcher>,
413416
E: H2Exec<<S::Service as Service>::Future, B>,
414417
{

0 commit comments

Comments
 (0)