From 085ee2256e01c29fd84edb9eadd1e69233ab6302 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 11 Oct 2021 14:31:25 +0800 Subject: [PATCH] refactor the profile module and add more functions (#11005) * refactor the profile module and add more functions Signed-off-by: qupeng * address comments Signed-off-by: qupeng * address comments Signed-off-by: qupeng --- Cargo.lock | 5 +- Cargo.toml | 2 +- components/tikv_alloc/Cargo.toml | 3 +- components/tikv_alloc/src/jemalloc.rs | 19 +- components/tikv_alloc/src/lib.rs | 4 - src/server/config.rs | 3 +- src/server/status_server/mod.rs | 454 ++++++++++---------------- src/server/status_server/profile.rs | 385 ++++++++++++++++++++++ 8 files changed, 573 insertions(+), 302 deletions(-) create mode 100644 src/server/status_server/profile.rs diff --git a/Cargo.lock b/Cargo.lock index 4216f9371a8..ca44c5825da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5365,7 +5365,6 @@ dependencies = [ "fxhash", "lazy_static", "libc 0.2.100", - "log", "mimalloc", "snmalloc-rs", "tcmalloc", @@ -5511,9 +5510,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cf844b23c6131f624accf65ce0e4e9956a8bb329400ea5bcc26ae3a5c20b0b" +checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" dependencies = [ "autocfg", "bytes 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index 744a90073f1..0a2b8a5ec22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -218,7 +218,7 @@ collections = { path = "components/collections" } coprocessor_plugin_api = { path = "components/coprocessor_plugin_api" } time = "0.1" tipb = { git = "https://github.com/pingcap/tipb.git", default-features = false } -tokio = { version = "1.5", features = ["full"] } +tokio = { version = "1.12", features = ["full"] } tokio-timer = "0.2" tokio-openssl = "0.6" toml = "0.5" diff --git a/components/tikv_alloc/Cargo.toml b/components/tikv_alloc/Cargo.toml index ff24d2b5dcf..11f4f5509fe 100644 --- a/components/tikv_alloc/Cargo.toml +++ b/components/tikv_alloc/Cargo.toml @@ -10,13 +10,12 @@ jemalloc = ["tikv-jemallocator", "tikv-jemalloc-ctl", "tikv-jemalloc-sys"] # Build jemalloc's profiling features. Without this # certain profile functions will return nothing. -mem-profiling = ["tikv-jemallocator/profiling", "log"] +mem-profiling = ["tikv-jemallocator/profiling"] snmalloc = ["snmalloc-rs"] [dependencies] fxhash = "0.2.1" libc = "0.2" -log = { version = "0.4", optional = true } lazy_static = "1.3" [dev-dependencies] diff --git a/components/tikv_alloc/src/jemalloc.rs b/components/tikv_alloc/src/jemalloc.rs index f2859c50a9b..f219d5cd017 100644 --- a/components/tikv_alloc/src/jemalloc.rs +++ b/components/tikv_alloc/src/jemalloc.rs @@ -124,10 +124,8 @@ mod profiling { const PROF_DUMP: &[u8] = b"prof.dump\0"; pub fn activate_prof() -> ProfResult<()> { - info!("start profiler"); unsafe { if let Err(e) = tikv_jemalloc_ctl::raw::update(PROF_ACTIVE, true) { - error!("failed to activate profiling: {}", e); return Err(ProfError::JemallocError(format!( "failed to activate profiling: {}", e @@ -138,10 +136,8 @@ mod profiling { } pub fn deactivate_prof() -> ProfResult<()> { - info!("stop profiler"); unsafe { if let Err(e) = tikv_jemalloc_ctl::raw::update(PROF_ACTIVE, false) { - error!("failed to deactivate profiling: {}", e); return Err(ProfError::JemallocError(format!( "failed to deactivate profiling: {}", e @@ -155,20 +151,15 @@ mod profiling { pub fn dump_prof(path: &str) -> ProfResult<()> { let mut bytes = CString::new(path)?.into_bytes_with_nul(); let ptr = bytes.as_mut_ptr() as *mut c_char; - let res = unsafe { tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr) }; - match res { - Err(e) => { - error!("failed to dump the profile to {:?}: {}", path, e); - Err(ProfError::JemallocError(format!( + unsafe { + if let Err(e) = tikv_jemalloc_ctl::raw::write(PROF_DUMP, ptr) { + return Err(ProfError::JemallocError(format!( "failed to dump the profile to {:?}: {}", path, e - ))) - } - Ok(_) => { - info!("dump profile to {}", path); - Ok(()) + ))); } } + Ok(()) } #[cfg(test)] diff --git a/components/tikv_alloc/src/lib.rs b/components/tikv_alloc/src/lib.rs index 817901db665..00f4d8c0c02 100644 --- a/components/tikv_alloc/src/lib.rs +++ b/components/tikv_alloc/src/lib.rs @@ -82,10 +82,6 @@ //! `--features=mem-profiling` to cargo for eather `tikv_alloc` or //! `tikv`. -#[cfg(feature = "mem-profiling")] -#[macro_use] -extern crate log; - #[cfg(feature = "jemalloc")] #[macro_use] extern crate lazy_static; diff --git a/src/server/config.rs b/src/server/config.rs index ec2be627c97..c5650708304 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -380,8 +380,7 @@ impl ConfigManager for ServerConfigManager { fn dispatch(&mut self, c: ConfigChange) -> std::result::Result<(), Box> { { let change = c.clone(); - self.config - .update(move |cfg: &mut Config| cfg.update(change)); + self.config.update(move |cfg| cfg.update(change)); if let Err(e) = self.tx.schedule(SnapTask::RefreshConfigEvent) { error!("server configuration manager schedule refresh snapshot work task failed"; "err"=> ?e); } diff --git a/src/server/status_server/mod.rs b/src/server/status_server/mod.rs index 8d08013a90f..aad9a1d6e9d 100644 --- a/src/server/status_server/mod.rs +++ b/src/server/status_server/mod.rs @@ -1,18 +1,26 @@ // Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0. +mod profile; +pub mod region_meta; +use self::profile::{ + activate_heap_profile, deactivate_heap_profile, jeprof_heap_profile, list_heap_profiles, + read_file, start_one_cpu_profile, start_one_heap_profile, +}; + use std::error::Error as StdError; use std::marker::PhantomData; use std::net::SocketAddr; use std::pin::Pin; -use std::str::FromStr; +use std::str::{self, FromStr}; use std::sync::Arc; use std::task::{Context, Poll}; use std::thread; use std::time::{Duration, Instant}; use async_stream::stream; +use collections::HashMap; use engine_traits::KvEngine; -use futures::compat::Compat01As03; +use futures::compat::{Compat01As03, Stream01CompatExt}; use futures::executor::block_on; use futures::future::{ok, poll_fn}; use futures::prelude::*; @@ -23,66 +31,31 @@ use hyper::server::Builder as HyperBuilder; use hyper::service::{make_service_fn, service_fn}; use hyper::{self, header, Body, Client, Method, Request, Response, Server, StatusCode, Uri}; use hyper_openssl::HttpsConnector; +use online_config::OnlineConfig; use openssl::ssl::{ Ssl, SslAcceptor, SslConnector, SslConnectorBuilder, SslFiletype, SslMethod, SslVerifyMode, }; use openssl::x509::X509; +use pd_client::{RpcClient, REQUEST_RECONNECT_INTERVAL}; use pin_project::pin_project; -use pprof::protos::Message; use raftstore::store::{transport::CasualRouter, CasualMessage}; use regex::Regex; -use serde_json::Value; -use tempfile::TempDir; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; -use tokio::runtime::{Builder, Runtime}; -use tokio::sync::oneshot::{self, Receiver, Sender}; -use tokio_openssl::SslStream; - -use collections::HashMap; -use online_config::OnlineConfig; -use pd_client::{RpcClient, REQUEST_RECONNECT_INTERVAL}; use security::{self, SecurityConfig}; -use tikv_alloc::error::ProfError; +use serde_json::Value; use tikv_util::logger::set_log_level; use tikv_util::metrics::dump; use tikv_util::timer::GLOBAL_TIMER_HANDLE; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::runtime::{Builder, Handle, Runtime}; +use tokio::sync::oneshot::{self, Receiver, Sender}; +use tokio_openssl::SslStream; -use super::Result; use crate::config::{log_level_serde, ConfigController}; - -pub mod region_meta; - -mod profiler_guard { - use tikv_alloc::error::ProfResult; - use tikv_alloc::{activate_prof, deactivate_prof}; - - use tokio::sync::{Mutex, MutexGuard}; - - lazy_static! { - static ref PROFILER_MUTEX: Mutex = Mutex::new(0); - } - - pub struct ProfGuard(MutexGuard<'static, u32>); - - pub async fn new_prof() -> ProfResult { - let guard = PROFILER_MUTEX.lock().await; - match activate_prof() { - Ok(_) => Ok(ProfGuard(guard)), - Err(e) => Err(e), - } - } - - impl Drop for ProfGuard { - fn drop(&mut self) { - // TODO: handle error here - let _ = deactivate_prof(); - } - } -} +use crate::server::Result; const COMPONENT_REQUEST_RETRY: usize = 5; - static COMPONENT: &str = "tikv"; +static TIMER_CANCELED: &str = "tokio timer canceled"; #[cfg(feature = "failpoints")] static MISSING_NAME: &[u8] = b"Missing param name"; @@ -93,7 +66,7 @@ static FAIL_POINTS_REQUEST_PATH: &str = "/fail"; #[derive(Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] -pub struct LogLevelRequest { +struct LogLevelRequest { #[serde(with = "log_level_serde")] pub log_level: slog::Level, } @@ -111,44 +84,6 @@ pub struct StatusServer { _snap: PhantomData, } -impl StatusServer<(), ()> { - fn extract_thread_name(thread_name: &str) -> String { - lazy_static! { - static ref THREAD_NAME_RE: Regex = - Regex::new(r"^(?P[a-z-_ :]+?)(-?\d)*$").unwrap(); - static ref THREAD_NAME_REPLACE_SEPERATOR_RE: Regex = Regex::new(r"[_ ]").unwrap(); - } - - THREAD_NAME_RE - .captures(thread_name) - .and_then(|cap| { - cap.name("thread_name").map(|thread_name| { - THREAD_NAME_REPLACE_SEPERATOR_RE - .replace_all(thread_name.as_str(), "-") - .into_owned() - }) - }) - .unwrap_or_else(|| thread_name.to_owned()) - } - - fn frames_post_processor() -> impl Fn(&mut pprof::Frames) { - move |frames| { - let name = Self::extract_thread_name(&frames.thread_name); - frames.thread_name = name; - } - } - - fn err_response(status_code: StatusCode, message: T) -> Response - where - T: Into, - { - Response::builder() - .status(status_code) - .body(message.into()) - .unwrap() - } -} - impl StatusServer where E: 'static, @@ -165,14 +100,11 @@ where .enable_all() .worker_threads(status_thread_pool_size) .thread_name("status-server") - .on_thread_start(|| { - debug!("Status server started"); - }) - .on_thread_stop(|| { - debug!("stopping status server"); - }) + .on_thread_start(|| debug!("Status server started")) + .on_thread_stop(|| debug!("stopping status server")) .build()?; - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + let (tx, rx) = oneshot::channel::<()>(); Ok(StatusServer { thread_pool, tx, @@ -187,64 +119,115 @@ where }) } - pub async fn dump_prof(seconds: u64) -> std::result::Result, ProfError> { - let guard = profiler_guard::new_prof().await?; - info!("start memory profiling {} seconds", seconds); - - let timer = GLOBAL_TIMER_HANDLE.clone(); - let _ = Compat01As03::new(timer.delay(Instant::now() + Duration::from_secs(seconds))).await; - let tmp_dir = TempDir::new()?; - let os_path = tmp_dir.path().join("tikv_dump_profile").into_os_string(); - let path = os_path - .into_string() - .map_err(ProfError::PathEncodingError)?; - tikv_alloc::dump_prof(&path)?; - drop(guard); - let mut file = tokio::fs::File::open(path).await?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).await?; - Ok(buf) + fn list_heap_prof(_req: Request) -> hyper::Result> { + let profiles = match list_heap_profiles() { + Ok(s) => s, + Err(e) => return Ok(make_response(StatusCode::INTERNAL_SERVER_ERROR, e)), + }; + + let text = profiles + .into_iter() + .map(|(f, ct)| format!("{}\t\t{}", f, ct)) + .collect::>() + .join("\n") + .into_bytes(); + + let response = Response::builder() + .header("Content-Type", mime::TEXT_PLAIN.to_string()) + .header("Content-Length", text.len()) + .body(text.into()) + .unwrap(); + Ok(response) } - pub async fn dump_prof_to_resp(req: Request) -> hyper::Result> { - let query = match req.uri().query() { - Some(query) => query, - None => { - return Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - "request should have the query part", - )); - } - }; + async fn activate_heap_prof(req: Request) -> hyper::Result> { + let query = req.uri().query().unwrap_or(""); let query_pairs: HashMap<_, _> = url::form_urlencoded::parse(query.as_bytes()).collect(); - let seconds: u64 = match query_pairs.get("seconds") { + + let interval: u64 = match query_pairs.get("interval") { Some(val) => match val.parse() { Ok(val) => val, - Err(_) => { - return Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - "request should have seconds argument", - )); - } + Err(err) => return Ok(make_response(StatusCode::BAD_REQUEST, err.to_string())), }, - None => 10, + None => 60, + }; + + let interval = Duration::from_secs(interval); + let period = GLOBAL_TIMER_HANDLE + .interval(Instant::now() + interval, interval) + .compat() + .map_ok(|_| ()) + .map_err(|_| TIMER_CANCELED.to_owned()) + .into_stream(); + let (tx, rx) = oneshot::channel(); + let callback = move || tx.send(()).unwrap_or_default(); + let res = Handle::current().spawn(activate_heap_profile(period, callback)); + if rx.await.is_ok() { + let msg = "activate heap profile success"; + Ok(make_response(StatusCode::OK, msg)) + } else { + let errmsg = format!("{:?}", res.await); + Ok(make_response(StatusCode::INTERNAL_SERVER_ERROR, errmsg)) + } + } + + fn deactivate_heap_prof(_req: Request) -> hyper::Result> { + let body = if deactivate_heap_profile() { + "deactivate heap profile success" + } else { + "no heap profile is running" }; + Ok(make_response(StatusCode::OK, body)) + } + + async fn dump_heap_prof_to_resp(req: Request) -> hyper::Result> { + let query = req.uri().query().unwrap_or(""); + let query_pairs: HashMap<_, _> = url::form_urlencoded::parse(query.as_bytes()).collect(); - match Self::dump_prof(seconds).await { - Ok(buf) => { - let response = Response::builder() + let use_jeprof = query_pairs.get("jeprof").map(|x| x.as_ref()) == Some("true"); + + let result = if let Some(name) = query_pairs.get("name") { + if use_jeprof { + jeprof_heap_profile(name) + } else { + read_file(name) + } + } else { + let mut seconds = 10; + if let Some(s) = query_pairs.get("seconds") { + match s.parse() { + Ok(val) => seconds = val, + Err(_) => { + let errmsg = "request should have seconds argument".to_owned(); + return Ok(make_response(StatusCode::BAD_REQUEST, errmsg)); + } + } + } + let timer = GLOBAL_TIMER_HANDLE.delay(Instant::now() + Duration::from_secs(seconds)); + let end = Compat01As03::new(timer) + .map_err(|_| TIMER_CANCELED.to_owned()) + .into_future(); + start_one_heap_profile(end, use_jeprof).await + }; + + match result { + Ok(body) => { + info!("dump or get heap profile successfully"); + let mut response = Response::builder() .header("X-Content-Type-Options", "nosniff") .header("Content-Disposition", "attachment; filename=\"profile\"") - .header("Content-Type", mime::APPLICATION_OCTET_STREAM.to_string()) - .header("Content-Length", buf.len()) - .body(buf.into()) - .unwrap(); - Ok(response) + .header("Content-Length", body.len()); + response = if use_jeprof { + response.header("Content-Type", mime::IMAGE_SVG.to_string()) + } else { + response.header("Content-Type", mime::APPLICATION_OCTET_STREAM.to_string()) + }; + Ok(response.body(body.into()).unwrap()) + } + Err(e) => { + info!("dump or get heap profile fail: {}", e); + Ok(make_response(StatusCode::INTERNAL_SERVER_ERROR, e)) } - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - err.to_string(), - )), } } @@ -259,12 +242,7 @@ where full = match query_pairs.get("full") { Some(val) => match val.parse() { Ok(val) => val, - Err(err) => { - return Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - err.to_string(), - )); - } + Err(err) => return Ok(make_response(StatusCode::BAD_REQUEST, err.to_string())), }, None => false, }; @@ -281,10 +259,7 @@ where .header(header::CONTENT_TYPE, "application/json") .body(Body::from(json)) .unwrap(), - Err(_) => StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Internal Server Error", - ), + Err(_) => make_response(StatusCode::INTERNAL_SERVER_ERROR, "Internal Server Error"), }) } @@ -301,7 +276,7 @@ where .await?; Ok(match decode_json(&body) { Ok(change) => match cfg_controller.update(change) { - Err(e) => StatusServer::err_response( + Err(e) => make_response( StatusCode::INTERNAL_SERVER_ERROR, format!("failed to update, error: {:?}", e), ), @@ -311,44 +286,21 @@ where resp } }, - Err(e) => StatusServer::err_response( + Err(e) => make_response( StatusCode::INTERNAL_SERVER_ERROR, format!("failed to decode, error: {:?}", e), ), }) } - pub async fn dump_rsprof(seconds: u64, frequency: i32) -> pprof::Result { - let guard = pprof::ProfilerGuard::new(frequency)?; - info!( - "start profiling {} seconds with frequency {} /s", - seconds, frequency - ); - let timer = GLOBAL_TIMER_HANDLE.clone(); - let _ = Compat01As03::new(timer.delay(Instant::now() + Duration::from_secs(seconds))).await; - guard - .report() - .frames_post_processor(StatusServer::frames_post_processor()) - .build() - } - - pub async fn dump_rsperf_to_resp(req: Request) -> hyper::Result> { - let query = match req.uri().query() { - Some(query) => query, - None => { - return Ok(StatusServer::err_response(StatusCode::BAD_REQUEST, "")); - } - }; + pub async fn dump_cpu_prof_to_resp(req: Request) -> hyper::Result> { + let query = req.uri().query().unwrap_or(""); let query_pairs: HashMap<_, _> = url::form_urlencoded::parse(query.as_bytes()).collect(); + let seconds: u64 = match query_pairs.get("seconds") { Some(val) => match val.parse() { Ok(val) => val, - Err(err) => { - return Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - err.to_string(), - )); - } + Err(err) => return Ok(make_response(StatusCode::BAD_REQUEST, err.to_string())), }, None => 10, }; @@ -356,56 +308,29 @@ where let frequency: i32 = match query_pairs.get("frequency") { Some(val) => match val.parse() { Ok(val) => val, - Err(err) => { - return Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - err.to_string(), - )); - } + Err(err) => return Ok(make_response(StatusCode::BAD_REQUEST, err.to_string())), }, None => 99, // Default frequency of sampling. 99Hz to avoid coincide with special periods }; let prototype_content_type: hyper::http::HeaderValue = hyper::http::HeaderValue::from_str("application/protobuf").unwrap(); - let report = match Self::dump_rsprof(seconds, frequency).await { - Ok(report) => report, - Err(err) => { - return Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - err.to_string(), - )); - } - }; + let output_protobuf = req.headers().get("Content-Type") == Some(&prototype_content_type); - let mut body: Vec = Vec::new(); - if req.headers().get("Content-Type") == Some(&prototype_content_type) { - match report.pprof() { - Ok(profile) => match profile.encode(&mut body) { - Ok(()) => { - info!("write report successfully"); - Ok(StatusServer::err_response(StatusCode::OK, body)) - } - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - err.to_string(), - )), - }, - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - err.to_string(), - )), + let timer = GLOBAL_TIMER_HANDLE.delay(Instant::now() + Duration::from_secs(seconds)); + let end = async move { + Compat01As03::new(timer) + .await + .map_err(|_| TIMER_CANCELED.to_owned()) + }; + match start_one_cpu_profile(end, frequency, output_protobuf).await { + Ok(body) => { + info!("dump cpu profile successfully"); + Ok(make_response(StatusCode::OK, body)) } - } else { - match report.flamegraph(&mut body) { - Ok(_) => { - info!("write report successfully"); - Ok(StatusServer::err_response(StatusCode::OK, body)) - } - Err(err) => Ok(StatusServer::err_response( - StatusCode::INTERNAL_SERVER_ERROR, - err.to_string(), - )), + Err(e) => { + info!("dump cpu profile fail: {}", e); + Ok(make_response(StatusCode::INTERNAL_SERVER_ERROR, e)) } } } @@ -427,10 +352,7 @@ where set_log_level(req.log_level); Ok(Response::new(Body::empty())) } - Err(err) => Ok(StatusServer::err_response( - StatusCode::BAD_REQUEST, - err.to_string(), - )), + Err(err) => Ok(make_response(StatusCode::BAD_REQUEST, err.to_string())), } } @@ -438,7 +360,7 @@ where // unregister the status address to pd self.unregister_addr(); let _ = self.tx.send(()); - self.thread_pool.shutdown_timeout(Duration::from_secs(10)); + self.thread_pool.shutdown_timeout(Duration::from_secs(3)); } // Return listening address, this may only be used for outer test @@ -606,15 +528,8 @@ where static ref REGION: Regex = Regex::new(r"/region/(?P\d+)").unwrap(); } - fn err_resp( - status_code: StatusCode, - msg: impl Into, - ) -> hyper::Result> { - Ok(StatusServer::err_response(status_code, msg)) - } - fn not_found(msg: impl Into) -> hyper::Result> { - err_resp(StatusCode::NOT_FOUND, msg) + Ok(make_response(StatusCode::NOT_FOUND, msg)) } let cap = match REGION.captures(req.uri().path()) { @@ -625,10 +540,10 @@ where let id: u64 = match cap["id"].parse() { Ok(id) => id, Err(err) => { - return err_resp( + return Ok(make_response( StatusCode::BAD_REQUEST, format!("invalid region id: {}", err), - ); + )); } }; let (tx, rx) = oneshot::channel(); @@ -645,17 +560,17 @@ where return not_found(format!("region({}) not found", id)); } Err(err) => { - return err_resp( + return Ok(make_response( StatusCode::INTERNAL_SERVER_ERROR, format!("channel pending or disconnect: {}", err), - ); + )); } } let meta = match rx.await { Ok(meta) => meta, Err(_) => { - return Ok(StatusServer::err_response( + return Ok(make_response( StatusCode::INTERNAL_SERVER_ERROR, "query cancelled", )); @@ -665,7 +580,7 @@ where let body = match serde_json::to_vec(&meta) { Ok(body) => body, Err(err) => { - return Ok(StatusServer::err_response( + return Ok(make_response( StatusCode::INTERNAL_SERVER_ERROR, format!("fails to json: {}", err), )); @@ -676,7 +591,7 @@ where .body(hyper::Body::from(body)) { Ok(resp) => Ok(resp), - Err(err) => Ok(StatusServer::err_response( + Err(err) => Ok(make_response( StatusCode::INTERNAL_SERVER_ERROR, format!("fails to build response: {}", err), )), @@ -729,7 +644,7 @@ where ); if should_check_cert && !check_cert(security_config, x509) { - return Ok(StatusServer::err_response( + return Ok(make_response( StatusCode::FORBIDDEN, "certificate role error", )); @@ -738,8 +653,15 @@ where match (method, path.as_ref()) { (Method::GET, "/metrics") => Ok(Response::new(dump().into())), (Method::GET, "/status") => Ok(Response::default()), + (Method::GET, "/debug/pprof/heap_list") => Self::list_heap_prof(req), + (Method::GET, "/debug/pprof/heap_activate") => { + Self::activate_heap_prof(req).await + } + (Method::GET, "/debug/pprof/heap_deactivate") => { + Self::deactivate_heap_prof(req) + } (Method::GET, "/debug/pprof/heap") => { - Self::dump_prof_to_resp(req).await + Self::dump_heap_prof_to_resp(req).await } (Method::GET, "/config") => { Self::get_config(req, &cfg_controller).await @@ -748,7 +670,7 @@ where Self::update_config(cfg_controller.clone(), req).await } (Method::GET, "/debug/pprof/profile") => { - Self::dump_rsperf_to_resp(req).await + Self::dump_cpu_prof_to_resp(req).await } (Method::GET, "/debug/fail_point") => { info!("debug fail point API start"); @@ -762,10 +684,7 @@ where (Method::PUT, path) if path.starts_with("/log-level") => { Self::change_log_level(req).await } - _ => Ok(StatusServer::err_response( - StatusCode::NOT_FOUND, - "path not found", - )), + _ => Ok(make_response(StatusCode::NOT_FOUND, "path not found")), } } })) @@ -1021,6 +940,16 @@ fn decode_json( } } +fn make_response(status_code: StatusCode, message: T) -> Response +where + T: Into, +{ + Response::builder() + .status(status_code) + .body(message.into()) + .unwrap() +} + #[cfg(test)] mod tests { use futures::executor::block_on; @@ -1037,6 +966,7 @@ mod tests { use std::sync::Arc; use crate::config::{ConfigController, TiKvConfig}; + use crate::server::status_server::profile::TEST_PROFILE_MUTEX; use crate::server::status_server::{LogLevelRequest, StatusServer}; use collections::HashSet; use engine_test::kv::KvTestEngine; @@ -1341,34 +1271,6 @@ mod tests { status_server.stop(); } - #[test] - fn test_extract_thread_name() { - assert_eq!( - &StatusServer::extract_thread_name("test-name-1"), - "test-name" - ); - assert_eq!( - &StatusServer::extract_thread_name("grpc-server-5"), - "grpc-server" - ); - assert_eq!( - &StatusServer::extract_thread_name("rocksdb:bg1000"), - "rocksdb:bg" - ); - assert_eq!( - &StatusServer::extract_thread_name("raftstore-1-100"), - "raftstore" - ); - assert_eq!( - &StatusServer::extract_thread_name("snap sender1000"), - "snap-sender" - ); - assert_eq!( - &StatusServer::extract_thread_name("snap_sender1000"), - "snap-sender" - ); - } - fn do_test_security_status_service(allowed_cn: HashSet, expected: bool) { let mut status_server = StatusServer::new( 1, @@ -1470,6 +1372,7 @@ mod tests { #[test] fn test_pprof_profile_service() { + let _test_guard = TEST_PROFILE_MUTEX.lock().unwrap(); let mut status_server = StatusServer::new( 1, None, @@ -1491,7 +1394,6 @@ mod tests { .thread_pool .spawn(async move { client.get(uri).await.unwrap() }); let resp = block_on(handle).unwrap(); - assert_eq!(resp.status(), StatusCode::OK); status_server.stop(); } diff --git a/src/server/status_server/profile.rs b/src/server/status_server/profile.rs new file mode 100644 index 00000000000..be88c006981 --- /dev/null +++ b/src/server/status_server/profile.rs @@ -0,0 +1,385 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +#[cfg(test)] +pub use self::test_utils::TEST_PROFILE_MUTEX; + +use std::fs::File; +use std::io::Read; +use std::os::linux::fs::MetadataExt; +use std::pin::Pin; +use std::process::Command; +use std::sync::Mutex as StdMutex; +use std::time::{Duration, UNIX_EPOCH}; + +use chrono::{offset::Local, DateTime}; +use futures::channel::oneshot::{self, Sender}; +use futures::future::BoxFuture; +use futures::task::{Context, Poll}; +use futures::{select, Future, FutureExt, Stream, StreamExt}; +use lazy_static::lazy_static; +use pprof::protos::Message; +use regex::Regex; +use tempfile::{NamedTempFile, TempDir}; +use tokio::sync::{Mutex, MutexGuard}; + +#[cfg(test)] +use self::test_utils::{activate_prof, deactivate_prof, dump_prof}; +#[cfg(not(test))] +use tikv_alloc::{activate_prof, deactivate_prof, dump_prof}; + +// File name suffix for periodically dumped heap profiles. +const HEAP_PROFILE_SUFFIX: &str = ".heap"; + +lazy_static! { + // If it's locked it means there are already a heap or CPU profiling. + static ref PROFILE_MUTEX: Mutex<()> = Mutex::new(()); + // The channel is used to deactivate a profiling. + static ref PROFILE_ACTIVE: StdMutex, TempDir)>> = StdMutex::new(None); + + // To normalize thread names. + static ref THREAD_NAME_RE: Regex = + Regex::new(r"^(?P[a-z-_ :]+?)(-?\d)*$").unwrap(); + static ref THREAD_NAME_REPLACE_SEPERATOR_RE: Regex = Regex::new(r"[_ ]").unwrap(); +} + +type OnEndFn = Box Result + Send + 'static>; + +struct ProfileGuard<'a, I, T> { + _guard: MutexGuard<'a, ()>, + item: Option, + on_end: Option>, + end: BoxFuture<'static, Result<(), String>>, +} + +impl<'a, I, T> Unpin for ProfileGuard<'a, I, T> {} + +impl<'a, I, T> ProfileGuard<'a, I, T> { + fn new( + on_start: F1, + on_end: F2, + end: BoxFuture<'static, Result<(), String>>, + ) -> Result, String> + where + F1: FnOnce() -> Result, + F2: FnOnce(I) -> Result + Send + 'static, + { + let _guard = match PROFILE_MUTEX.try_lock() { + Ok(guard) => guard, + _ => return Err("Already in Profiling".to_owned()), + }; + let item = on_start()?; + Ok(ProfileGuard { + _guard, + item: Some(item), + on_end: Some(Box::new(on_end) as OnEndFn), + end, + }) + } +} + +impl<'a, I, T> Future for ProfileGuard<'a, I, T> { + type Output = Result; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.end.as_mut().poll(cx) { + Poll::Ready(Ok(_)) => { + let item = self.item.take().unwrap(); + let on_end = self.on_end.take().unwrap(); + Poll::Ready(on_end(item)) + } + Poll::Ready(Err(errmsg)) => Poll::Ready(Err(errmsg)), + Poll::Pending => Poll::Pending, + } + } +} + +/// Trigger a heap profie and return the content. +pub async fn start_one_heap_profile(end: F, use_jeprof: bool) -> Result, String> +where + F: Future> + Send + 'static, +{ + let on_start = || activate_prof().map_err(|e| format!("activate_prof: {}", e)); + + let on_end = move |_| { + deactivate_prof().map_err(|e| format!("deactivate_prof: {}", e))?; + let f = NamedTempFile::new().map_err(|e| format!("create tmp file fail: {}", e))?; + let path = f.path().to_str().unwrap(); + dump_prof(path).map_err(|e| format!("dump_prof: {}", e))?; + if use_jeprof { + jeprof_heap_profile(path) + } else { + read_file(path) + } + }; + + ProfileGuard::new(on_start, on_end, end.boxed())?.await +} + +/// Activate heap profile and call `callback` if successfully. +/// `deactivate_heap_profile` can only be called after it's notified from `callback`. +pub async fn activate_heap_profile(dump_period: S, callback: F) -> Result<(), String> +where + S: Stream> + Send + Unpin + 'static, + F: FnOnce() + Send + 'static, +{ + let (tx, rx) = oneshot::channel(); + let dir = TempDir::new().map_err(|e| format!("create temp directory: {}", e))?; + let dir_path = dir.path().to_str().unwrap().to_owned(); + + let on_start = move || { + let mut activate = PROFILE_ACTIVE.lock().unwrap(); + assert!(activate.is_none()); + *activate = Some((tx, dir)); + activate_prof().map_err(|e| format!("activate_prof: {}", e))?; + callback(); + info!("periodical heap profiling is started"); + Ok(()) + }; + + let on_end = |_| { + deactivate_heap_profile(); + deactivate_prof().map_err(|e| format!("deactivate_prof: {}", e)) + }; + + let end = async move { + select! { + _ = rx.fuse() => { + info!("periodical heap profiling is canceled"); + Ok(()) + }, + res = dump_heap_profile_periodically(dump_period, dir_path).fuse() => { + warn!("the heap profiling dump loop shouldn't break"); + res + } + } + }; + + ProfileGuard::new(on_start, on_end, end.boxed())?.await +} + +/// Deactivate heap profile. Return `false` if it hasn't been activated. +pub fn deactivate_heap_profile() -> bool { + let mut activate = PROFILE_ACTIVE.lock().unwrap(); + activate.take().is_some() +} + +/// Trigger one cpu profile. +pub async fn start_one_cpu_profile( + end: F, + frequency: i32, + protobuf: bool, +) -> Result, String> +where + F: Future> + Send + 'static, +{ + let on_start = || { + let guard = pprof::ProfilerGuard::new(frequency) + .map_err(|e| format!("pprof::ProfileGuard::new fail: {}", e))?; + Ok(guard) + }; + + let on_end = move |guard: pprof::ProfilerGuard<'static>| { + let report = guard + .report() + .frames_post_processor(move |frames| { + let name = extract_thread_name(&frames.thread_name); + frames.thread_name = name; + }) + .build() + .map_err(|e| format!("create cpu profiling report fail: {}", e))?; + let mut body = Vec::new(); + if protobuf { + let profile = report + .pprof() + .map_err(|e| format!("generate pprof from report fail: {}", e))?; + profile + .encode(&mut body) + .map_err(|e| format!("encode pprof into bytes fail: {}", e))?; + } else { + report + .flamegraph(&mut body) + .map_err(|e| format!("generate flamegraph from report fail: {}", e))?; + } + Ok(body) + }; + + ProfileGuard::new(on_start, on_end, end.boxed())?.await +} + +pub fn read_file(path: &str) -> Result, String> { + let mut f = File::open(path).map_err(|e| format!("open {} fail: {}", path, e))?; + let mut buf = Vec::new(); + f.read_to_end(&mut buf) + .map_err(|e| format!("read {} fail: {}", path, e))?; + Ok(buf) +} + +pub fn jeprof_heap_profile(path: &str) -> Result, String> { + info!("using jeprof to process {}", path); + let output = Command::new("./jeprof") + .args(&["--show_bytes", "./bin/tikv-server", path, "--svg"]) + .output() + .map_err(|e| format!("jeprof: {}", e))?; + if !output.status.success() { + let stderr = std::str::from_utf8(&output.stderr).unwrap_or("invalid utf8"); + return Err(format!("jeprof stderr: {:?}", stderr)); + } + Ok(output.stdout) +} + +pub fn list_heap_profiles() -> Result, String> { + let path = match &*PROFILE_ACTIVE.lock().unwrap() { + Some((_, ref dir)) => dir.path().to_str().unwrap().to_owned(), + None => return Ok(vec![]), + }; + + let dir = std::fs::read_dir(&path).map_err(|e| format!("read dir fail: {}", e))?; + let mut profiles = Vec::new(); + for item in dir { + let item = match item { + Ok(x) => x, + _ => continue, + }; + let f = item.path().to_str().unwrap().to_owned(); + if !f.ends_with(HEAP_PROFILE_SUFFIX) { + continue; + } + let ct = item.metadata().map(|x| x.st_ctime() as u64).unwrap(); + let dt = DateTime::::from(UNIX_EPOCH + Duration::from_secs(ct)); + profiles.push((f, dt.format("%Y-%m-%d %H:%M:%S").to_string())); + } + + // Reverse sort them. + profiles.sort_by(|x, y| y.1.cmp(&x.1)); + info!("list_heap_profiles gets {} items", profiles.len()); + Ok(profiles) +} + +async fn dump_heap_profile_periodically(mut period: S, dir: String) -> Result<(), String> +where + S: Stream> + Send + Unpin + 'static, +{ + let mut id = 0; + while let Some(res) = period.next().await { + let _ = res?; + id += 1; + let path = format!("{}/{:0>6}{}", dir, id, HEAP_PROFILE_SUFFIX); + dump_prof(&path).map_err(|e| format!("dump_prof: {}", e))?; + info!("a heap profile is dumped to {}", path); + } + Ok(()) +} + +fn extract_thread_name(thread_name: &str) -> String { + THREAD_NAME_RE + .captures(thread_name) + .and_then(|cap| { + cap.name("thread_name").map(|thread_name| { + THREAD_NAME_REPLACE_SEPERATOR_RE + .replace_all(thread_name.as_str(), "-") + .into_owned() + }) + }) + .unwrap_or_else(|| thread_name.to_owned()) +} + +// Re-define some heap profiling functions because heap-profiling is not enabled for tests. +#[cfg(test)] +mod test_utils { + use std::sync::Mutex; + use tikv_alloc::error::ProfResult; + + lazy_static! { + pub static ref TEST_PROFILE_MUTEX: Mutex<()> = Mutex::new(()); + } + + pub fn activate_prof() -> ProfResult<()> { + Ok(()) + } + pub fn deactivate_prof() -> ProfResult<()> { + Ok(()) + } + pub fn dump_prof(_: &str) -> ProfResult<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::channel::{mpsc, oneshot}; + use futures::executor::block_on; + use futures::TryFutureExt; + use std::sync::mpsc::sync_channel; + use std::thread; + use std::time::Duration; + use tokio::runtime; + + #[test] + fn test_extract_thread_name() { + assert_eq!(&extract_thread_name("test-name-1"), "test-name"); + assert_eq!(&extract_thread_name("grpc-server-5"), "grpc-server"); + assert_eq!(&extract_thread_name("rocksdb:bg1000"), "rocksdb:bg"); + assert_eq!(&extract_thread_name("raftstore-1-100"), "raftstore"); + assert_eq!(&extract_thread_name("snap sender1000"), "snap-sender"); + assert_eq!(&extract_thread_name("snap_sender1000"), "snap-sender"); + } + + // Test there is at most 1 concurrent profiling. + #[test] + fn test_profile_guard_concurrency() { + let _test_guard = TEST_PROFILE_MUTEX.lock().unwrap(); + let rt = runtime::Builder::new_multi_thread() + .worker_threads(4) + .build() + .unwrap(); + + let expected = "Already in Profiling"; + + let (tx1, rx1) = oneshot::channel(); + let rx1 = rx1.map_err(|_| "channel canceled".to_owned()); + let res1 = rt.spawn(start_one_cpu_profile(rx1, 99, false)); + thread::sleep(Duration::from_millis(100)); + + let (_tx2, rx2) = oneshot::channel(); + let rx2 = rx2.map_err(|_| "channel canceled".to_owned()); + let res2 = rt.spawn(start_one_cpu_profile(rx2, 99, false)); + assert_eq!(block_on(res2).unwrap().unwrap_err(), expected); + + let (_tx2, rx2) = oneshot::channel(); + let rx2 = rx2.map_err(|_| "channel canceled".to_owned()); + let res2 = rt.spawn(start_one_heap_profile(rx2, false)); + assert_eq!(block_on(res2).unwrap().unwrap_err(), expected); + + let (_tx2, rx2) = mpsc::channel(1); + let res2 = rt.spawn(activate_heap_profile(rx2, || {})); + assert_eq!(block_on(res2).unwrap().unwrap_err(), expected); + + drop(tx1); + assert!(block_on(res1).unwrap().is_err()); + } + + #[test] + fn test_profile_guard_toggle() { + let _test_guard = TEST_PROFILE_MUTEX.lock().unwrap(); + let rt = runtime::Builder::new_multi_thread() + .worker_threads(4) + .build() + .unwrap(); + + // Test activated profiling can be stopped by canceling the period stream. + let (tx, rx) = mpsc::channel(1); + let res = rt.spawn(activate_heap_profile(rx, || {})); + drop(tx); + assert!(block_on(res).unwrap().is_ok()); + + // Test activated profiling can be stopped by the handle. + let (tx, rx) = sync_channel::(1); + let on_activated = move || drop(tx); + let check_activated = move || rx.recv().is_err(); + + let (_tx, _rx) = mpsc::channel(1); + let res = rt.spawn(activate_heap_profile(_rx, on_activated)); + assert!(check_activated()); + assert!(deactivate_heap_profile()); + assert!(block_on(res).unwrap().is_ok()); + } +}