Skip to content

Commit

Permalink
Convert more of tikv-server to generic kv engines (tikv#10785)
Browse files Browse the repository at this point in the history
* server: Add EK typaram to TiKVEngines type

Signed-off-by: Brian Anderson <[email protected]>

* server: Add EK typaram to EngineMetricsManager

Signed-off-by: Brian Anderson <[email protected]>

* server: Add EK typaram to ServerType/RaftKvType

Signed-off-by: Brian Anderson <[email protected]>

* server: Add EK typaram to Servers type

Signed-off-by: Brian Anderson <[email protected]>

* server: Replace RocksEngine with typaram in signal_handler

Signed-off-by: Brian Anderson <[email protected]>

* server: Fix signature of wait_for_signal on not(unix)

Signed-off-by: Brian Anderson <[email protected]>

* storage: Replace RocksEngine with typaram in StorageConfigManager

Signed-off-by: Brian Anderson <[email protected]>

* server: Add EK typaram to Node

Signed-off-by: Brian Anderson <[email protected]>

* rustfmt

Signed-off-by: Brian Anderson <[email protected]>

* server: Use impl trait instead of ER typaram in signal_handler

Signed-off-by: Brian Anderson <[email protected]>

* storage: Return Self

Signed-off-by: Brian Anderson <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
brson and ti-chi-bot authored Sep 13, 2021
1 parent d377cfd commit 32b314a
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 58 deletions.
43 changes: 22 additions & 21 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use concurrency_manager::ConcurrencyManager;
use encryption_export::{data_key_manager_from_config, DataKeyManager};
use engine_rocks::{from_rocks_compression_type, get_env, FlowInfo, RocksEngine};
use engine_traits::{
compaction_job::CompactionJobInfo, CFOptionsExt, ColumnFamilyOptions, Engines, MiscExt,
RaftEngine, CF_DEFAULT, CF_LOCK, CF_WRITE,
compaction_job::CompactionJobInfo, CFOptionsExt, ColumnFamilyOptions, Engines, KvEngine,
MiscExt, RaftEngine, CF_DEFAULT, CF_LOCK, CF_WRITE,
};
use error_code::ErrorCodeExt;
use file_system::{
Expand Down Expand Up @@ -180,8 +180,8 @@ struct TiKVServer<ER: RaftEngine> {
store_path: PathBuf,
snap_mgr: Option<SnapManager>, // Will be filled in `init_servers`.
encryption_key_manager: Option<Arc<DataKeyManager>>,
engines: Option<TiKVEngines<ER>>,
servers: Option<Servers<ER>>,
engines: Option<TiKVEngines<RocksEngine, ER>>,
servers: Option<Servers<RocksEngine, ER>>,
region_info_accessor: RegionInfoAccessor,
coprocessor_host: Option<CoprocessorHost<RocksEngine>>,
to_stop: Vec<Box<dyn Stop>>,
Expand All @@ -191,24 +191,24 @@ struct TiKVServer<ER: RaftEngine> {
background_worker: Worker,
}

struct TiKVEngines<ER: RaftEngine> {
engines: Engines<RocksEngine, ER>,
struct TiKVEngines<EK: KvEngine, ER: RaftEngine> {
engines: Engines<EK, ER>,
store_meta: Arc<Mutex<StoreMeta>>,
engine: RaftKv<RocksEngine, ServerRaftStoreRouter<RocksEngine, ER>>,
engine: RaftKv<EK, ServerRaftStoreRouter<EK, ER>>,
}

struct Servers<ER: RaftEngine> {
struct Servers<EK: KvEngine, ER: RaftEngine> {
lock_mgr: LockManager,
server: LocalServer<ER>,
node: Node<RpcClient, ER>,
server: LocalServer<EK, ER>,
node: Node<RpcClient, EK, ER>,
importer: Arc<SSTImporter>,
cdc_scheduler: tikv_util::worker::Scheduler<cdc::Task>,
cdc_memory_quota: MemoryQuota,
}

type LocalServer<ER> =
Server<RaftRouter<RocksEngine, ER>, resolve::PdStoreAddrResolver, LocalRaftKv<ER>>;
type LocalRaftKv<ER> = RaftKv<RocksEngine, ServerRaftStoreRouter<RocksEngine, ER>>;
type LocalServer<EK, ER> =
Server<RaftRouter<EK, ER>, resolve::PdStoreAddrResolver, LocalRaftKv<EK, ER>>;
type LocalRaftKv<EK, ER> = RaftKv<EK, ServerRaftStoreRouter<EK, ER>>;

impl<ER: RaftEngine> TiKVServer<ER> {
fn init(mut config: TiKvConfig) -> TiKVServer<ER> {
Expand Down Expand Up @@ -1054,8 +1054,9 @@ impl<ER: RaftEngine> TiKVServer<ER> {
fetcher: BytesFetcher,
engines_info: Arc<EnginesResourceInfo>,
) {
let mut engine_metrics =
EngineMetricsManager::new(self.engines.as_ref().unwrap().engines.clone());
let mut engine_metrics = EngineMetricsManager::<RocksEngine, ER>::new(
self.engines.as_ref().unwrap().engines.clone(),
);
let mut io_metrics = IOMetricsManager::new(fetcher);
let engines_info_clone = engines_info.clone();
self.background_worker
Expand Down Expand Up @@ -1473,24 +1474,24 @@ impl<T: fmt::Display + Send + 'static> Stop for LazyWorker<T> {
}
}

pub struct EngineMetricsManager<R: RaftEngine> {
engines: Engines<RocksEngine, R>,
pub struct EngineMetricsManager<EK: KvEngine, R: RaftEngine> {
engines: Engines<EK, R>,
last_reset: Instant,
}

impl<R: RaftEngine> EngineMetricsManager<R> {
pub fn new(engines: Engines<RocksEngine, R>) -> Self {
impl<EK: KvEngine, R: RaftEngine> EngineMetricsManager<EK, R> {
pub fn new(engines: Engines<EK, R>) -> Self {
EngineMetricsManager {
engines,
last_reset: Instant::now(),
}
}

pub fn flush(&mut self, now: Instant) {
self.engines.kv.flush_metrics("kv");
KvEngine::flush_metrics(&self.engines.kv, "kv");
self.engines.raft.flush_metrics("raft");
if now.saturating_duration_since(self.last_reset) >= DEFAULT_ENGINE_METRICS_RESET_INTERVAL {
self.engines.kv.reset_statistics();
KvEngine::reset_statistics(&self.engines.kv);
self.engines.raft.reset_statistics();
self.last_reset = now;
}
Expand Down
10 changes: 4 additions & 6 deletions components/server/src/signal_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ pub use self::imp::wait_for_signal;

#[cfg(unix)]
mod imp {
use engine_rocks::RocksEngine;
use engine_traits::{Engines, MiscExt, RaftEngine};
use engine_traits::{Engines, KvEngine, MiscExt, RaftEngine};
use libc::c_int;
use nix::sys::signal::{SIGHUP, SIGINT, SIGTERM, SIGUSR1, SIGUSR2};
use signal::trap::Trap;
use tikv_util::metrics;

#[allow(dead_code)]
pub fn wait_for_signal<ER: RaftEngine>(engines: Option<Engines<RocksEngine, ER>>) {
pub fn wait_for_signal(engines: Option<Engines<impl KvEngine, impl RaftEngine>>) {
let trap = Trap::trap(&[SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2]);
for sig in trap {
match sig {
Expand All @@ -37,8 +36,7 @@ mod imp {

#[cfg(not(unix))]
mod imp {
use engine_rocks::RocksEngine;
use engine_traits::Engines;
use engine_traits::{Engines, KvEngine, RaftEngine};

pub fn wait_for_signal(_: Option<Engines<RocksEngine, RocksEngine>>) {}
pub fn wait_for_signal(_: Option<Engines<impl KvEngine, impl RaftEngine>>) {}
}
7 changes: 5 additions & 2 deletions components/test_raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ type SimulateChannelTransport = SimulateTransport<ChannelTransport>;
pub struct NodeCluster {
trans: ChannelTransport,
pd_client: Arc<TestPdClient>,
nodes: HashMap<u64, Node<TestPdClient, RocksEngine>>,
nodes: HashMap<u64, Node<TestPdClient, RocksEngine, RocksEngine>>,
snap_mgrs: HashMap<u64, SnapManager>,
simulate_trans: HashMap<u64, SimulateChannelTransport>,
concurrency_managers: HashMap<u64, ConcurrencyManager>,
Expand Down Expand Up @@ -183,7 +183,10 @@ impl NodeCluster {
self.post_create_coprocessor_host = Some(op)
}

pub fn get_node(&mut self, node_id: u64) -> Option<&mut Node<TestPdClient, RocksEngine>> {
pub fn get_node(
&mut self,
node_id: u64,
) -> Option<&mut Node<TestPdClient, RocksEngine, RocksEngine>> {
self.nodes.get_mut(&node_id)
}

Expand Down
2 changes: 1 addition & 1 deletion components/test_raftstore/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl StoreAddrResolver for AddressMap {
}

struct ServerMeta {
node: Node<TestPdClient, RocksEngine>,
node: Node<TestPdClient, RocksEngine, RocksEngine>,
server: Server<SimulateStoreTransport, PdStoreAddrResolver, SimulateEngine>,
sim_router: SimulateStoreTransport,
sim_trans: SimulateServerTransport,
Expand Down
40 changes: 20 additions & 20 deletions src/server/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use crate::storage::kv::FlowStatsReporter;
use crate::storage::txn::flow_controller::FlowController;
use crate::storage::{config::Config as StorageConfig, Storage};
use concurrency_manager::ConcurrencyManager;
use engine_rocks::RocksEngine;
use engine_traits::{Engines, KvEngine, Peekable, RaftEngine};
use engine_traits::{Engines, KvEngine, RaftEngine};
use kvproto::metapb;
use kvproto::raft_serverpb::StoreIdent;
use kvproto::replication_modepb::ReplicationStatus;
Expand Down Expand Up @@ -64,32 +63,33 @@ where

/// A wrapper for the raftstore which runs Multi-Raft.
// TODO: we will rename another better name like RaftStore later.
pub struct Node<C: PdClient + 'static, ER: RaftEngine> {
pub struct Node<C: PdClient + 'static, EK: KvEngine, ER: RaftEngine> {
cluster_id: u64,
store: metapb::Store,
store_cfg: Arc<VersionTrack<StoreConfig>>,
system: RaftBatchSystem<RocksEngine, ER>,
system: RaftBatchSystem<EK, ER>,
has_started: bool,

pd_client: Arc<C>,
state: Arc<Mutex<GlobalReplicationState>>,
bg_worker: Worker,
}

impl<C, ER> Node<C, ER>
impl<C, EK, ER> Node<C, EK, ER>
where
C: PdClient,
EK: KvEngine,
ER: RaftEngine,
{
/// Creates a new Node.
pub fn new(
system: RaftBatchSystem<RocksEngine, ER>,
system: RaftBatchSystem<EK, ER>,
cfg: &ServerConfig,
store_cfg: Arc<VersionTrack<StoreConfig>>,
pd_client: Arc<C>,
state: Arc<Mutex<GlobalReplicationState>>,
bg_worker: Worker,
) -> Node<C, ER> {
) -> Node<C, EK, ER> {
let mut store = metapb::Store::default();
store.set_id(INVALID_ID);
if cfg.advertise_addr.is_empty() {
Expand Down Expand Up @@ -138,7 +138,7 @@ where
}
}

pub fn try_bootstrap_store(&mut self, engines: Engines<RocksEngine, ER>) -> Result<()> {
pub fn try_bootstrap_store(&mut self, engines: Engines<EK, ER>) -> Result<()> {
let mut store_id = self.check_store(&engines)?;
if store_id == INVALID_ID {
store_id = self.alloc_id()?;
Expand All @@ -158,12 +158,12 @@ where
#[allow(clippy::too_many_arguments)]
pub fn start<T>(
&mut self,
engines: Engines<RocksEngine, ER>,
engines: Engines<EK, ER>,
trans: T,
snap_mgr: SnapManager,
pd_worker: LazyWorker<PdTask<RocksEngine>>,
pd_worker: LazyWorker<PdTask<EK>>,
store_meta: Arc<Mutex<StoreMeta>>,
coprocessor_host: CoprocessorHost<RocksEngine>,
coprocessor_host: CoprocessorHost<EK>,
importer: Arc<SSTImporter>,
split_check_scheduler: Scheduler<SplitCheckTask>,
auto_split_controller: AutoSplitController,
Expand Down Expand Up @@ -215,17 +215,17 @@ where

/// Gets a transmission end of a channel which is used to send `Msg` to the
/// raftstore.
pub fn get_router(&self) -> RaftRouter<RocksEngine, ER> {
pub fn get_router(&self) -> RaftRouter<EK, ER> {
self.system.router()
}
/// Gets a transmission end of a channel which is used send messages to apply worker.
pub fn get_apply_router(&self) -> ApplyRouter<RocksEngine> {
pub fn get_apply_router(&self) -> ApplyRouter<EK> {
self.system.apply_router()
}

// check store, return store id for the engine.
// If the store is not bootstrapped, use INVALID_ID.
fn check_store(&self, engines: &Engines<RocksEngine, ER>) -> Result<u64> {
fn check_store(&self, engines: &Engines<EK, ER>) -> Result<u64> {
let res = engines.kv.get_msg::<StoreIdent>(keys::STORE_IDENT_KEY)?;
if res.is_none() {
return Ok(INVALID_ID);
Expand Down Expand Up @@ -274,7 +274,7 @@ where
#[doc(hidden)]
pub fn prepare_bootstrap_cluster(
&self,
engines: &Engines<RocksEngine, ER>,
engines: &Engines<EK, ER>,
store_id: u64,
) -> Result<metapb::Region> {
let region_id = self.alloc_id()?;
Expand All @@ -298,7 +298,7 @@ where

fn check_or_prepare_bootstrap_cluster(
&self,
engines: &Engines<RocksEngine, ER>,
engines: &Engines<EK, ER>,
store_id: u64,
) -> Result<Option<metapb::Region>> {
if let Some(first_region) = engines.kv.get_msg(keys::PREPARE_BOOTSTRAP_KEY)? {
Expand All @@ -312,7 +312,7 @@ where

fn bootstrap_cluster(
&mut self,
engines: &Engines<RocksEngine, ER>,
engines: &Engines<EK, ER>,
first_region: metapb::Region,
) -> Result<()> {
let region_id = first_region.get_id();
Expand Down Expand Up @@ -374,12 +374,12 @@ where
fn start_store<T>(
&mut self,
store_id: u64,
engines: Engines<RocksEngine, ER>,
engines: Engines<EK, ER>,
trans: T,
snap_mgr: SnapManager,
pd_worker: LazyWorker<PdTask<RocksEngine>>,
pd_worker: LazyWorker<PdTask<EK>>,
store_meta: Arc<Mutex<StoreMeta>>,
coprocessor_host: CoprocessorHost<RocksEngine>,
coprocessor_host: CoprocessorHost<EK>,
importer: Arc<SSTImporter>,
split_check_scheduler: Scheduler<SplitCheckTask>,
auto_split_controller: AutoSplitController,
Expand Down
15 changes: 7 additions & 8 deletions src/storage/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crate::server::ttl::TTLCheckerTask;
use crate::server::CONFIG_ROCKSDB_GAUGE;
use crate::storage::txn::flow_controller::FlowController;
use engine_rocks::raw::{Cache, LRUCacheOptions, MemoryAllocator};
use engine_rocks::RocksEngine;
use engine_traits::{CFNamesExt, CFOptionsExt, ColumnFamilyOptions, CF_DEFAULT};
use engine_traits::{ColumnFamilyOptions, KvEngine, CF_DEFAULT};
use file_system::{get_io_rate_limiter, IOPriority, IORateLimitMode, IORateLimiter, IOType};
use libc::c_int;
use online_config::{ConfigChange, ConfigManager, ConfigValue, OnlineConfig, Result as CfgResult};
Expand Down Expand Up @@ -105,20 +104,20 @@ impl Config {
}
}

pub struct StorageConfigManger {
kvdb: RocksEngine,
pub struct StorageConfigManger<EK: KvEngine> {
kvdb: EK,
shared_block_cache: bool,
ttl_checker_scheduler: Scheduler<TTLCheckerTask>,
flow_controller: Arc<FlowController>,
}

impl StorageConfigManger {
impl<EK: KvEngine> StorageConfigManger<EK> {
pub fn new(
kvdb: RocksEngine,
kvdb: EK,
shared_block_cache: bool,
ttl_checker_scheduler: Scheduler<TTLCheckerTask>,
flow_controller: Arc<FlowController>,
) -> StorageConfigManger {
) -> Self {
StorageConfigManger {
kvdb,
shared_block_cache,
Expand All @@ -128,7 +127,7 @@ impl StorageConfigManger {
}
}

impl ConfigManager for StorageConfigManger {
impl<EK: KvEngine> ConfigManager for StorageConfigManger<EK> {
fn dispatch(&mut self, mut change: ConfigChange) -> CfgResult<()> {
if let Some(ConfigValue::Module(mut block_cache)) = change.remove("block_cache") {
if !self.shared_block_cache {
Expand Down

0 comments on commit 32b314a

Please sign in to comment.