diff --git a/components/server/src/server.rs b/components/server/src/server.rs index d49dbfdb650..bb810b8482e 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -31,8 +31,8 @@ use concurrency_manager::ConcurrencyManager; use encryption_export::{data_key_manager_from_config, DataKeyManager}; use engine_rocks::{from_rocks_compression_type, get_env, FlowInfo, RocksEngine}; use engine_traits::{ - compaction_job::CompactionJobInfo, CFOptionsExt, ColumnFamilyOptions, Engines, MiscExt, - RaftEngine, CF_DEFAULT, CF_LOCK, CF_WRITE, + compaction_job::CompactionJobInfo, CFOptionsExt, ColumnFamilyOptions, Engines, KvEngine, + MiscExt, RaftEngine, CF_DEFAULT, CF_LOCK, CF_WRITE, }; use error_code::ErrorCodeExt; use file_system::{ @@ -180,8 +180,8 @@ struct TiKVServer { store_path: PathBuf, snap_mgr: Option, // Will be filled in `init_servers`. encryption_key_manager: Option>, - engines: Option>, - servers: Option>, + engines: Option>, + servers: Option>, region_info_accessor: RegionInfoAccessor, coprocessor_host: Option>, to_stop: Vec>, @@ -191,24 +191,24 @@ struct TiKVServer { background_worker: Worker, } -struct TiKVEngines { - engines: Engines, +struct TiKVEngines { + engines: Engines, store_meta: Arc>, - engine: RaftKv>, + engine: RaftKv>, } -struct Servers { +struct Servers { lock_mgr: LockManager, - server: LocalServer, - node: Node, + server: LocalServer, + node: Node, importer: Arc, cdc_scheduler: tikv_util::worker::Scheduler, cdc_memory_quota: MemoryQuota, } -type LocalServer = - Server, resolve::PdStoreAddrResolver, LocalRaftKv>; -type LocalRaftKv = RaftKv>; +type LocalServer = + Server, resolve::PdStoreAddrResolver, LocalRaftKv>; +type LocalRaftKv = RaftKv>; impl TiKVServer { fn init(mut config: TiKvConfig) -> TiKVServer { @@ -1054,8 +1054,9 @@ impl TiKVServer { fetcher: BytesFetcher, engines_info: Arc, ) { - let mut engine_metrics = - EngineMetricsManager::new(self.engines.as_ref().unwrap().engines.clone()); + let mut engine_metrics = EngineMetricsManager::::new( + self.engines.as_ref().unwrap().engines.clone(), + ); let mut io_metrics = IOMetricsManager::new(fetcher); let engines_info_clone = engines_info.clone(); self.background_worker @@ -1473,13 +1474,13 @@ impl Stop for LazyWorker { } } -pub struct EngineMetricsManager { - engines: Engines, +pub struct EngineMetricsManager { + engines: Engines, last_reset: Instant, } -impl EngineMetricsManager { - pub fn new(engines: Engines) -> Self { +impl EngineMetricsManager { + pub fn new(engines: Engines) -> Self { EngineMetricsManager { engines, last_reset: Instant::now(), @@ -1487,10 +1488,10 @@ impl EngineMetricsManager { } pub fn flush(&mut self, now: Instant) { - self.engines.kv.flush_metrics("kv"); + KvEngine::flush_metrics(&self.engines.kv, "kv"); self.engines.raft.flush_metrics("raft"); if now.saturating_duration_since(self.last_reset) >= DEFAULT_ENGINE_METRICS_RESET_INTERVAL { - self.engines.kv.reset_statistics(); + KvEngine::reset_statistics(&self.engines.kv); self.engines.raft.reset_statistics(); self.last_reset = now; } diff --git a/components/server/src/signal_handler.rs b/components/server/src/signal_handler.rs index 6c59cc38e55..75feaa4319b 100644 --- a/components/server/src/signal_handler.rs +++ b/components/server/src/signal_handler.rs @@ -4,15 +4,14 @@ pub use self::imp::wait_for_signal; #[cfg(unix)] mod imp { - use engine_rocks::RocksEngine; - use engine_traits::{Engines, MiscExt, RaftEngine}; + use engine_traits::{Engines, KvEngine, MiscExt, RaftEngine}; use libc::c_int; use nix::sys::signal::{SIGHUP, SIGINT, SIGTERM, SIGUSR1, SIGUSR2}; use signal::trap::Trap; use tikv_util::metrics; #[allow(dead_code)] - pub fn wait_for_signal(engines: Option>) { + pub fn wait_for_signal(engines: Option>) { let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]); for sig in trap { match sig { @@ -37,8 +36,7 @@ mod imp { #[cfg(not(unix))] mod imp { - use engine_rocks::RocksEngine; - use engine_traits::Engines; + use engine_traits::{Engines, KvEngine, RaftEngine}; - pub fn wait_for_signal(_: Option>) {} + pub fn wait_for_signal(_: Option>) {} } diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index c0e693ec9d1..aba4d1aa7fa 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -134,7 +134,7 @@ type SimulateChannelTransport = SimulateTransport; pub struct NodeCluster { trans: ChannelTransport, pd_client: Arc, - nodes: HashMap>, + nodes: HashMap>, snap_mgrs: HashMap, simulate_trans: HashMap, concurrency_managers: HashMap, @@ -183,7 +183,10 @@ impl NodeCluster { self.post_create_coprocessor_host = Some(op) } - pub fn get_node(&mut self, node_id: u64) -> Option<&mut Node> { + pub fn get_node( + &mut self, + node_id: u64, + ) -> Option<&mut Node> { self.nodes.get_mut(&node_id) } diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 7a82a0abc1b..6089b5fd47d 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -108,7 +108,7 @@ impl StoreAddrResolver for AddressMap { } struct ServerMeta { - node: Node, + node: Node, server: Server, sim_router: SimulateStoreTransport, sim_trans: SimulateServerTransport, diff --git a/src/server/node.rs b/src/server/node.rs index 4ecfde42430..b2e53173598 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -14,8 +14,7 @@ use crate::storage::kv::FlowStatsReporter; use crate::storage::txn::flow_controller::FlowController; use crate::storage::{config::Config as StorageConfig, Storage}; use concurrency_manager::ConcurrencyManager; -use engine_rocks::RocksEngine; -use engine_traits::{Engines, KvEngine, Peekable, RaftEngine}; +use engine_traits::{Engines, KvEngine, RaftEngine}; use kvproto::metapb; use kvproto::raft_serverpb::StoreIdent; use kvproto::replication_modepb::ReplicationStatus; @@ -64,11 +63,11 @@ where /// A wrapper for the raftstore which runs Multi-Raft. // TODO: we will rename another better name like RaftStore later. -pub struct Node { +pub struct Node { cluster_id: u64, store: metapb::Store, store_cfg: Arc>, - system: RaftBatchSystem, + system: RaftBatchSystem, has_started: bool, pd_client: Arc, @@ -76,20 +75,21 @@ pub struct Node { bg_worker: Worker, } -impl Node +impl Node where C: PdClient, + EK: KvEngine, ER: RaftEngine, { /// Creates a new Node. pub fn new( - system: RaftBatchSystem, + system: RaftBatchSystem, cfg: &ServerConfig, store_cfg: Arc>, pd_client: Arc, state: Arc>, bg_worker: Worker, - ) -> Node { + ) -> Node { let mut store = metapb::Store::default(); store.set_id(INVALID_ID); if cfg.advertise_addr.is_empty() { @@ -138,7 +138,7 @@ where } } - pub fn try_bootstrap_store(&mut self, engines: Engines) -> Result<()> { + pub fn try_bootstrap_store(&mut self, engines: Engines) -> Result<()> { let mut store_id = self.check_store(&engines)?; if store_id == INVALID_ID { store_id = self.alloc_id()?; @@ -158,12 +158,12 @@ where #[allow(clippy::too_many_arguments)] pub fn start( &mut self, - engines: Engines, + engines: Engines, trans: T, snap_mgr: SnapManager, - pd_worker: LazyWorker>, + pd_worker: LazyWorker>, store_meta: Arc>, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, importer: Arc, split_check_scheduler: Scheduler, auto_split_controller: AutoSplitController, @@ -215,17 +215,17 @@ where /// Gets a transmission end of a channel which is used to send `Msg` to the /// raftstore. - pub fn get_router(&self) -> RaftRouter { + pub fn get_router(&self) -> RaftRouter { self.system.router() } /// Gets a transmission end of a channel which is used send messages to apply worker. - pub fn get_apply_router(&self) -> ApplyRouter { + pub fn get_apply_router(&self) -> ApplyRouter { self.system.apply_router() } // check store, return store id for the engine. // If the store is not bootstrapped, use INVALID_ID. - fn check_store(&self, engines: &Engines) -> Result { + fn check_store(&self, engines: &Engines) -> Result { let res = engines.kv.get_msg::(keys::STORE_IDENT_KEY)?; if res.is_none() { return Ok(INVALID_ID); @@ -274,7 +274,7 @@ where #[doc(hidden)] pub fn prepare_bootstrap_cluster( &self, - engines: &Engines, + engines: &Engines, store_id: u64, ) -> Result { let region_id = self.alloc_id()?; @@ -298,7 +298,7 @@ where fn check_or_prepare_bootstrap_cluster( &self, - engines: &Engines, + engines: &Engines, store_id: u64, ) -> Result> { if let Some(first_region) = engines.kv.get_msg(keys::PREPARE_BOOTSTRAP_KEY)? { @@ -312,7 +312,7 @@ where fn bootstrap_cluster( &mut self, - engines: &Engines, + engines: &Engines, first_region: metapb::Region, ) -> Result<()> { let region_id = first_region.get_id(); @@ -374,12 +374,12 @@ where fn start_store( &mut self, store_id: u64, - engines: Engines, + engines: Engines, trans: T, snap_mgr: SnapManager, - pd_worker: LazyWorker>, + pd_worker: LazyWorker>, store_meta: Arc>, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, importer: Arc, split_check_scheduler: Scheduler, auto_split_controller: AutoSplitController, diff --git a/src/storage/config.rs b/src/storage/config.rs index d1c2be72bc8..9b1ac08dde5 100644 --- a/src/storage/config.rs +++ b/src/storage/config.rs @@ -7,8 +7,7 @@ use crate::server::ttl::TTLCheckerTask; use crate::server::CONFIG_ROCKSDB_GAUGE; use crate::storage::txn::flow_controller::FlowController; use engine_rocks::raw::{Cache, LRUCacheOptions, MemoryAllocator}; -use engine_rocks::RocksEngine; -use engine_traits::{CFNamesExt, CFOptionsExt, ColumnFamilyOptions, CF_DEFAULT}; +use engine_traits::{ColumnFamilyOptions, KvEngine, CF_DEFAULT}; use file_system::{get_io_rate_limiter, IOPriority, IORateLimitMode, IORateLimiter, IOType}; use libc::c_int; use online_config::{ConfigChange, ConfigManager, ConfigValue, OnlineConfig, Result as CfgResult}; @@ -105,20 +104,20 @@ impl Config { } } -pub struct StorageConfigManger { - kvdb: RocksEngine, +pub struct StorageConfigManger { + kvdb: EK, shared_block_cache: bool, ttl_checker_scheduler: Scheduler, flow_controller: Arc, } -impl StorageConfigManger { +impl StorageConfigManger { pub fn new( - kvdb: RocksEngine, + kvdb: EK, shared_block_cache: bool, ttl_checker_scheduler: Scheduler, flow_controller: Arc, - ) -> StorageConfigManger { + ) -> Self { StorageConfigManger { kvdb, shared_block_cache, @@ -128,7 +127,7 @@ impl StorageConfigManger { } } -impl ConfigManager for StorageConfigManger { +impl ConfigManager for StorageConfigManger { fn dispatch(&mut self, mut change: ConfigChange) -> CfgResult<()> { if let Some(ConfigValue::Module(mut block_cache)) = change.remove("block_cache") { if !self.shared_block_cache {