Skip to content

Commit

Permalink
statistics: fix resource report agent not supporting TLS (tikv#10315)
Browse files Browse the repository at this point in the history
* remove agent tls & seconds -> duration

Signed-off-by: Zhenchi <[email protected]>

* format

Signed-off-by: Zhenchi <[email protected]>

* add limits

Signed-off-by: Zhenchi <[email protected]>

* mutex -> atomic

Signed-off-by: Zhenchi <[email protected]>

* remain evicted records to others

Signed-off-by: Zhenchi <[email protected]>

* fix unstable test

Signed-off-by: Zhenchi <[email protected]>

* fix unstable test

Signed-off-by: Zhenchi <[email protected]>

* drain_filter instead of retain

Signed-off-by: Zhenchi <[email protected]>

* polish

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jun 10, 2021
1 parent ba04dff commit a0a0ba0
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 92 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions components/resource_metering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,10 @@ default = ["protobuf-codec"]
protobuf-codec = [
"kvproto/protobuf-codec",
"grpcio/protobuf-codec",
"security/protobuf-codec",
]
prost-codec = [
"kvproto/prost-codec",
"grpcio/prost-codec",
"security/prost-codec",
]

[dependencies]
Expand All @@ -27,7 +25,6 @@ prometheus-static-metric = "0.4"
kvproto = { rev = "7a046020d1c091638e1e8aba623c8c1e8962219d", git = "https://github.com/pingcap/kvproto.git", default-features = false }
tikv_util = { path = "../tikv_util" }
grpcio = { version = "0.9", default-features = false, features = ["openssl-vendored"] }
security = { path = "../security", default-features = false }
configuration = { path = "../configuration" }
serde = "1.0"
serde_derive = "1.0"
Expand Down
54 changes: 31 additions & 23 deletions components/resource_metering/src/cpu/recorder/linux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{ResourceMeteringTag, TagInfos};
use std::cell::Cell;
use std::fs::read_dir;
use std::marker::PhantomData;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64};
use std::sync::Arc;
use std::thread;
Expand All @@ -33,13 +33,13 @@ pub fn init_recorder() -> RecorderHandle {

let pause = Arc::new(AtomicBool::new(config.enabled));
let pause0 = pause.clone();
let precision_seconds = Arc::new(AtomicU64::new(config.precision_seconds));
let precision_seconds0 = precision_seconds.clone();
let precision_ms = Arc::new(AtomicU64::new(config.precision.0.as_millis() as _));
let precision_ms0 = precision_ms.clone();

let join_handle = std::thread::Builder::new()
.name("req-cpu-recorder".to_owned())
.name("cpu-recorder".to_owned())
.spawn(move || {
let mut recorder = CpuRecorder::new(pause, precision_seconds);
let mut recorder = CpuRecorder::new(pause, precision_ms);

loop {
recorder.handle_pause();
Expand All @@ -55,7 +55,7 @@ pub fn init_recorder() -> RecorderHandle {
}
})
.expect("Failed to create recorder thread");
RecorderHandle::new(join_handle, pause0, precision_seconds0)
RecorderHandle::new(join_handle, pause0, precision_ms0)
};
}
HANDLE.clone()
Expand Down Expand Up @@ -146,7 +146,7 @@ struct ThreadRegistrationMsg {

struct CpuRecorder {
pause: Arc<AtomicBool>,
precision_seconds: Arc<AtomicU64>,
precision_ms: Arc<AtomicU64>,

thread_stats: HashMap<pid_t, ThreadStat>,
current_window_records: CpuRecords,
Expand All @@ -164,12 +164,12 @@ struct ThreadStat {
}

impl CpuRecorder {
pub fn new(pause: Arc<AtomicBool>, precision_seconds: Arc<AtomicU64>) -> Self {
pub fn new(pause: Arc<AtomicBool>, precision_ms: Arc<AtomicU64>) -> Self {
let now = Instant::now();

Self {
pause,
precision_seconds,
precision_ms,

last_collect_instant: now,
last_gc_instant: now,
Expand Down Expand Up @@ -314,12 +314,12 @@ impl CpuRecorder {
}

pub fn may_advance_window(&mut self) -> bool {
let duration_secs = self.last_collect_instant.elapsed().as_secs();
let need_advance = duration_secs >= self.precision_seconds.load(SeqCst);
let duration = self.last_collect_instant.elapsed();
let need_advance = duration.as_millis() >= self.precision_ms.load(Relaxed) as _;

if need_advance {
let mut records = std::mem::take(&mut self.current_window_records);
records.duration_secs = duration_secs;
records.duration = duration;

if !records.records.is_empty() {
let records = Arc::new(records);
Expand Down Expand Up @@ -379,8 +379,6 @@ mod tests {
use super::*;
use crate::cpu::collector::register_collector;

use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Mutex;
use std::thread::JoinHandle;

Expand Down Expand Up @@ -445,6 +443,7 @@ mod tests {

let handle = std::thread::spawn(|| {
let mut guard = None;
let thread_id = unsafe { libc::syscall(libc::SYS_gettid) as libc::pid_t };

for op in ops {
match op {
Expand All @@ -462,15 +461,22 @@ mod tests {
guard.take();
}
Operation::CpuHeavy(ms) => {
let done = Arc::new(AtomicBool::new(false));
let done1 = done.clone();
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(ms));
done.store(true, SeqCst);
});

while !done1.load(SeqCst) {
let begin_stat = procinfo::pid::stat_task(*PID, thread_id).unwrap();
let begin_ticks =
(begin_stat.utime as u64).wrapping_add(begin_stat.stime as u64);

loop {
Self::heavy_job();
let later_stat = procinfo::pid::stat_task(*PID, thread_id).unwrap();

let later_ticks =
(later_stat.utime as u64).wrapping_add(later_stat.stime as u64);
let delta_ms = later_ticks.wrapping_sub(begin_ticks) * 1_000
/ (*CLK_TCK as u64);

if delta_ms >= ms {
break;
}
}
}
Operation::Sleep(ms) => {
Expand Down Expand Up @@ -638,6 +644,8 @@ mod tests {

collector.check(merge(vec![expected0, expected1, expected2]));
}

handle.pause();
}

impl DummyCollector {
Expand All @@ -661,7 +669,7 @@ mod tests {
let r = value.saturating_add(MAX_DRIFT);
if !(l <= expected_value && expected_value <= r) {
panic!(
"tag {} cpu time expected {} got {}",
"tag {} cpu time expected {} but got {}",
k, expected_value, value
);
}
Expand Down
16 changes: 8 additions & 8 deletions components/resource_metering/src/cpu/recorder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use collections::HashMap;

Expand All @@ -28,20 +28,20 @@ pub struct RecorderHandle {
pub struct RecorderHandleInner {
join_handle: JoinHandle<()>,
pause: Arc<AtomicBool>,
precision_seconds: Arc<AtomicU64>,
precision_ms: Arc<AtomicU64>,
}

impl RecorderHandle {
pub fn new(
join_handle: JoinHandle<()>,
pause: Arc<AtomicBool>,
precision_seconds: Arc<AtomicU64>,
precision_ms: Arc<AtomicU64>,
) -> Self {
Self {
inner: Some(Arc::new(RecorderHandleInner {
join_handle,
pause,
precision_seconds,
precision_ms,
})),
}
}
Expand All @@ -59,17 +59,17 @@ impl RecorderHandle {
}
}

pub fn set_precision_seconds(&self, value: u64) {
pub fn set_precision(&self, value: Duration) {
if let Some(inner) = self.inner.as_ref() {
inner.precision_seconds.store(value, SeqCst);
inner.precision_ms.store(value.as_millis() as _, SeqCst);
}
}
}

#[derive(Debug)]
pub struct CpuRecords {
pub begin_unix_time_secs: u64,
pub duration_secs: u64,
pub duration: Duration,

// tag -> ms
pub records: HashMap<ResourceMeteringTag, u64>,
Expand All @@ -82,7 +82,7 @@ impl Default for CpuRecords {
.expect("Clock may have gone backwards");
Self {
begin_unix_time_secs: now_unix_time.as_secs(),
duration_secs: 0,
duration: Duration::default(),
records: HashMap::default(),
}
}
Expand Down
39 changes: 19 additions & 20 deletions components/resource_metering/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

#![feature(shrink_to)]
#![feature(hash_drain_filter)]

#[macro_use]
extern crate tikv_util;
Expand All @@ -12,6 +13,7 @@ use std::sync::Arc;

use configuration::{ConfigChange, Configuration};
use serde_derive::{Deserialize, Serialize};
use tikv_util::config::ReadableDuration;
use tikv_util::worker::Scheduler;

pub mod cpu;
Expand Down Expand Up @@ -54,10 +56,10 @@ impl TagInfos {
}
}

const MIN_PRECISION_SECONDS: u64 = 1;
const MAX_PRECISION_SECONDS: u64 = 60 * 60;
const MIN_PRECISION: ReadableDuration = ReadableDuration::secs(1);
const MAX_PRECISION: ReadableDuration = ReadableDuration::hours(1);
const MAX_MAX_RESOURCE_GROUPS: usize = 5_000;
const MIN_REPORT_AGENT_INTERVAL_SECONDS: u64 = 15;
const MIN_REPORT_AGENT_INTERVAL: ReadableDuration = ReadableDuration::secs(5);

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Configuration)]
#[serde(default)]
Expand All @@ -66,20 +68,20 @@ pub struct Config {
pub enabled: bool,

pub agent_address: String,
pub report_agent_interval_seconds: u64,
pub report_agent_interval: ReadableDuration,
pub max_resource_groups: usize,

pub precision_seconds: u64,
pub precision: ReadableDuration,
}

impl Default for Config {
fn default() -> Config {
Config {
enabled: false,
agent_address: "".to_string(),
precision_seconds: 1,
report_agent_interval_seconds: 60,
max_resource_groups: 200,
report_agent_interval: ReadableDuration::minutes(1),
max_resource_groups: 2000,
precision: ReadableDuration::secs(1),
}
}
}
Expand All @@ -90,12 +92,10 @@ impl Config {
tikv_util::config::check_addr(&self.agent_address)?;
}

if self.precision_seconds < MIN_PRECISION_SECONDS
|| self.precision_seconds > MAX_PRECISION_SECONDS
{
if self.precision < MIN_PRECISION || self.precision > MAX_PRECISION {
return Err(format!(
"precision seconds must between {} and {}",
MIN_PRECISION_SECONDS, MAX_PRECISION_SECONDS
"precision must between {} and {}",
MIN_PRECISION, MAX_PRECISION
)
.into());
}
Expand All @@ -108,13 +108,13 @@ impl Config {
.into());
}

if self.report_agent_interval_seconds < MIN_REPORT_AGENT_INTERVAL_SECONDS
|| self.report_agent_interval_seconds > self.precision_seconds * 500
if self.report_agent_interval < MIN_REPORT_AGENT_INTERVAL
|| self.report_agent_interval > self.precision * 500
{
return Err(format!(
"report interval seconds must between {} and {}",
MIN_REPORT_AGENT_INTERVAL_SECONDS,
self.precision_seconds * 500
MIN_REPORT_AGENT_INTERVAL,
self.precision * 500
)
.into());
}
Expand Down Expand Up @@ -164,9 +164,8 @@ impl configuration::ConfigManager for ConfigManager {
}
}

if self.current_config.precision_seconds != new_config.precision_seconds {
self.recorder
.set_precision_seconds(new_config.precision_seconds);
if self.current_config.precision != new_config.precision {
self.recorder.set_precision(new_config.precision.0);
}

self.scheduler
Expand Down
Loading

0 comments on commit a0a0ba0

Please sign in to comment.