Skip to content

Commit

Permalink
Add RocksDB perf metrics for CDC and storage (tikv#10898)
Browse files Browse the repository at this point in the history
* init

Signed-off-by: qupeng <[email protected]>

* add perf context for cdc

Signed-off-by: qupeng <[email protected]>
  • Loading branch information
hicqu authored Sep 13, 2021
1 parent 32b314a commit b2117e2
Show file tree
Hide file tree
Showing 7 changed files with 311 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions components/cdc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ fail = "0.4"
lazy_static = "1.3"
log_wrappers = { path = "../log_wrappers" }
prometheus = { version = "0.12", default-features = false, features = ["nightly"] }
prometheus-static-metric = "0.5"
protobuf = "2.8"
prost = "0.7"
futures-timer = "3.0"
Expand Down
24 changes: 19 additions & 5 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use raftstore::store::msg::{Callback, ReadResponse, SignificantMsg};
use resolved_ts::Resolver;
use security::SecurityManager;
use tikv::config::CdcConfig;
use tikv::storage::kv::Snapshot;
use tikv::storage::kv::{PerfStatisticsInstant, Snapshot};
use tikv::storage::mvcc::{DeltaScanner, ScannerBuilder};
use tikv::storage::txn::TxnEntry;
use tikv::storage::txn::TxnEntryScanner;
Expand Down Expand Up @@ -1227,14 +1227,15 @@ impl Initializer {
Ok(())
}

async fn scan_batch<S: Snapshot>(
fn do_scan<S: Snapshot>(
&self,
scanner: &mut DeltaScanner<S>,
resolver: Option<&mut Resolver>,
) -> Result<Vec<Option<TxnEntry>>> {
let mut entries = Vec::with_capacity(self.max_scan_batch_size);
entries: &mut Vec<Option<TxnEntry>>,
) -> Result<usize> {
let mut total_bytes = 0;
let mut total_size = 0;

let perf_instant = PerfStatisticsInstant::new();
while total_bytes <= self.max_scan_batch_bytes && total_size < self.max_scan_batch_size {
total_size += 1;
match scanner.next_entry()? {
Expand All @@ -1248,6 +1249,19 @@ impl Initializer {
}
}
}
TLS_CDC_PERF_STATS.with(|x| *x.borrow_mut() += perf_instant.delta());
Ok(total_bytes)
}

async fn scan_batch<S: Snapshot>(
&self,
scanner: &mut DeltaScanner<S>,
resolver: Option<&mut Resolver>,
) -> Result<Vec<Option<TxnEntry>>> {
let mut entries = Vec::with_capacity(self.max_scan_batch_size);
let total_bytes = self.do_scan(scanner, &mut entries)?;
tls_flush_perf_stats();

if total_bytes > 0 {
self.speed_limiter.consume(total_bytes).await;
CDC_SCAN_BYTES.inc_by(total_bytes as _);
Expand Down
136 changes: 136 additions & 0 deletions components/cdc/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,66 @@

use lazy_static::*;
use prometheus::*;
use prometheus_static_metric::*;
use std::cell::RefCell;
use tikv::storage::kv::PerfStatisticsDelta;

make_auto_flush_static_metric! {
pub label_enum PerfMetric {
user_key_comparison_count,
block_cache_hit_count,
block_read_count,
block_read_byte,
block_read_time,
block_cache_index_hit_count,
index_block_read_count,
block_cache_filter_hit_count,
filter_block_read_count,
block_checksum_time,
block_decompress_time,
get_read_bytes,
iter_read_bytes,
internal_key_skipped_count,
internal_delete_skipped_count,
internal_recent_skipped_count,
get_snapshot_time,
get_from_memtable_time,
get_from_memtable_count,
get_post_process_time,
get_from_output_files_time,
seek_on_memtable_time,
seek_on_memtable_count,
next_on_memtable_count,
prev_on_memtable_count,
seek_child_seek_time,
seek_child_seek_count,
seek_min_heap_time,
seek_max_heap_time,
seek_internal_seek_time,
db_mutex_lock_nanos,
db_condition_wait_nanos,
read_index_block_nanos,
read_filter_block_nanos,
new_table_block_iter_nanos,
new_table_iterator_nanos,
block_seek_nanos,
find_table_nanos,
bloom_memtable_hit_count,
bloom_memtable_miss_count,
bloom_sst_hit_count,
bloom_sst_miss_count,
get_cpu_nanos,
iter_next_cpu_nanos,
iter_prev_cpu_nanos,
iter_seek_cpu_nanos,
encrypt_data_nanos,
decrypt_data_nanos,
}

pub struct PerfCounter: LocalIntCounter {
"metric" => PerfMetric,
}
}

lazy_static! {
pub static ref CDC_RESOLVED_TS_GAP_HISTOGRAM: Histogram = register_histogram!(
Expand Down Expand Up @@ -116,4 +176,80 @@ lazy_static! {
&["type"]
)
.unwrap();

pub static ref CDC_ROCKSDB_PERF_COUNTER: IntCounterVec = register_int_counter_vec!(
"tikv_cdc_rocksdb_perf",
"Total number of RocksDB internal operations from PerfContext",
&["metric"]
)
.unwrap();

pub static ref CDC_ROCKSDB_PERF_COUNTER_STATIC: PerfCounter =
auto_flush_from!(CDC_ROCKSDB_PERF_COUNTER, PerfCounter);
}

thread_local! {
pub static TLS_CDC_PERF_STATS: RefCell<PerfStatisticsDelta> = RefCell::new(PerfStatisticsDelta::default());
}

macro_rules! tls_flush_perf_stat {
($local_stats:ident, $stat:ident) => {
CDC_ROCKSDB_PERF_COUNTER_STATIC
.$stat
.inc_by($local_stats.0.$stat as u64);
};
}

pub fn tls_flush_perf_stats() {
TLS_CDC_PERF_STATS.with(|x| {
let perf_stats = std::mem::take(&mut *x.borrow_mut());
tls_flush_perf_stat!(perf_stats, user_key_comparison_count);
tls_flush_perf_stat!(perf_stats, block_cache_hit_count);
tls_flush_perf_stat!(perf_stats, block_read_count);
tls_flush_perf_stat!(perf_stats, block_read_byte);
tls_flush_perf_stat!(perf_stats, block_read_time);
tls_flush_perf_stat!(perf_stats, block_cache_index_hit_count);
tls_flush_perf_stat!(perf_stats, index_block_read_count);
tls_flush_perf_stat!(perf_stats, block_cache_filter_hit_count);
tls_flush_perf_stat!(perf_stats, filter_block_read_count);
tls_flush_perf_stat!(perf_stats, block_checksum_time);
tls_flush_perf_stat!(perf_stats, block_decompress_time);
tls_flush_perf_stat!(perf_stats, get_read_bytes);
tls_flush_perf_stat!(perf_stats, iter_read_bytes);
tls_flush_perf_stat!(perf_stats, internal_key_skipped_count);
tls_flush_perf_stat!(perf_stats, internal_delete_skipped_count);
tls_flush_perf_stat!(perf_stats, internal_recent_skipped_count);
tls_flush_perf_stat!(perf_stats, get_snapshot_time);
tls_flush_perf_stat!(perf_stats, get_from_memtable_time);
tls_flush_perf_stat!(perf_stats, get_from_memtable_count);
tls_flush_perf_stat!(perf_stats, get_post_process_time);
tls_flush_perf_stat!(perf_stats, get_from_output_files_time);
tls_flush_perf_stat!(perf_stats, seek_on_memtable_time);
tls_flush_perf_stat!(perf_stats, seek_on_memtable_count);
tls_flush_perf_stat!(perf_stats, next_on_memtable_count);
tls_flush_perf_stat!(perf_stats, prev_on_memtable_count);
tls_flush_perf_stat!(perf_stats, seek_child_seek_time);
tls_flush_perf_stat!(perf_stats, seek_child_seek_count);
tls_flush_perf_stat!(perf_stats, seek_min_heap_time);
tls_flush_perf_stat!(perf_stats, seek_max_heap_time);
tls_flush_perf_stat!(perf_stats, seek_internal_seek_time);
tls_flush_perf_stat!(perf_stats, db_mutex_lock_nanos);
tls_flush_perf_stat!(perf_stats, db_condition_wait_nanos);
tls_flush_perf_stat!(perf_stats, read_index_block_nanos);
tls_flush_perf_stat!(perf_stats, read_filter_block_nanos);
tls_flush_perf_stat!(perf_stats, new_table_block_iter_nanos);
tls_flush_perf_stat!(perf_stats, new_table_iterator_nanos);
tls_flush_perf_stat!(perf_stats, block_seek_nanos);
tls_flush_perf_stat!(perf_stats, find_table_nanos);
tls_flush_perf_stat!(perf_stats, bloom_memtable_hit_count);
tls_flush_perf_stat!(perf_stats, bloom_memtable_miss_count);
tls_flush_perf_stat!(perf_stats, bloom_sst_hit_count);
tls_flush_perf_stat!(perf_stats, bloom_sst_miss_count);
tls_flush_perf_stat!(perf_stats, get_cpu_nanos);
tls_flush_perf_stat!(perf_stats, iter_next_cpu_nanos);
tls_flush_perf_stat!(perf_stats, iter_prev_cpu_nanos);
tls_flush_perf_stat!(perf_stats, iter_seek_cpu_nanos);
tls_flush_perf_stat!(perf_stats, encrypt_data_nanos);
tls_flush_perf_stat!(perf_stats, decrypt_data_nanos);
});
}
1 change: 1 addition & 0 deletions components/tikv_kv/src/perf_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl Default for PerfStatisticsInstant {
Self::new()
}
}

/// Store statistics we need. Data comes from RocksDB's `PerfContext`.
/// This statistics store delta values between two instant statistics.
#[derive(Default, Debug, Clone, Copy, Add, AddAssign, Sub, SubAssign)]
Expand Down
Loading

0 comments on commit b2117e2

Please sign in to comment.