diff --git a/components/raftstore/src/store/snap.rs b/components/raftstore/src/store/snap.rs index 2812392613f..a125b1898d2 100644 --- a/components/raftstore/src/store/snap.rs +++ b/components/raftstore/src/store/snap.rs @@ -1123,14 +1123,14 @@ struct SnapManagerCore { /// `SnapManagerCore` trace all current processing snapshots. pub struct SnapManager { core: SnapManagerCore, - max_total_size: u64, + max_total_size: AtomicU64, } impl Clone for SnapManager { fn clone(&self) -> Self { SnapManager { core: self.core.clone(), - max_total_size: self.max_total_size, + max_total_size: AtomicU64::new(self.max_total_size.load(Ordering::Acquire)), } } } @@ -1368,7 +1368,19 @@ impl SnapManager { } pub fn max_total_snap_size(&self) -> u64 { - self.max_total_size + self.max_total_size.load(Ordering::Acquire) + } + + pub fn set_max_total_snap_size(&self, max_total_size: u64) { + self.max_total_size.store(max_total_size, Ordering::Release); + } + + pub fn set_speed_limit(&self, bytes_per_sec: f64) { + self.core.limiter.set_speed_limit(bytes_per_sec); + } + + pub fn get_speed_limit(&self) -> f64 { + self.core.limiter.speed_limit() } pub fn register(&self, key: SnapKey, entry: SnapEntry) { @@ -1568,7 +1580,7 @@ impl SnapManagerBuilder { temp_sst_id: Arc::new(AtomicU64::new(0)), encryption_key_manager: self.key_manager, }, - max_total_size, + max_total_size: AtomicU64::new(max_total_size), } } } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 9651cdce32c..2526a74ae00 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -66,6 +66,7 @@ use tikv::{ server::raftkv::ReplicaReadLockChecker, server::{ config::Config as ServerConfig, + config::ServerConfigManager, create_raft_storage, gc_worker::{AutoGcConfig, GcWorker}, lock_manager::LockManager, @@ -500,7 +501,7 @@ impl TiKVServer { gc_worker } - fn init_servers(&mut self) -> Arc { + fn init_servers(&mut self) -> Arc> { let gc_worker = self.init_gc_worker(); let mut ttl_checker = Box::new(LazyWorker::new("ttl-checker")); let ttl_scheduler = ttl_checker.scheduler(); @@ -624,7 +625,7 @@ impl TiKVServer { let resolved_ts_ob = resolved_ts::Observer::new(rts_scheduler.clone()); resolved_ts_ob.register_to(self.coprocessor_host.as_mut().unwrap()); - let server_config = Arc::new(self.config.server.clone()); + let server_config = Arc::new(VersionTrack::new(self.config.server.clone())); self.config .raft_store @@ -633,7 +634,7 @@ impl TiKVServer { let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone())); let mut node = Node::new( self.system.take().unwrap(), - &server_config, + &server_config.value().clone(), raft_store.clone(), self.pd_client.clone(), self.state.clone(), @@ -649,7 +650,7 @@ impl TiKVServer { &self.security_mgr, storage, coprocessor::Endpoint::new( - &server_config, + &server_config.value(), cop_read_pool_handle, self.concurrency_manager.clone(), engine_rocks::raw_util::to_raw_perf_level(self.config.coprocessor.perf_level), @@ -664,6 +665,13 @@ impl TiKVServer { debug_thread_pool, ) .unwrap_or_else(|e| fatal!("failed to create server: {}", e)); + cfg_controller.register( + tikv::config::Module::Server, + Box::new(ServerConfigManager::new( + server.get_snap_worker_scheduler(), + server_config.clone(), + )), + ); let import_path = self.store_path.join("import"); let importer = Arc::new( @@ -930,7 +938,7 @@ impl TiKVServer { }); } - fn run_server(&mut self, server_config: Arc) { + fn run_server(&mut self, server_config: Arc>) { let server = self.servers.as_mut().unwrap(); server .server diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index ba7c68c3389..30e409b4662 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -358,14 +358,14 @@ impl Simulator for ServerCluster { .encryption_key_manager(key_manager) .build(tmp_str); self.snap_mgrs.insert(node_id, snap_mgr.clone()); - let server_cfg = Arc::new(cfg.server.clone()); + let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap()); let cop_read_pool = ReadPool::from(coprocessor::readpool_impl::build_read_pool_for_test( &tikv::config::CoprReadPoolConfig::default_for_test(), store.get_engine(), )); let cop = coprocessor::Endpoint::new( - &server_cfg, + &server_cfg.value().clone(), cop_read_pool.handle(), concurrency_manager.clone(), PerfLevel::EnableCount, @@ -395,7 +395,7 @@ impl Simulator for ServerCluster { raft_store.validate().unwrap(); let mut node = Node::new( system, - &cfg.server, + &server_cfg.value().clone(), Arc::new(VersionTrack::new(raft_store)), Arc::clone(&self.pd_client), state, @@ -448,7 +448,7 @@ impl Simulator for ServerCluster { cfg.server.addr = format!("{}", addr); let trans = server.transport(); let simulate_trans = SimulateTransport::new(trans); - let server_cfg = Arc::new(cfg.server.clone()); + let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone())); // Register the role change observer of the lock manager. lock_mgr.register_detector_role_change_observer(&mut coprocessor_host); diff --git a/src/config.rs b/src/config.rs index 1fe1d8557da..df9ee101c38 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2256,7 +2256,7 @@ pub struct TiKvConfig { #[config(skip)] pub readpool: ReadPoolConfig, - #[config(skip)] + #[config(submodule)] pub server: ServerConfig, #[config(submodule)] diff --git a/src/server/config.rs b/src/server/config.rs index 339fad50498..935fe6dca6c 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -1,5 +1,6 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. +use std::sync::Arc; use std::{cmp, i32, isize}; use super::Result; @@ -7,12 +8,16 @@ use grpcio::CompressionAlgorithms; use regex::Regex; use collections::HashMap; -use tikv_util::config::{self, ReadableDuration, ReadableSize}; +use configuration::{ConfigChange, ConfigManager, Configuration}; +use tikv_util::config::{self, ReadableDuration, ReadableSize, VersionTrack}; use tikv_util::sys::sys_quota::SysQuota; +use tikv_util::worker::Scheduler; pub use crate::storage::config::Config as StorageConfig; pub use raftstore::store::Config as RaftStoreConfig; +use super::snap::Task as SnapTask; + pub const DEFAULT_CLUSTER_ID: u64 = 0; pub const DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20160"; const DEFAULT_ADVERTISE_LISTENING_ADDR: &str = ""; @@ -50,84 +55,118 @@ pub enum GrpcCompressionType { } /// Configuration for the `server` module. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Configuration)] #[serde(default)] #[serde(rename_all = "kebab-case")] pub struct Config { #[serde(skip)] + #[config(skip)] pub cluster_id: u64, // Server listening address. + #[config(skip)] pub addr: String, // Server advertise listening address for outer communication. // If not set, we will use listening address instead. + #[config(skip)] pub advertise_addr: String, // These are related to TiKV status. + #[config(skip)] pub status_addr: String, // Status server's advertise listening address for outer communication. // If not set, the status server's listening address will be used. + #[config(skip)] pub advertise_status_addr: String, + #[config(skip)] pub status_thread_pool_size: usize, + #[config(skip)] pub max_grpc_send_msg_len: i32, // TODO: use CompressionAlgorithms instead once it supports traits like Clone etc. + #[config(skip)] pub grpc_compression_type: GrpcCompressionType, + #[config(skip)] pub grpc_concurrency: usize, + #[config(skip)] pub grpc_concurrent_stream: i32, + #[config(skip)] pub grpc_raft_conn_num: usize, + #[config(skip)] pub grpc_memory_pool_quota: ReadableSize, + #[config(skip)] pub grpc_stream_initial_window_size: ReadableSize, + #[config(skip)] pub grpc_keepalive_time: ReadableDuration, + #[config(skip)] pub grpc_keepalive_timeout: ReadableDuration, /// How many snapshots can be sent concurrently. pub concurrent_send_snap_limit: usize, /// How many snapshots can be recv concurrently. pub concurrent_recv_snap_limit: usize, + #[config(skip)] pub end_point_recursion_limit: u32, + #[config(skip)] pub end_point_stream_channel_size: usize, + #[config(skip)] pub end_point_batch_row_limit: usize, + #[config(skip)] pub end_point_stream_batch_row_limit: usize, + #[config(skip)] pub end_point_enable_batch_if_possible: bool, + #[config(skip)] pub end_point_request_max_handle_duration: ReadableDuration, + #[config(skip)] pub end_point_max_concurrency: usize, pub snap_max_write_bytes_per_sec: ReadableSize, pub snap_max_total_size: ReadableSize, + #[config(skip)] pub stats_concurrency: usize, + #[config(skip)] pub heavy_load_threshold: usize, + #[config(skip)] pub heavy_load_wait_duration: ReadableDuration, + #[config(skip)] pub enable_request_batch: bool, + #[config(skip)] pub background_thread_count: usize, // If handle time is larger than the threshold, it will print slow log in end point. + #[config(skip)] pub end_point_slow_log_threshold: ReadableDuration, /// Max connections per address for forwarding request. + #[config(skip)] pub forward_max_connections_per_address: usize, // Test only. #[doc(hidden)] #[serde(skip_serializing)] + #[config(skip)] pub raft_client_backoff_step: ReadableDuration, // Server labels to specify some attributes about this server. + #[config(skip)] pub labels: HashMap, // deprecated. use readpool.coprocessor.xx_concurrency. #[doc(hidden)] #[serde(skip_serializing)] + #[config(skip)] pub end_point_concurrency: Option, // deprecated. use readpool.coprocessor.stack_size. #[doc(hidden)] #[serde(skip_serializing)] + #[config(skip)] pub end_point_stack_size: Option, // deprecated. use readpool.coprocessor.max_tasks_per_worker_xx. #[doc(hidden)] #[serde(skip_serializing)] + #[config(skip)] pub end_point_max_tasks: Option, } @@ -295,6 +334,32 @@ impl Config { } } +pub struct ServerConfigManager { + tx: Scheduler, + config: Arc>, +} + +impl ServerConfigManager { + pub fn new(tx: Scheduler, config: Arc>) -> ServerConfigManager { + ServerConfigManager { tx, config } + } +} + +impl ConfigManager for ServerConfigManager { + fn dispatch(&mut self, c: ConfigChange) -> std::result::Result<(), Box> { + { + let change = c.clone(); + self.config + .update(move |cfg: &mut Config| cfg.update(change)); + if let Err(e) = self.tx.schedule(SnapTask::RefreshConfigEvent) { + error!("server configuration manager schedule refresh snapshot work task failed"; "err"=> ?e); + } + } + info!("server configuration changed"; "change" => ?c); + Ok(()) + } +} + lazy_static! { static ref LABEL_KEY_FORMAT: Regex = Regex::new("^[$]?[A-Za-z0-9]([-A-Za-z0-9_./]*[A-Za-z0-9])?$").unwrap(); diff --git a/src/server/mod.rs b/src/server/mod.rs index 822d35a7e6f..435148bd958 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -20,7 +20,7 @@ pub mod status_server; pub mod transport; pub mod ttl; -pub use self::config::{Config, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR}; +pub use self::config::{Config, ServerConfigManager, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR}; pub use self::errors::{Error, Result}; pub use self::metrics::CONFIG_ROCKSDB_GAUGE; pub use self::metrics::CPU_CORES_QUOTA_GAUGE; diff --git a/src/server/proxy.rs b/src/server/proxy.rs index add705e3a44..117bc41e4da 100644 --- a/src/server/proxy.rs +++ b/src/server/proxy.rs @@ -13,6 +13,7 @@ use collections::HashMap; use grpcio::{CallOption, Channel, ChannelBuilder, Environment, MetadataBuilder, RpcContext}; use kvproto::tikvpb::TikvClient; use security::SecurityManager; + use std::ffi::CString; use std::future::Future; use std::str; diff --git a/src/server/server.rs b/src/server/server.rs index 08203ecf86c..bbf0387a485 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -24,8 +24,9 @@ use engine_rocks::RocksEngine; use raftstore::router::RaftStoreRouter; use raftstore::store::SnapManager; use security::SecurityManager; +use tikv_util::config::VersionTrack; use tikv_util::timer::GLOBAL_TIMER_HANDLE; -use tikv_util::worker::{LazyWorker, Worker}; +use tikv_util::worker::{LazyWorker, Scheduler, Worker}; use tikv_util::Either; use super::load_statistics::*; @@ -75,7 +76,7 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se #[allow(clippy::too_many_arguments)] pub fn new( store_id: u64, - cfg: &Arc, + cfg: &Arc>, security_mgr: &Arc, storage: Storage, cop: Endpoint, @@ -89,24 +90,25 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se debug_thread_pool: Arc, ) -> Result { // A helper thread (or pool) for transport layer. - let stats_pool = if cfg.stats_concurrency > 0 { + let stats_pool = if cfg.value().stats_concurrency > 0 { Some( RuntimeBuilder::new() .threaded_scheduler() .thread_name(STATS_THREAD_PREFIX) - .core_threads(cfg.stats_concurrency) + .core_threads(cfg.value().stats_concurrency) .build() .unwrap(), ) } else { None }; - let grpc_thread_load = Arc::new(ThreadLoad::with_threshold(cfg.heavy_load_threshold)); + let grpc_thread_load = + Arc::new(ThreadLoad::with_threshold(cfg.value().heavy_load_threshold)); let snap_worker = Worker::new("snap-handler"); let lazy_worker = snap_worker.lazy_build("snap-handler"); - let proxy = Proxy::new(security_mgr.clone(), &env, cfg.clone()); + let proxy = Proxy::new(security_mgr.clone(), &env, Arc::new(cfg.value().clone())); let kv_service = KvService::new( store_id, storage, @@ -116,23 +118,23 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se raft_router.clone(), lazy_worker.scheduler(), Arc::clone(&grpc_thread_load), - cfg.enable_request_batch, + cfg.value().enable_request_batch, proxy, ); - let addr = SocketAddr::from_str(&cfg.addr)?; + let addr = SocketAddr::from_str(&cfg.value().addr)?; let ip = format!("{}", addr.ip()); let mem_quota = ResourceQuota::new(Some("ServerMemQuota")) - .resize_memory(cfg.grpc_memory_pool_quota.0 as usize); + .resize_memory(cfg.value().grpc_memory_pool_quota.0 as usize); let channel_args = ChannelBuilder::new(Arc::clone(&env)) - .stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as i32) - .max_concurrent_stream(cfg.grpc_concurrent_stream) + .stream_initial_window_size(cfg.value().grpc_stream_initial_window_size.0 as i32) + .max_concurrent_stream(cfg.value().grpc_concurrent_stream) .max_receive_message_len(-1) .set_resource_quota(mem_quota) .max_send_message_len(-1) .http2_max_ping_strikes(i32::MAX) // For pings without data from clients. - .keepalive_time(cfg.grpc_keepalive_time.into()) - .keepalive_timeout(cfg.grpc_keepalive_timeout.into()) + .keepalive_time(cfg.value().grpc_keepalive_time.into()) + .keepalive_timeout(cfg.value().grpc_keepalive_timeout.into()) .build_args(); let health_service = HealthService::default(); let builder = { @@ -146,7 +148,7 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se let conn_builder = ConnectionBuilder::new( env.clone(), - cfg.clone(), + Arc::new(cfg.value().clone()), security_mgr.clone(), resolver, raft_router.clone(), @@ -180,6 +182,10 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se self.debug_thread_pool.handle() } + pub fn get_snap_worker_scheduler(&self) -> Scheduler { + self.snap_worker.scheduler() + } + pub fn transport(&self) -> ServerTransport { self.trans.clone() } @@ -218,7 +224,11 @@ impl + Unpin, S: StoreAddrResolver + 'static> Se /// Starts the TiKV server. /// Notice: Make sure call `build_and_bind` first. - pub fn start(&mut self, cfg: Arc, security_mgr: Arc) -> Result<()> { + pub fn start( + &mut self, + cfg: Arc>, + security_mgr: Arc, + ) -> Result<()> { let snap_runner = SnapHandler::new( Arc::clone(&self.env), self.snap_mgr.clone(), @@ -457,7 +467,7 @@ mod tests { gc_worker.start().unwrap(); let quick_fail = Arc::new(AtomicBool::new(false)); - let cfg = Arc::new(cfg); + let cfg = Arc::new(VersionTrack::new(cfg)); let security_mgr = Arc::new(SecurityManager::new(&SecurityConfig::default()).unwrap()); let cop_read_pool = ReadPool::from(readpool_impl::build_read_pool_for_test( @@ -465,7 +475,7 @@ mod tests { storage.get_engine(), )); let cop = coprocessor::Endpoint::new( - &cfg, + &cfg.value().clone(), cop_read_pool.handle(), storage.get_concurrency_manager(), PerfLevel::EnableCount, diff --git a/src/server/snap.rs b/src/server/snap.rs index 6031574954e..437334f62b1 100644 --- a/src/server/snap.rs +++ b/src/server/snap.rs @@ -25,6 +25,7 @@ use file_system::{IOType, WithIOType}; use raftstore::router::RaftStoreRouter; use raftstore::store::{GenericSnapshot, SnapEntry, SnapKey, SnapManager}; use security::SecurityManager; +use tikv_util::config::{Tracker, VersionTrack}; use tikv_util::worker::Runnable; use tikv_util::DeferContext; @@ -46,6 +47,8 @@ pub enum Task { msg: RaftMessage, cb: Callback, }, + RefreshConfigEvent, + Validate(Box), } impl Display for Task { @@ -55,6 +58,8 @@ impl Display for Task { Task::Send { ref addr, ref msg, .. } => write!(f, "Send Snap[to: {}, snap: {:?}]", addr, msg), + Task::RefreshConfigEvent => write!(f, "Refresh configuration"), + Task::Validate(_) => write!(f, "Validate snap worker config"), } } } @@ -307,7 +312,8 @@ where pool: Runtime, raft_router: R, security_mgr: Arc, - cfg: Arc, + cfg_tracker: Tracker, + cfg: Config, sending_count: Arc, recving_count: Arc, engine: PhantomData, @@ -323,9 +329,10 @@ where snap_mgr: SnapManager, r: R, security_mgr: Arc, - cfg: Arc, + cfg: Arc>, ) -> Runner { - Runner { + let cfg_tracker = cfg.clone().tracker("snap-sender".to_owned()); + let snap_worker = Runner { env, snap_mgr, pool: RuntimeBuilder::new() @@ -338,10 +345,33 @@ where .unwrap(), raft_router: r, security_mgr, - cfg, + cfg_tracker, + cfg: cfg.value().clone(), sending_count: Arc::new(AtomicUsize::new(0)), recving_count: Arc::new(AtomicUsize::new(0)), engine: PhantomData, + }; + snap_worker + } + + fn refresh_cfg(&mut self) { + if let Some(incoming) = self.cfg_tracker.any_new() { + let limit = if incoming.snap_max_write_bytes_per_sec.0 > 0 { + incoming.snap_max_write_bytes_per_sec.0 as f64 + } else { + f64::INFINITY + }; + let max_total_size = if incoming.snap_max_total_size.0 > 0 { + incoming.snap_max_total_size.0 + } else { + u64::MAX + }; + self.snap_mgr.set_speed_limit(limit); + self.snap_mgr.set_max_total_snap_size(max_total_size); + info!("refresh snapshot manager config"; + "speed_limit"=> limit, + "max_total_snap_size"=> max_total_size); + self.cfg = incoming.clone(); } } } @@ -403,7 +433,7 @@ where let sending_count = Arc::clone(&self.sending_count); sending_count.fetch_add(1, Ordering::SeqCst); - let send_task = send_snap(env, mgr, security_mgr, &self.cfg, &addr, msg); + let send_task = send_snap(env, mgr, security_mgr, &self.cfg.clone(), &addr, msg); let task = async move { let res = match send_task { Err(e) => Err(e), @@ -430,6 +460,12 @@ where self.pool.spawn(task); } + Task::RefreshConfigEvent => { + self.refresh_cfg(); + } + Task::Validate(f) => { + f(&self.cfg); + } } } } diff --git a/tests/integrations/config/dynamic/mod.rs b/tests/integrations/config/dynamic/mod.rs index b0160634a6d..05a8131b551 100644 --- a/tests/integrations/config/dynamic/mod.rs +++ b/tests/integrations/config/dynamic/mod.rs @@ -3,4 +3,5 @@ mod gc_worker; mod pessimistic_txn; mod raftstore; +mod snap; mod split_check; diff --git a/tests/integrations/config/dynamic/snap.rs b/tests/integrations/config/dynamic/snap.rs new file mode 100644 index 00000000000..103cef699ef --- /dev/null +++ b/tests/integrations/config/dynamic/snap.rs @@ -0,0 +1,109 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use std::sync::mpsc::channel; +use std::sync::Arc; +use std::time::Duration; + +use grpcio::EnvBuilder; + +use raftstore::store::fsm::*; +use raftstore::store::SnapManager; +use tikv::server::config::{Config as ServerConfig, ServerConfigManager}; +use tikv::server::snap::{Runner as SnapHandler, Task as SnapTask}; + +use tikv::config::{ConfigController, TiKvConfig}; + +use engine_rocks::RocksEngine; +use security::SecurityManager; +use tempfile::TempDir; +use tikv_util::config::{ReadableSize, VersionTrack}; +use tikv_util::worker::{LazyWorker, Scheduler, Worker}; + +fn start_server( + cfg: TiKvConfig, + dir: &TempDir, +) -> (ConfigController, LazyWorker, SnapManager) { + let snap_mgr = { + let p = dir + .path() + .join("store-config-snp") + .as_path() + .display() + .to_string(); + SnapManager::new(p) + }; + + let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap()); + let env = Arc::new( + EnvBuilder::new() + .cq_count(2) + .name_prefix(thd_name!("test-server")) + .build(), + ); + let (raft_router, _) = create_raft_batch_system::(&cfg.raft_store); + let mut snap_worker = Worker::new("snap-handler").lazy_build("snap-handler"); + let snap_worker_scheduler = snap_worker.scheduler(); + let server_config = Arc::new(VersionTrack::new(cfg.server.clone())); + let cfg_controller = ConfigController::new(cfg); + cfg_controller.register( + tikv::config::Module::Server, + Box::new(ServerConfigManager::new( + snap_worker_scheduler, + server_config.clone(), + )), + ); + let snap_runner = SnapHandler::new( + Arc::clone(&env), + snap_mgr.clone(), + raft_router.clone(), + security_mgr, + Arc::clone(&server_config), + ); + snap_worker.start(snap_runner); + + (cfg_controller, snap_worker, snap_mgr) +} + +fn validate(scheduler: &Scheduler, f: F) +where + F: FnOnce(&ServerConfig) + Send + 'static, +{ + let (tx, rx) = channel(); + scheduler + .schedule(SnapTask::Validate(Box::new(move |cfg: &ServerConfig| { + f(cfg); + tx.send(()).unwrap(); + }))) + .unwrap(); + rx.recv_timeout(Duration::from_secs(3)).unwrap(); +} + +#[test] +fn test_update_server_config() { + let (mut config, _dir) = TiKvConfig::with_tmp().unwrap(); + config.validate().unwrap(); + let (cfg_controller, snap_worker, snap_mgr) = start_server(config.clone(), &_dir); + let mut svr_cfg = config.server.clone(); + // dispatch updated config + let change = { + let mut m = std::collections::HashMap::new(); + m.insert( + "server.snap-max-write-bytes-per-sec".to_owned(), + "512MB".to_owned(), + ); + m.insert( + "server.concurrent-send-snap-limit".to_owned(), + "100".to_owned(), + ); + m + }; + cfg_controller.update(change).unwrap(); + + svr_cfg.snap_max_write_bytes_per_sec = ReadableSize::mb(512); + svr_cfg.concurrent_send_snap_limit = 100; + // config should be updated + assert_eq!(snap_mgr.get_speed_limit(), 536870912 as f64); + validate(&snap_worker.scheduler(), move |cfg: &ServerConfig| { + assert_eq!(cfg, &svr_cfg); + }); +}