Skip to content

Commit

Permalink
*: make configuration of snap to be dynamically modified (tikv#10121)
Browse files Browse the repository at this point in the history
* *: make configuration of snap to be dynamically modified

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

* address comment

Signed-off-by: nolouch <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
nolouch and ti-chi-bot authored May 21, 2021
1 parent c2f2be4 commit 0447554
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 39 deletions.
20 changes: 16 additions & 4 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1123,14 +1123,14 @@ struct SnapManagerCore {
/// `SnapManagerCore` trace all current processing snapshots.
pub struct SnapManager {
core: SnapManagerCore,
max_total_size: u64,
max_total_size: AtomicU64,
}

impl Clone for SnapManager {
fn clone(&self) -> Self {
SnapManager {
core: self.core.clone(),
max_total_size: self.max_total_size,
max_total_size: AtomicU64::new(self.max_total_size.load(Ordering::Acquire)),
}
}
}
Expand Down Expand Up @@ -1368,7 +1368,19 @@ impl SnapManager {
}

pub fn max_total_snap_size(&self) -> u64 {
self.max_total_size
self.max_total_size.load(Ordering::Acquire)
}

pub fn set_max_total_snap_size(&self, max_total_size: u64) {
self.max_total_size.store(max_total_size, Ordering::Release);
}

pub fn set_speed_limit(&self, bytes_per_sec: f64) {
self.core.limiter.set_speed_limit(bytes_per_sec);
}

pub fn get_speed_limit(&self) -> f64 {
self.core.limiter.speed_limit()
}

pub fn register(&self, key: SnapKey, entry: SnapEntry) {
Expand Down Expand Up @@ -1568,7 +1580,7 @@ impl SnapManagerBuilder {
temp_sst_id: Arc::new(AtomicU64::new(0)),
encryption_key_manager: self.key_manager,
},
max_total_size,
max_total_size: AtomicU64::new(max_total_size),
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tikv::{
server::raftkv::ReplicaReadLockChecker,
server::{
config::Config as ServerConfig,
config::ServerConfigManager,
create_raft_storage,
gc_worker::{AutoGcConfig, GcWorker},
lock_manager::LockManager,
Expand Down Expand Up @@ -500,7 +501,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
gc_worker
}

fn init_servers(&mut self) -> Arc<ServerConfig> {
fn init_servers(&mut self) -> Arc<VersionTrack<ServerConfig>> {
let gc_worker = self.init_gc_worker();
let mut ttl_checker = Box::new(LazyWorker::new("ttl-checker"));
let ttl_scheduler = ttl_checker.scheduler();
Expand Down Expand Up @@ -624,7 +625,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
let resolved_ts_ob = resolved_ts::Observer::new(rts_scheduler.clone());
resolved_ts_ob.register_to(self.coprocessor_host.as_mut().unwrap());

let server_config = Arc::new(self.config.server.clone());
let server_config = Arc::new(VersionTrack::new(self.config.server.clone()));

self.config
.raft_store
Expand All @@ -633,7 +634,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
let raft_store = Arc::new(VersionTrack::new(self.config.raft_store.clone()));
let mut node = Node::new(
self.system.take().unwrap(),
&server_config,
&server_config.value().clone(),
raft_store.clone(),
self.pd_client.clone(),
self.state.clone(),
Expand All @@ -649,7 +650,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
&self.security_mgr,
storage,
coprocessor::Endpoint::new(
&server_config,
&server_config.value(),
cop_read_pool_handle,
self.concurrency_manager.clone(),
engine_rocks::raw_util::to_raw_perf_level(self.config.coprocessor.perf_level),
Expand All @@ -664,6 +665,13 @@ impl<ER: RaftEngine> TiKVServer<ER> {
debug_thread_pool,
)
.unwrap_or_else(|e| fatal!("failed to create server: {}", e));
cfg_controller.register(
tikv::config::Module::Server,
Box::new(ServerConfigManager::new(
server.get_snap_worker_scheduler(),
server_config.clone(),
)),
);

let import_path = self.store_path.join("import");
let importer = Arc::new(
Expand Down Expand Up @@ -930,7 +938,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
});
}

fn run_server(&mut self, server_config: Arc<ServerConfig>) {
fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>) {
let server = self.servers.as_mut().unwrap();
server
.server
Expand Down
8 changes: 4 additions & 4 deletions components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,14 +358,14 @@ impl Simulator for ServerCluster {
.encryption_key_manager(key_manager)
.build(tmp_str);
self.snap_mgrs.insert(node_id, snap_mgr.clone());
let server_cfg = Arc::new(cfg.server.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
let security_mgr = Arc::new(SecurityManager::new(&cfg.security).unwrap());
let cop_read_pool = ReadPool::from(coprocessor::readpool_impl::build_read_pool_for_test(
&tikv::config::CoprReadPoolConfig::default_for_test(),
store.get_engine(),
));
let cop = coprocessor::Endpoint::new(
&server_cfg,
&server_cfg.value().clone(),
cop_read_pool.handle(),
concurrency_manager.clone(),
PerfLevel::EnableCount,
Expand Down Expand Up @@ -395,7 +395,7 @@ impl Simulator for ServerCluster {
raft_store.validate().unwrap();
let mut node = Node::new(
system,
&cfg.server,
&server_cfg.value().clone(),
Arc::new(VersionTrack::new(raft_store)),
Arc::clone(&self.pd_client),
state,
Expand Down Expand Up @@ -448,7 +448,7 @@ impl Simulator for ServerCluster {
cfg.server.addr = format!("{}", addr);
let trans = server.transport();
let simulate_trans = SimulateTransport::new(trans);
let server_cfg = Arc::new(cfg.server.clone());
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));

// Register the role change observer of the lock manager.
lock_mgr.register_detector_role_change_observer(&mut coprocessor_host);
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2256,7 +2256,7 @@ pub struct TiKvConfig {
#[config(skip)]
pub readpool: ReadPoolConfig,

#[config(skip)]
#[config(submodule)]
pub server: ServerConfig,

#[config(submodule)]
Expand Down
69 changes: 67 additions & 2 deletions src/server/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;
use std::{cmp, i32, isize};

use super::Result;
use grpcio::CompressionAlgorithms;
use regex::Regex;

use collections::HashMap;
use tikv_util::config::{self, ReadableDuration, ReadableSize};
use configuration::{ConfigChange, ConfigManager, Configuration};
use tikv_util::config::{self, ReadableDuration, ReadableSize, VersionTrack};
use tikv_util::sys::sys_quota::SysQuota;
use tikv_util::worker::Scheduler;

pub use crate::storage::config::Config as StorageConfig;
pub use raftstore::store::Config as RaftStoreConfig;

use super::snap::Task as SnapTask;

pub const DEFAULT_CLUSTER_ID: u64 = 0;
pub const DEFAULT_LISTENING_ADDR: &str = "127.0.0.1:20160";
const DEFAULT_ADVERTISE_LISTENING_ADDR: &str = "";
Expand Down Expand Up @@ -50,84 +55,118 @@ pub enum GrpcCompressionType {
}

/// Configuration for the `server` module.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Configuration)]
#[serde(default)]
#[serde(rename_all = "kebab-case")]
pub struct Config {
#[serde(skip)]
#[config(skip)]
pub cluster_id: u64,

// Server listening address.
#[config(skip)]
pub addr: String,

// Server advertise listening address for outer communication.
// If not set, we will use listening address instead.
#[config(skip)]
pub advertise_addr: String,

// These are related to TiKV status.
#[config(skip)]
pub status_addr: String,

// Status server's advertise listening address for outer communication.
// If not set, the status server's listening address will be used.
#[config(skip)]
pub advertise_status_addr: String,

#[config(skip)]
pub status_thread_pool_size: usize,

#[config(skip)]
pub max_grpc_send_msg_len: i32,

// TODO: use CompressionAlgorithms instead once it supports traits like Clone etc.
#[config(skip)]
pub grpc_compression_type: GrpcCompressionType,
#[config(skip)]
pub grpc_concurrency: usize,
#[config(skip)]
pub grpc_concurrent_stream: i32,
#[config(skip)]
pub grpc_raft_conn_num: usize,
#[config(skip)]
pub grpc_memory_pool_quota: ReadableSize,
#[config(skip)]
pub grpc_stream_initial_window_size: ReadableSize,
#[config(skip)]
pub grpc_keepalive_time: ReadableDuration,
#[config(skip)]
pub grpc_keepalive_timeout: ReadableDuration,
/// How many snapshots can be sent concurrently.
pub concurrent_send_snap_limit: usize,
/// How many snapshots can be recv concurrently.
pub concurrent_recv_snap_limit: usize,
#[config(skip)]
pub end_point_recursion_limit: u32,
#[config(skip)]
pub end_point_stream_channel_size: usize,
#[config(skip)]
pub end_point_batch_row_limit: usize,
#[config(skip)]
pub end_point_stream_batch_row_limit: usize,
#[config(skip)]
pub end_point_enable_batch_if_possible: bool,
#[config(skip)]
pub end_point_request_max_handle_duration: ReadableDuration,
#[config(skip)]
pub end_point_max_concurrency: usize,
pub snap_max_write_bytes_per_sec: ReadableSize,
pub snap_max_total_size: ReadableSize,
#[config(skip)]
pub stats_concurrency: usize,
#[config(skip)]
pub heavy_load_threshold: usize,
#[config(skip)]
pub heavy_load_wait_duration: ReadableDuration,
#[config(skip)]
pub enable_request_batch: bool,
#[config(skip)]
pub background_thread_count: usize,
// If handle time is larger than the threshold, it will print slow log in end point.
#[config(skip)]
pub end_point_slow_log_threshold: ReadableDuration,
/// Max connections per address for forwarding request.
#[config(skip)]
pub forward_max_connections_per_address: usize,

// Test only.
#[doc(hidden)]
#[serde(skip_serializing)]
#[config(skip)]
pub raft_client_backoff_step: ReadableDuration,

// Server labels to specify some attributes about this server.
#[config(skip)]
pub labels: HashMap<String, String>,

// deprecated. use readpool.coprocessor.xx_concurrency.
#[doc(hidden)]
#[serde(skip_serializing)]
#[config(skip)]
pub end_point_concurrency: Option<usize>,

// deprecated. use readpool.coprocessor.stack_size.
#[doc(hidden)]
#[serde(skip_serializing)]
#[config(skip)]
pub end_point_stack_size: Option<ReadableSize>,

// deprecated. use readpool.coprocessor.max_tasks_per_worker_xx.
#[doc(hidden)]
#[serde(skip_serializing)]
#[config(skip)]
pub end_point_max_tasks: Option<usize>,
}

Expand Down Expand Up @@ -295,6 +334,32 @@ impl Config {
}
}

pub struct ServerConfigManager {
tx: Scheduler<SnapTask>,
config: Arc<VersionTrack<Config>>,
}

impl ServerConfigManager {
pub fn new(tx: Scheduler<SnapTask>, config: Arc<VersionTrack<Config>>) -> ServerConfigManager {
ServerConfigManager { tx, config }
}
}

impl ConfigManager for ServerConfigManager {
fn dispatch(&mut self, c: ConfigChange) -> std::result::Result<(), Box<dyn std::error::Error>> {
{
let change = c.clone();
self.config
.update(move |cfg: &mut Config| cfg.update(change));
if let Err(e) = self.tx.schedule(SnapTask::RefreshConfigEvent) {
error!("server configuration manager schedule refresh snapshot work task failed"; "err"=> ?e);
}
}
info!("server configuration changed"; "change" => ?c);
Ok(())
}
}

lazy_static! {
static ref LABEL_KEY_FORMAT: Regex =
Regex::new("^[$]?[A-Za-z0-9]([-A-Za-z0-9_./]*[A-Za-z0-9])?$").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod status_server;
pub mod transport;
pub mod ttl;

pub use self::config::{Config, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR};
pub use self::config::{Config, ServerConfigManager, DEFAULT_CLUSTER_ID, DEFAULT_LISTENING_ADDR};
pub use self::errors::{Error, Result};
pub use self::metrics::CONFIG_ROCKSDB_GAUGE;
pub use self::metrics::CPU_CORES_QUOTA_GAUGE;
Expand Down
1 change: 1 addition & 0 deletions src/server/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use collections::HashMap;
use grpcio::{CallOption, Channel, ChannelBuilder, Environment, MetadataBuilder, RpcContext};
use kvproto::tikvpb::TikvClient;
use security::SecurityManager;

use std::ffi::CString;
use std::future::Future;
use std::str;
Expand Down
Loading

0 comments on commit 0447554

Please sign in to comment.