Skip to content

Commit

Permalink
refactor: reuse http status code (apache#226)
Browse files Browse the repository at this point in the history
* reuse http status code
  • Loading branch information
baojinri authored Sep 1, 2022
1 parent 295a25d commit 5590c95
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 72 deletions.
21 changes: 3 additions & 18 deletions server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,17 @@
use common_util::define_result;
use snafu::Snafu;

/// Server status code
#[derive(Debug, Clone, Copy)]
pub enum StatusCode {
Ok = 200,
InvalidArgument = 400,
NotFound = 404,
TooManyRequests = 429,
InternalError = 500,
}

impl StatusCode {
pub fn as_u32(&self) -> u32 {
*self as u32
}
}
pub use warp::http::StatusCode;

define_result!(ServerError);

#[derive(Snafu, Debug)]
#[snafu(visibility(pub(crate)))]
pub enum ServerError {
#[snafu(display("Rpc error, code:{}, message:{}", code.as_u32(), msg))]
#[snafu(display("Rpc error, code:{}, message:{}", code.as_u16(), msg))]
ErrNoCause { code: StatusCode, msg: String },

#[snafu(display("Rpc error, code:{}, message:{}, cause:{}", code.as_u32(), msg, source))]
#[snafu(display("Rpc error, code:{}, message:{}, cause:{}", code.as_u16(), msg, source))]
ErrWithCause {
code: StatusCode,
msg: String,
Expand Down
16 changes: 8 additions & 8 deletions server/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,15 +328,15 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {

fn build_err_header(err: ServerError) -> ResponseHeader {
let mut header = ResponseHeader::new();
header.set_code(err.code().as_u32());
header.set_code(err.code().as_u16().into());
header.set_error(err.error_message());

header
}

fn build_ok_header() -> ResponseHeader {
let mut header = ResponseHeader::new();
header.set_code(StatusCode::Ok.as_u32());
header.set_code(StatusCode::OK.as_u16().into());

header
}
Expand Down Expand Up @@ -385,7 +385,7 @@ macro_rules! handle_request {
HandlerContext::new(header, router, instance, &schema_config_provider)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Invalid header",
})?;
$mod_name::$handle_fn(&handler_ctx, req).await.map_err(|e| {
Expand Down Expand Up @@ -413,7 +413,7 @@ macro_rules! handle_request {
let resp_result = match rx.await {
Ok(resp_result) => resp_result,
Err(_e) => ErrNoCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Result channel disconnected",
}
.fail(),
Expand Down Expand Up @@ -483,7 +483,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Invalid header",
})?;
let mut total_success = 0;
Expand All @@ -493,7 +493,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
let write_result = write::handle_write(
&handler_ctx,
req.map_err(|e| Box::new(e) as _).context(ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to fetch request",
})?,
)
Expand Down Expand Up @@ -529,7 +529,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
let resp_result = match rx.await {
Ok(resp_result) => resp_result,
Err(_e) => ErrNoCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Result channel disconnected",
}
.fail(),
Expand Down Expand Up @@ -578,7 +578,7 @@ impl<Q: QueryExecutor + 'static> StorageService for StorageServiceImpl<Q> {
let handler_ctx = HandlerContext::new(header, router, instance, &schema_config_provider)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Invalid header",
})?;
let output = query::fetch_query_output(&handler_ctx, &req)
Expand Down
24 changes: 12 additions & 12 deletions server/src/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ where
.parse_promql(&mut sql_ctx, req)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Invalid request",
})?;

let (plan, column_name) = frontend
.promql_expr_to_plan(&mut sql_ctx, expr)
.map_err(|e| {
let code = if is_table_not_found_error(&e) {
StatusCode::NotFound
StatusCode::NOT_FOUND
} else {
StatusCode::InternalError
StatusCode::INTERNAL_SERVER_ERROR
};
ServerError::ErrWithCause {
code,
Expand All @@ -91,7 +91,7 @@ where

if ctx.instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TooManyRequests,
code: StatusCode::TOO_MANY_REQUESTS,
msg: "Query limited by reject list",
}
.fail()?;
Expand All @@ -114,14 +114,14 @@ where
.await
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to execute interpreter",
})?;

let resp = convert_output(output, column_name)
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to convert output",
})?;

Expand Down Expand Up @@ -191,7 +191,7 @@ fn convert_records(

fn empty_ok_resp() -> PrometheusQueryResponse {
let mut header = ResponseHeader::new();
header.code = StatusCode::Ok.as_u32();
header.code = StatusCode::OK.as_u16().into();

let mut resp = PrometheusQueryResponse::new();
resp.set_header(header);
Expand All @@ -212,33 +212,33 @@ impl RecordConverter {
let tsid_idx = record_schema
.index_of(TSID_COLUMN)
.with_context(|| ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Failed to find Tsid column".to_string(),
})?;
let timestamp_idx = record_schema
.index_of(&column_name.timestamp)
.with_context(|| ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Failed to find Timestamp column".to_string(),
})?;
ensure!(
record_schema.column(timestamp_idx).data_type == DatumKind::Timestamp,
ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Timestamp column should be timestamp type"
}
);
let field_idx = record_schema
.index_of(&column_name.field)
.with_context(|| ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to find {} column", column_name.field),
})?;
let field_type = record_schema.column(field_idx).data_type;
ensure!(
field_type.is_f64_castable(),
ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: format!(
"Field type must be f64-compatibile type, current:{}",
field_type
Expand Down
16 changes: 8 additions & 8 deletions server/src/grpc/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const RECORD_NAME: &str = "Result";

fn empty_ok_resp() -> QueryResponse {
let mut header = ResponseHeader::new();
header.code = StatusCode::Ok.as_u32();
header.code = StatusCode::OK.as_u16().into();

let mut resp = QueryResponse::new();
resp.set_header(header);
Expand All @@ -47,7 +47,7 @@ pub async fn handle_query<Q: QueryExecutor + 'static>(
convert_output(&output)
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to convert output, query:{}", &req.ql),
})
} else {
Expand Down Expand Up @@ -89,7 +89,7 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
.parse_sql(&mut sql_ctx, &req.ql)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: "Failed to parse sql",
})?;

Expand All @@ -102,7 +102,7 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
ensure!(
stmts.len() == 1,
ErrNoCause {
code: StatusCode::InvalidArgument,
code: StatusCode::BAD_REQUEST,
msg: format!(
"Only support execute one statement now, current num:{}, query:{}",
stmts.len(),
Expand All @@ -119,13 +119,13 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
.statement_to_plan(&mut sql_ctx, stmts.remove(0))
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to create plan, query:{}", req.ql),
})?;

if ctx.instance.limiter.should_limit(&plan) {
ErrNoCause {
code: StatusCode::TooManyRequests,
code: StatusCode::TOO_MANY_REQUESTS,
msg: "Query limited by reject list",
}
.fail()?;
Expand All @@ -148,7 +148,7 @@ pub async fn fetch_query_output<Q: QueryExecutor + 'static>(
.await
.map_err(|e| Box::new(e) as _)
.with_context(|| ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to execute interpreter, query:{}", req.ql),
})?;

Expand Down Expand Up @@ -217,7 +217,7 @@ pub fn convert_records(records: &[RecordBatch]) -> Result<QueryResponse> {
avro_util::record_batch_to_avro(record_batch, avro_schema, &mut rows)
.map_err(|e| Box::new(e) as _)
.context(ErrWithCause {
code: StatusCode::InternalError,
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to convert record batch",
})?;
}
Expand Down
Loading

0 comments on commit 5590c95

Please sign in to comment.