Skip to content

Commit

Permalink
Update dependencies and enhance connection handling in Proxer CLI
Browse files Browse the repository at this point in the history
- Bump versions of `hyper` (1.5.2), `serde` (1.0.216), and `proxer-cli` (0.3.2) in `Cargo.toml` and `Cargo.lock`.
- Introduce `rlimit` dependency (0.10.2) for managing open connection limits.
- Refactor connection handling in `src/server/mod.rs`, `src/server/proxy.rs`, and `src/server/tunnel.rs` to improve error handling and connection management with timeouts.
- Replace direct usage of `TokioIo::new(stream)` with a consistent approach across the server modules.
  • Loading branch information
doroved committed Dec 18, 2024
1 parent f77664f commit 105eb17
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
24 changes: 17 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "proxer-cli"
version = "0.3.1"
version = "0.3.2"
edition = "2021"
authors = ["doroved"]
description = "Proxy TCP traffic on macOS with domain filtering."
Expand All @@ -13,10 +13,10 @@ categories = ["network-programming", "command-line-utilities"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = { version = "1.5.1", features = ["http1", "http2", "server", "client"] }
hyper = { version = "1.5.2", features = ["http1", "http2", "server", "client"] }
hyper-util = { version = "0.1.10", features = ["tokio"] }

serde = { version = "1.0.215", features = ["derive"] }
serde = { version = "1.0.216", features = ["derive"] }
serde_json = "1.0.133"

tokio = { version = "1.42.0", features = ["full"] }
Expand All @@ -34,6 +34,7 @@ http-body-util = "0.1.2"
tracing = "0.1.41"
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
json5 = "0.4.1"
rlimit = "0.10.2"


[profile.release]
Expand Down
22 changes: 20 additions & 2 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ pub struct ProxyConfig {
}

pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
tracing::info!(
"Default open connection limit: {:?}",
rlimit::Resource::NOFILE.get_soft()?
);

let connection_limit = match rlimit::Resource::NOFILE.get() {
Ok(limit) if limit.0 < 1024 * 10 => {
tracing::info!("Setting open connection limit to {}", limit.1);
limit.1
}
Ok(limit) => limit.0,
Err(err) => {
tracing::error!("Failed to get open connection limit: {}", err);
return Err(Box::new(err));
}
};

let _ = rlimit::setrlimit(rlimit::Resource::NOFILE, connection_limit, rlimit::INFINITY);

// Close all proxer-cli processes
terminate_proxer();

Expand Down Expand Up @@ -82,7 +101,6 @@ pub async fn run() -> Result<(), Box<dyn std::error::Error>> {

loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
let proxy_config = Arc::clone(&proxy_config);

let service = service_fn(move |req: Request<body::Incoming>| {
Expand All @@ -94,7 +112,7 @@ pub async fn run() -> Result<(), Box<dyn std::error::Error>> {
if let Err(err) = http1::Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.serve_connection(io, service)
.serve_connection(TokioIo::new(stream), service)
.with_upgrades()
.await
{
Expand Down
4 changes: 1 addition & 3 deletions src/server/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,10 @@ pub async fn handle_request(

match TcpStream::connect((host, port)).await {
Ok(stream) => {
let io = TokioIo::new(stream);

let (mut sender, conn) = Builder::new()
.preserve_header_case(true)
.title_case_headers(true)
.handshake(io)
.handshake(TokioIo::new(stream))
.await?;

tokio::spawn(async move {
Expand Down
25 changes: 17 additions & 8 deletions src/server/tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ pub async fn tunnel_direct(
) -> Result<(), Box<dyn std::error::Error>> {
tracing::info!("{addr} → DIRECT connection");

let mut server = TcpStream::connect(&addr).await?;
let _ = tokio::io::copy_bidirectional(&mut TokioIo::new(upgraded), &mut server).await?;
let mut server = timeout(Duration::from_secs(30), TcpStream::connect(&addr)).await??;

timeout(
Duration::from_secs(30),
tokio::io::copy_bidirectional(&mut TokioIo::new(upgraded), &mut server),
)
.await??;

Ok(())
}
Expand All @@ -52,14 +57,13 @@ pub async fn tunnel_via_proxy(
let proxy_addr = format!("{}:{}", &proxy.host, &proxy.port);
let proxy_host = proxy_addr.split(':').next().unwrap();

let tcp = TcpStream::connect(&proxy_addr).await?;
let mut upgraded = TokioIo::new(upgraded);
let tcp = timeout(Duration::from_secs(30), TcpStream::connect(&proxy_addr)).await??;

let mut stream: Pin<Box<dyn AsyncReadWrite>> = if proxy.scheme.eq_ignore_ascii_case("http") {
Box::pin(tcp)
} else {
let tls = TlsConnector::from(native_tls::TlsConnector::new().unwrap());
Box::pin(tls.connect(proxy_host, tcp).await?)
let tls = TlsConnector::from(native_tls::TlsConnector::new()?);
Box::pin(timeout(Duration::from_secs(30), tls.connect(proxy_host, tcp)).await??)
};

let mut connect_req = format!(
Expand Down Expand Up @@ -100,14 +104,19 @@ pub async fn tunnel_via_proxy(
return Err(format!("{:?}", String::from_utf8_lossy(&response[..n])).into());
}

let (from_client, from_server) =
tokio::io::copy_bidirectional(&mut upgraded, &mut stream).await?;
let (from_client, from_server) = timeout(
Duration::from_secs(30),
tokio::io::copy_bidirectional(&mut TokioIo::new(upgraded), &mut stream),
)
.await??;

tracing::info!(
"{addr} → Client wrote {:.2} KB and received {:.2} KB",
from_client as f64 / 1024.0,
from_server as f64 / 1024.0
);

stream.shutdown().await?;

Ok(())
}

0 comments on commit 105eb17

Please sign in to comment.