From 12c9d659a28f97265768709b54833659d4127465 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 9 Sep 2021 16:40:39 +0800 Subject: [PATCH] trace memory usage for analyze (#10860) * init Signed-off-by: qupeng * remove stale fixme Signed-off-by: qupeng * fix metrics Signed-off-by: qupeng * updte the metrics panel Signed-off-by: qupeng * revert changes about yatp time slice Signed-off-by: qupeng * address comments Signed-off-by: qupeng * remove useless MemoryTraceGuard::new Signed-off-by: qupeng * restructure Signed-off-by: Andy Lok * fix clippy Signed-off-by: Andy Lok Co-authored-by: Andy Lok --- components/raftstore/src/store/memory.rs | 22 ++-- components/server/src/memory.rs | 6 +- components/server/src/server.rs | 8 +- components/test_coprocessor/src/util.rs | 2 +- components/tikv_alloc/src/lib.rs | 1 + components/tikv_alloc/src/trace.rs | 126 +++++++++++++++++------ metrics/grafana/tikv_details.json | 2 +- src/coprocessor/cache.rs | 5 +- src/coprocessor/checksum.rs | 5 +- src/coprocessor/dag/mod.rs | 5 +- src/coprocessor/endpoint.rs | 26 +++-- src/coprocessor/mod.rs | 11 +- src/coprocessor/statistics/analyze.rs | 87 ++++++++++++---- src/server/service/kv.rs | 36 ++++--- 14 files changed, 240 insertions(+), 102 deletions(-) diff --git a/components/raftstore/src/store/memory.rs b/components/raftstore/src/store/memory.rs index bb6fa2c66b4..393987c71d0 100644 --- a/components/raftstore/src/store/memory.rs +++ b/components/raftstore/src/store/memory.rs @@ -5,12 +5,12 @@ use lazy_static::lazy_static; use std::sync::Arc; use tikv_alloc::{ mem_trace, - trace::{Id, MemoryTrace, MemoryTraceNode}, + trace::{Id, MemoryTrace}, }; use tikv_util::sys::memory_usage_reaches_high_water; lazy_static! { - pub static ref MEMTRACE_ROOT: Arc = mem_trace!( + pub static ref MEMTRACE_ROOT: Arc = mem_trace!( raftstore, [ peers, @@ -23,35 +23,35 @@ lazy_static! { ] ); /// Memory usage for raft peers fsms. - pub static ref MEMTRACE_PEERS: Arc = + pub static ref MEMTRACE_PEERS: Arc = MEMTRACE_ROOT.sub_trace(Id::Name("peers")); /// Memory usage for apply fsms. - pub static ref MEMTRACE_APPLYS: Arc = + pub static ref MEMTRACE_APPLYS: Arc = MEMTRACE_ROOT.sub_trace(Id::Name("applys")); - pub static ref MEMTRACE_ENTRY_CACHE: Arc = + pub static ref MEMTRACE_ENTRY_CACHE: Arc = MEMTRACE_ROOT.sub_trace(Id::Name("entry_cache")); - pub static ref MEMTRACE_RAFT_ROUTER_ALIVE: Arc = MEMTRACE_ROOT + pub static ref MEMTRACE_RAFT_ROUTER_ALIVE: Arc = MEMTRACE_ROOT .sub_trace(Id::Name("raft_router")) .sub_trace(Id::Name("alive")); - pub static ref MEMTRACE_RAFT_ROUTER_LEAK: Arc = MEMTRACE_ROOT + pub static ref MEMTRACE_RAFT_ROUTER_LEAK: Arc = MEMTRACE_ROOT .sub_trace(Id::Name("raft_router")) .sub_trace(Id::Name("leak")); - pub static ref MEMTRACE_APPLY_ROUTER_ALIVE: Arc = MEMTRACE_ROOT + pub static ref MEMTRACE_APPLY_ROUTER_ALIVE: Arc = MEMTRACE_ROOT .sub_trace(Id::Name("apply_router")) .sub_trace(Id::Name("alive")); - pub static ref MEMTRACE_APPLY_ROUTER_LEAK: Arc = MEMTRACE_ROOT + pub static ref MEMTRACE_APPLY_ROUTER_LEAK: Arc = MEMTRACE_ROOT .sub_trace(Id::Name("apply_router")) .sub_trace(Id::Name("leak")); /// Heap size trace for received raft messages. - pub static ref MEMTRACE_RAFT_MESSAGES: Arc = + pub static ref MEMTRACE_RAFT_MESSAGES: Arc = MEMTRACE_ROOT.sub_trace(Id::Name("raft_messages")); /// Heap size trace for appended raft entries. - pub static ref MEMTRACE_RAFT_ENTRIES: Arc = + pub static ref MEMTRACE_RAFT_ENTRIES: Arc = MEMTRACE_ROOT.sub_trace(Id::Name("raft_entries")); } diff --git a/components/server/src/memory.rs b/components/server/src/memory.rs index 373a005407f..303ff257a78 100644 --- a/components/server/src/memory.rs +++ b/components/server/src/memory.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use tikv::server::MEM_TRACE_SUM_GAUGE; -use tikv_alloc::trace::{MemoryTrace, MemoryTraceNode}; +use tikv_alloc::trace::MemoryTrace; use tikv_util::time::Instant; #[derive(Default)] pub struct MemoryTraceManager { - providers: Vec>, + providers: Vec>, } impl MemoryTraceManager { @@ -30,7 +30,7 @@ impl MemoryTraceManager { } } - pub fn register_provider(&mut self, provider: Arc) { + pub fn register_provider(&mut self, provider: Arc) { let p = &mut self.providers; p.push(provider); } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 285d47a7168..d49dbfdb650 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -58,7 +58,7 @@ use raftstore::{ config::RaftstoreConfigManager, fsm, fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_MSG_CAP}, - memory::MEMTRACE_ROOT, + memory::MEMTRACE_ROOT as MEMTRACE_RAFTSTORE, AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, SnapManager, SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMsg, }, @@ -66,7 +66,8 @@ use raftstore::{ use security::SecurityManager; use tikv::{ config::{ConfigController, DBConfigManger, DBType, TiKvConfig, DEFAULT_ROCKSDB_SUB_DIR}, - coprocessor, coprocessor_v2, + coprocessor::{self, MEMTRACE_ROOT as MEMTRACE_COPROCESSOR}, + coprocessor_v2, import::{ImportSSTService, SSTImporter}, read_pool::{build_yatp_read_pool, ReadPool}, server::raftkv::ReplicaReadLockChecker, @@ -1069,7 +1070,8 @@ impl TiKVServer { } let mut mem_trace_metrics = MemoryTraceManager::default(); - mem_trace_metrics.register_provider((&*MEMTRACE_ROOT).to_owned()); + mem_trace_metrics.register_provider(MEMTRACE_RAFTSTORE.clone()); + mem_trace_metrics.register_provider(MEMTRACE_COPROCESSOR.clone()); self.background_worker .spawn_interval_task(DEFAULT_MEMTRACE_FLUSH_INTERVAL, move || { let now = Instant::now(); diff --git a/components/test_coprocessor/src/util.rs b/components/test_coprocessor/src/util.rs index 619568a2f3b..7570db89069 100644 --- a/components/test_coprocessor/src/util.rs +++ b/components/test_coprocessor/src/util.rs @@ -23,7 +23,7 @@ pub fn handle_request(copr: &Endpoint, req: Request) -> Response where E: Engine, { - block_on(copr.parse_and_handle_unary_request(req, None)) + block_on(copr.parse_and_handle_unary_request(req, None)).consume() } pub fn handle_select(copr: &Endpoint, req: Request) -> SelectResponse diff --git a/components/tikv_alloc/src/lib.rs b/components/tikv_alloc/src/lib.rs index 03bd7a8ca48..817901db665 100644 --- a/components/tikv_alloc/src/lib.rs +++ b/components/tikv_alloc/src/lib.rs @@ -125,6 +125,7 @@ mod imp; mod imp; pub use crate::imp::*; +pub use crate::trace::*; #[global_allocator] static ALLOC: imp::Allocator = imp::allocator(); diff --git a/components/tikv_alloc/src/trace.rs b/components/tikv_alloc/src/trace.rs index fccc8d073fb..52b2eeda062 100644 --- a/components/tikv_alloc/src/trace.rs +++ b/components/tikv_alloc/src/trace.rs @@ -13,12 +13,12 @@ //! enumerates instead. //! //! To define a memory trace tree, we can use the `mem_trace` macro. The `mem_trace` -//! macro constructs every node as a `MemoryTraceNode` which implements `MemoryTrace` trait. +//! macro constructs every node as a `MemoryTrace` which implements `MemoryTrace` trait. //! We can also define a specified tree node by implementing `MemoryTrace` trait. -use std::fmt::{self, Display}; +use std::fmt::{self, Debug, Display, Formatter}; use std::num::NonZeroU64; -use std::ops::Add; +use std::ops::{Add, Deref, DerefMut}; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -118,34 +118,22 @@ impl Add for TraceEvent { } } -pub trait MemoryTrace { - fn trace(&self, event: TraceEvent); - fn snapshot(&self) -> MemoryTraceSnapshot; - fn sub_trace(&self, id: Id) -> Arc; - fn add_sub_trace(&mut self, id: Id, trace: Arc); - fn sum(&self) -> usize; - fn name(&self) -> String; - fn get_children_ids(&self) -> Vec; -} - -pub struct MemoryTraceNode { +pub struct MemoryTrace { pub id: Id, trace: AtomicUsize, - children: HashMap>, + children: HashMap>, } -impl MemoryTraceNode { - pub fn new(id: impl Into) -> MemoryTraceNode { - MemoryTraceNode { +impl MemoryTrace { + pub fn new(id: impl Into) -> MemoryTrace { + MemoryTrace { id: id.into(), trace: std::sync::atomic::AtomicUsize::default(), children: HashMap::default(), } } -} -impl MemoryTrace for MemoryTraceNode { - fn trace(&self, event: TraceEvent) { + pub fn trace(&self, event: TraceEvent) { match event { TraceEvent::Add(val) => { self.trace.fetch_add(val, Ordering::Relaxed); @@ -159,7 +147,17 @@ impl MemoryTrace for MemoryTraceNode { } } - fn snapshot(&self) -> MemoryTraceSnapshot { + pub fn trace_guard( + self: &Arc, + item: T, + size: usize, + ) -> MemoryTraceGuard { + self.trace(TraceEvent::Add(size)); + let node = Some(self.clone()); + MemoryTraceGuard { item, size, node } + } + + pub fn snapshot(&self) -> MemoryTraceSnapshot { MemoryTraceSnapshot { id: self.id, trace: self.trace.load(Ordering::Relaxed), @@ -167,25 +165,25 @@ impl MemoryTrace for MemoryTraceNode { } } - fn sub_trace(&self, id: Id) -> Arc { + pub fn sub_trace(&self, id: Id) -> Arc { self.children.get(&id).cloned().unwrap() } - fn add_sub_trace(&mut self, id: Id, trace: Arc) { + pub fn add_sub_trace(&mut self, id: Id, trace: Arc) { self.children.insert(id, trace); } // TODO: Maybe need a cache to reduce read cost. - fn sum(&self) -> usize { + pub fn sum(&self) -> usize { let sum: usize = self.children.values().map(|c| c.sum()).sum(); sum + self.trace.load(Ordering::Relaxed) } - fn name(&self) -> String { + pub fn name(&self) -> String { self.id.name() } - fn get_children_ids(&self) -> Vec { + pub fn get_children_ids(&self) -> Vec { let mut ids = vec![]; for id in self.children.keys() { ids.push(*id); @@ -215,15 +213,13 @@ pub struct MemoryTraceSnapshot { macro_rules! mem_trace { ($name: ident) => { { - use tikv_alloc::trace::MemoryTraceNode; + use tikv_alloc::trace::MemoryTrace; - std::sync::Arc::new(MemoryTraceNode::new(stringify!($name))) + std::sync::Arc::new(MemoryTrace::new(stringify!($name))) } }; ($name: ident, [$($child:tt),+]) => { { - use tikv_alloc::trace::MemoryTrace; - let mut node = mem_trace!($name); $( let child = mem_trace!($child); @@ -237,11 +233,77 @@ macro_rules! mem_trace { } } +pub struct MemoryTraceGuard { + item: T, + size: usize, + node: Option>, +} + +impl MemoryTraceGuard { + pub fn map(mut self, f: F) -> MemoryTraceGuard + where + F: FnOnce(T) -> U, + { + let item = std::mem::take(&mut self.item); + MemoryTraceGuard { + item: f(item), + size: self.size, + node: self.node.take(), + } + } + + pub fn consume(&mut self) -> T { + if let Some(node) = self.node.take() { + node.trace(TraceEvent::Sub(self.size)); + } + std::mem::take(&mut self.item) + } +} + +impl Drop for MemoryTraceGuard { + fn drop(&mut self) { + if let Some(node) = self.node.take() { + node.trace(TraceEvent::Sub(self.size)); + } + } +} + +impl From for MemoryTraceGuard { + fn from(item: T) -> Self { + MemoryTraceGuard { + item, + size: 0, + node: None, + } + } +} + +impl Deref for MemoryTraceGuard { + type Target = T; + fn deref(&self) -> &T { + &self.item + } +} + +impl DerefMut for MemoryTraceGuard { + fn deref_mut(&mut self) -> &mut T { + &mut self.item + } +} + +impl Debug for MemoryTraceGuard { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryTraceGuard") + .field("size", &self.size) + .finish() + } +} + #[cfg(test)] mod tests { use crate::{ self as tikv_alloc, - trace::{Id, MemoryTrace, TraceEvent}, + trace::{Id, TraceEvent}, }; #[test] diff --git a/metrics/grafana/tikv_details.json b/metrics/grafana/tikv_details.json index 5c56d77167e..7d28183c313 100644 --- a/metrics/grafana/tikv_details.json +++ b/metrics/grafana/tikv_details.json @@ -3686,7 +3686,7 @@ "steppedLine": false, "targets": [ { - "expr": "tikv_server_mem_trace_sum{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\", name=~\"raftstore-.*\"}", + "expr": "tikv_server_mem_trace_sum{tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}", "format": "time_series", "intervalFactor": 1, "legendFormat": "{{instance}}-{{name}}", diff --git a/src/coprocessor/cache.rs b/src/coprocessor/cache.rs index c6d333aef8d..2bb807fdbed 100644 --- a/src/coprocessor/cache.rs +++ b/src/coprocessor/cache.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use kvproto::coprocessor::Response; +use tikv_alloc::trace::MemoryTraceGuard; use crate::coprocessor::RequestHandler; use crate::coprocessor::*; @@ -25,12 +26,12 @@ impl CachedRequestHandler { #[async_trait] impl RequestHandler for CachedRequestHandler { - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { let mut resp = Response::default(); resp.set_is_cache_hit(true); if let Some(v) = self.data_version { resp.set_cache_last_version(v); } - Ok(resp) + Ok(resp.into()) } } diff --git a/src/coprocessor/checksum.rs b/src/coprocessor/checksum.rs index 8049c75353f..06f684c4c73 100644 --- a/src/coprocessor/checksum.rs +++ b/src/coprocessor/checksum.rs @@ -7,6 +7,7 @@ use tidb_query_common::storage::scanner::{RangesScanner, RangesScannerOptions}; use tidb_query_common::storage::Range; use tidb_query_executors::runner::MAX_TIME_SLICE; use tidb_query_expr::BATCH_MAX_SIZE; +use tikv_alloc::trace::MemoryTraceGuard; use tikv_util::time::Instant; use tipb::{ChecksumAlgorithm, ChecksumRequest, ChecksumResponse}; use yatp::task::future::reschedule; @@ -53,7 +54,7 @@ impl ChecksumContext { #[async_trait] impl RequestHandler for ChecksumContext { - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { let algorithm = self.req.get_algorithm(); if algorithm != ChecksumAlgorithm::Crc64Xor { return Err(box_err!("unknown checksum algorithm {:?}", algorithm)); @@ -101,7 +102,7 @@ impl RequestHandler for ChecksumContext { let mut resp = Response::default(); resp.set_data(data); - Ok(resp) + Ok(resp.into()) } fn collect_scan_statistics(&mut self, dest: &mut Statistics) { diff --git a/src/coprocessor/dag/mod.rs b/src/coprocessor/dag/mod.rs index 823a102fd23..d1432f301b0 100644 --- a/src/coprocessor/dag/mod.rs +++ b/src/coprocessor/dag/mod.rs @@ -8,6 +8,7 @@ use async_trait::async_trait; use kvproto::coprocessor::{KeyRange, Response}; use protobuf::Message; use tidb_query_common::{execute_stats::ExecSummary, storage::IntervalRange}; +use tikv_alloc::trace::MemoryTraceGuard; use tipb::{DagRequest, SelectResponse, StreamResponse}; use crate::coprocessor::metrics::*; @@ -100,9 +101,9 @@ impl BatchDAGHandler { #[async_trait] impl RequestHandler for BatchDAGHandler { - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { let result = self.runner.handle_request().await; - handle_qe_response(result, self.runner.can_be_cached(), self.data_version) + handle_qe_response(result, self.runner.can_be_cached(), self.data_version).map(|x| x.into()) } fn handle_streaming_request(&mut self) -> Result<(Option, bool)> { diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index 421788fcd8c..b9f3c6c4e79 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -34,6 +34,7 @@ use concurrency_manager::ConcurrencyManager; use engine_rocks::PerfLevel; use resource_metering::cpu::{FutureExt, StreamExt}; use resource_metering::ResourceMeteringTag; +use tikv_alloc::trace::MemoryTraceGuard; use tikv_util::time::Instant; use txn_types::Lock; @@ -394,7 +395,7 @@ impl Endpoint { semaphore: Option>, mut tracker: Box, handler_builder: RequestHandlerBuilder, - ) -> Result { + ) -> Result> { // When this function is being executed, it may be queued for a long time, so that // deadline may exceed. tracker.on_scheduled(); @@ -447,7 +448,7 @@ impl Endpoint { COPR_RESP_SIZE.inc_by(resp.data.len() as u64); resp } - Err(e) => make_error_response(e), + Err(e) => make_error_response(e).into(), }; resp.set_exec_details(exec_details); resp.set_exec_details_v2(exec_details_v2); @@ -462,7 +463,7 @@ impl Endpoint { &self, req_ctx: ReqContext, handler_builder: RequestHandlerBuilder, - ) -> impl Future> { + ) -> impl Future>> { let priority = req_ctx.context.get_priority(); let task_id = req_ctx.build_task_id(); let resource_tag = ResourceMeteringTag::from_rpc_context(&req_ctx.context); @@ -489,15 +490,17 @@ impl Endpoint { &self, req: coppb::Request, peer: Option, - ) -> impl Future { + ) -> impl Future> { let result_of_future = self .parse_request_and_check_memory_locks(req, peer, false) .map(|(handler_builder, req_ctx)| self.handle_unary_request(req_ctx, handler_builder)); async move { match result_of_future { - Err(e) => make_error_response(e), - Ok(handle_fut) => handle_fut.await.unwrap_or_else(make_error_response), + Err(e) => make_error_response(e).into(), + Ok(handle_fut) => handle_fut + .await + .unwrap_or_else(|e| make_error_response(e).into()), } } } @@ -733,7 +736,7 @@ mod tests { #[async_trait] impl RequestHandler for UnaryFixture { - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { if self.yieldable { // We split the task into small executions of 100 milliseconds. for _ in 0..self.handle_duration_millis / 100 { @@ -745,7 +748,7 @@ mod tests { thread::sleep(Duration::from_millis(self.handle_duration_millis)); } - self.result.take().unwrap() + self.result.take().unwrap().map(|x| x.into()) } } @@ -910,7 +913,7 @@ mod tests { req }; - let resp: coppb::Response = block_on(copr.parse_and_handle_unary_request(req, None)); + let resp = block_on(copr.parse_and_handle_unary_request(req, None)); assert!(!resp.get_other_error().is_empty()); } @@ -932,7 +935,7 @@ mod tests { let mut req = coppb::Request::default(); req.set_tp(9999); - let resp: coppb::Response = block_on(copr.parse_and_handle_unary_request(req, None)); + let resp = block_on(copr.parse_and_handle_unary_request(req, None)); assert!(!resp.get_other_error().is_empty()); } @@ -1514,7 +1517,8 @@ mod tests { }); let resp_future_3 = copr .handle_stream_request(req_with_exec_detail, handler_builder) - .unwrap(); + .unwrap() + .map(|x| x.map(|x| x.into())); thread::spawn(move || { tx.send( block_on_stream(resp_future_3) diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index 45d0e5d46fb..cc00ae1b8c7 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -39,9 +39,12 @@ use crate::storage::Statistics; use async_trait::async_trait; use engine_rocks::PerfLevel; use kvproto::{coprocessor as coppb, kvrpcpb}; +use lazy_static::lazy_static; use metrics::ReqTag; use rand::prelude::*; +use std::sync::Arc; use tidb_query_common::execute_stats::ExecSummary; +use tikv_alloc::{mem_trace, Id, MemoryTrace, MemoryTraceGuard}; use tikv_util::deadline::Deadline; use tikv_util::time::Duration; use txn_types::TsSet; @@ -56,7 +59,7 @@ type HandlerStreamStepResult = Result<(Option, bool)>; #[async_trait] pub trait RequestHandler: Send { /// Processes current request and produces a response. - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { panic!("unary request is not supported for this handler"); } @@ -204,6 +207,12 @@ impl ReqContext { } } +lazy_static! { + pub static ref MEMTRACE_ROOT: Arc = mem_trace!(coprocessor, [analyze]); + pub static ref MEMTRACE_ANALYZE: Arc = + MEMTRACE_ROOT.sub_trace(Id::Name("analyze")); +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/coprocessor/statistics/analyze.rs b/src/coprocessor/statistics/analyze.rs index d30e44fedde..0234e8b0386 100644 --- a/src/coprocessor/statistics/analyze.rs +++ b/src/coprocessor/statistics/analyze.rs @@ -2,7 +2,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; - +use std::mem; use std::sync::Arc; use async_trait::async_trait; @@ -23,6 +23,7 @@ use tidb_query_executors::{ interface::BatchExecutor, runner::MAX_TIME_SLICE, BatchTableScanExecutor, }; use tidb_query_expr::BATCH_MAX_SIZE; +use tikv_alloc::trace::{MemoryTraceGuard, TraceEvent}; use tikv_util::time::Instant; use tipb::{self, AnalyzeColumnsReq, AnalyzeIndexReq, AnalyzeReq, AnalyzeType}; use yatp::task::future::reschedule; @@ -31,6 +32,7 @@ use super::cmsketch::CmSketch; use super::fmsketch::FmSketch; use super::histogram::Histogram; use crate::coprocessor::dag::TiKVStorage; +use crate::coprocessor::MEMTRACE_ANALYZE; use crate::coprocessor::*; use crate::storage::{Snapshot, SnapshotStore, Statistics}; @@ -209,7 +211,7 @@ impl AnalyzeContext { #[async_trait] impl RequestHandler for AnalyzeContext { - async fn handle_request(&mut self) -> Result { + async fn handle_request(&mut self) -> Result> { let ret = match self.req.get_tp() { AnalyzeType::TypeIndex | AnalyzeType::TypeCommonHandle => { let req = self.req.take_idx_req(); @@ -273,14 +275,15 @@ impl RequestHandler for AnalyzeContext { }; match ret { Ok(data) => { + let memory_size = data.capacity(); let mut resp = Response::default(); resp.set_data(data); - Ok(resp) + Ok(MEMTRACE_ANALYZE.trace_guard(resp, memory_size)) } Err(Error::Other(e)) => { let mut resp = Response::default(); resp.set_other_error(e); - Ok(resp) + Ok(resp.into()) } Err(e) => Err(e), } @@ -410,6 +413,8 @@ struct RowSampleCollector { rng: StdRng, total_sizes: Vec, row_buf: Vec, + memory_usage: usize, + reported_memory_usage: usize, } impl Default for RowSampleCollector { @@ -423,6 +428,8 @@ impl Default for RowSampleCollector { rng: StdRng::from_entropy(), total_sizes: vec![], row_buf: Vec::new(), + memory_usage: 0, + reported_memory_usage: 0, } } } @@ -442,6 +449,8 @@ impl RowSampleCollector { rng: StdRng::from_entropy(), total_sizes: vec![0; col_and_group_len], row_buf: Vec::new(), + memory_usage: 0, + reported_memory_usage: 0, } } @@ -504,22 +513,29 @@ impl RowSampleCollector { } pub fn sampling(&mut self, data: Vec>) { + let mut need_push = false; let cur_rng = self.rng.gen_range(0, i64::MAX); if self.samples.len() < self.max_sample_size { - self.samples.push(Reverse((cur_rng, data))); - return; + need_push = true; + } else if self.samples.peek().unwrap().0.0 < cur_rng { + need_push = true; + let (_, evicted) = self.samples.pop().unwrap().0; + self.memory_usage -= evicted.iter().map(|x| x.capacity()).sum::(); } - if self.samples.len() == self.max_sample_size && self.samples.peek().unwrap().0.0 < cur_rng - { - self.samples.pop(); + + if need_push { + self.memory_usage += data.iter().map(|x| x.capacity()).sum::(); self.samples.push(Reverse((cur_rng, data))); + self.report_memory_usage(false); } } - pub fn into_proto(self) -> tipb::RowSampleCollector { + pub fn to_proto(&mut self) -> tipb::RowSampleCollector { + self.memory_usage = 0; + self.report_memory_usage(true); + let mut s = tipb::RowSampleCollector::default(); - let samples = self - .samples + let samples = mem::take(&mut self.samples) .into_iter() .map(|r_tuple| { let mut pb_sample = tipb::RowSample::default(); @@ -529,17 +545,36 @@ impl RowSampleCollector { }) .collect(); s.set_samples(samples); - s.set_null_counts(self.null_count); + s.set_null_counts(mem::take(&mut self.null_count)); s.set_count(self.count as i64); - let pb_fm_sketches = self - .fm_sketches + let pb_fm_sketches = mem::take(&mut self.fm_sketches) .into_iter() .map(|fm_sketch| fm_sketch.into_proto()) .collect(); s.set_fm_sketch(pb_fm_sketches); - s.set_total_size(self.total_sizes); + s.set_total_size(mem::take(&mut self.total_sizes)); s } + + fn report_memory_usage(&mut self, on_finish: bool) { + let diff = self.memory_usage as isize - self.reported_memory_usage as isize; + if on_finish || diff.abs() > 1024 * 1024 { + let event = if diff >= 0 { + TraceEvent::Add(diff as usize) + } else { + TraceEvent::Sub(-diff as usize) + }; + MEMTRACE_ANALYZE.trace(event); + self.reported_memory_usage = self.memory_usage; + } + } +} + +impl Drop for RowSampleCollector { + fn drop(&mut self) { + self.memory_usage = 0; + self.report_memory_usage(true); + } } struct SampleBuilder { @@ -866,8 +901,8 @@ impl AnalyzeSamplingResult { } } - fn into_proto(self) -> tipb::AnalyzeColumnsResp { - let pb_collector = self.row_sample_collector.into_proto(); + fn into_proto(mut self) -> tipb::AnalyzeColumnsResp { + let pb_collector = self.row_sample_collector.to_proto(); let mut res = tipb::AnalyzeColumnsResp::default(); res.set_row_collector(pb_collector); res @@ -995,16 +1030,28 @@ mod tests { datum::encode_value(&mut EvalContext::default(), &[Datum::I64(i as i64)]).unwrap(), ); } - for _loop_i in 0..loop_cnt { + for loop_i in 0..loop_cnt { let mut collector = RowSampleCollector::new(sample_num, 1000, 1); for row in &nums { collector.sampling([row.clone()].to_vec()); } assert_eq!(collector.samples.len(), sample_num); - for sample in collector.samples.into_vec() { + for sample in &collector.samples { *item_cnt.entry(sample.0.1[0].clone()).or_insert(0) += 1; } + + // Test memory usage tracing is correct. + collector.report_memory_usage(true); + assert_eq!(collector.reported_memory_usage, collector.memory_usage); + if loop_i % 2 == 0 { + collector.to_proto(); + assert_eq!(collector.memory_usage, 0); + assert_eq!(MEMTRACE_ANALYZE.sum(), 0); + } + drop(collector); + assert_eq!(MEMTRACE_ANALYZE.sum(), 0); } + let exp_freq = sample_num as f64 * loop_cnt as f64 / row_num as f64; let delta = 0.5; for (_, v) in item_cnt.into_iter() { diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index d27ed5930f5..71c62ce2222 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -45,6 +45,7 @@ use raftstore::store::memory::{MEMTRACE_RAFT_ENTRIES, MEMTRACE_RAFT_MESSAGES}; use raftstore::store::CheckLeaderTask; use raftstore::store::{Callback, CasualMessage, RaftCmdExtraOpts}; use raftstore::{DiscardReason, Error as RaftStoreError}; +use tikv_alloc::trace::MemoryTraceGuard; use tikv_util::future::{paired_future_callback, poll_future_notify}; use tikv_util::mpsc::batch::{unbounded, BatchCollector, BatchReceiver, Sender}; use tikv_util::sys::memory_usage_reaches_high_water; @@ -353,7 +354,7 @@ impl + 'static, E: Engine, L: LockManager> Tikv for let begin_instant = Instant::now_coarse(); let future = future_copr(&self.copr, Some(ctx.peer()), req); let task = async move { - let resp = future.await?; + let resp = future.await?.consume(); sink.success(resp).await?; GRPC_MSG_HISTOGRAM_STATIC .coprocessor @@ -1142,14 +1143,15 @@ impl + 'static, E: Engine, L: LockManager> Tikv for } } -fn response_batch_commands_request( +fn response_batch_commands_request( id: u64, resp: F, tx: Sender, begin: Instant, label: GrpcTypeKind, ) where - F: Future> + Send + 'static, + MemoryTraceGuard: From, + F: Future> + Send + 'static, { let task = async move { if let Ok(resp) = resp.await { @@ -1218,6 +1220,15 @@ fn handle_batch_commands_request( response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::raw_get); } }, + Some(batch_commands_request::request::Cmd::Coprocessor(req)) => { + let begin_instant = Instant::now(); + let resp = future_copr(copr, Some(peer.to_string()), req) + .map_ok(|resp| { + resp.map(oneof!(batch_commands_response::response::Cmd::Coprocessor)) + }) + .map_err(|_| GRPC_MSG_FAIL_COUNTER.coprocessor.inc()); + response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::coprocessor); + }, $(Some(batch_commands_request::request::Cmd::$cmd(req)) => { let begin_instant = Instant::now(); let resp = $future_fn($($arg,)* req) @@ -1252,7 +1263,6 @@ fn handle_batch_commands_request( RawScan, future_raw_scan(storage), raw_scan; RawDeleteRange, future_raw_delete_range(storage), raw_delete_range; RawBatchScan, future_raw_batch_scan(storage), raw_batch_scan; - Coprocessor, future_copr(copr, Some(peer.to_string())), coprocessor; RawCoprocessor, future_raw_coprocessor(copr_v2, storage), coprocessor; PessimisticLock, future_acquire_pessimistic_lock(storage), kv_pessimistic_lock; PessimisticRollback, future_pessimistic_rollback(storage), kv_pessimistic_rollback; @@ -1815,7 +1825,7 @@ fn future_copr( copr: &Endpoint, peer: Option, req: Request, -) -> impl Future> { +) -> impl Future>> { let ret = copr.parse_and_handle_unary_request(req, peer); async move { Ok(ret.await) } } @@ -2014,15 +2024,15 @@ impl GrpcRequestDuration { #[derive(Debug)] pub struct MeasuredSingleResponse { pub id: u64, - pub resp: batch_commands_response::Response, + pub resp: MemoryTraceGuard, pub measure: GrpcRequestDuration, } impl MeasuredSingleResponse { - pub fn new( - id: u64, - resp: batch_commands_response::Response, - measure: GrpcRequestDuration, - ) -> Self { + pub fn new(id: u64, resp: T, measure: GrpcRequestDuration) -> Self + where + MemoryTraceGuard: From, + { + let resp = resp.into(); MeasuredSingleResponse { id, resp, measure } } } @@ -2046,10 +2056,10 @@ impl BatchCollector for BatchResp fn collect( &mut self, v: &mut MeasuredBatchResponse, - e: MeasuredSingleResponse, + mut e: MeasuredSingleResponse, ) -> Option { v.batch_resp.mut_request_ids().push(e.id); - v.batch_resp.mut_responses().push(e.resp); + v.batch_resp.mut_responses().push(e.resp.consume()); v.measures.push(e.measure); None }