Skip to content

Commit 41f4173

Browse files
committed
refactor(http2): re-enable http2 client and server support
1 parent 4920f5e commit 41f4173

File tree

11 files changed

+321
-367
lines changed

11 files changed

+321
-367
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ futures-util-preview = { version = "0.3.0-alpha.17" }
2929
http = "0.1.15"
3030
http-body = "0.1"
3131
httparse = "1.0"
32-
h2 = "0.1.10"
32+
h2 = { git = "https://github.com/hyperium/h2" }
3333
iovec = "0.1"
3434
itoa = "0.4.1"
3535
log = "0.4"

benches/end_to_end.rs

-10
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,14 @@ fn http1_parallel_x10_req_10mb(b: &mut test::Bencher) {
6464
}
6565

6666
#[bench]
67-
#[ignore]
6867
fn http2_get(b: &mut test::Bencher) {
69-
// FIXME: re-implement tests when `h2` upgrades to `async/await`
7068
opts()
7169
.http2()
7270
.bench(b)
7371
}
7472

7573
#[bench]
76-
#[ignore]
7774
fn http2_post(b: &mut test::Bencher) {
78-
// FIXME: re-implement tests when `h2` upgrades to `async/await`
7975
opts()
8076
.http2()
8177
.method(Method::POST)
@@ -84,9 +80,7 @@ fn http2_post(b: &mut test::Bencher) {
8480
}
8581

8682
#[bench]
87-
#[ignore]
8883
fn http2_req_100kb(b: &mut test::Bencher) {
89-
// FIXME: re-implement tests when `h2` upgrades to `async/await`
9084
let body = &[b'x'; 1024 * 100];
9185
opts()
9286
.http2()
@@ -96,19 +90,15 @@ fn http2_req_100kb(b: &mut test::Bencher) {
9690
}
9791

9892
#[bench]
99-
#[ignore]
10093
fn http2_parallel_x10_empty(b: &mut test::Bencher) {
101-
// FIXME: re-implement tests when `h2` upgrades to `async/await`
10294
opts()
10395
.http2()
10496
.parallel(10)
10597
.bench(b)
10698
}
10799

108100
#[bench]
109-
#[ignore]
110101
fn http2_parallel_x10_req_10mb(b: &mut test::Bencher) {
111-
// FIXME: re-implement tests when `h2` upgrades to `async/await`
112102
let body = &[b'x'; 1024 * 1024 * 10];
113103
opts()
114104
.http2()

src/body/body.rs

+15-22
Original file line numberDiff line numberDiff line change
@@ -269,25 +269,17 @@ impl Body {
269269
}
270270
None => Poll::Ready(None),
271271
}
272-
}
272+
},
273273
Kind::H2 {
274-
/*recv: ref mut h2,*/ ..
275-
} => {
276-
unimplemented!("h2.poll_inner");
277-
/*
278-
h2
279-
.poll()
280-
.map(|r#async| {
281-
r#async.map(|opt| {
282-
opt.map(|bytes| {
283-
let _ = h2.release_capacity().release_capacity(bytes.len());
284-
Chunk::from(bytes)
285-
})
286-
})
287-
})
288-
.map_err(crate::Error::new_body)
289-
*/
290-
}
274+
recv: ref mut h2, ..
275+
} => match ready!(Pin::new(&mut *h2).poll_next(cx)) {
276+
Some(Ok(bytes)) => {
277+
let _ = h2.release_capacity().release_capacity(bytes.len());
278+
Poll::Ready(Some(Ok(Chunk::from(bytes))))
279+
},
280+
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
281+
None => Poll::Ready(None),
282+
},
291283
Kind::Wrapped(ref mut s) => {
292284
match ready!(s.as_mut().poll_next(cx)) {
293285
Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
@@ -314,11 +306,12 @@ impl Payload for Body {
314306
self.poll_eof(cx)
315307
}
316308

317-
fn poll_trailers(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> {
309+
fn poll_trailers(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Result<HeaderMap, Self::Error>>> {
318310
match self.kind {
319-
Kind::H2 { /*recv: ref mut h2,*/ .. } => {
320-
unimplemented!("h2.poll_trailers");
321-
//h2.poll_trailers().map_err(crate::Error::new_h2)
311+
Kind::H2 { recv: ref mut h2, .. } => match ready!(h2.poll_trailers(cx)) {
312+
Some(Ok(t)) => Poll::Ready(Some(Ok(t))),
313+
Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
314+
None => Poll::Ready(None),
322315
},
323316
_ => Poll::Ready(None),
324317
}

src/client/conn.rs

+22-62
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,19 @@ type Http1Dispatcher<T, B, R> = proto::dispatch::Dispatcher<
3232
>;
3333
type ConnEither<T, B> = Either<
3434
Http1Dispatcher<T, B, proto::h1::ClientTransaction>,
35-
proto::h2::Client<T, B>,
35+
proto::h2::ClientTask<B>,
3636
>;
3737

38-
/// Returns a `Handshake` future over some IO.
38+
/// Returns a handshake future over some IO.
3939
///
4040
/// This is a shortcut for `Builder::new().handshake(io)`.
41-
pub fn handshake<T>(io: T) -> Handshake<T, crate::Body>
41+
pub async fn handshake<T>(io: T) -> crate::Result<(SendRequest<crate::Body>, Connection<T, crate::Body>)>
4242
where
4343
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
4444
{
4545
Builder::new()
4646
.handshake(io)
47+
.await
4748
}
4849

4950
/// The sender side of an established connection.
@@ -68,7 +69,7 @@ where
6869

6970
/// A builder to configure an HTTP connection.
7071
///
71-
/// After setting options, the builder is used to create a `Handshake` future.
72+
/// After setting options, the builder is used to create a handshake future.
7273
#[derive(Clone, Debug)]
7374
pub struct Builder {
7475
pub(super) exec: Exec,
@@ -80,16 +81,6 @@ pub struct Builder {
8081
h2_builder: h2::client::Builder,
8182
}
8283

83-
/// A future setting up HTTP over an IO object.
84-
///
85-
/// If successful, yields a `(SendRequest, Connection)` pair.
86-
#[must_use = "futures do nothing unless polled"]
87-
pub struct Handshake<T, B> {
88-
builder: Builder,
89-
io: Option<T>,
90-
_marker: PhantomData<fn(B)>,
91-
}
92-
9384
/// A future returned by `SendRequest::send_request`.
9485
///
9586
/// Yields a `Response` if successful.
@@ -334,7 +325,8 @@ impl<B> Clone for Http2SendRequest<B> {
334325
impl<T, B> Connection<T, B>
335326
where
336327
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
337-
B: Payload + 'static,
328+
B: Payload + Unpin + 'static,
329+
B::Data: Unpin,
338330
{
339331
/// Return the inner IO object, and additional information.
340332
///
@@ -365,29 +357,20 @@ where
365357
/// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
366358
/// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
367359
/// to work with this function; or use the `without_shutdown` wrapper.
368-
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>
369-
where
370-
B: Unpin,
371-
{
360+
pub fn poll_without_shutdown(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
372361
match self.inner.as_mut().expect("already upgraded") {
373362
&mut Either::Left(ref mut h1) => {
374363
h1.poll_without_shutdown(cx)
375364
},
376365
&mut Either::Right(ref mut h2) => {
377-
unimplemented!("h2 poll_without_shutdown");
378-
/*
379-
h2.poll().map(|x| x.map(|_| ()))
380-
*/
366+
Pin::new(h2).poll(cx).map_ok(|_| ())
381367
}
382368
}
383369
}
384370

385371
/// Prevent shutdown of the underlying IO object at the end of service the request,
386372
/// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
387-
pub fn without_shutdown(self) -> impl Future<Output=crate::Result<Parts<T>>>
388-
where
389-
B: Unpin,
390-
{
373+
pub fn without_shutdown(self) -> impl Future<Output=crate::Result<Parts<T>>> {
391374
let mut conn = Some(self);
392375
future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
393376
ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
@@ -400,6 +383,7 @@ impl<T, B> Future for Connection<T, B>
400383
where
401384
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
402385
B: Payload + Unpin + 'static,
386+
B::Data: Unpin,
403387
{
404388
type Output = crate::Result<()>;
405389

@@ -522,70 +506,46 @@ impl Builder {
522506
}
523507

524508
/// Constructs a connection with the configured options and IO.
525-
#[inline]
526-
pub fn handshake<T, B>(&self, io: T) -> Handshake<T, B>
509+
pub async fn handshake<T, B>(self, io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
527510
where
528511
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
529512
B: Payload + 'static,
513+
B::Data: Unpin,
530514
{
531515
trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 });
532-
Handshake {
533-
builder: self.clone(),
534-
io: Some(io),
535-
_marker: PhantomData,
536-
}
537-
}
538-
}
539-
540-
// ===== impl Handshake
541-
542-
impl<T, B> Future for Handshake<T, B>
543-
where
544-
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
545-
B: Payload + 'static,
546-
{
547-
type Output = crate::Result<(SendRequest<B>, Connection<T, B>)>;
548516

549-
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
550-
let io = self.io.take().expect("polled more than once");
551517
let (tx, rx) = dispatch::channel();
552-
let either = if !self.builder.http2 {
518+
let either = if !self.http2 {
553519
let mut conn = proto::Conn::new(io);
554-
if !self.builder.h1_writev {
520+
if !self.h1_writev {
555521
conn.set_write_strategy_flatten();
556522
}
557-
if self.builder.h1_title_case_headers {
523+
if self.h1_title_case_headers {
558524
conn.set_title_case_headers();
559525
}
560-
if let Some(sz) = self.builder.h1_read_buf_exact_size {
526+
if let Some(sz) = self.h1_read_buf_exact_size {
561527
conn.set_read_buf_exact_size(sz);
562528
}
563-
if let Some(max) = self.builder.h1_max_buf_size {
529+
if let Some(max) = self.h1_max_buf_size {
564530
conn.set_max_buf_size(max);
565531
}
566532
let cd = proto::h1::dispatch::Client::new(rx);
567533
let dispatch = proto::h1::Dispatcher::new(cd, conn);
568534
Either::Left(dispatch)
569535
} else {
570-
let h2 = proto::h2::Client::new(io, rx, &self.builder.h2_builder, self.builder.exec.clone());
536+
let h2 = proto::h2::client::handshake(io, rx, &self.h2_builder, self.exec.clone())
537+
.await?;
571538
Either::Right(h2)
572539
};
573540

574-
Poll::Ready(Ok((
541+
Ok((
575542
SendRequest {
576543
dispatch: tx,
577544
},
578545
Connection {
579546
inner: Some(either),
580547
},
581-
)))
582-
}
583-
}
584-
585-
impl<T, B> fmt::Debug for Handshake<T, B> {
586-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
587-
f.debug_struct("Handshake")
588-
.finish()
548+
))
589549
}
590550
}
591551

src/client/mod.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ where C: Connect + Sync + 'static,
163163
C::Transport: 'static,
164164
C::Future: 'static,
165165
B: Payload + Unpin + Send + 'static,
166-
B::Data: Send,
166+
B::Data: Send + Unpin,
167167
{
168168
/// Send a `GET` request to the supplied `Uri`.
169169
///
@@ -512,8 +512,10 @@ where C: Connect + Sync + 'static,
512512
connecting
513513
};
514514
let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
515-
Either::Left(conn_builder
515+
Either::Left(Box::pin(conn_builder
516516
.http2_only(is_h2)
517+
// TODO: convert client::conn::Builder to be by-value?
518+
.clone()
517519
.handshake(io)
518520
.and_then(move |(tx, conn)| {
519521
trace!("handshake complete, spawning background dispatcher task");
@@ -541,7 +543,7 @@ where C: Connect + Sync + 'static,
541543
PoolTx::Http1(tx)
542544
},
543545
})
544-
}))
546+
})))
545547
}))
546548
})
547549
}

0 commit comments

Comments
 (0)