Skip to content

Commit

Permalink
resource_metering naming issue on agent_address (tikv#10975)
Browse files Browse the repository at this point in the history
into
`resource_metering::Config::receiver_address`

refractor dependent `agent` into `receiver` excluding `kvproto`.

Signed-off-by: lemonhx <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
LemonHX and ti-chi-bot authored Oct 8, 2021
1 parent 05767ac commit 677ff9a
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 113 deletions.
22 changes: 11 additions & 11 deletions components/resource_metering/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,16 @@ impl TagInfos {
const MIN_PRECISION: ReadableDuration = ReadableDuration::secs(1);
const MAX_PRECISION: ReadableDuration = ReadableDuration::hours(1);
const MAX_MAX_RESOURCE_GROUPS: usize = 5_000;
const MIN_REPORT_AGENT_INTERVAL: ReadableDuration = ReadableDuration::secs(5);
const MIN_REPORT_RECEIVER_INTERVAL: ReadableDuration = ReadableDuration::secs(5);

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, OnlineConfig)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
pub enabled: bool,

pub agent_address: String,
pub report_agent_interval: ReadableDuration,
pub receiver_address: String,
pub report_receiver_interval: ReadableDuration,
pub max_resource_groups: usize,

pub precision: ReadableDuration,
Expand All @@ -78,8 +78,8 @@ impl Default for Config {
fn default() -> Config {
Config {
enabled: true,
agent_address: "".to_string(),
report_agent_interval: ReadableDuration::minutes(1),
receiver_address: "".to_string(),
report_receiver_interval: ReadableDuration::minutes(1),
max_resource_groups: 2000,
precision: ReadableDuration::secs(1),
}
Expand All @@ -88,8 +88,8 @@ impl Default for Config {

impl Config {
pub fn validate(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
if !self.agent_address.is_empty() {
tikv_util::config::check_addr(&self.agent_address)?;
if !self.receiver_address.is_empty() {
tikv_util::config::check_addr(&self.receiver_address)?;
}

if self.precision < MIN_PRECISION || self.precision > MAX_PRECISION {
Expand All @@ -108,12 +108,12 @@ impl Config {
.into());
}

if self.report_agent_interval < MIN_REPORT_AGENT_INTERVAL
|| self.report_agent_interval > self.precision * 500
if self.report_receiver_interval < MIN_REPORT_RECEIVER_INTERVAL
|| self.report_receiver_interval > self.precision * 500
{
return Err(format!(
"report interval seconds must between {} and {}",
MIN_REPORT_AGENT_INTERVAL,
MIN_REPORT_RECEIVER_INTERVAL,
self.precision * 500
)
.into());
Expand All @@ -123,7 +123,7 @@ impl Config {
}

fn should_report(&self) -> bool {
self.enabled && !self.agent_address.is_empty() && self.max_resource_groups != 0
self.enabled && !self.receiver_address.is_empty() && self.max_resource_groups != 0
}
}

Expand Down
10 changes: 5 additions & 5 deletions components/resource_metering/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl ResourceMeteringReporter {
let cb = ChannelBuilder::new(self.env.clone())
.keepalive_time(Duration::from_secs(10))
.keepalive_timeout(Duration::from_secs(3));
cb.connect(&self.config.agent_address)
cb.connect(&self.config.receiver_address)
};
self.client = Some(ResourceUsageAgentClient::new(channel));
if self.cpu_records_collector.is_none() {
Expand All @@ -112,12 +112,12 @@ impl Runnable for ResourceMeteringReporter {
match task {
Task::ConfigChange(new_config) => {
let old_config_enabled = self.config.enabled;
let old_config_agent_address = self.config.agent_address.clone();
let old_config_receiver_address = self.config.receiver_address.clone();
self.config = new_config;
if !self.config.should_report() {
self.client.take();
self.cpu_records_collector.take();
} else if self.config.agent_address != old_config_agent_address
} else if self.config.receiver_address != old_config_receiver_address
|| self.config.enabled != old_config_enabled
{
self.init_client();
Expand Down Expand Up @@ -235,13 +235,13 @@ impl RunnableWithTimer for ResourceMeteringReporter {
});
}
Err(err) => {
warn!("failed to connect resource usage agent"; "error" => ?err);
warn!("failed to connect resource usage receiver"; "error" => ?err);
}
}
}
}

fn get_interval(&self) -> Duration {
self.config.report_agent_interval.0
self.config.report_receiver_interval.0
}
}
10 changes: 5 additions & 5 deletions tests/integrations/resource_metering/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

pub mod test_suite;

pub mod test_agent;
pub mod test_dynamic_config;
pub mod test_receiver;

#[cfg(target_os = "linux")]
mod linux {
Expand All @@ -19,9 +19,9 @@ mod linux {
test_dynamic_config::case_max_resource_groups(&mut ts);
test_dynamic_config::case_precision(&mut ts);

// Agent
test_agent::case_alter_agent_addr(&mut ts);
test_agent::case_agent_blocking(&mut ts);
test_agent::case_agent_shutdown(&mut ts);
// Receiver
test_receiver::case_alter_receiver_addr(&mut ts);
test_receiver::case_receiver_blocking(&mut ts);
test_receiver::case_receiver_shutdown(&mut ts);
}
}
56 changes: 28 additions & 28 deletions tests/integrations/resource_metering/test_dynamic_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const ONE_SEC: Duration = Duration::from_secs(1);
pub fn case_enable(test_suite: &mut TestSuite) {
test_suite.reset();
let port = alloc_port();
test_suite.start_agent_at(port);
test_suite.start_receiver_at(port);

// Workload
// [req-1, req-2]
Expand All @@ -23,7 +23,7 @@ pub fn case_enable(test_suite: &mut TestSuite) {
// | Address | Enabled |
// | x | o |
test_suite.cfg_enabled(true);
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
assert!(test_suite.fetch_reported_cpu_time().is_empty());

// Workload
Expand All @@ -32,8 +32,8 @@ pub fn case_enable(test_suite: &mut TestSuite) {

// | Address | Enabled |
// | o | o |
test_suite.cfg_agent_address(format!("127.0.0.1:{}", port));
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port));
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
assert!(test_suite.fetch_reported_cpu_time().is_empty());

// Workload
Expand All @@ -42,42 +42,42 @@ pub fn case_enable(test_suite: &mut TestSuite) {

// | Address | Enabled |
// | o | o |
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
assert_eq!(res.len(), 2);
assert!(res.contains_key("req-1"));
assert!(res.contains_key("req-2"));

// | Address | Enabled |
// | x | o |
test_suite.cfg_agent_address("");
test_suite.flush_agent();
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
test_suite.cfg_receiver_address("");
test_suite.flush_receiver();
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
assert!(test_suite.fetch_reported_cpu_time().is_empty());

// | Address | Enabled |
// | o | x |
test_suite.cfg_enabled(false);
test_suite.cfg_agent_address(format!("127.0.0.1:{}", port));
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port));
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
assert!(test_suite.fetch_reported_cpu_time().is_empty());
}

pub fn case_report_interval(test_suite: &mut TestSuite) {
test_suite.reset();
let port = alloc_port();
test_suite.start_agent_at(port);
test_suite.start_receiver_at(port);
test_suite.cfg_enabled(true);
test_suite.cfg_agent_address(format!("127.0.0.1:{}", port));
test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port));

// Workload
// [req-1, req-2]
test_suite.setup_workload(vec!["req-1", "req-2"]);

// | Report Interval |
// | 15s |
test_suite.cfg_report_agent_interval("15s");
test_suite.flush_agent();
test_suite.cfg_report_receiver_interval("15s");
test_suite.flush_receiver();

sleep(Duration::from_secs(5));
assert!(test_suite.fetch_reported_cpu_time().is_empty());
Expand All @@ -91,11 +91,11 @@ pub fn case_report_interval(test_suite: &mut TestSuite) {

// | Report Interval |
// | 5s |
test_suite.cfg_report_agent_interval("5s");
test_suite.cfg_report_receiver_interval("5s");
sleep(Duration::from_secs(10));
test_suite.flush_agent();
test_suite.flush_receiver();

sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
assert_eq!(res.len(), 2);
assert!(res.contains_key("req-1"));
Expand All @@ -105,9 +105,9 @@ pub fn case_report_interval(test_suite: &mut TestSuite) {
pub fn case_max_resource_groups(test_suite: &mut TestSuite) {
test_suite.reset();
let port = alloc_port();
test_suite.start_agent_at(port);
test_suite.start_receiver_at(port);
test_suite.cfg_enabled(true);
test_suite.cfg_agent_address(format!("127.0.0.1:{}", port));
test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port));

// Workload
// [req-{1..3} * 10, req-{4..5} * 1]
Expand All @@ -122,7 +122,7 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) {

// | Max Resource Groups |
// | 5000 |
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
assert_eq!(res.len(), 5);
assert!(res.contains_key("req-1"));
Expand All @@ -134,8 +134,8 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) {
// | Max Resource Groups |
// | 3 |
test_suite.cfg_max_resource_groups(3);
test_suite.flush_agent();
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
test_suite.flush_receiver();
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
assert_eq!(res.len(), 4);
assert!(res.contains_key("req-1"));
Expand All @@ -147,18 +147,18 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) {
pub fn case_precision(test_suite: &mut TestSuite) {
test_suite.reset();
let port = alloc_port();
test_suite.start_agent_at(port);
test_suite.cfg_report_agent_interval("10s");
test_suite.start_receiver_at(port);
test_suite.cfg_report_receiver_interval("10s");
test_suite.cfg_enabled(true);
test_suite.cfg_agent_address(format!("127.0.0.1:{}", port));
test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port));

// Workload
// [req-1]
test_suite.setup_workload(vec!["req-1"]);

// | Precision |
// | 1s |
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
let (secs, _) = res.get("req-1").unwrap();
for (l, r) in secs.iter().zip({
Expand All @@ -173,8 +173,8 @@ pub fn case_precision(test_suite: &mut TestSuite) {
// | Precision |
// | 3s |
test_suite.cfg_precision("3s");
test_suite.flush_agent();
sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC);
test_suite.flush_receiver();
sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC);
let res = test_suite.fetch_reported_cpu_time();
let (secs, _) = res.get("req-1").unwrap();
for (l, r) in secs.iter().zip({
Expand Down
Loading

0 comments on commit 677ff9a

Please sign in to comment.