diff --git a/components/resource_metering/src/lib.rs b/components/resource_metering/src/lib.rs index 8a52a06a1db..34be2844fd5 100644 --- a/components/resource_metering/src/lib.rs +++ b/components/resource_metering/src/lib.rs @@ -59,7 +59,7 @@ impl TagInfos { const MIN_PRECISION: ReadableDuration = ReadableDuration::secs(1); const MAX_PRECISION: ReadableDuration = ReadableDuration::hours(1); const MAX_MAX_RESOURCE_GROUPS: usize = 5_000; -const MIN_REPORT_AGENT_INTERVAL: ReadableDuration = ReadableDuration::secs(5); +const MIN_REPORT_RECEIVER_INTERVAL: ReadableDuration = ReadableDuration::secs(5); #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, OnlineConfig)] #[serde(default)] @@ -67,8 +67,8 @@ const MIN_REPORT_AGENT_INTERVAL: ReadableDuration = ReadableDuration::secs(5); pub struct Config { pub enabled: bool, - pub agent_address: String, - pub report_agent_interval: ReadableDuration, + pub receiver_address: String, + pub report_receiver_interval: ReadableDuration, pub max_resource_groups: usize, pub precision: ReadableDuration, @@ -78,8 +78,8 @@ impl Default for Config { fn default() -> Config { Config { enabled: true, - agent_address: "".to_string(), - report_agent_interval: ReadableDuration::minutes(1), + receiver_address: "".to_string(), + report_receiver_interval: ReadableDuration::minutes(1), max_resource_groups: 2000, precision: ReadableDuration::secs(1), } @@ -88,8 +88,8 @@ impl Default for Config { impl Config { pub fn validate(&self) -> std::result::Result<(), Box> { - if !self.agent_address.is_empty() { - tikv_util::config::check_addr(&self.agent_address)?; + if !self.receiver_address.is_empty() { + tikv_util::config::check_addr(&self.receiver_address)?; } if self.precision < MIN_PRECISION || self.precision > MAX_PRECISION { @@ -108,12 +108,12 @@ impl Config { .into()); } - if self.report_agent_interval < MIN_REPORT_AGENT_INTERVAL - || self.report_agent_interval > self.precision * 500 + if self.report_receiver_interval < MIN_REPORT_RECEIVER_INTERVAL + || self.report_receiver_interval > self.precision * 500 { return Err(format!( "report interval seconds must between {} and {}", - MIN_REPORT_AGENT_INTERVAL, + MIN_REPORT_RECEIVER_INTERVAL, self.precision * 500 ) .into()); @@ -123,7 +123,7 @@ impl Config { } fn should_report(&self) -> bool { - self.enabled && !self.agent_address.is_empty() && self.max_resource_groups != 0 + self.enabled && !self.receiver_address.is_empty() && self.max_resource_groups != 0 } } diff --git a/components/resource_metering/src/reporter.rs b/components/resource_metering/src/reporter.rs index 0d2b616dcc8..eaf3144002e 100644 --- a/components/resource_metering/src/reporter.rs +++ b/components/resource_metering/src/reporter.rs @@ -94,7 +94,7 @@ impl ResourceMeteringReporter { let cb = ChannelBuilder::new(self.env.clone()) .keepalive_time(Duration::from_secs(10)) .keepalive_timeout(Duration::from_secs(3)); - cb.connect(&self.config.agent_address) + cb.connect(&self.config.receiver_address) }; self.client = Some(ResourceUsageAgentClient::new(channel)); if self.cpu_records_collector.is_none() { @@ -112,12 +112,12 @@ impl Runnable for ResourceMeteringReporter { match task { Task::ConfigChange(new_config) => { let old_config_enabled = self.config.enabled; - let old_config_agent_address = self.config.agent_address.clone(); + let old_config_receiver_address = self.config.receiver_address.clone(); self.config = new_config; if !self.config.should_report() { self.client.take(); self.cpu_records_collector.take(); - } else if self.config.agent_address != old_config_agent_address + } else if self.config.receiver_address != old_config_receiver_address || self.config.enabled != old_config_enabled { self.init_client(); @@ -235,13 +235,13 @@ impl RunnableWithTimer for ResourceMeteringReporter { }); } Err(err) => { - warn!("failed to connect resource usage agent"; "error" => ?err); + warn!("failed to connect resource usage receiver"; "error" => ?err); } } } } fn get_interval(&self) -> Duration { - self.config.report_agent_interval.0 + self.config.report_receiver_interval.0 } } diff --git a/tests/integrations/resource_metering/mod.rs b/tests/integrations/resource_metering/mod.rs index cc0aa943aed..368ca7f67cf 100644 --- a/tests/integrations/resource_metering/mod.rs +++ b/tests/integrations/resource_metering/mod.rs @@ -2,8 +2,8 @@ pub mod test_suite; -pub mod test_agent; pub mod test_dynamic_config; +pub mod test_receiver; #[cfg(target_os = "linux")] mod linux { @@ -19,9 +19,9 @@ mod linux { test_dynamic_config::case_max_resource_groups(&mut ts); test_dynamic_config::case_precision(&mut ts); - // Agent - test_agent::case_alter_agent_addr(&mut ts); - test_agent::case_agent_blocking(&mut ts); - test_agent::case_agent_shutdown(&mut ts); + // Receiver + test_receiver::case_alter_receiver_addr(&mut ts); + test_receiver::case_receiver_blocking(&mut ts); + test_receiver::case_receiver_shutdown(&mut ts); } } diff --git a/tests/integrations/resource_metering/test_dynamic_config.rs b/tests/integrations/resource_metering/test_dynamic_config.rs index a91e2d6d516..97ab233a81b 100644 --- a/tests/integrations/resource_metering/test_dynamic_config.rs +++ b/tests/integrations/resource_metering/test_dynamic_config.rs @@ -14,7 +14,7 @@ const ONE_SEC: Duration = Duration::from_secs(1); pub fn case_enable(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); // Workload // [req-1, req-2] @@ -23,7 +23,7 @@ pub fn case_enable(test_suite: &mut TestSuite) { // | Address | Enabled | // | x | o | test_suite.cfg_enabled(true); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // Workload @@ -32,8 +32,8 @@ pub fn case_enable(test_suite: &mut TestSuite) { // | Address | Enabled | // | o | o | - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // Workload @@ -42,7 +42,7 @@ pub fn case_enable(test_suite: &mut TestSuite) { // | Address | Enabled | // | o | o | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 2); assert!(res.contains_key("req-1")); @@ -50,25 +50,25 @@ pub fn case_enable(test_suite: &mut TestSuite) { // | Address | Enabled | // | x | o | - test_suite.cfg_agent_address(""); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(""); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // | Address | Enabled | // | o | x | test_suite.cfg_enabled(false); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); } pub fn case_report_interval(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); test_suite.cfg_enabled(true); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); // Workload // [req-1, req-2] @@ -76,8 +76,8 @@ pub fn case_report_interval(test_suite: &mut TestSuite) { // | Report Interval | // | 15s | - test_suite.cfg_report_agent_interval("15s"); - test_suite.flush_agent(); + test_suite.cfg_report_receiver_interval("15s"); + test_suite.flush_receiver(); sleep(Duration::from_secs(5)); assert!(test_suite.fetch_reported_cpu_time().is_empty()); @@ -91,11 +91,11 @@ pub fn case_report_interval(test_suite: &mut TestSuite) { // | Report Interval | // | 5s | - test_suite.cfg_report_agent_interval("5s"); + test_suite.cfg_report_receiver_interval("5s"); sleep(Duration::from_secs(10)); - test_suite.flush_agent(); + test_suite.flush_receiver(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 2); assert!(res.contains_key("req-1")); @@ -105,9 +105,9 @@ pub fn case_report_interval(test_suite: &mut TestSuite) { pub fn case_max_resource_groups(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); test_suite.cfg_enabled(true); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); // Workload // [req-{1..3} * 10, req-{4..5} * 1] @@ -122,7 +122,7 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) { // | Max Resource Groups | // | 5000 | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 5); assert!(res.contains_key("req-1")); @@ -134,8 +134,8 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) { // | Max Resource Groups | // | 3 | test_suite.cfg_max_resource_groups(3); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 4); assert!(res.contains_key("req-1")); @@ -147,10 +147,10 @@ pub fn case_max_resource_groups(test_suite: &mut TestSuite) { pub fn case_precision(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); - test_suite.cfg_report_agent_interval("10s"); + test_suite.start_receiver_at(port); + test_suite.cfg_report_receiver_interval("10s"); test_suite.cfg_enabled(true); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); // Workload // [req-1] @@ -158,7 +158,7 @@ pub fn case_precision(test_suite: &mut TestSuite) { // | Precision | // | 1s | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); let (secs, _) = res.get("req-1").unwrap(); for (l, r) in secs.iter().zip({ @@ -173,8 +173,8 @@ pub fn case_precision(test_suite: &mut TestSuite) { // | Precision | // | 3s | test_suite.cfg_precision("3s"); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); let (secs, _) = res.get("req-1").unwrap(); for (l, r) in secs.iter().zip({ diff --git a/tests/integrations/resource_metering/test_agent.rs b/tests/integrations/resource_metering/test_receiver.rs similarity index 71% rename from tests/integrations/resource_metering/test_agent.rs rename to tests/integrations/resource_metering/test_receiver.rs index 2077cb35d62..0ebfc032a8c 100644 --- a/tests/integrations/resource_metering/test_agent.rs +++ b/tests/integrations/resource_metering/test_receiver.rs @@ -11,10 +11,10 @@ use test_util::alloc_port; const ONE_SEC: Duration = Duration::from_secs(1); -pub fn case_alter_agent_addr(test_suite: &mut TestSuite) { +pub fn case_alter_receiver_addr(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); test_suite.cfg_enabled(true); test_suite.cfg_max_resource_groups(5); @@ -31,13 +31,13 @@ pub fn case_alter_agent_addr(test_suite: &mut TestSuite) { // | Address | Enabled | // | x | o | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // | Address | Enabled | // | o | o | - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-1")); @@ -49,15 +49,15 @@ pub fn case_alter_agent_addr(test_suite: &mut TestSuite) { // | Address | Enabled | // | ! | o | - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port + 1)); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port + 1)); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // | Address | Enabled | // | o | o | - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-1")); @@ -68,13 +68,13 @@ pub fn case_alter_agent_addr(test_suite: &mut TestSuite) { assert!(res.contains_key("")); } -pub fn case_agent_blocking(test_suite: &mut TestSuite) { +pub fn case_receiver_blocking(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); test_suite.cfg_enabled(true); test_suite.cfg_max_resource_groups(5); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); // Workload // [req-{1..5} * 10, req-{6..10} * 1] @@ -87,9 +87,9 @@ pub fn case_agent_blocking(test_suite: &mut TestSuite) { wl.shuffle(&mut rand::thread_rng()); test_suite.setup_workload(wl); - // | Block Agent | + // | Block Receiver | // | x | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-1")); @@ -99,11 +99,11 @@ pub fn case_agent_blocking(test_suite: &mut TestSuite) { assert!(res.contains_key("req-5")); assert!(res.contains_key("")); - // | Block Agent | + // | Block Receiver | // | o | - fail::cfg("mock-agent", "sleep(5000)").unwrap(); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + fail::cfg("mock-receiver", "sleep(5000)").unwrap(); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // Workload @@ -118,11 +118,11 @@ pub fn case_agent_blocking(test_suite: &mut TestSuite) { wl.shuffle(&mut rand::thread_rng()); test_suite.setup_workload(wl); - // | Block Agent | + // | Block Receiver | // | x | - fail::remove("mock-agent"); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + fail::remove("mock-receiver"); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-6")); @@ -133,13 +133,13 @@ pub fn case_agent_blocking(test_suite: &mut TestSuite) { assert!(res.contains_key("")); } -pub fn case_agent_shutdown(test_suite: &mut TestSuite) { +pub fn case_receiver_shutdown(test_suite: &mut TestSuite) { test_suite.reset(); let port = alloc_port(); - test_suite.start_agent_at(port); + test_suite.start_receiver_at(port); test_suite.cfg_enabled(true); test_suite.cfg_max_resource_groups(5); - test_suite.cfg_agent_address(format!("127.0.0.1:{}", port)); + test_suite.cfg_receiver_address(format!("127.0.0.1:{}", port)); // Workload // [req-{1..5} * 10, req-{6..10} * 1] @@ -152,9 +152,9 @@ pub fn case_agent_shutdown(test_suite: &mut TestSuite) { wl.shuffle(&mut rand::thread_rng()); test_suite.setup_workload(wl); - // | Agent Alive | + // | Receiver Alive | // | o | - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-1")); @@ -164,11 +164,11 @@ pub fn case_agent_shutdown(test_suite: &mut TestSuite) { assert!(res.contains_key("req-5")); assert!(res.contains_key("")); - // | Agent Alive | + // | Receiver Alive | // | x | - test_suite.shutdown_agent(); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.shutdown_receiver(); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); assert!(test_suite.fetch_reported_cpu_time().is_empty()); // Workload @@ -183,11 +183,11 @@ pub fn case_agent_shutdown(test_suite: &mut TestSuite) { wl.shuffle(&mut rand::thread_rng()); test_suite.setup_workload(wl); - // | Agent Alive | + // | Receiver Alive | // | o | - test_suite.start_agent_at(port); - test_suite.flush_agent(); - sleep(test_suite.get_current_cfg().report_agent_interval.0 + ONE_SEC); + test_suite.start_receiver_at(port); + test_suite.flush_receiver(); + sleep(test_suite.get_current_cfg().report_receiver_interval.0 + ONE_SEC); let res = test_suite.fetch_reported_cpu_time(); assert_eq!(res.len(), 6); assert!(res.contains_key("req-6")); diff --git a/tests/integrations/resource_metering/test_suite/mock_agent_server.rs b/tests/integrations/resource_metering/test_suite/mock_receiver_server.rs similarity index 92% rename from tests/integrations/resource_metering/test_suite/mock_agent_server.rs rename to tests/integrations/resource_metering/test_suite/mock_receiver_server.rs index 5c253881ef7..08cfd0e9e87 100644 --- a/tests/integrations/resource_metering/test_suite/mock_agent_server.rs +++ b/tests/integrations/resource_metering/test_suite/mock_receiver_server.rs @@ -14,11 +14,11 @@ use kvproto::resource_usage_agent::{ }; #[derive(Clone)] -pub struct MockAgentServer { +pub struct MockReceiverServer { tx: Sender>, } -impl MockAgentServer { +impl MockReceiverServer { pub fn new(tx: Sender>) -> Self { Self { tx } } @@ -39,14 +39,14 @@ impl MockAgentServer { } } -impl ResourceUsageAgent for MockAgentServer { +impl ResourceUsageAgent for MockReceiverServer { fn report_cpu_time( &mut self, ctx: RpcContext, mut stream: RequestStream, sink: ClientStreamingSink, ) { - fail_point!("mock-agent"); + fail_point!("mock-receiver"); let tx = self.tx.clone(); let f = async move { let mut res = vec![]; diff --git a/tests/integrations/resource_metering/test_suite/mod.rs b/tests/integrations/resource_metering/test_suite/mod.rs index 85d66419bd8..041127c8676 100644 --- a/tests/integrations/resource_metering/test_suite/mod.rs +++ b/tests/integrations/resource_metering/test_suite/mod.rs @@ -1,6 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -mod mock_agent_server; +mod mock_receiver_server; use std::collections::HashMap; use std::sync::Arc; @@ -11,7 +11,7 @@ use futures::{select, FutureExt}; use grpcio::{Environment, Server}; use kvproto::kvrpcpb::Context; use kvproto::resource_usage_agent::CpuTimeRecord; -use mock_agent_server::MockAgentServer; +use mock_receiver_server::MockReceiverServer; use resource_metering::cpu::recorder::{init_recorder, TEST_TAG_PREFIX}; use resource_metering::reporter::{ResourceMeteringReporter, Task}; use resource_metering::{Config, ConfigManager}; @@ -25,7 +25,7 @@ use tokio::runtime::{self, Runtime}; use txn_types::{Key, TimeStamp}; pub struct TestSuite { - agent_server: Option, + receiver_server: Option, storage: Storage, reporter: Option>>, @@ -54,7 +54,7 @@ impl TestSuite { let scheduler = reporter.scheduler(); let (mut tikv_cfg, dir) = TiKvConfig::with_tmp().unwrap(); - tikv_cfg.resource_metering.report_agent_interval = ReadableDuration::secs(5); + tikv_cfg.resource_metering.report_receiver_interval = ReadableDuration::secs(5); let resource_metering_cfg = tikv_cfg.resource_metering.clone(); let cfg_controller = ConfigController::new(tikv_cfg); @@ -81,7 +81,7 @@ impl TestSuite { .unwrap(); Self { - agent_server: None, + receiver_server: None, storage, reporter: Some(reporter), cfg_controller, @@ -101,10 +101,10 @@ impl TestSuite { .unwrap(); } - pub fn cfg_agent_address(&self, addr: impl Into) { + pub fn cfg_receiver_address(&self, addr: impl Into) { let addr = addr.into(); self.cfg_controller - .update_config("resource-metering.agent-address", &addr) + .update_config("resource-metering.receiver-address", &addr) .unwrap(); } @@ -115,10 +115,10 @@ impl TestSuite { .unwrap(); } - pub fn cfg_report_agent_interval(&self, interval: impl Into) { + pub fn cfg_report_receiver_interval(&self, interval: impl Into) { let interval = interval.into(); self.cfg_controller - .update_config("resource-metering.report-agent-interval", &interval) + .update_config("resource-metering.report-receiver-interval", &interval) .unwrap(); } @@ -135,18 +135,18 @@ impl TestSuite { self.cfg_controller.get_current().resource_metering.clone() } - pub fn start_agent_at(&mut self, port: u16) { - assert!(self.agent_server.is_none()); + pub fn start_receiver_at(&mut self, port: u16) { + assert!(self.receiver_server.is_none()); - let mut agent_server = - MockAgentServer::new(self.tx.clone()).build_server(port, self.env.clone()); - agent_server.start(); - self.agent_server = Some(agent_server); + let mut receiver_server = + MockReceiverServer::new(self.tx.clone()).build_server(port, self.env.clone()); + receiver_server.start(); + self.receiver_server = Some(receiver_server); } - pub fn shutdown_agent(&mut self) { - if let Some(mut agent) = self.agent_server.take() { - self.rt.block_on(agent.shutdown()).unwrap(); + pub fn shutdown_receiver(&mut self) { + if let Some(mut receiver) = self.receiver_server.take() { + self.rt.block_on(receiver.shutdown()).unwrap(); } } @@ -211,21 +211,21 @@ impl TestSuite { res } - pub fn flush_agent(&self) { + pub fn flush_receiver(&self) { let _ = self .rx - .recv_timeout(self.get_current_cfg().report_agent_interval.0); + .recv_timeout(self.get_current_cfg().report_receiver_interval.0); } pub fn reset(&mut self) { self.cfg_enabled(false); - self.cfg_agent_address(""); + self.cfg_receiver_address(""); self.cancel_workload(); - self.flush_agent(); + self.flush_receiver(); self.cfg_precision("1s"); - self.cfg_report_agent_interval("5s"); + self.cfg_report_receiver_interval("5s"); self.cfg_max_resource_groups(5000); - self.shutdown_agent(); + self.shutdown_receiver(); } }