From 834ec1108d85623b2b19b00df2fa8e1359c726b6 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 12 Aug 2021 19:47:16 +0800 Subject: [PATCH] record grpc duration correctly (#10660) Signed-off-by: qupeng Co-authored-by: Ti Chi Robot --- components/test_storage/src/sync_storage.rs | 6 +- src/server/service/batch.rs | 51 ++++++---- src/server/service/kv.rs | 101 ++++++++++++++------ src/server/service/mod.rs | 5 +- src/storage/mod.rs | 34 +++++-- 5 files changed, 139 insertions(+), 58 deletions(-) diff --git a/components/test_storage/src/sync_storage.rs b/components/test_storage/src/sync_storage.rs index 2351e267ea2..697372f8b1b 100644 --- a/components/test_storage/src/sync_storage.rs +++ b/components/test_storage/src/sync_storage.rs @@ -14,6 +14,7 @@ use tikv::storage::{ test_util::GetConsumer, txn::commands, Engine, PerfStatisticsDelta, PrewriteResult, Result, Statistics, Storage, TestEngineBuilder, TestStorageBuilder, TxnStatus, }; +use tikv_util::time::Instant; use txn_types::{Key, KvPair, Mutation, TimeStamp, Value}; /// A builder to build a `SyncTestStorage`. @@ -142,7 +143,10 @@ impl SyncTestStorage { }) .collect(); let p = GetConsumer::new(); - block_on(self.store.batch_get_command(requests, ids, p.clone()))?; + block_on( + self.store + .batch_get_command(requests, ids, p.clone(), Instant::now()), + )?; let mut values = vec![]; for value in p.take_data().into_iter() { values.push(value?); diff --git a/src/server/service/batch.rs b/src/server/service/batch.rs index bea8c7778b7..804176386a0 100644 --- a/src/server/service/batch.rs +++ b/src/server/service/batch.rs @@ -1,8 +1,9 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. -use crate::server::metrics::GRPC_MSG_HISTOGRAM_STATIC; -use crate::server::metrics::REQUEST_BATCH_SIZE_HISTOGRAM_VEC; -use crate::server::service::kv::batch_commands_response; +use crate::server::metrics::{GrpcTypeKind, REQUEST_BATCH_SIZE_HISTOGRAM_VEC}; +use crate::server::service::kv::{ + batch_commands_response, GrpcRequestDuration, MeasuredSingleResponse, +}; use crate::storage::kv::{PerfStatisticsDelta, Statistics}; use crate::storage::{ errors::{extract_key_error, extract_region_error}, @@ -63,7 +64,7 @@ impl ReqBatcher { pub fn maybe_commit( &mut self, storage: &Storage, - tx: &Sender<(u64, batch_commands_response::Response)>, + tx: &Sender, ) { if self.gets.len() >= self.batch_size { let gets = std::mem::take(&mut self.gets); @@ -81,7 +82,7 @@ impl ReqBatcher { pub fn commit( self, storage: &Storage, - tx: &Sender<(u64, batch_commands_response::Response)>, + tx: &Sender, ) { if !self.gets.is_empty() { future_batch_get_command( @@ -135,13 +136,18 @@ impl BatcherBuilder { } pub struct GetCommandResponseConsumer { - tx: Sender<(u64, batch_commands_response::Response)>, + tx: Sender, } impl ResponseBatchConsumer<(Option>, Statistics, PerfStatisticsDelta)> for GetCommandResponseConsumer { - fn consume(&self, id: u64, res: Result<(Option>, Statistics, PerfStatisticsDelta)>) { + fn consume( + &self, + id: u64, + res: Result<(Option>, Statistics, PerfStatisticsDelta)>, + begin: Instant, + ) { let mut resp = GetResponse::default(); if let Some(err) = extract_region_error(&res) { resp.set_region_error(err); @@ -164,14 +170,16 @@ impl ResponseBatchConsumer<(Option>, Statistics, PerfStatisticsDelta)> cmd: Some(batch_commands_response::response::Cmd::Get(resp)), ..Default::default() }; - if self.tx.send_and_notify((id, res)).is_err() { + let mesure = GrpcRequestDuration::new(begin, GrpcTypeKind::kv_batch_get_command); + let task = MeasuredSingleResponse::new(id, res, mesure); + if self.tx.send_and_notify(task).is_err() { error!("KvService response batch commands fail"); } } } impl ResponseBatchConsumer>> for GetCommandResponseConsumer { - fn consume(&self, id: u64, res: Result>>) { + fn consume(&self, id: u64, res: Result>>, begin: Instant) { let mut resp = RawGetResponse::default(); if let Some(err) = extract_region_error(&res) { resp.set_region_error(err); @@ -186,7 +194,9 @@ impl ResponseBatchConsumer>> for GetCommandResponseConsumer { cmd: Some(batch_commands_response::response::Cmd::RawGet(resp)), ..Default::default() }; - if self.tx.send_and_notify((id, res)).is_err() { + let mesure = GrpcRequestDuration::new(begin, GrpcTypeKind::raw_batch_get_command); + let task = MeasuredSingleResponse::new(id, res, mesure); + if self.tx.send_and_notify(task).is_err() { error!("KvService response batch commands fail"); } } @@ -196,7 +206,7 @@ fn future_batch_get_command( storage: &Storage, requests: Vec, gets: Vec, - tx: Sender<(u64, batch_commands_response::Response)>, + tx: Sender, begin_instant: tikv_util::time::Instant, ) { REQUEST_BATCH_SIZE_HISTOGRAM_VEC @@ -207,6 +217,7 @@ fn future_batch_get_command( gets, requests, GetCommandResponseConsumer { tx: tx.clone() }, + begin_instant, ); let f = async move { // This error can only cause by readpool busy. @@ -219,14 +230,14 @@ fn future_batch_get_command( cmd: Some(batch_commands_response::response::Cmd::Get(resp.clone())), ..Default::default() }; - if tx.send_and_notify((id, res)).is_err() { + let measure = + GrpcRequestDuration::new(begin_instant, GrpcTypeKind::kv_batch_get_command); + let task = MeasuredSingleResponse::new(id, res, measure); + if tx.send_and_notify(task).is_err() { error!("KvService response batch commands fail"); } } } - GRPC_MSG_HISTOGRAM_STATIC - .kv_batch_get_command - .observe(begin_instant.saturating_elapsed_secs()); }; poll_future_notify(f); } @@ -235,7 +246,7 @@ fn future_batch_raw_get_command( storage: &Storage, requests: Vec, gets: Vec, - tx: Sender<(u64, batch_commands_response::Response)>, + tx: Sender, begin_instant: tikv_util::time::Instant, ) { REQUEST_BATCH_SIZE_HISTOGRAM_VEC @@ -258,14 +269,14 @@ fn future_batch_raw_get_command( cmd: Some(batch_commands_response::response::Cmd::RawGet(resp.clone())), ..Default::default() }; - if tx.send_and_notify((id, res)).is_err() { + let measure = + GrpcRequestDuration::new(begin_instant, GrpcTypeKind::raw_batch_get_command); + let task = MeasuredSingleResponse::new(id, res, measure); + if tx.send_and_notify(task).is_err() { error!("KvService response batch commands fail"); } } } - GRPC_MSG_HISTOGRAM_STATIC - .raw_batch_get_command - .observe(begin_instant.saturating_elapsed_secs()); }; poll_future_notify(f); } diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 781f5a453e5..d27ed5930f5 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -990,19 +990,26 @@ impl + 'static, E: Engine, L: LockManager> Tikv for let response_retriever = BatchReceiver::new( rx, GRPC_MSG_MAX_BATCH_SIZE, - BatchCommandsResponse::default, + MeasuredBatchResponse::default, BatchRespCollector, ); - let mut response_retriever = response_retriever - .inspect(|r| GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64)) - .map(move |mut r| { - r.set_transport_layer_load(thread_load.load() as u64); - GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok(( - r, - WriteFlags::default().buffer_hint(false), - )) - }); + let mut response_retriever = response_retriever.map(move |item| { + for measure in item.measures { + let GrpcRequestDuration { label, begin } = measure; + GRPC_MSG_HISTOGRAM_STATIC + .get(label) + .observe(begin.saturating_elapsed_secs()); + } + + let mut r = item.batch_resp; + GRPC_RESP_BATCH_COMMANDS_SIZE.observe(r.request_ids.len() as f64); + r.set_transport_layer_load(thread_load.load() as u64); + GrpcResult::<(BatchCommandsResponse, WriteFlags)>::Ok(( + r, + WriteFlags::default().buffer_hint(false), + )) + }); let send_task = async move { sink.send_all(&mut response_retriever).await?; @@ -1138,20 +1145,18 @@ impl + 'static, E: Engine, L: LockManager> Tikv for fn response_batch_commands_request( id: u64, resp: F, - tx: Sender<(u64, batch_commands_response::Response)>, - begin_instant: Instant, - label_enum: GrpcTypeKind, + tx: Sender, + begin: Instant, + label: GrpcTypeKind, ) where F: Future> + Send + 'static, { let task = async move { if let Ok(resp) = resp.await { - if let Err(e) = tx.send_and_notify((id, resp)) { + let measure = GrpcRequestDuration { begin, label }; + let task = MeasuredSingleResponse::new(id, resp, measure); + if let Err(e) = tx.send_and_notify(task) { error!("KvService response batch commands fail"; "err" => ?e); - } else { - GRPC_MSG_HISTOGRAM_STATIC - .get(label_enum) - .observe(begin_instant.saturating_elapsed_secs()); } } }; @@ -1166,7 +1171,7 @@ fn handle_batch_commands_request( peer: &str, id: u64, req: batch_commands_request::Request, - tx: &Sender<(u64, batch_commands_response::Response)>, + tx: &Sender, ) { // To simplify code and make the logic more clear. macro_rules! oneof { @@ -1993,17 +1998,59 @@ pub use kvproto::tikvpb::batch_commands_request; pub use kvproto::tikvpb::batch_commands_response; use protobuf::RepeatedField; +/// To measure execute time for a given request. +#[derive(Debug)] +pub struct GrpcRequestDuration { + pub begin: Instant, + pub label: GrpcTypeKind, +} +impl GrpcRequestDuration { + pub fn new(begin: Instant, label: GrpcTypeKind) -> Self { + GrpcRequestDuration { begin, label } + } +} + +/// A single response, will be collected into `MeasuredBatchResponse`. +#[derive(Debug)] +pub struct MeasuredSingleResponse { + pub id: u64, + pub resp: batch_commands_response::Response, + pub measure: GrpcRequestDuration, +} +impl MeasuredSingleResponse { + pub fn new( + id: u64, + resp: batch_commands_response::Response, + measure: GrpcRequestDuration, + ) -> Self { + MeasuredSingleResponse { id, resp, measure } + } +} + +/// A batch response. +pub struct MeasuredBatchResponse { + pub batch_resp: BatchCommandsResponse, + pub measures: Vec, +} +impl Default for MeasuredBatchResponse { + fn default() -> Self { + MeasuredBatchResponse { + batch_resp: Default::default(), + measures: Vec::with_capacity(GRPC_MSG_MAX_BATCH_SIZE), + } + } +} + struct BatchRespCollector; -impl BatchCollector - for BatchRespCollector -{ +impl BatchCollector for BatchRespCollector { fn collect( &mut self, - v: &mut BatchCommandsResponse, - e: (u64, batch_commands_response::Response), - ) -> Option<(u64, batch_commands_response::Response)> { - v.mut_request_ids().push(e.0); - v.mut_responses().push(e.1); + v: &mut MeasuredBatchResponse, + e: MeasuredSingleResponse, + ) -> Option { + v.batch_resp.mut_request_ids().push(e.id); + v.batch_resp.mut_responses().push(e.resp); + v.measures.push(e.measure); None } } diff --git a/src/server/service/mod.rs b/src/server/service/mod.rs index 65116fd6b36..8c719f6ed64 100644 --- a/src/server/service/mod.rs +++ b/src/server/service/mod.rs @@ -8,4 +8,7 @@ mod kv; pub use self::debug::Service as DebugService; pub use self::diagnostics::Service as DiagnosticsService; pub use self::kv::Service as KvService; -pub use self::kv::{batch_commands_request, batch_commands_response}; +pub use self::kv::{ + batch_commands_request, batch_commands_response, GrpcRequestDuration, MeasuredBatchResponse, + MeasuredSingleResponse, +}; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f75c6901387..ef5ad60dc32 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -413,6 +413,7 @@ impl Storage { requests: Vec, ids: Vec, consumer: P, + begin_instant: tikv_util::time::Instant, ) -> impl Future> { const CMD: CommandKind = CommandKind::batch_get_command; // all requests in a batch have the same region, epoch, term, replica_read @@ -465,7 +466,7 @@ impl Storage { snap_ctx } Err(e) => { - consumer.consume(id, Err(e)); + consumer.consume(id, Err(e), begin_instant); continue; } }; @@ -517,15 +518,20 @@ impl Storage { id, v.map_err(|e| Error::from(txn::Error::from(e))) .map(|v| (v, stat, perf_statistics.delta())), + begin_instant, ); } Err(e) => { - consumer.consume(id, Err(Error::from(txn::Error::from(e)))); + consumer.consume( + id, + Err(Error::from(txn::Error::from(e))), + begin_instant, + ); } } } Err(e) => { - consumer.consume(id, Err(e)); + consumer.consume(id, Err(e), begin_instant); } } } @@ -1117,16 +1123,17 @@ impl Storage { &Key::from_encoded(key), &mut stats, ), + begin_instant, ); tls_collect_read_flow(ctx.get_region_id(), &stats); } Err(e) => { - consumer.consume(id, Err(e)); + consumer.consume(id, Err(e), begin_instant); } } } Err(e) => { - consumer.consume(id, Err(e)); + consumer.consume(id, Err(e), begin_instant); } } } @@ -1945,7 +1952,7 @@ impl TestStorageBuilder { } pub trait ResponseBatchConsumer: Send { - fn consume(&self, id: u64, res: Result); + fn consume(&self, id: u64, res: Result, begin: Instant); } pub mod test_util { @@ -2117,6 +2124,7 @@ pub mod test_util { &self, id: u64, res: Result<(Option>, Statistics, PerfStatisticsDelta)>, + _: tikv_util::time::Instant, ) { self.data.lock().unwrap().push(GetResult { id, @@ -2126,7 +2134,7 @@ pub mod test_util { } impl ResponseBatchConsumer>> for GetConsumer { - fn consume(&self, id: u64, res: Result>>) { + fn consume(&self, id: u64, res: Result>>, _: tikv_util::time::Instant) { self.data.lock().unwrap().push(GetResult { id, res }); } } @@ -2343,6 +2351,7 @@ mod tests { vec![create_get_request(b"c", 1), create_get_request(b"d", 1)], vec![1, 2], consumer.clone(), + Instant::now(), )) .unwrap(); let data = consumer.take_data(); @@ -3031,6 +3040,7 @@ mod tests { vec![create_get_request(b"c", 2), create_get_request(b"d", 2)], vec![1, 2], consumer.clone(), + Instant::now(), )) .unwrap(); let mut x = consumer.take_data(); @@ -3070,6 +3080,7 @@ mod tests { ], vec![1, 2, 3, 4], consumer.clone(), + Instant::now(), )) .unwrap(); @@ -6270,8 +6281,13 @@ mod tests { req2.set_key(b"key".to_vec()); req2.set_version(100); let consumer = GetConsumer::new(); - block_on(storage.batch_get_command(vec![req1, req2], vec![1, 2], consumer.clone())) - .unwrap(); + block_on(storage.batch_get_command( + vec![req1, req2], + vec![1, 2], + consumer.clone(), + Instant::now(), + )) + .unwrap(); let res = consumer.take_data(); assert!(res[0].is_ok()); let key_error = extract_key_error(res[1].as_ref().unwrap_err());