Skip to content

Commit f841552

Browse files
committed
WIP generic
1 parent ac1a86e commit f841552

File tree

4 files changed

+214
-210
lines changed

4 files changed

+214
-210
lines changed

josh-proxy/src/bin/josh-proxy.rs

+48-43
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,17 @@ extern crate clap;
44

55
use bytes::Bytes;
66
use clap::Parser;
7+
use http_body_util::combinators::BoxBody;
78
use http_body_util::Full;
89
use hyper::body::Incoming;
910
use hyper::server::conn::http1;
1011
use hyper_util::rt::{tokio::TokioIo, tokio::TokioTimer};
1112
use josh_proxy::cli;
12-
use josh_proxy::{run_git_with_auth, FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate};
13+
use josh_proxy::juniper_hyper::graphql;
14+
use josh_proxy::{
15+
body::{empty, full},
16+
run_git_with_auth, FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate,
17+
};
1318
use opentelemetry::global;
1419
use opentelemetry::sdk::propagation::TraceContextPropagator;
1520
use tokio::pin;
@@ -297,7 +302,7 @@ async fn fetch_upstream(
297302
async fn static_paths(
298303
service: &JoshProxyService,
299304
path: &str,
300-
) -> josh::JoshResult<Option<Response<Full<Bytes>>>> {
305+
) -> josh::JoshResult<Option<Response<BoxBody<Bytes, hyper::Error>>>> {
301306
tracing::debug!("static_path {:?}", path);
302307
if path == "/version" {
303308
return Ok(Some(make_response(
@@ -356,7 +361,7 @@ async fn static_paths(
356361
async fn repo_update_fn(
357362
serv: Arc<JoshProxyService>,
358363
req: Request<Incoming>,
359-
) -> josh::JoshResult<Response<Full<Bytes>>> {
364+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
360365
let body = req.into_body().collect().await?.to_bytes();
361366

362367
let s = tracing::span!(tracing::Level::TRACE, "repo update worker");
@@ -378,10 +383,10 @@ async fn repo_update_fn(
378383
Ok(match result {
379384
Ok(stderr) => Response::builder()
380385
.status(hyper::StatusCode::OK)
381-
.body(Full::new(Bytes::from(stderr))),
386+
.body(full(stderr)),
382387
Err(josh::JoshError(stderr)) => Response::builder()
383388
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
384-
.body(Full::new(Bytes::from(stderr))),
389+
.body(full(stderr)),
385390
}?)
386391
}
387392

@@ -509,19 +514,19 @@ async fn do_filter(
509514
Ok(())
510515
}
511516

512-
fn make_response(body: &str, code: hyper::StatusCode) -> Response<Full<Bytes>> {
517+
fn make_response(body: &str, code: hyper::StatusCode) -> Response<BoxBody<Bytes, hyper::Error>> {
513518
let owned_body = body.to_owned();
514519
Response::builder()
515520
.status(code)
516521
.header(hyper::header::CONTENT_TYPE, "text/plain")
517-
.body(Full::new(Bytes::from(owned_body)))
522+
.body(full(owned_body))
518523
.expect("Can't build response")
519524
}
520525

521526
async fn handle_ui_request(
522527
req: Request<Incoming>,
523528
resource_path: &str,
524-
) -> josh::JoshResult<Response<Full<Bytes>>> {
529+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
525530
/*
526531
// Proxy: can be used for UI development or to serve a different UI
527532
if let Some(proxy) = &ARGS.static_resource_proxy_target {
@@ -530,7 +535,7 @@ async fn handle_ui_request(
530535
Ok(response) => Ok(response),
531536
Err(error) => Ok(Response::builder()
532537
.status(StatusCode::INTERNAL_SERVER_ERROR)
533-
.body(Full::new(Bytes::from(format!("Proxy error: {:?}", error))))
538+
.body(full(format!("Proxy error: {:?}", error)))
534539
.unwrap()),
535540
};
536541
}*/
@@ -553,7 +558,7 @@ async fn handle_ui_request(
553558
let resolver = hyper_staticfile::Resolver::new("josh/static");
554559
let request = hyper::http::Request::get(resolve_path).body(()).unwrap();
555560
let result = resolver.resolve_request(&request).await?;
556-
let response = hyper::Response::new(Full::new(
561+
let response = hyper::Response::new(full(
557562
hyper_staticfile::ResponseBuilder::new()
558563
.request(&req)
559564
.build(result)?
@@ -932,7 +937,7 @@ fn head_ref_or_default(head_ref: &str) -> HeadRef {
932937
async fn handle_serve_namespace_request(
933938
serv: Arc<JoshProxyService>,
934939
req: Request<Incoming>,
935-
) -> josh::JoshResult<Response<Full<Bytes>>> {
940+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
936941
let error_response = |status: StatusCode| Ok(make_response("", status));
937942

938943
if req.method() != hyper::Method::POST {
@@ -1126,7 +1131,7 @@ async fn handle_serve_namespace_request(
11261131
async fn call_service(
11271132
serv: Arc<JoshProxyService>,
11281133
req_auth: (josh_proxy::auth::Handle, Request<Incoming>),
1129-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1134+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
11301135
let (auth, req) = req_auth;
11311136

11321137
let path = {
@@ -1194,7 +1199,7 @@ async fn call_service(
11941199
return Ok(Response::builder()
11951200
.status(hyper::StatusCode::FOUND)
11961201
.header("Location", redirect_path)
1197-
.body(Full::new(Bytes::new()))?);
1202+
.body(empty())?);
11981203
}
11991204
};
12001205

@@ -1232,7 +1237,7 @@ async fn call_service(
12321237
return Ok(Response::builder()
12331238
.status(hyper::StatusCode::TEMPORARY_REDIRECT)
12341239
.header("Location", format!("{}{}", remote_url, parsed_url.pathinfo))
1235-
.body(Full::new(Bytes::new()))?);
1240+
.body(empty())?);
12361241
}
12371242

12381243
if is_repo_blocked(&meta) {
@@ -1260,7 +1265,7 @@ async fn call_service(
12601265
"Basic realm=User Visible Realm",
12611266
)
12621267
.status(hyper::StatusCode::UNAUTHORIZED);
1263-
return Ok(builder.body(Full::new(Bytes::new()))?);
1268+
return Ok(builder.body(empty())?);
12641269
}
12651270

12661271
if parsed_url.api == "/~/graphql" {
@@ -1269,11 +1274,13 @@ async fn call_service(
12691274

12701275
if parsed_url.api == "/~/graphiql" {
12711276
let addr = format!("/~/graphql{}", meta.config.repo);
1272-
return Ok(tokio::task::spawn_blocking(move || {
1273-
josh_proxy::juniper_hyper::graphiql(&addr, None)
1274-
})
1275-
.in_current_span()
1276-
.await??);
1277+
let resp =
1278+
tokio::task::spawn(
1279+
async move { josh_proxy::juniper_hyper::graphiql(&addr, None).await },
1280+
)
1281+
.in_current_span()
1282+
.await?;
1283+
return Ok(into_body(resp));
12771284
}
12781285

12791286
let headref = head_ref_or_default(&parsed_url.headref);
@@ -1297,11 +1304,11 @@ async fn call_service(
12971304
"Basic realm=User Visible Realm",
12981305
)
12991306
.status(hyper::StatusCode::UNAUTHORIZED);
1300-
return Ok(builder.body(Full::new(Bytes::new()))?);
1307+
return Ok(builder.body(empty())?);
13011308
}
13021309
Err(FetchError::Other(e)) => {
13031310
let builder = Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
1304-
return Ok(builder.body(Full::new(Bytes::from(e.0)))?);
1311+
return Ok(builder.body(full(e.0))?);
13051312
}
13061313
}
13071314

@@ -1382,7 +1389,9 @@ async fn call_service(
13821389
// it is executed in all cases.
13831390
std::mem::drop(temp_ns);
13841391

1385-
Ok(cgires.0)
1392+
Ok(cgires
1393+
.0
1394+
.map(|body| body.map_err(|never| match never {}).boxed()))
13861395
}
13871396

13881397
async fn serve_query(
@@ -1391,7 +1400,7 @@ async fn serve_query(
13911400
upstream_repo: String,
13921401
filter: josh::filter::Filter,
13931402
head_ref: &str,
1394-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1403+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
13951404
let tracing_span = tracing::span!(tracing::Level::TRACE, "render worker");
13961405
let head_ref = head_ref.to_string();
13971406
let res = tokio::task::spawn_blocking(move || -> josh::JoshResult<_> {
@@ -1434,15 +1443,15 @@ async fn serve_query(
14341443
.get("content-type")
14351444
.unwrap_or(&"text/plain".to_string()),
14361445
)
1437-
.body(Full::new(Bytes::from(res)))?,
1446+
.body(full(res))?,
14381447

14391448
Ok(None) => Response::builder()
14401449
.status(hyper::StatusCode::NOT_FOUND)
1441-
.body(Full::new(Bytes::from("File not found".to_string())))?,
1450+
.body(full("File not found".to_string()))?,
14421451

14431452
Err(res) => Response::builder()
14441453
.status(hyper::StatusCode::UNPROCESSABLE_ENTITY)
1445-
.body(Full::new(Bytes::from(res.to_string())))?,
1454+
.body(full(res.to_string()))?,
14461455
})
14471456
}
14481457

@@ -1756,16 +1765,8 @@ async fn serve_graphql(
17561765
upstream_repo: String,
17571766
upstream: String,
17581767
auth: josh_proxy::auth::Handle,
1759-
) -> josh::JoshResult<Response<Full<Bytes>>> {
1768+
) -> josh::JoshResult<Response<BoxBody<Bytes, hyper::Error>>> {
17601769
let remote_url = upstream.clone() + upstream_repo.as_str();
1761-
let parsed = match josh_proxy::juniper_hyper::parse_req(req).await {
1762-
Ok(r) => r,
1763-
Err(resp) => {
1764-
return Ok(hyper::Response::new(Full::new(Bytes::from(
1765-
resp.collect().await?.to_bytes(),
1766-
))))
1767-
}
1768-
};
17691770

17701771
let transaction_mirror = josh::cache::Transaction::open(
17711772
&serv.repo_path.join("mirror"),
@@ -1796,12 +1797,16 @@ async fn serve_graphql(
17961797
// First attempt to serve GraphQL query. If we can serve it
17971798
// that means all requested revisions were specified by SHA and we could find
17981799
// all of them locally, so no need to fetch.
1799-
let res = parsed.execute(&root_node, &context).await;
1800+
let res = graphql(root_node, context, req).await;
1801+
1802+
if !res.status().is_success() {
1803+
return Ok(res.map(|body| body.map_err(|never| match never {}).boxed()));
1804+
}
18001805

18011806
// The "allow_refs" flag will be set by the query handler if we need to do a fetch
18021807
// to complete the query.
18031808
if !*context.allow_refs.lock().unwrap() {
1804-
res
1809+
Ok(res)
18051810
} else {
18061811
match fetch_upstream(
18071812
serv.clone(),
@@ -1823,16 +1828,16 @@ async fn serve_graphql(
18231828
"Basic realm=User Visible Realm",
18241829
)
18251830
.status(hyper::StatusCode::UNAUTHORIZED);
1826-
return Ok(builder.body(Full::new(Bytes::new()))?);
1831+
return Ok(builder.body(empty())?);
18271832
}
18281833
Err(FetchError::Other(e)) => {
18291834
let builder =
18301835
Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR);
1831-
return Ok(builder.body(Full::new(Bytes::from(e.0)))?);
1836+
return Ok(builder.body(full(e.0))?);
18321837
}
18331838
};
18341839

1835-
parsed.execute(&root_node, &context).await
1840+
Ok(graphql(root_node, context, req).await)
18361841
}
18371842
};
18381843

@@ -1842,8 +1847,8 @@ async fn serve_graphql(
18421847
hyper::StatusCode::BAD_REQUEST
18431848
};
18441849

1845-
let body = Full::new(Bytes::from(serde_json::to_string_pretty(&res).unwrap()));
1846-
let mut resp = Response::new(Full::new(Bytes::new()));
1850+
let body = full(serde_json::to_string_pretty(&res).unwrap());
1851+
let mut resp = Response::new(empty());
18471852
*resp.status_mut() = code;
18481853
resp.headers_mut().insert(
18491854
hyper::header::CONTENT_TYPE,

josh-proxy/src/body.rs

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
2+
use hyper::{
3+
body::{Body, Bytes},
4+
Response,
5+
};
6+
7+
pub fn empty() -> BoxBody<Bytes, hyper::Error> {
8+
Empty::<Bytes>::new()
9+
.map_err(|never| match never {})
10+
.boxed()
11+
}
12+
pub fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
13+
Full::new(chunk.into())
14+
.map_err(|never| match never {})
15+
.boxed()
16+
}
17+
18+
pub fn into_box_body<T>(resp: &Response<T>) -> Response<BoxBody<Bytes, hyper::Error>>
19+
where
20+
T: Body,
21+
Bytes: From<T>,
22+
{
23+
resp.map(|body| {
24+
<Bytes as Into<Bytes>>::into(body)
25+
.map_err(|e| e.into())
26+
.boxed()
27+
})
28+
}

0 commit comments

Comments
 (0)