Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ lazy_static::lazy_static! {
pub(crate) static ref CLIENT: Client = Client::new();
}

fn split_header(line: &str) -> (&str, &str) {
let mut iter = line.splitn(2, ": ");
(
iter.next().unwrap_or(""),
iter.next().unwrap_or(""),
)
}

async fn close_session(target: &Url, uid: Uuid) {
let resp = CLIENT
.get(join_url(target, ["close/", &uid.to_string()]))
Expand All @@ -28,8 +36,21 @@ async fn close_session(target: &Url, uid: Uuid) {
.unwrap();
assert_ok(resp).await;
}
async fn init_http_session(target: &Url) -> Trace<Uuid> {
let resp = CLIENT.get(join_url(target, ["open"])).send().await.unwrap();
async fn init_http_session(target: &Url, req_headers: &Vec<String>) -> Trace<Uuid> {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert("key", "value".try_into().unwrap());

let mut req = CLIENT.get(join_url(target, ["open"]));

println!("req_headers: {:?}", req_headers);
for req_header in req_headers {
let (u_name, u_val) = split_header(req_header);
req = req.header(u_name, u_val);
}

let resp = req.send()
.await
.unwrap();
let resp = assert_ok(resp).await;
return Ok(Uuid::from_bytes(
match identity::<&[u8]>(&resp.bytes().await.unwrap()).try_into() {
Expand Down Expand Up @@ -72,8 +93,8 @@ async fn download_req(
return resp.bytes_stream();
}

async fn process_socket(target_url: Arc<Url>, socket: tokio::net::TcpStream) -> Trace<Uuid> {
let uid = init_http_session(&target_url).await?;
async fn process_socket(target_url: Arc<Url>, req_headers: Arc<Vec<String>>, socket: tokio::net::TcpStream) -> Trace<Uuid> {
let uid = init_http_session(&target_url, &req_headers).await?;
println!("HTTP Server copies. Established session {uid:#x?}");

let (s_read, mut s_write) = socket.into_split();
Expand Down Expand Up @@ -151,6 +172,7 @@ async fn process_socket(target_url: Arc<Url>, socket: tokio::net::TcpStream) ->
pub async fn main(
bind_addr: &[SocketAddr],
target_url: Url,
req_headers: Vec<String>,
) -> (SocketAddr, impl Future<Output = Infallible>) {
//console_subscriber::init();
let listener_result = TcpListener::bind(bind_addr).await;
Expand Down Expand Up @@ -181,14 +203,16 @@ pub async fn main(
let bound = listener.local_addr().unwrap();
println!("Listening on {bound}");
let target_url = Arc::new(target_url);
let req_headers = Arc::new(req_headers);
return (bound, async move {
loop {
let (socket, _) = listener.accept().await.unwrap();
let target_url = target_url.clone();
let req_headers = req_headers.clone();
let _join_handle = tokio::spawn(async move {
#[cfg(test)]
AC.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
drop(dbg!(process_socket(target_url, socket).await));
drop(dbg!(process_socket(target_url, req_headers, socket).await));
#[cfg(test)]
AC.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
});
Expand Down
7 changes: 6 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ enum CommandMode {
/// URL of the exit node.
#[clap(short, long, value_parser)]
target_url: Url,

/// Additional headers to include in HTTP request to the exit node.
#[clap(short, long, value_parser)]
req_headers: Vec<String>,
},
/// Spin up exit node. Receives incoming HTTP and forwards TCP.
Exit {
Expand Down Expand Up @@ -112,8 +116,9 @@ async fn main() {
CommandMode::Entry {
bind_addr,
target_url,
req_headers,
} => {
entry::main(&bind_addr.resolve().await, target_url)
entry::main(&bind_addr.resolve().await, target_url, req_headers)
.await
.1
.await;
Expand Down
1 change: 1 addition & 0 deletions tmp/tcp-over-http
Submodule tcp-over-http added at a67424