From 1146452ce7fb02f7c2f6ce7d199276af7d39c55e Mon Sep 17 00:00:00 2001 From: Glen De Cauwsemaecker Date: Mon, 20 Jan 2025 11:45:53 +0100 Subject: [PATCH] use rama-http-core for client Http Proxy Connect Closes #379 --- Cargo.lock | 1 + rama-http-backend/Cargo.toml | 1 + .../proxy/layer/proxy_connector/connector.rs | 157 +++++++----------- .../proxy/layer/proxy_connector/service.rs | 25 ++- 4 files changed, 78 insertions(+), 106 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ebec3fd..493f1252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2250,6 +2250,7 @@ dependencies = [ name = "rama-http-backend" version = "0.2.0-alpha.7" dependencies = [ + "const_format", "h2", "pin-project-lite", "rama-core", diff --git a/rama-http-backend/Cargo.toml b/rama-http-backend/Cargo.toml index 53337913..c31b0ace 100644 --- a/rama-http-backend/Cargo.toml +++ b/rama-http-backend/Cargo.toml @@ -21,6 +21,7 @@ boring = ["tls", "rama-net/boring", "rama-tls/boring"] rustls-ring = ["rustls", "rama-tls/rustls-ring"] [dependencies] +const_format = { workspace = true } h2 = { workspace = true } pin-project-lite = { workspace = true } rama-core = { version = "0.2.0-alpha.7", path = "../rama-core" } diff --git a/rama-http-backend/src/client/proxy/layer/proxy_connector/connector.rs b/rama-http-backend/src/client/proxy/layer/proxy_connector/connector.rs index 436d24df..d3e53c5b 100644 --- a/rama-http-backend/src/client/proxy/layer/proxy_connector/connector.rs +++ b/rama-http-backend/src/client/proxy/layer/proxy_connector/connector.rs @@ -2,139 +2,94 @@ //! //! As defined in . -use std::borrow::Cow; - +use rama_core::error::{ErrorContext, OpaqueError}; +use rama_http_core::{client::conn::http1, upgrade}; use rama_http_types::{ + header::{HOST, USER_AGENT}, headers::{Header, HeaderMapExt}, - HeaderMap, HeaderName, HeaderValue, + Body, HeaderName, HeaderValue, Method, Request, StatusCode, Version, }; use rama_net::{address::Authority, stream::Stream}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; use super::HttpProxyError; -#[derive(Debug, Clone)] +#[derive(Debug)] /// Connector for HTTP proxies. /// /// Used to connect as a client to a HTTP proxy server. pub(super) struct InnerHttpProxyConnector { - authority: Authority, - headers: Option, + req: Request, } impl InnerHttpProxyConnector { /// Create a new [`InnerHttpProxyConnector`] with the given authority. - pub(super) fn new(authority: Authority) -> Self { - Self { - authority, - headers: None, - } + pub(super) fn new(authority: Authority) -> Result { + let uri = authority.to_string(); + let host_value: HeaderValue = uri.parse().context("parse authority as header value")?; + + let req = Request::builder() + .method(Method::CONNECT) + .version(Version::HTTP_11) + .uri(uri) + .header(HOST, host_value) + .header( + USER_AGENT, + HeaderValue::from_static(const_format::formatcp!( + "{}/{}", + rama_utils::info::NAME, + rama_utils::info::VERSION, + )), + ) + .body(Body::empty()) + .context("build http request")?; + + Ok(Self { req }) } - #[allow(unused)] + #[expect(unused)] /// Add a header to the request. pub(super) fn with_header(&mut self, name: HeaderName, value: HeaderValue) -> &mut Self { - match self.headers { - Some(ref mut headers) => { - headers.insert(name, value); - } - None => { - let mut headers = HeaderMap::new(); - headers.insert(name, value); - self.headers = Some(headers); - } - } + self.req.headers_mut().insert(name, value); self } /// Add a typed header to the request. pub(super) fn with_typed_header(&mut self, header: impl Header) -> &mut Self { - match self.headers { - Some(ref mut headers) => { - headers.typed_insert(header); - } - None => { - let mut headers = HeaderMap::new(); - headers.typed_insert(header); - self.headers = Some(headers); - } - } + self.req.headers_mut().typed_insert(header); self } /// Connect to the proxy server. pub(super) async fn handshake( - &self, - mut stream: S, - ) -> Result { - // TODO: handle user-agent and host better - // TODO: use h1 protocol from embedded hyper directly here! - let mut request = format!( - "\ - CONNECT {authority} HTTP/1.1\r\n\ - Host: {authority}\r\n\ - User-Agent: {ua_name}/{ua_version}\r\n\ - ", - authority = self.authority, - ua_name = rama_utils::info::NAME, - ua_version = rama_utils::info::VERSION, - ) - .into_bytes(); - if let Some(ref headers) = self.headers { - for (name, value) in headers.iter() { - request.extend_from_slice(name.as_str().as_bytes()); - request.extend_from_slice(b": "); - request.extend_from_slice(value.as_bytes()); - request.extend_from_slice(b"\r\n"); - } - } - request.extend_from_slice(b"\r\n"); - - stream.write_all(&request).await?; + self, + stream: S, + ) -> Result { + let (tx, conn) = http1::Builder::default() + .ignore_invalid_headers(true) + .handshake(stream) + .await + .map_err(|err| HttpProxyError::Transport(err.into()))?; - let mut buf = [0; 8192]; - let mut pos = 0; - - loop { - let n = stream.read(&mut buf[pos..]).await?; - - if n == 0 { - return Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "http conn handshake read incomplete", - ) - .into()); + tokio::spawn(async move { + if let Err(err) = conn.with_upgrades().await { + tracing::debug!(?err, "http upgrade proxy client conn failed"); } - pos += n; + }); - let recvd = &buf[..pos]; - if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") { - if recvd.ends_with(b"\r\n\r\n") { - return Ok(stream); - } - if pos == buf.len() { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "http conn handshake response too large", - ) - .into()); - } - // else read more - } else if recvd.starts_with(b"HTTP/1.1 407") { - return Err(HttpProxyError::AuthRequired); - } else if recvd.starts_with(b"HTTP/1.1 503") { - return Err(HttpProxyError::Unavailable); - } else { - let input = String::from_utf8_lossy(recvd); - return Err(HttpProxyError::Other(format!( - "invalid http conn handshake start: [{}]", - if let Some((line, _)) = input.split_once("\r\n") { - Cow::Borrowed(line) - } else { - input - } - ))); - } + let response = tx + .send_request(self.req) + .await + .map_err(|err| HttpProxyError::Transport(OpaqueError::from_std(err).into_boxed()))?; + + match response.status() { + StatusCode::OK => upgrade::on(response) + .await + .map_err(|err| HttpProxyError::Transport(OpaqueError::from_std(err).into_boxed())), + StatusCode::PROXY_AUTHENTICATION_REQUIRED => Err(HttpProxyError::AuthRequired), + StatusCode::SERVICE_UNAVAILABLE => Err(HttpProxyError::Unavailable), + status => Err(HttpProxyError::Other(format!( + "invalid http proxy conn handshake: status={status}", + ))), } } } diff --git a/rama-http-backend/src/client/proxy/layer/proxy_connector/service.rs b/rama-http-backend/src/client/proxy/layer/proxy_connector/service.rs index 854ff371..8174fbb0 100644 --- a/rama-http-backend/src/client/proxy/layer/proxy_connector/service.rs +++ b/rama-http-backend/src/client/proxy/layer/proxy_connector/service.rs @@ -2,9 +2,11 @@ use crate::client::proxy::layer::HttpProxyError; use super::InnerHttpProxyConnector; use rama_core::{ + combinators::Either, error::{BoxError, ErrorExt, OpaqueError}, Context, Service, }; +use rama_http_core::upgrade; use rama_http_types::headers::ProxyAuthorization; use rama_net::{ address::ProxyAddress, @@ -77,7 +79,8 @@ where + Send + 'static, { - type Response = EstablishedClientConnection; + type Response = + EstablishedClientConnection, State, Request>; type Error = BoxError; async fn serve( @@ -142,7 +145,18 @@ where Err("http proxy required but none is defined".into()) } else { tracing::trace!("http proxy connector: no proxy required or set: proceed with direct connection"); - Ok(established_conn) + let EstablishedClientConnection { + ctx, + req, + conn, + addr, + } = established_conn; + return Ok(EstablishedClientConnection { + ctx, + req, + conn: Either::A(conn), + addr, + }); }; } }; @@ -173,12 +187,13 @@ where return Ok(EstablishedClientConnection { ctx, req, - conn, + conn: Either::A(conn), addr, }); } - let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone()); + let mut connector = InnerHttpProxyConnector::new(transport_ctx.authority.clone())?; + if let Some(credential) = address.credential.clone() { match credential { ProxyCredential::Basic(basic) => { @@ -203,7 +218,7 @@ where Ok(EstablishedClientConnection { ctx, req, - conn, + conn: Either::B(conn), addr, }) }