From b2117e26df80e1d4d85fcedbc7c6249ffda789c2 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 13 Sep 2021 14:28:40 +0800 Subject: [PATCH] Add RocksDB perf metrics for CDC and storage (#10898) * init Signed-off-by: qupeng * add perf context for cdc Signed-off-by: qupeng --- Cargo.lock | 1 + components/cdc/Cargo.toml | 1 + components/cdc/src/endpoint.rs | 24 ++++- components/cdc/src/metrics.rs | 136 ++++++++++++++++++++++++ components/tikv_kv/src/perf_context.rs | 1 + src/storage/metrics.rs | 139 ++++++++++++++++++++++++- src/storage/mod.rs | 18 +++- 7 files changed, 311 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15283a5791d..ebe30c72ff1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -475,6 +475,7 @@ dependencies = [ "panic_hook", "pd_client", "prometheus", + "prometheus-static-metric", "prost", "protobuf", "raft", diff --git a/components/cdc/Cargo.toml b/components/cdc/Cargo.toml index e87ad797556..842d086adcf 100644 --- a/components/cdc/Cargo.toml +++ b/components/cdc/Cargo.toml @@ -77,6 +77,7 @@ fail = "0.4" lazy_static = "1.3" log_wrappers = { path = "../log_wrappers" } prometheus = { version = "0.12", default-features = false, features = ["nightly"] } +prometheus-static-metric = "0.5" protobuf = "2.8" prost = "0.7" futures-timer = "3.0" diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 255ebb32bf6..39ec5c30e12 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -36,7 +36,7 @@ use raftstore::store::msg::{Callback, ReadResponse, SignificantMsg}; use resolved_ts::Resolver; use security::SecurityManager; use tikv::config::CdcConfig; -use tikv::storage::kv::Snapshot; +use tikv::storage::kv::{PerfStatisticsInstant, Snapshot}; use tikv::storage::mvcc::{DeltaScanner, ScannerBuilder}; use tikv::storage::txn::TxnEntry; use tikv::storage::txn::TxnEntryScanner; @@ -1227,14 +1227,15 @@ impl Initializer { Ok(()) } - async fn scan_batch( + fn do_scan( &self, scanner: &mut DeltaScanner, - resolver: Option<&mut Resolver>, - ) -> Result>> { - let mut entries = Vec::with_capacity(self.max_scan_batch_size); + entries: &mut Vec>, + ) -> Result { let mut total_bytes = 0; let mut total_size = 0; + + let perf_instant = PerfStatisticsInstant::new(); while total_bytes <= self.max_scan_batch_bytes && total_size < self.max_scan_batch_size { total_size += 1; match scanner.next_entry()? { @@ -1248,6 +1249,19 @@ impl Initializer { } } } + TLS_CDC_PERF_STATS.with(|x| *x.borrow_mut() += perf_instant.delta()); + Ok(total_bytes) + } + + async fn scan_batch( + &self, + scanner: &mut DeltaScanner, + resolver: Option<&mut Resolver>, + ) -> Result>> { + let mut entries = Vec::with_capacity(self.max_scan_batch_size); + let total_bytes = self.do_scan(scanner, &mut entries)?; + tls_flush_perf_stats(); + if total_bytes > 0 { self.speed_limiter.consume(total_bytes).await; CDC_SCAN_BYTES.inc_by(total_bytes as _); diff --git a/components/cdc/src/metrics.rs b/components/cdc/src/metrics.rs index 27f1cf85fd0..7985b05559a 100644 --- a/components/cdc/src/metrics.rs +++ b/components/cdc/src/metrics.rs @@ -2,6 +2,66 @@ use lazy_static::*; use prometheus::*; +use prometheus_static_metric::*; +use std::cell::RefCell; +use tikv::storage::kv::PerfStatisticsDelta; + +make_auto_flush_static_metric! { + pub label_enum PerfMetric { + user_key_comparison_count, + block_cache_hit_count, + block_read_count, + block_read_byte, + block_read_time, + block_cache_index_hit_count, + index_block_read_count, + block_cache_filter_hit_count, + filter_block_read_count, + block_checksum_time, + block_decompress_time, + get_read_bytes, + iter_read_bytes, + internal_key_skipped_count, + internal_delete_skipped_count, + internal_recent_skipped_count, + get_snapshot_time, + get_from_memtable_time, + get_from_memtable_count, + get_post_process_time, + get_from_output_files_time, + seek_on_memtable_time, + seek_on_memtable_count, + next_on_memtable_count, + prev_on_memtable_count, + seek_child_seek_time, + seek_child_seek_count, + seek_min_heap_time, + seek_max_heap_time, + seek_internal_seek_time, + db_mutex_lock_nanos, + db_condition_wait_nanos, + read_index_block_nanos, + read_filter_block_nanos, + new_table_block_iter_nanos, + new_table_iterator_nanos, + block_seek_nanos, + find_table_nanos, + bloom_memtable_hit_count, + bloom_memtable_miss_count, + bloom_sst_hit_count, + bloom_sst_miss_count, + get_cpu_nanos, + iter_next_cpu_nanos, + iter_prev_cpu_nanos, + iter_seek_cpu_nanos, + encrypt_data_nanos, + decrypt_data_nanos, + } + + pub struct PerfCounter: LocalIntCounter { + "metric" => PerfMetric, + } +} lazy_static! { pub static ref CDC_RESOLVED_TS_GAP_HISTOGRAM: Histogram = register_histogram!( @@ -116,4 +176,80 @@ lazy_static! { &["type"] ) .unwrap(); + + pub static ref CDC_ROCKSDB_PERF_COUNTER: IntCounterVec = register_int_counter_vec!( + "tikv_cdc_rocksdb_perf", + "Total number of RocksDB internal operations from PerfContext", + &["metric"] + ) + .unwrap(); + + pub static ref CDC_ROCKSDB_PERF_COUNTER_STATIC: PerfCounter = + auto_flush_from!(CDC_ROCKSDB_PERF_COUNTER, PerfCounter); +} + +thread_local! { + pub static TLS_CDC_PERF_STATS: RefCell = RefCell::new(PerfStatisticsDelta::default()); +} + +macro_rules! tls_flush_perf_stat { + ($local_stats:ident, $stat:ident) => { + CDC_ROCKSDB_PERF_COUNTER_STATIC + .$stat + .inc_by($local_stats.0.$stat as u64); + }; +} + +pub fn tls_flush_perf_stats() { + TLS_CDC_PERF_STATS.with(|x| { + let perf_stats = std::mem::take(&mut *x.borrow_mut()); + tls_flush_perf_stat!(perf_stats, user_key_comparison_count); + tls_flush_perf_stat!(perf_stats, block_cache_hit_count); + tls_flush_perf_stat!(perf_stats, block_read_count); + tls_flush_perf_stat!(perf_stats, block_read_byte); + tls_flush_perf_stat!(perf_stats, block_read_time); + tls_flush_perf_stat!(perf_stats, block_cache_index_hit_count); + tls_flush_perf_stat!(perf_stats, index_block_read_count); + tls_flush_perf_stat!(perf_stats, block_cache_filter_hit_count); + tls_flush_perf_stat!(perf_stats, filter_block_read_count); + tls_flush_perf_stat!(perf_stats, block_checksum_time); + tls_flush_perf_stat!(perf_stats, block_decompress_time); + tls_flush_perf_stat!(perf_stats, get_read_bytes); + tls_flush_perf_stat!(perf_stats, iter_read_bytes); + tls_flush_perf_stat!(perf_stats, internal_key_skipped_count); + tls_flush_perf_stat!(perf_stats, internal_delete_skipped_count); + tls_flush_perf_stat!(perf_stats, internal_recent_skipped_count); + tls_flush_perf_stat!(perf_stats, get_snapshot_time); + tls_flush_perf_stat!(perf_stats, get_from_memtable_time); + tls_flush_perf_stat!(perf_stats, get_from_memtable_count); + tls_flush_perf_stat!(perf_stats, get_post_process_time); + tls_flush_perf_stat!(perf_stats, get_from_output_files_time); + tls_flush_perf_stat!(perf_stats, seek_on_memtable_time); + tls_flush_perf_stat!(perf_stats, seek_on_memtable_count); + tls_flush_perf_stat!(perf_stats, next_on_memtable_count); + tls_flush_perf_stat!(perf_stats, prev_on_memtable_count); + tls_flush_perf_stat!(perf_stats, seek_child_seek_time); + tls_flush_perf_stat!(perf_stats, seek_child_seek_count); + tls_flush_perf_stat!(perf_stats, seek_min_heap_time); + tls_flush_perf_stat!(perf_stats, seek_max_heap_time); + tls_flush_perf_stat!(perf_stats, seek_internal_seek_time); + tls_flush_perf_stat!(perf_stats, db_mutex_lock_nanos); + tls_flush_perf_stat!(perf_stats, db_condition_wait_nanos); + tls_flush_perf_stat!(perf_stats, read_index_block_nanos); + tls_flush_perf_stat!(perf_stats, read_filter_block_nanos); + tls_flush_perf_stat!(perf_stats, new_table_block_iter_nanos); + tls_flush_perf_stat!(perf_stats, new_table_iterator_nanos); + tls_flush_perf_stat!(perf_stats, block_seek_nanos); + tls_flush_perf_stat!(perf_stats, find_table_nanos); + tls_flush_perf_stat!(perf_stats, bloom_memtable_hit_count); + tls_flush_perf_stat!(perf_stats, bloom_memtable_miss_count); + tls_flush_perf_stat!(perf_stats, bloom_sst_hit_count); + tls_flush_perf_stat!(perf_stats, bloom_sst_miss_count); + tls_flush_perf_stat!(perf_stats, get_cpu_nanos); + tls_flush_perf_stat!(perf_stats, iter_next_cpu_nanos); + tls_flush_perf_stat!(perf_stats, iter_prev_cpu_nanos); + tls_flush_perf_stat!(perf_stats, iter_seek_cpu_nanos); + tls_flush_perf_stat!(perf_stats, encrypt_data_nanos); + tls_flush_perf_stat!(perf_stats, decrypt_data_nanos); + }); } diff --git a/components/tikv_kv/src/perf_context.rs b/components/tikv_kv/src/perf_context.rs index 212f036624f..49c2402262d 100644 --- a/components/tikv_kv/src/perf_context.rs +++ b/components/tikv_kv/src/perf_context.rs @@ -141,6 +141,7 @@ impl Default for PerfStatisticsInstant { Self::new() } } + /// Store statistics we need. Data comes from RocksDB's `PerfContext`. /// This statistics store delta values between two instant statistics. #[derive(Default, Debug, Clone, Copy, Add, AddAssign, Sub, SubAssign)] diff --git a/src/storage/metrics.rs b/src/storage/metrics.rs index 2753a5b4275..071b4154a30 100644 --- a/src/storage/metrics.rs +++ b/src/storage/metrics.rs @@ -9,7 +9,7 @@ use std::cell::RefCell; use std::mem; use crate::server::metrics::{GcKeysCF as ServerGcKeysCF, GcKeysDetail as ServerGcKeysDetail}; -use crate::storage::kv::{FlowStatsReporter, Statistics}; +use crate::storage::kv::{FlowStatsReporter, PerfStatisticsDelta, Statistics}; use collections::HashMap; use kvproto::kvrpcpb::KeyRange; use kvproto::metapb; @@ -20,6 +20,7 @@ use raftstore::store::ReadStats; struct StorageLocalMetrics { local_scan_details: HashMap, local_read_stats: ReadStats, + local_perf_stats: HashMap, } thread_local! { @@ -27,10 +28,20 @@ thread_local! { StorageLocalMetrics { local_scan_details: HashMap::default(), local_read_stats:ReadStats::default(), + local_perf_stats: HashMap::default(), } ); } +macro_rules! tls_flush_perf_stats { + ($tag:ident, $local_stats:ident, $stat:ident) => { + STORAGE_ROCKSDB_PERF_COUNTER_STATIC + .get($tag) + .$stat + .inc_by($local_stats.0.$stat as u64); + }; +} + pub fn tls_flush(reporter: &R) { TLS_STORAGE_METRICS.with(|m| { let mut m = m.borrow_mut(); @@ -53,6 +64,57 @@ pub fn tls_flush(reporter: &R) { mem::swap(&mut read_stats, &mut m.local_read_stats); reporter.report_read_stats(read_stats); } + + for (req_tag, perf_stats) in m.local_perf_stats.drain() { + tls_flush_perf_stats!(req_tag, perf_stats, user_key_comparison_count); + tls_flush_perf_stats!(req_tag, perf_stats, block_cache_hit_count); + tls_flush_perf_stats!(req_tag, perf_stats, block_read_count); + tls_flush_perf_stats!(req_tag, perf_stats, block_read_byte); + tls_flush_perf_stats!(req_tag, perf_stats, block_read_time); + tls_flush_perf_stats!(req_tag, perf_stats, block_cache_index_hit_count); + tls_flush_perf_stats!(req_tag, perf_stats, index_block_read_count); + tls_flush_perf_stats!(req_tag, perf_stats, block_cache_filter_hit_count); + tls_flush_perf_stats!(req_tag, perf_stats, filter_block_read_count); + tls_flush_perf_stats!(req_tag, perf_stats, block_checksum_time); + tls_flush_perf_stats!(req_tag, perf_stats, block_decompress_time); + tls_flush_perf_stats!(req_tag, perf_stats, get_read_bytes); + tls_flush_perf_stats!(req_tag, perf_stats, iter_read_bytes); + tls_flush_perf_stats!(req_tag, perf_stats, internal_key_skipped_count); + tls_flush_perf_stats!(req_tag, perf_stats, internal_delete_skipped_count); + tls_flush_perf_stats!(req_tag, perf_stats, internal_recent_skipped_count); + tls_flush_perf_stats!(req_tag, perf_stats, get_snapshot_time); + tls_flush_perf_stats!(req_tag, perf_stats, get_from_memtable_time); + tls_flush_perf_stats!(req_tag, perf_stats, get_from_memtable_count); + tls_flush_perf_stats!(req_tag, perf_stats, get_post_process_time); + tls_flush_perf_stats!(req_tag, perf_stats, get_from_output_files_time); + tls_flush_perf_stats!(req_tag, perf_stats, seek_on_memtable_time); + tls_flush_perf_stats!(req_tag, perf_stats, seek_on_memtable_count); + tls_flush_perf_stats!(req_tag, perf_stats, next_on_memtable_count); + tls_flush_perf_stats!(req_tag, perf_stats, prev_on_memtable_count); + tls_flush_perf_stats!(req_tag, perf_stats, seek_child_seek_time); + tls_flush_perf_stats!(req_tag, perf_stats, seek_child_seek_count); + tls_flush_perf_stats!(req_tag, perf_stats, seek_min_heap_time); + tls_flush_perf_stats!(req_tag, perf_stats, seek_max_heap_time); + tls_flush_perf_stats!(req_tag, perf_stats, seek_internal_seek_time); + tls_flush_perf_stats!(req_tag, perf_stats, db_mutex_lock_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, db_condition_wait_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, read_index_block_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, read_filter_block_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, new_table_block_iter_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, new_table_iterator_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, block_seek_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, find_table_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, bloom_memtable_hit_count); + tls_flush_perf_stats!(req_tag, perf_stats, bloom_memtable_miss_count); + tls_flush_perf_stats!(req_tag, perf_stats, bloom_sst_hit_count); + tls_flush_perf_stats!(req_tag, perf_stats, bloom_sst_miss_count); + tls_flush_perf_stats!(req_tag, perf_stats, get_cpu_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, iter_next_cpu_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, iter_prev_cpu_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, iter_seek_cpu_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, encrypt_data_nanos); + tls_flush_perf_stats!(req_tag, perf_stats, decrypt_data_nanos); + } }); } @@ -106,6 +168,15 @@ pub fn tls_collect_query_batch( }); } +pub fn tls_collect_perf_stats(cmd: CommandKind, perf_stats: &PerfStatisticsDelta) { + TLS_STORAGE_METRICS.with(|m| { + *(m.borrow_mut() + .local_perf_stats + .entry(cmd) + .or_insert_with(Default::default)) += *perf_stats; + }) +} + make_auto_flush_static_metric! { pub label_enum CommandKind { get, @@ -197,6 +268,57 @@ make_auto_flush_static_metric! { unlocked, } + pub label_enum PerfMetric { + user_key_comparison_count, + block_cache_hit_count, + block_read_count, + block_read_byte, + block_read_time, + block_cache_index_hit_count, + index_block_read_count, + block_cache_filter_hit_count, + filter_block_read_count, + block_checksum_time, + block_decompress_time, + get_read_bytes, + iter_read_bytes, + internal_key_skipped_count, + internal_delete_skipped_count, + internal_recent_skipped_count, + get_snapshot_time, + get_from_memtable_time, + get_from_memtable_count, + get_post_process_time, + get_from_output_files_time, + seek_on_memtable_time, + seek_on_memtable_count, + next_on_memtable_count, + prev_on_memtable_count, + seek_child_seek_time, + seek_child_seek_count, + seek_min_heap_time, + seek_max_heap_time, + seek_internal_seek_time, + db_mutex_lock_nanos, + db_condition_wait_nanos, + read_index_block_nanos, + read_filter_block_nanos, + new_table_block_iter_nanos, + new_table_iterator_nanos, + block_seek_nanos, + find_table_nanos, + bloom_memtable_hit_count, + bloom_memtable_miss_count, + bloom_sst_hit_count, + bloom_sst_miss_count, + get_cpu_nanos, + iter_next_cpu_nanos, + iter_prev_cpu_nanos, + iter_seek_cpu_nanos, + encrypt_data_nanos, + decrypt_data_nanos, + } + pub struct CommandScanDetails: LocalIntCounter { "req" => CommandKind, "cf" => GcKeysCF, @@ -244,6 +366,11 @@ make_auto_flush_static_metric! { "type" => CommandKind, "result" => CheckMemLockResult, } + + pub struct PerfCounter: LocalIntCounter { + "req" => CommandKind, + "metric" => PerfMetric, + } } impl From for GcKeysCF { @@ -492,4 +619,14 @@ lazy_static! { .unwrap(); pub static ref CHECK_MEM_LOCK_DURATION_HISTOGRAM_VEC: CheckMemLockHistogramVec = auto_flush_from!(CHECK_MEM_LOCK_DURATION_HISTOGRAM, CheckMemLockHistogramVec); + + pub static ref STORAGE_ROCKSDB_PERF_COUNTER: IntCounterVec = register_int_counter_vec!( + "tikv_storage_rocksdb_perf", + "Total number of RocksDB internal operations from PerfContext", + &["req", "metric"] + ) + .unwrap(); + + pub static ref STORAGE_ROCKSDB_PERF_COUNTER_STATIC: PerfCounter = + auto_flush_from!(STORAGE_ROCKSDB_PERF_COUNTER, PerfCounter); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 39842cafef7..641c548b675 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -384,8 +384,10 @@ impl Storage { r }); + let delta = perf_statistics.delta(); metrics::tls_collect_scan_details(CMD, &statistics); metrics::tls_collect_read_flow(ctx.get_region_id(), &statistics); + metrics::tls_collect_perf_stats(CMD, &delta); SCHED_PROCESSING_READ_HISTOGRAM_STATIC .get(CMD) .observe(begin_instant.saturating_elapsed_secs()); @@ -393,7 +395,7 @@ impl Storage { .get(CMD) .observe(command_duration.saturating_elapsed_secs()); - Ok((result?, statistics, perf_statistics.delta())) + Ok((result?, statistics, delta)) } } .in_resource_metering_tag(resource_tag), @@ -518,12 +520,14 @@ impl Storage { let perf_statistics = PerfStatisticsInstant::new(); let v = point_getter.get(&key); let stat = point_getter.take_statistics(); + let delta = perf_statistics.delta(); metrics::tls_collect_read_flow(region_id, &stat); + metrics::tls_collect_perf_stats(CMD, &delta); statistics.add(&stat); consumer.consume( id, v.map_err(|e| Error::from(txn::Error::from(e))) - .map(|v| (v, stat, perf_statistics.delta())), + .map(|v| (v, stat, delta)), begin_instant, ); } @@ -639,8 +643,10 @@ impl Storage { kv_pairs }); + let delta = perf_statistics.delta(); metrics::tls_collect_scan_details(CMD, &statistics); metrics::tls_collect_read_flow(ctx.get_region_id(), &statistics); + metrics::tls_collect_perf_stats(CMD, &delta); SCHED_PROCESSING_READ_HISTOGRAM_STATIC .get(CMD) .observe(begin_instant.saturating_elapsed_secs()); @@ -648,7 +654,7 @@ impl Storage { .get(CMD) .observe(command_duration.saturating_elapsed_secs()); - Ok((result?, statistics, perf_statistics.delta())) + Ok((result?, statistics, delta)) } } .in_resource_metering_tag(resource_tag), @@ -755,6 +761,7 @@ impl Storage { Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await?; { let begin_instant = Instant::now_coarse(); + let perf_statistics = PerfStatisticsInstant::new(); let snap_store = SnapshotStore::new( snapshot, @@ -776,8 +783,10 @@ impl Storage { let res = scanner.scan(limit, sample_step); let statistics = scanner.take_statistics(); + let delta = perf_statistics.delta(); metrics::tls_collect_scan_details(CMD, &statistics); metrics::tls_collect_read_flow(ctx.get_region_id(), &statistics); + metrics::tls_collect_perf_stats(CMD, &delta); SCHED_PROCESSING_READ_HISTOGRAM_STATIC .get(CMD) .observe(begin_instant.saturating_elapsed_secs()); @@ -886,6 +895,7 @@ impl Storage { { let begin_instant = Instant::now_coarse(); let mut statistics = Statistics::default(); + let perf_statistics = PerfStatisticsInstant::new(); let mut reader = MvccReader::new( snapshot, Some(ScanMode::Forward), @@ -908,8 +918,10 @@ impl Storage { locks.push(lock_info); } + let delta = perf_statistics.delta(); metrics::tls_collect_scan_details(CMD, &statistics); metrics::tls_collect_read_flow(ctx.get_region_id(), &statistics); + metrics::tls_collect_perf_stats(CMD, &delta); SCHED_PROCESSING_READ_HISTOGRAM_STATIC .get(CMD) .observe(begin_instant.saturating_elapsed_secs());