Skip to content

Commit

Permalink
fix: reduce cloning for dedupe (#3025)
Browse files Browse the repository at this point in the history
Co-authored-by: Sandipsinh Rathod <[email protected]>
Co-authored-by: Sandipsinh Dilipsinh Rathod <[email protected]>
Co-authored-by: Tushar Mathur <[email protected]>
Co-authored-by: Amit Singh <[email protected]>
  • Loading branch information
5 people authored Oct 24, 2024
1 parent d520ee4 commit 9fe3dc1
Show file tree
Hide file tree
Showing 23 changed files with 801 additions and 268 deletions.
7 changes: 4 additions & 3 deletions src/core/app_context.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::sync::Arc;

use async_graphql::dynamic::{self, DynamicRequest};
use async_graphql::Response;
use async_graphql_value::ConstValue;
use dashmap::DashMap;

use super::lift::Lift;
use super::jit::AnyResponse;
use crate::core::async_graphql_hyper::OperationId;
use crate::core::auth::context::GlobalAuthContext;
use crate::core::blueprint::{Blueprint, Definition, SchemaModifiers};
Expand All @@ -30,8 +29,9 @@ pub struct AppContext {
pub endpoints: EndpointSet<Checked>,
pub auth_ctx: Arc<GlobalAuthContext>,
pub dedupe_handler: Arc<DedupeResult<IoId, ConstValue, Error>>,
pub dedupe_operation_handler: DedupeResult<OperationId, Lift<Response>, Error>,
pub dedupe_operation_handler: DedupeResult<OperationId, AnyResponse<Vec<u8>>, Error>,
pub operation_plans: DashMap<OPHash, OperationPlan<async_graphql_value::Value>>,
pub const_execution_cache: DashMap<OPHash, AnyResponse<Vec<u8>>>,
}

impl AppContext {
Expand Down Expand Up @@ -153,6 +153,7 @@ impl AppContext {
dedupe_handler: Arc::new(DedupeResult::new(false)),
dedupe_operation_handler: DedupeResult::new(false),
operation_plans: DashMap::new(),
const_execution_cache: DashMap::default(),
}
}

Expand Down
190 changes: 181 additions & 9 deletions src/core/async_graphql_hyper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use tailcall_hasher::TailcallHasher;

use super::jit::{BatchResponse as JITBatchResponse, JITExecutor};

#[derive(PartialEq, Eq, Clone, Hash, Debug)]
pub struct OperationId(u64);

Expand All @@ -21,6 +23,8 @@ pub trait GraphQLRequestLike: Hash + Send {
where
E: Executor;

async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse;

fn parse_query(&mut self) -> Option<&ExecutableDocument>;

fn is_query(&mut self) -> bool {
Expand Down Expand Up @@ -73,6 +77,11 @@ impl GraphQLRequestLike for GraphQLBatchRequest {
}
self
}

async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
GraphQLArcResponse::new(executor.execute_batch(self.0).await)
}

/// Shortcut method to execute the request on the executor.
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
Expand Down Expand Up @@ -107,6 +116,11 @@ impl GraphQLRequestLike for GraphQLRequest {
self.0.data.insert(data);
self
}
async fn execute_with_jit(self, executor: JITExecutor) -> GraphQLArcResponse {
let response = executor.execute(self.0).await;
GraphQLArcResponse::new(JITBatchResponse::Single(response))
}

/// Shortcut method to execute the request on the schema.
async fn execute<E>(self, executor: &E) -> GraphQLResponse
where
Expand Down Expand Up @@ -250,20 +264,148 @@ impl GraphQLResponse {
///
/// * A modified `GraphQLResponse` with updated `cache_control` `max_age`
/// and `public` flag.
pub fn set_cache_control(mut self, min_cache: i32, cache_public: bool) -> GraphQLResponse {
match self.0 {
BatchResponse::Single(ref mut res) => {
res.cache_control.max_age = min_cache;
res.cache_control.public = cache_public;
}
BatchResponse::Batch(ref mut list) => {
for res in list {
pub fn set_cache_control(
mut self,
enable_cache_header: bool,
min_cache: i32,
cache_public: bool,
) -> GraphQLResponse {
if enable_cache_header {
match self.0 {
BatchResponse::Single(ref mut res) => {
res.cache_control.max_age = min_cache;
res.cache_control.public = cache_public;
}
BatchResponse::Batch(ref mut list) => {
for res in list {
res.cache_control.max_age = min_cache;
res.cache_control.public = cache_public;
}
}
};
}
self
}
}

#[derive(Clone, Debug)]
pub struct CacheControl {
pub max_age: i32,
pub public: bool,
}

impl Default for CacheControl {
fn default() -> Self {
Self { public: true, max_age: 0 }
}
}

impl CacheControl {
pub fn value(&self) -> Option<String> {
let mut value = if self.max_age > 0 {
format!("max-age={}", self.max_age)
} else if self.max_age == -1 {
"no-cache".to_string()
} else {
String::new()
};

if !self.public {
if !value.is_empty() {
value += ", ";
}
value += "private";
}

if !value.is_empty() {
Some(value)
} else {
None
}
}

pub fn merge(self, other: &CacheControl) -> CacheControl {
CacheControl {
public: self.public && other.public,
max_age: match (self.max_age, other.max_age) {
(-1, _) => -1,
(_, -1) => -1,
(a, 0) => a,
(0, b) => b,
(a, b) => a.min(b),
},
}
}
}

pub struct GraphQLArcResponse {
response: JITBatchResponse<Vec<u8>>,
cache_control: Option<CacheControl>,
}

impl GraphQLArcResponse {
pub fn new(response: JITBatchResponse<Vec<u8>>) -> Self {
Self { response, cache_control: None }
}

pub fn set_cache_control(self, enable_cache_header: bool, max_age: i32, public: bool) -> Self {
Self {
response: self.response,
cache_control: enable_cache_header.then_some(CacheControl { max_age, public }),
}
}
}

impl GraphQLArcResponse {
fn build_response(&self, status: StatusCode, body: Body) -> Result<Response<Body>> {
let mut response = Response::builder()
.status(status)
.header(CONTENT_TYPE, APPLICATION_JSON.as_ref())
.body(body)?;
if self.response.is_ok() {
if let Some(cache_control) = self
.response
.cache_control(self.cache_control.as_ref())
.value()
{
response.headers_mut().insert(
CACHE_CONTROL,
HeaderValue::from_str(cache_control.as_str())?,
);
}
}

Ok(response)
}

fn default_body(&self) -> Result<Body> {
let str_repr: Vec<u8> = match &self.response {
JITBatchResponse::Batch(resp) => {
// Use iterators and collect for more efficient concatenation
let combined = resp
.iter()
.enumerate()
.flat_map(|(i, r)| {
let mut v = if i > 0 {
vec![b',']
} else {
Vec::with_capacity(r.body.as_ref().len())
};
v.extend_from_slice(r.body.as_ref());
v
})
.collect::<Vec<u8>>();

// Wrap the result in square brackets
[&[b'['], &combined[..], &[b']']].concat()
}
JITBatchResponse::Single(resp) => resp.body.as_ref().to_owned(),
};
self
Ok(Body::from(str_repr))
}

pub fn into_response(self) -> Result<Response<hyper::Body>> {
self.build_response(StatusCode::OK, self.default_body()?)
}
}

Expand Down Expand Up @@ -362,4 +504,34 @@ mod tests {
.to_vec()
);
}

#[test]
fn to_value() {
assert_eq!(CacheControl { public: true, max_age: 0 }.value(), None);

assert_eq!(
CacheControl { public: false, max_age: 0 }.value(),
Some("private".to_string())
);

assert_eq!(
CacheControl { public: false, max_age: 10 }.value(),
Some("max-age=10, private".to_string())
);

assert_eq!(
CacheControl { public: true, max_age: 10 }.value(),
Some("max-age=10".to_string())
);

assert_eq!(
CacheControl { public: true, max_age: -1 }.value(),
Some("no-cache".to_string())
);

assert_eq!(
CacheControl { public: false, max_age: -1 }.value(),
Some("no-cache, private".to_string())
);
}
}
2 changes: 1 addition & 1 deletion src/core/blueprint/into_schema.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::Arc;

use async_graphql::dynamic::{self, FieldFuture, FieldValue, SchemaBuilder, TypeRef};
use async_graphql::ErrorExtensions;
use async_graphql_value::ConstValue;
use futures_util::TryFutureExt;
use tracing::Instrument;

use crate::core::blueprint::{Blueprint, Definition};
use crate::core::http::RequestContext;
use crate::core::ir::{EvalContext, ResolverContext, TypedValue};
use crate::core::jit::graphql_error::ErrorExtensions;
use crate::core::scalar::Scalar;

/// We set the default value for an `InputValue` by reading it from the
Expand Down
57 changes: 29 additions & 28 deletions src/core/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,6 @@ fn create_request_context(req: &Request<Body>, app_ctx: &AppContext) -> RequestC
RequestContext::from(app_ctx).allowed_headers(allowed_headers)
}

fn update_cache_control_header(
response: GraphQLResponse,
app_ctx: &AppContext,
req_ctx: Arc<RequestContext>,
) -> GraphQLResponse {
if app_ctx.blueprint.server.enable_cache_control_header {
let ttl = req_ctx.get_min_max_age().unwrap_or(0);
let cache_public_flag = req_ctx.is_cache_public().unwrap_or(true);
return response.set_cache_control(ttl, cache_public_flag);
}
response
}

pub fn update_response_headers(
resp: &mut Response<Body>,
req_ctx: &RequestContext,
Expand Down Expand Up @@ -134,21 +121,31 @@ async fn execute_query<T: DeserializeOwned + GraphQLRequestLike>(
) -> anyhow::Result<Response<Body>> {
let mut response = if app_ctx.blueprint.server.enable_jit {
let operation_id = request.operation_id(&req.headers);
let exec = JITExecutor::new(app_ctx.clone(), req_ctx.clone(), operation_id);
request
.execute(&JITExecutor::new(
app_ctx.clone(),
req_ctx.clone(),
operation_id,
))
.execute_with_jit(exec)
.await
.set_cache_control(
app_ctx.blueprint.server.enable_cache_control_header,
req_ctx.get_min_max_age().unwrap_or(0),
req_ctx.is_cache_public().unwrap_or(true),
)
.into_response()?
} else {
request.data(req_ctx.clone()).execute(&app_ctx.schema).await
request
.data(req_ctx.clone())
.execute(&app_ctx.schema)
.await
.set_cache_control(
app_ctx.blueprint.server.enable_cache_control_header,
req_ctx.get_min_max_age().unwrap_or(0),
req_ctx.is_cache_public().unwrap_or(true),
)
.into_response()?
};
response = update_cache_control_header(response, app_ctx, req_ctx.clone());

let mut resp = response.into_response()?;
update_response_headers(&mut resp, req_ctx, app_ctx);
Ok(resp)
update_response_headers(&mut response, req_ctx, app_ctx);
Ok(response)
}

fn create_allowed_headers(headers: &HeaderMap, allowed: &BTreeSet<String>) -> HeaderMap {
Expand Down Expand Up @@ -271,11 +268,15 @@ async fn handle_rest_apis(
let mut response = graphql_request
.data(req_ctx.clone())
.execute(&app_ctx.schema)
.await;
response = update_cache_control_header(response, app_ctx.as_ref(), req_ctx.clone());
let mut resp = response.into_rest_response()?;
update_response_headers(&mut resp, &req_ctx, &app_ctx);
Ok(resp)
.await
.set_cache_control(
app_ctx.blueprint.server.enable_cache_control_header,
req_ctx.get_min_max_age().unwrap_or(0),
req_ctx.is_cache_public().unwrap_or(true),
)
.into_rest_response()?;
update_response_headers(&mut response, &req_ctx, &app_ctx);
Ok(response)
}
.instrument(span)
.await;
Expand Down
7 changes: 4 additions & 3 deletions src/core/ir/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::fmt::Display;
use std::sync::Arc;

use async_graphql::{ErrorExtensions, Value as ConstValue};
use async_graphql::Value as ConstValue;
use derive_more::From;
use thiserror::Error;

use crate::core::jit::graphql_error::{Error as ExtensionError, ErrorExtensions};
use crate::core::{auth, cache, worker, Errata};

#[derive(From, Debug, Error, Clone)]
Expand Down Expand Up @@ -72,8 +73,8 @@ impl From<Error> for Errata {
}

impl ErrorExtensions for Error {
fn extend(&self) -> async_graphql::Error {
async_graphql::Error::new(format!("{}", self)).extend_with(|_err, e| {
fn extend(&self) -> ExtensionError {
ExtensionError::new(format!("{}", self)).extend_with(|_err, e| {
if let Error::GRPC {
grpc_code,
grpc_description,
Expand Down
Loading

1 comment on commit 9fe3dc1

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 8.45ms 3.56ms 105.23ms 91.60%
Req/Sec 3.01k 409.31 4.48k 76.67%

358942 requests in 30.02s, 1.80GB read

Requests/sec: 11956.02

Transfer/sec: 61.37MB

Please sign in to comment.