Skip to content

Commit

Permalink
use rama-http-core for client Http Proxy Connect
Browse files Browse the repository at this point in the history
Closes #379
  • Loading branch information
GlenDC committed Jan 20, 2025
1 parent e5c9b4b commit 1146452
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 106 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rama-http-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ boring = ["tls", "rama-net/boring", "rama-tls/boring"]
rustls-ring = ["rustls", "rama-tls/rustls-ring"]

[dependencies]
const_format = { workspace = true }
h2 = { workspace = true }
pin-project-lite = { workspace = true }
rama-core = { version = "0.2.0-alpha.7", path = "../rama-core" }
Expand Down
157 changes: 56 additions & 101 deletions rama-http-backend/src/client/proxy/layer/proxy_connector/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,94 @@
//!
//! As defined in <https://www.ietf.org/rfc/rfc2068.txt>.
use std::borrow::Cow;

use rama_core::error::{ErrorContext, OpaqueError};
use rama_http_core::{client::conn::http1, upgrade};
use rama_http_types::{
header::{HOST, USER_AGENT},
headers::{Header, HeaderMapExt},
HeaderMap, HeaderName, HeaderValue,
Body, HeaderName, HeaderValue, Method, Request, StatusCode, Version,
};
use rama_net::{address::Authority, stream::Stream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use super::HttpProxyError;

#[derive(Debug, Clone)]
#[derive(Debug)]
/// Connector for HTTP proxies.
///
/// Used to connect as a client to a HTTP proxy server.
pub(super) struct InnerHttpProxyConnector {
authority: Authority,
headers: Option<HeaderMap>,
req: Request,
}

impl InnerHttpProxyConnector {
/// Create a new [`InnerHttpProxyConnector`] with the given authority.
pub(super) fn new(authority: Authority) -> Self {
Self {
authority,
headers: None,
}
pub(super) fn new(authority: Authority) -> Result<Self, OpaqueError> {
let uri = authority.to_string();
let host_value: HeaderValue = uri.parse().context("parse authority as header value")?;

let req = Request::builder()
.method(Method::CONNECT)
.version(Version::HTTP_11)
.uri(uri)
.header(HOST, host_value)
.header(
USER_AGENT,
HeaderValue::from_static(const_format::formatcp!(
"{}/{}",
rama_utils::info::NAME,
rama_utils::info::VERSION,
)),
)
.body(Body::empty())
.context("build http request")?;

Ok(Self { req })
}

#[allow(unused)]
#[expect(unused)]
/// Add a header to the request.
pub(super) fn with_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self {
match self.headers {
Some(ref mut headers) => {
headers.insert(name, value);
}
None => {
let mut headers = HeaderMap::new();
headers.insert(name, value);
self.headers = Some(headers);
}
}
self.req.headers_mut().insert(name, value);
self
}

/// Add a typed header to the request.
pub(super) fn with_typed_header(&mut self, header: impl Header) -> &mut Self {
match self.headers {
Some(ref mut headers) => {
headers.typed_insert(header);
}
None => {
let mut headers = HeaderMap::new();
headers.typed_insert(header);
self.headers = Some(headers);
}
}
self.req.headers_mut().typed_insert(header);
self
}

/// Connect to the proxy server.
pub(super) async fn handshake<S: Stream + Unpin>(
&self,
mut stream: S,
) -> Result<S, HttpProxyError> {
// TODO: handle user-agent and host better
// TODO: use h1 protocol from embedded hyper directly here!
let mut request = format!(
"\
CONNECT {authority} HTTP/1.1\r\n\
Host: {authority}\r\n\
User-Agent: {ua_name}/{ua_version}\r\n\
",
authority = self.authority,
ua_name = rama_utils::info::NAME,
ua_version = rama_utils::info::VERSION,
)
.into_bytes();
if let Some(ref headers) = self.headers {
for (name, value) in headers.iter() {
request.extend_from_slice(name.as_str().as_bytes());
request.extend_from_slice(b": ");
request.extend_from_slice(value.as_bytes());
request.extend_from_slice(b"\r\n");
}
}
request.extend_from_slice(b"\r\n");

stream.write_all(&request).await?;
self,
stream: S,
) -> Result<upgrade::Upgraded, HttpProxyError> {
let (tx, conn) = http1::Builder::default()
.ignore_invalid_headers(true)
.handshake(stream)
.await
.map_err(|err| HttpProxyError::Transport(err.into()))?;

let mut buf = [0; 8192];
let mut pos = 0;

loop {
let n = stream.read(&mut buf[pos..]).await?;

if n == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"http conn handshake read incomplete",
)
.into());
tokio::spawn(async move {
if let Err(err) = conn.with_upgrades().await {
tracing::debug!(?err, "http upgrade proxy client conn failed");
}
pos += n;
});

let recvd = &buf[..pos];
if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") {
if recvd.ends_with(b"\r\n\r\n") {
return Ok(stream);
}
if pos == buf.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"http conn handshake response too large",
)
.into());
}
// else read more
} else if recvd.starts_with(b"HTTP/1.1 407") {
return Err(HttpProxyError::AuthRequired);
} else if recvd.starts_with(b"HTTP/1.1 503") {
return Err(HttpProxyError::Unavailable);
} else {
let input = String::from_utf8_lossy(recvd);
return Err(HttpProxyError::Other(format!(
"invalid http conn handshake start: [{}]",
if let Some((line, _)) = input.split_once("\r\n") {
Cow::Borrowed(line)
} else {
input
}
)));
}
let response = tx
.send_request(self.req)
.await
.map_err(|err| HttpProxyError::Transport(OpaqueError::from_std(err).into_boxed()))?;

match response.status() {
StatusCode::OK => upgrade::on(response)
.await
.map_err(|err| HttpProxyError::Transport(OpaqueError::from_std(err).into_boxed())),
StatusCode::PROXY_AUTHENTICATION_REQUIRED => Err(HttpProxyError::AuthRequired),
StatusCode::SERVICE_UNAVAILABLE => Err(HttpProxyError::Unavailable),
status => Err(HttpProxyError::Other(format!(
"invalid http proxy conn handshake: status={status}",
))),
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use crate::client::proxy::layer::HttpProxyError;

use super::InnerHttpProxyConnector;
use rama_core::{
combinators::Either,
error::{BoxError, ErrorExt, OpaqueError},
Context, Service,
};
use rama_http_core::upgrade;
use rama_http_types::headers::ProxyAuthorization;
use rama_net::{
address::ProxyAddress,
Expand Down Expand Up @@ -77,7 +79,8 @@ where
+ Send
+ 'static,
{
type Response = EstablishedClientConnection<S::Connection, State, Request>;
type Response =
EstablishedClientConnection<Either<S::Connection, upgrade::Upgraded>, State, Request>;
type Error = BoxError;

async fn serve(
Expand Down Expand Up @@ -142,7 +145,18 @@ where
Err("http proxy required but none is defined".into())
} else {
tracing::trace!("http proxy connector: no proxy required or set: proceed with direct connection");
Ok(established_conn)
let EstablishedClientConnection {
ctx,
req,
conn,
addr,
} = established_conn;
return Ok(EstablishedClientConnection {
ctx,
req,
conn: Either::A(conn),
addr,
});
};
}
};
Expand Down Expand Up @@ -173,12 +187,13 @@ where
return Ok(EstablishedClientConnection {
ctx,
req,
conn,
conn: Either::A(conn),
addr,
});
}

let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone());
let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone())?;

if let Some(credential) = address.credential.clone() {
match credential {
ProxyCredential::Basic(basic) => {
Expand All @@ -203,7 +218,7 @@ where
Ok(EstablishedClientConnection {
ctx,
req,
conn,
conn: Either::B(conn),
addr,
})
}
Expand Down

0 comments on commit 1146452

Please sign in to comment.