Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ futures-util = { version = "0.3.16", default-features = false, features = ["allo
http-body-util = "0.1.0"
tokio = { version = "1", features = ["macros", "test-util", "signal"] }
tokio-test = "0.4"
tower = { version = "0.5", features = ["util"] }
tower-test = "0.4"
pretty_env_logger = "0.5"

Expand Down Expand Up @@ -77,7 +78,7 @@ full = [

client = ["hyper/client", "tokio/net", "dep:tracing", "dep:futures-channel", "dep:tower-service"]
client-legacy = ["client", "dep:socket2", "tokio/sync", "dep:libc", "dep:futures-util"]
client-pool = ["dep:futures-util", "dep:tower-layer"]
client-pool = ["client", "dep:futures-util", "dep:tower-layer"]
client-proxy = ["client", "dep:base64", "dep:ipnet", "dep:percent-encoding"]
client-proxy-system = ["dep:system-configuration", "dep:windows-registry"]

Expand All @@ -99,6 +100,10 @@ __internal_happy_eyeballs_tests = []

[[example]]
name = "client"
required-features = ["client-legacy", "client-pool", "http1", "tokio"]

[[example]]
name = "client_legacy"
required-features = ["client-legacy", "http1", "tokio"]

[[example]]
Expand Down
172 changes: 146 additions & 26 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,157 @@
use std::env;
use tower::ServiceExt;
use tower_service::Service;

use http_body_util::Empty;
use hyper::Request;
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::client::pool;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = match env::args().nth(1) {
Some(url) => url,
None => {
eprintln!("Usage: client <url>");
return Ok(());
}
};

// HTTPS requires picking a TLS implementation, so give a better
// warning if the user tries to request an 'https' URL.
let url = url.parse::<hyper::Uri>()?;
if url.scheme_str() != Some("http") {
eprintln!("This example only works with 'http' URLs.");
return Ok(());
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
send_nego().await
}

async fn send_h1() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();

let http1 = tcp.and_then(|conn| {
Box::pin(async move {
let (mut tx, c) = hyper::client::conn::http1::handshake::<
_,
http_body_util::Empty<hyper::body::Bytes>,
>(conn)
.await?;
tokio::spawn(async move {
if let Err(e) = c.await {
eprintln!("connection error: {:?}", e);
}
});
let svc = tower::service_fn(move |req| tx.send_request(req));
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
})
});

let mut p = pool::Cache::new(http1).build();

let mut c = p.call(http::Uri::from_static("http://hyper.rs")).await?;
eprintln!("{:?}", c);

let req = http::Request::builder()
.header("host", "hyper.rs")
.body(http_body_util::Empty::new())
.unwrap();

c.ready().await?;
let resp = c.call(req).await?;
eprintln!("{:?}", resp);

Ok(())
}

async fn send_h2() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();

let http2 = tcp.and_then(|conn| {
Box::pin(async move {
let (mut tx, c) = hyper::client::conn::http2::handshake::<
_,
_,
http_body_util::Empty<hyper::body::Bytes>,
>(hyper_util::rt::TokioExecutor::new(), conn)
.await?;
println!("connected");
tokio::spawn(async move {
if let Err(e) = c.await {
eprintln!("connection error: {:?}", e);
}
});
let svc = tower::service_fn(move |req| tx.send_request(req));
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
})
});

let mut p = pool::Singleton::new(http2);

for _ in 0..5 {
let mut c = p
.call(http::Uri::from_static("http://localhost:3000"))
.await?;
eprintln!("{:?}", c);

let req = http::Request::builder()
.header("host", "hyper.rs")
.body(http_body_util::Empty::new())
.unwrap();

c.ready().await?;
let resp = c.call(req).await?;
eprintln!("{:?}", resp);
}

let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());
Ok(())
}

let req = Request::builder()
.uri(url)
.body(Empty::<bytes::Bytes>::new())?;
async fn send_nego() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let tcp = hyper_util::client::legacy::connect::HttpConnector::new();

let resp = client.request(req).await?;
let http1 = tower::layer::layer_fn(|tcp| {
tower::service_fn(move |dst| {
let inner = tcp.call(dst);
async move {
let conn = inner.await?;
let (mut tx, c) = hyper::client::conn::http1::handshake::<
_,
http_body_util::Empty<hyper::body::Bytes>,
>(conn)
.await?;
tokio::spawn(async move {
if let Err(e) = c.await {
eprintln!("connection error: {:?}", e);
}
});
let svc = tower::service_fn(move |req| tx.send_request(req));
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
}
})
});

eprintln!("{:?} {:?}", resp.version(), resp.status());
eprintln!("{:#?}", resp.headers());
let http2 = tower::layer::layer_fn(|tcp| {
tower::service_fn(move |dst| {
let inner = tcp.call(dst);
async move {
let conn = inner.await?;
let (mut tx, c) = hyper::client::conn::http2::handshake::<
_,
_,
http_body_util::Empty<hyper::body::Bytes>,
>(hyper_util::rt::TokioExecutor::new(), conn)
.await?;
println!("connected");
tokio::spawn(async move {
if let Err(e) = c.await {
eprintln!("connection error: {:?}", e);
}
});
let svc = tower::service_fn(move |req| tx.send_request(req));
Ok::<_, Box<dyn std::error::Error + Send + Sync>>(svc)
}
})
});

let mut svc = pool::negotiate(tcp, |_| false, http1, http2);

for _ in 0..5 {
let mut c = svc
.call(http::Uri::from_static("http://localhost:3000"))
.await?;
eprintln!("{:?}", c);

let req = http::Request::builder()
.header("host", "hyper.rs")
.body(http_body_util::Empty::new())
.unwrap();

c.ready().await?;
let resp = c.call(req).await?;
eprintln!("{:?}", resp);
}

Ok(())
}
37 changes: 37 additions & 0 deletions examples/client_legacy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::env;

use http_body_util::Empty;
use hyper::Request;
use hyper_util::client::legacy::{connect::HttpConnector, Client};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = match env::args().nth(1) {
Some(url) => url,
None => {
eprintln!("Usage: client <url>");
return Ok(());
}
};

// HTTPS requires picking a TLS implementation, so give a better
// warning if the user tries to request an 'https' URL.
let url = url.parse::<hyper::Uri>()?;
if url.scheme_str() != Some("http") {
eprintln!("This example only works with 'http' URLs.");
return Ok(());
}

let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new());

let req = Request::builder()
.uri(url)
.body(Empty::<bytes::Bytes>::new())?;

let resp = client.request(req).await?;

eprintln!("{:?} {:?}", resp.version(), resp.status());
eprintln!("{:#?}", resp.headers());

Ok(())
}
4 changes: 4 additions & 0 deletions src/client/pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//! Composable pool services
//!
//! This module contains various concepts of a connection pool separated into
//! their own concerns. This allows for users to compose the layers, along with
//! any other layers, when constructing custom connection pools.

pub mod cache;
pub mod map;
Expand Down
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ mod common;
pub mod rt;
#[cfg(feature = "server")]
pub mod server;
#[cfg(any(feature = "service", feature = "client-legacy"))]
#[cfg(any(
feature = "service",
feature = "client-legacy",
feature = "client-pool",
))]
pub mod service;

mod error;
Loading