Skip to content

Commit

Permalink
record grpc duration correctly (tikv#10660)
Browse files Browse the repository at this point in the history
Signed-off-by: qupeng <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
hicqu and ti-chi-bot authored Aug 12, 2021
1 parent b806018 commit 834ec11
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 58 deletions.
6 changes: 5 additions & 1 deletion components/test_storage/src/sync_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tikv::storage::{
test_util::GetConsumer, txn::commands, Engine, PerfStatisticsDelta, PrewriteResult, Result,
Statistics, Storage, TestEngineBuilder, TestStorageBuilder, TxnStatus,
};
use tikv_util::time::Instant;
use txn_types::{Key, KvPair, Mutation, TimeStamp, Value};

/// A builder to build a `SyncTestStorage`.
Expand Down Expand Up @@ -142,7 +143,10 @@ impl<E: Engine> SyncTestStorage<E> {
})
.collect();
let p = GetConsumer::new();
block_on(self.store.batch_get_command(requests, ids, p.clone()))?;
block_on(
self.store
.batch_get_command(requests, ids, p.clone(), Instant::now()),
)?;
let mut values = vec![];
for value in p.take_data().into_iter() {
values.push(value?);
Expand Down
51 changes: 31 additions & 20 deletions src/server/service/batch.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.

use crate::server::metrics::GRPC_MSG_HISTOGRAM_STATIC;
use crate::server::metrics::REQUEST_BATCH_SIZE_HISTOGRAM_VEC;
use crate::server::service::kv::batch_commands_response;
use crate::server::metrics::{GrpcTypeKind, REQUEST_BATCH_SIZE_HISTOGRAM_VEC};
use crate::server::service::kv::{
batch_commands_response, GrpcRequestDuration, MeasuredSingleResponse,
};
use crate::storage::kv::{PerfStatisticsDelta, Statistics};
use crate::storage::{
errors::{extract_key_error, extract_region_error},
Expand Down Expand Up @@ -63,7 +64,7 @@ impl ReqBatcher {
pub fn maybe_commit<E: Engine, L: LockManager>(
&mut self,
storage: &Storage<E, L>,
tx: &Sender<(u64, batch_commands_response::Response)>,
tx: &Sender<MeasuredSingleResponse>,
) {
if self.gets.len() >= self.batch_size {
let gets = std::mem::take(&mut self.gets);
Expand All @@ -81,7 +82,7 @@ impl ReqBatcher {
pub fn commit<E: Engine, L: LockManager>(
self,
storage: &Storage<E, L>,
tx: &Sender<(u64, batch_commands_response::Response)>,
tx: &Sender<MeasuredSingleResponse>,
) {
if !self.gets.is_empty() {
future_batch_get_command(
Expand Down Expand Up @@ -135,13 +136,18 @@ impl BatcherBuilder {
}

pub struct GetCommandResponseConsumer {
tx: Sender<(u64, batch_commands_response::Response)>,
tx: Sender<MeasuredSingleResponse>,
}

impl ResponseBatchConsumer<(Option<Vec<u8>>, Statistics, PerfStatisticsDelta)>
for GetCommandResponseConsumer
{
fn consume(&self, id: u64, res: Result<(Option<Vec<u8>>, Statistics, PerfStatisticsDelta)>) {
fn consume(
&self,
id: u64,
res: Result<(Option<Vec<u8>>, Statistics, PerfStatisticsDelta)>,
begin: Instant,
) {
let mut resp = GetResponse::default();
if let Some(err) = extract_region_error(&res) {
resp.set_region_error(err);
Expand All @@ -164,14 +170,16 @@ impl ResponseBatchConsumer<(Option<Vec<u8>>, Statistics, PerfStatisticsDelta)>
cmd: Some(batch_commands_response::response::Cmd::Get(resp)),
..Default::default()
};
if self.tx.send_and_notify((id, res)).is_err() {
let mesure = GrpcRequestDuration::new(begin, GrpcTypeKind::kv_batch_get_command);
let task = MeasuredSingleResponse::new(id, res, mesure);
if self.tx.send_and_notify(task).is_err() {
error!("KvService response batch commands fail");
}
}
}

impl ResponseBatchConsumer<Option<Vec<u8>>> for GetCommandResponseConsumer {
fn consume(&self, id: u64, res: Result<Option<Vec<u8>>>) {
fn consume(&self, id: u64, res: Result<Option<Vec<u8>>>, begin: Instant) {
let mut resp = RawGetResponse::default();
if let Some(err) = extract_region_error(&res) {
resp.set_region_error(err);
Expand All @@ -186,7 +194,9 @@ impl ResponseBatchConsumer<Option<Vec<u8>>> for GetCommandResponseConsumer {
cmd: Some(batch_commands_response::response::Cmd::RawGet(resp)),
..Default::default()
};
if self.tx.send_and_notify((id, res)).is_err() {
let mesure = GrpcRequestDuration::new(begin, GrpcTypeKind::raw_batch_get_command);
let task = MeasuredSingleResponse::new(id, res, mesure);
if self.tx.send_and_notify(task).is_err() {
error!("KvService response batch commands fail");
}
}
Expand All @@ -196,7 +206,7 @@ fn future_batch_get_command<E: Engine, L: LockManager>(
storage: &Storage<E, L>,
requests: Vec<u64>,
gets: Vec<GetRequest>,
tx: Sender<(u64, batch_commands_response::Response)>,
tx: Sender<MeasuredSingleResponse>,
begin_instant: tikv_util::time::Instant,
) {
REQUEST_BATCH_SIZE_HISTOGRAM_VEC
Expand All @@ -207,6 +217,7 @@ fn future_batch_get_command<E: Engine, L: LockManager>(
gets,
requests,
GetCommandResponseConsumer { tx: tx.clone() },
begin_instant,
);
let f = async move {
// This error can only cause by readpool busy.
Expand All @@ -219,14 +230,14 @@ fn future_batch_get_command<E: Engine, L: LockManager>(
cmd: Some(batch_commands_response::response::Cmd::Get(resp.clone())),
..Default::default()
};
if tx.send_and_notify((id, res)).is_err() {
let measure =
GrpcRequestDuration::new(begin_instant, GrpcTypeKind::kv_batch_get_command);
let task = MeasuredSingleResponse::new(id, res, measure);
if tx.send_and_notify(task).is_err() {
error!("KvService response batch commands fail");
}
}
}
GRPC_MSG_HISTOGRAM_STATIC
.kv_batch_get_command
.observe(begin_instant.saturating_elapsed_secs());
};
poll_future_notify(f);
}
Expand All @@ -235,7 +246,7 @@ fn future_batch_raw_get_command<E: Engine, L: LockManager>(
storage: &Storage<E, L>,
requests: Vec<u64>,
gets: Vec<RawGetRequest>,
tx: Sender<(u64, batch_commands_response::Response)>,
tx: Sender<MeasuredSingleResponse>,
begin_instant: tikv_util::time::Instant,
) {
REQUEST_BATCH_SIZE_HISTOGRAM_VEC
Expand All @@ -258,14 +269,14 @@ fn future_batch_raw_get_command<E: Engine, L: LockManager>(
cmd: Some(batch_commands_response::response::Cmd::RawGet(resp.clone())),
..Default::default()
};
if tx.send_and_notify((id, res)).is_err() {
let measure =
GrpcRequestDuration::new(begin_instant, GrpcTypeKind::raw_batch_get_command);
let task = MeasuredSingleResponse::new(id, res, measure);
if tx.send_and_notify(task).is_err() {
error!("KvService response batch commands fail");
}
}
}
GRPC_MSG_HISTOGRAM_STATIC
.raw_batch_get_command
.observe(begin_instant.saturating_elapsed_secs());
};
poll_future_notify(f);
}
101 changes: 74 additions & 27 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,19 +990,26 @@ impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager> Tikv for
let response_retriever = BatchReceiver::new(
rx,
GRPC_MSG_MAX_BATCH_SIZE,
BatchCommandsResponse::default,
MeasuredBatchResponse::default,
BatchRespCollector,
);

let mut response_retriever = response_retriever
.inspect(|r| GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64))
.map(move |mut r| {
r.set_transport_layer_load(thread_load.load() as u64);
GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok((
r,
WriteFlags::default().buffer_hint(false),
))
});
let mut response_retriever = response_retriever.map(move |item| {
for measure in item.measures {
let GrpcRequestDuration { label, begin } = measure;
GRPC_MSG_HISTOGRAM_STATIC
.get(label)
.observe(begin.saturating_elapsed_secs());
}

let mut r = item.batch_resp;
GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64);
r.set_transport_layer_load(thread_load.load() as u64);
GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok((
r,
WriteFlags::default().buffer_hint(false),
))
});

let send_task = async move {
sink.send_all(&mut response_retriever).await?;
Expand Down Expand Up @@ -1138,20 +1145,18 @@ impl<T: RaftStoreRouter<E::Local> + 'static, E: Engine, L: LockManager> Tikv for
fn response_batch_commands_request<F>(
id: u64,
resp: F,
tx: Sender<(u64, batch_commands_response::Response)>,
begin_instant: Instant,
label_enum: GrpcTypeKind,
tx: Sender<MeasuredSingleResponse>,
begin: Instant,
label: GrpcTypeKind,
) where
F: Future<Output = Result<batch_commands_response::Response, ()>> + Send + 'static,
{
let task = async move {
if let Ok(resp) = resp.await {
if let Err(e) = tx.send_and_notify((id, resp)) {
let measure = GrpcRequestDuration { begin, label };
let task = MeasuredSingleResponse::new(id, resp, measure);
if let Err(e) = tx.send_and_notify(task) {
error!("KvService response batch commands fail"; "err" => ?e);
} else {
GRPC_MSG_HISTOGRAM_STATIC
.get(label_enum)
.observe(begin_instant.saturating_elapsed_secs());
}
}
};
Expand All @@ -1166,7 +1171,7 @@ fn handle_batch_commands_request<E: Engine, L: LockManager>(
peer: &str,
id: u64,
req: batch_commands_request::Request,
tx: &Sender<(u64, batch_commands_response::Response)>,
tx: &Sender<MeasuredSingleResponse>,
) {
// To simplify code and make the logic more clear.
macro_rules! oneof {
Expand Down Expand Up @@ -1993,17 +1998,59 @@ pub use kvproto::tikvpb::batch_commands_request;
pub use kvproto::tikvpb::batch_commands_response;
use protobuf::RepeatedField;

/// To measure execute time for a given request.
#[derive(Debug)]
pub struct GrpcRequestDuration {
pub begin: Instant,
pub label: GrpcTypeKind,
}
impl GrpcRequestDuration {
pub fn new(begin: Instant, label: GrpcTypeKind) -> Self {
GrpcRequestDuration { begin, label }
}
}

/// A single response, will be collected into `MeasuredBatchResponse`.
#[derive(Debug)]
pub struct MeasuredSingleResponse {
pub id: u64,
pub resp: batch_commands_response::Response,
pub measure: GrpcRequestDuration,
}
impl MeasuredSingleResponse {
pub fn new(
id: u64,
resp: batch_commands_response::Response,
measure: GrpcRequestDuration,
) -> Self {
MeasuredSingleResponse { id, resp, measure }
}
}

/// A batch response.
pub struct MeasuredBatchResponse {
pub batch_resp: BatchCommandsResponse,
pub measures: Vec<GrpcRequestDuration>,
}
impl Default for MeasuredBatchResponse {
fn default() -> Self {
MeasuredBatchResponse {
batch_resp: Default::default(),
measures: Vec::with_capacity(GRPC_MSG_MAX_BATCH_SIZE),
}
}
}

struct BatchRespCollector;
impl BatchCollector<BatchCommandsResponse, (u64, batch_commands_response::Response)>
for BatchRespCollector
{
impl BatchCollector<MeasuredBatchResponse, MeasuredSingleResponse> for BatchRespCollector {
fn collect(
&mut self,
v: &mut BatchCommandsResponse,
e: (u64, batch_commands_response::Response),
) -> Option<(u64, batch_commands_response::Response)> {
v.mut_request_ids().push(e.0);
v.mut_responses().push(e.1);
v: &mut MeasuredBatchResponse,
e: MeasuredSingleResponse,
) -> Option<MeasuredSingleResponse> {
v.batch_resp.mut_request_ids().push(e.id);
v.batch_resp.mut_responses().push(e.resp);
v.measures.push(e.measure);
None
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/server/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ mod kv;
pub use self::debug::Service as DebugService;
pub use self::diagnostics::Service as DiagnosticsService;
pub use self::kv::Service as KvService;
pub use self::kv::{batch_commands_request, batch_commands_response};
pub use self::kv::{
batch_commands_request, batch_commands_response, GrpcRequestDuration, MeasuredBatchResponse,
MeasuredSingleResponse,
};
Loading

0 comments on commit 834ec11

Please sign in to comment.