diff --git a/cmd/src/server.rs b/cmd/src/server.rs index 012c5a72899..32800d601fb 100644 --- a/cmd/src/server.rs +++ b/cmd/src/server.rs @@ -123,7 +123,7 @@ struct TiKVServer { engines: Option, servers: Option, region_info_accessor: RegionInfoAccessor, - coprocessor_host: Option, + coprocessor_host: Option>, to_stop: Vec>, lock_files: Vec, } @@ -131,14 +131,14 @@ struct TiKVServer { struct Engines { engines: engine::Engines, store_meta: Arc>, - engine: RaftKv, - raft_router: ServerRaftStoreRouter, + engine: RaftKv>, + raft_router: ServerRaftStoreRouter, } struct Servers { - pd_sender: FutureScheduler, + pd_sender: FutureScheduler>, lock_mgr: Option, - server: Server, + server: Server, resolve::PdStoreAddrResolver>, node: Node, importer: Arc, cdc_scheduler: tikv_util::worker::Scheduler, @@ -436,7 +436,7 @@ impl TiKVServer { }); } - fn init_gc_worker(&mut self) -> GcWorker> { + fn init_gc_worker(&mut self) -> GcWorker>> { let engines = self.engines.as_ref().unwrap(); let mut gc_worker = GcWorker::new( engines.engine.clone(), @@ -457,7 +457,7 @@ impl TiKVServer { fn init_servers( &mut self, - gc_worker: &GcWorker>, + gc_worker: &GcWorker>>, ) -> Arc { let mut cfg_controller = self.cfg_controller.take().unwrap(); cfg_controller.register( @@ -656,7 +656,10 @@ impl TiKVServer { server_config } - fn register_services(&mut self, gc_worker: GcWorker>) { + fn register_services( + &mut self, + gc_worker: GcWorker>>, + ) { let servers = self.servers.as_mut().unwrap(); let engines = self.engines.as_ref().unwrap(); diff --git a/components/cdc/src/endpoint.rs b/components/cdc/src/endpoint.rs index 8b2118fd980..dc33c9d3acc 100644 --- a/components/cdc/src/endpoint.rs +++ b/components/cdc/src/endpoint.rs @@ -205,7 +205,7 @@ pub struct Endpoint { min_ts_region_id: u64, } -impl Endpoint { +impl> Endpoint { pub fn new( pd_client: Arc, scheduler: Scheduler, @@ -774,7 +774,7 @@ impl Initializer { } } -impl Runnable for Endpoint { +impl> Runnable for Endpoint { fn run(&mut self, task: Task) { debug!("run cdc task"; "task" => %task); match task { @@ -808,7 +808,7 @@ impl Runnable for Endpoint { } } -impl RunnableWithTimer for Endpoint { +impl> RunnableWithTimer for Endpoint { fn on_timeout(&mut self, timer: &mut Timer<()>, _: ()) { CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64); if self.min_resolved_ts != TimeStamp::max() { diff --git a/components/cdc/src/observer.rs b/components/cdc/src/observer.rs index a0a9597b916..5fb3ff65013 100644 --- a/components/cdc/src/observer.rs +++ b/components/cdc/src/observer.rs @@ -3,6 +3,7 @@ use std::cell::RefCell; use std::sync::{Arc, RwLock}; +use engine_rocks::RocksEngine; use raft::StateRole; use raftstore::coprocessor::*; use raftstore::store::fsm::ObserveID; @@ -40,7 +41,7 @@ impl CdcObserver { } } - pub fn register_to(&self, coprocessor_host: &mut CoprocessorHost) { + pub fn register_to(&self, coprocessor_host: &mut CoprocessorHost) { // 100 is the priority of the observer. CDC should have a high priority. coprocessor_host .registry diff --git a/components/cdc/tests/mod.rs b/components/cdc/tests/mod.rs index c329d852c5a..c4cd5f37daa 100644 --- a/components/cdc/tests/mod.rs +++ b/components/cdc/tests/mod.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::sync::*; use std::time::Duration; +use engine_rocks::RocksEngine; use futures::{Future, Stream}; use grpcio::{ChannelBuilder, Environment}; use grpcio::{ClientDuplexReceiver, ClientDuplexSender, ClientUnaryReceiver}; @@ -113,7 +114,7 @@ impl TestSuite { let cdc_ob = cdc::CdcObserver::new(scheduler.clone()); obs.insert(id, cdc_ob.clone()); sim.coprocessor_hooks.entry(id).or_default().push(Box::new( - move |host: &mut CoprocessorHost| { + move |host: &mut CoprocessorHost| { cdc_ob.register_to(host); }, )); diff --git a/components/raftstore/src/coprocessor/dispatcher.rs b/components/raftstore/src/coprocessor/dispatcher.rs index 809d0c604bf..0a6aeae6970 100644 --- a/components/raftstore/src/coprocessor/dispatcher.rs +++ b/components/raftstore/src/coprocessor/dispatcher.rs @@ -1,6 +1,5 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use engine_rocks::RocksEngine; use engine_traits::{CfName, KvEngine}; use kvproto::metapb::Region; use kvproto::pdpb::CheckPolicy; @@ -151,7 +150,7 @@ impl_box_observer!(BoxCmdObserver, CmdObserver, WrappedCmdObserver); #[derive(Clone)] pub struct Registry where - E: KvEngine, + E: 'static, { admin_observers: Vec>, query_observers: Vec>, @@ -163,10 +162,7 @@ where // TODO: add endpoint } -impl Default for Registry -where - E: KvEngine, -{ +impl Default for Registry { fn default() -> Registry { Registry { admin_observers: Default::default(), @@ -193,10 +189,7 @@ macro_rules! push { }; } -impl Registry -where - E: KvEngine, -{ +impl Registry { pub fn register_admin_observer(&mut self, priority: u32, ao: BoxAdminObserver) { push!(priority, ao, self.admin_observers); } @@ -275,13 +268,30 @@ macro_rules! loop_ob { } /// Admin and invoke all coprocessors. -#[derive(Default, Clone)] -pub struct CoprocessorHost { - pub registry: Registry, +#[derive(Clone)] +pub struct CoprocessorHost +where + E: 'static, +{ + pub registry: Registry, } -impl CoprocessorHost { - pub fn new + Clone + Send + 'static>(ch: C) -> CoprocessorHost { +impl Default for CoprocessorHost +where + E: 'static, +{ + fn default() -> Self { + CoprocessorHost { + registry: Default::default(), + } + } +} + +impl CoprocessorHost +where + E: KvEngine, +{ + pub fn new + Clone + Send + 'static>(ch: C) -> CoprocessorHost { let mut registry = Registry::default(); registry.register_split_check_observer( 200, @@ -396,10 +406,10 @@ impl CoprocessorHost { &self, cfg: &'a Config, region: &Region, - engine: &RocksEngine, + engine: &E, auto_split: bool, policy: CheckPolicy, - ) -> SplitCheckerHost<'a, RocksEngine> { + ) -> SplitCheckerHost<'a, E> { let mut host = SplitCheckerHost::new(auto_split, cfg); loop_ob!( region, @@ -486,6 +496,7 @@ mod tests { use std::sync::atomic::*; use std::sync::Arc; + use engine_rocks::RocksEngine; use kvproto::metapb::Region; use kvproto::raft_cmdpb::{ AdminRequest, AdminResponse, RaftCmdRequest, RaftCmdResponse, Request, Response, @@ -616,7 +627,7 @@ mod tests { #[test] fn test_trigger_right_hook() { - let mut host = CoprocessorHost::default(); + let mut host = CoprocessorHost::::default(); let ob = TestCoprocessor::default(); host.registry .register_admin_observer(1, BoxAdminObserver::new(ob.clone())); @@ -678,7 +689,7 @@ mod tests { #[test] fn test_order() { - let mut host = CoprocessorHost::default(); + let mut host = CoprocessorHost::::default(); let ob1 = TestCoprocessor::default(); host.registry diff --git a/components/raftstore/src/coprocessor/region_info_accessor.rs b/components/raftstore/src/coprocessor/region_info_accessor.rs index f0da0322b63..975141ec6b1 100644 --- a/components/raftstore/src/coprocessor/region_info_accessor.rs +++ b/components/raftstore/src/coprocessor/region_info_accessor.rs @@ -11,6 +11,7 @@ use super::{ BoxRegionChangeObserver, BoxRoleObserver, Coprocessor, CoprocessorHost, ObserverContext, RegionChangeEvent, RegionChangeObserver, Result, RoleObserver, }; +use engine_rocks::RocksEngine; use keys::{data_end_key, data_key}; use kvproto::metapb::Region; use raft::StateRole; @@ -142,7 +143,7 @@ impl RoleObserver for RegionEventListener { /// Creates an `RegionEventListener` and register it to given coprocessor host. fn register_region_event_listener( - host: &mut CoprocessorHost, + host: &mut CoprocessorHost, scheduler: Scheduler, ) { let listener = RegionEventListener { scheduler }; @@ -457,7 +458,7 @@ impl RegionInfoAccessor { /// Creates a new `RegionInfoAccessor` and register to `host`. /// `RegionInfoAccessor` doesn't need, and should not be created more than once. If it's needed /// in different places, just clone it, and their contents are shared. - pub fn new(host: &mut CoprocessorHost) -> Self { + pub fn new(host: &mut CoprocessorHost) -> Self { let worker = WorkerBuilder::new("region-collector-worker").create(); let scheduler = worker.scheduler(); diff --git a/components/raftstore/src/router.rs b/components/raftstore/src/router.rs index 28e0caa6bfa..71488f1ef1b 100644 --- a/components/raftstore/src/router.rs +++ b/components/raftstore/src/router.rs @@ -9,16 +9,19 @@ use crate::store::{ Callback, CasualMessage, LocalReader, PeerMsg, RaftCommand, SignificantMsg, StoreMsg, }; use crate::{DiscardReason, Error as RaftStoreError, Result as RaftStoreResult}; -use engine_rocks::RocksEngine; +use engine_traits::KvEngine; use raft::SnapshotStatus; /// Routes messages to the raftstore. -pub trait RaftStoreRouter: Send + Clone { +pub trait RaftStoreRouter: Send + Clone +where + E: KvEngine, +{ /// Sends RaftMessage to local store. fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()>; /// Sends RaftCmdRequest to local store. - fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> RaftStoreResult<()>; + fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> RaftStoreResult<()>; /// Sends a significant message. We should guarantee that the message can't be dropped. fn significant_send(&self, region_id: u64, msg: SignificantMsg) -> RaftStoreResult<()>; @@ -53,20 +56,23 @@ pub trait RaftStoreRouter: Send + Clone { ) } - fn casual_send(&self, region_id: u64, msg: CasualMessage) -> RaftStoreResult<()>; + fn casual_send(&self, region_id: u64, msg: CasualMessage) -> RaftStoreResult<()>; } #[derive(Clone)] pub struct RaftStoreBlackHole; -impl RaftStoreRouter for RaftStoreBlackHole { +impl RaftStoreRouter for RaftStoreBlackHole +where + E: KvEngine, +{ /// Sends RaftMessage to local store. fn send_raft_msg(&self, _: RaftMessage) -> RaftStoreResult<()> { Ok(()) } /// Sends RaftCmdRequest to local store. - fn send_command(&self, _: RaftCmdRequest, _: Callback) -> RaftStoreResult<()> { + fn send_command(&self, _: RaftCmdRequest, _: Callback) -> RaftStoreResult<()> { Ok(()) } @@ -77,24 +83,30 @@ impl RaftStoreRouter for RaftStoreBlackHole { fn broadcast_unreachable(&self, _: u64) {} - fn casual_send(&self, _: u64, _: CasualMessage) -> RaftStoreResult<()> { + fn casual_send(&self, _: u64, _: CasualMessage) -> RaftStoreResult<()> { Ok(()) } } /// A router that routes messages to the raftstore #[derive(Clone)] -pub struct ServerRaftStoreRouter { - router: RaftRouter, - local_reader: LocalReader, RocksEngine>, +pub struct ServerRaftStoreRouter +where + E: KvEngine, +{ + router: RaftRouter, + local_reader: LocalReader, E>, } -impl ServerRaftStoreRouter { +impl ServerRaftStoreRouter +where + E: KvEngine, +{ /// Creates a new router. pub fn new( - router: RaftRouter, - local_reader: LocalReader, RocksEngine>, - ) -> ServerRaftStoreRouter { + router: RaftRouter, + local_reader: LocalReader, E>, + ) -> ServerRaftStoreRouter { ServerRaftStoreRouter { router, local_reader, @@ -119,7 +131,10 @@ pub fn handle_send_error(region_id: u64, e: TrySendError) -> RaftStoreErro } } -impl RaftStoreRouter for ServerRaftStoreRouter { +impl RaftStoreRouter for ServerRaftStoreRouter +where + E: KvEngine, +{ fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()> { let region_id = msg.get_region_id(); self.router @@ -127,9 +142,9 @@ impl RaftStoreRouter for ServerRaftStoreRouter { .map_err(|e| handle_send_error(region_id, e)) } - fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> RaftStoreResult<()> { + fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> RaftStoreResult<()> { let cmd = RaftCommand::new(req, cb); - if LocalReader::, RocksEngine>::acceptable(&cmd.request) { + if LocalReader::, E>::acceptable(&cmd.request) { self.local_reader.execute_raft_command(cmd); Ok(()) } else { @@ -153,7 +168,7 @@ impl RaftStoreRouter for ServerRaftStoreRouter { Ok(()) } - fn casual_send(&self, region_id: u64, msg: CasualMessage) -> RaftStoreResult<()> { + fn casual_send(&self, region_id: u64, msg: CasualMessage) -> RaftStoreResult<()> { self.router .send(region_id, PeerMsg::CasualMessage(msg)) .map_err(|e| handle_send_error(region_id, e)) diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs index b682f988718..e7f00f8dd12 100644 --- a/components/raftstore/src/store/fsm/apply.rs +++ b/components/raftstore/src/store/fsm/apply.rs @@ -15,8 +15,8 @@ use std::{cmp, usize}; use batch_system::{BasicMailbox, BatchRouter, BatchSystem, Fsm, HandlerBuilder, PollHandler}; use crossbeam::channel::{TryRecvError, TrySendError}; -use engine_rocks::{RocksEngine, RocksSnapshot}; -use engine_traits::{KvEngine, MiscExt, Peekable, Snapshot, WriteBatch, WriteBatchVecExt}; +use engine_rocks::RocksEngine; +use engine_traits::{KvEngine, Snapshot, WriteBatch, WriteBatchVecExt}; use engine_traits::{ALL_CFS, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; use kvproto::import_sstpb::SstMeta; use kvproto::metapb::{Peer as PeerMeta, Region, RegionEpoch}; @@ -58,14 +58,20 @@ const WRITE_BATCH_LIMIT: usize = 16; const APPLY_WB_SHRINK_SIZE: usize = 1024 * 1024; const SHRINK_PENDING_CMD_QUEUE_CAP: usize = 64; -pub struct PendingCmd { +pub struct PendingCmd +where + E: KvEngine, +{ pub index: u64, pub term: u64, - pub cb: Option>, + pub cb: Option>, } -impl PendingCmd { - fn new(index: u64, term: u64, cb: Callback) -> PendingCmd { +impl PendingCmd +where + E: KvEngine, +{ + fn new(index: u64, term: u64, cb: Callback) -> PendingCmd { PendingCmd { index, term, @@ -74,7 +80,10 @@ impl PendingCmd { } } -impl Drop for PendingCmd { +impl Drop for PendingCmd +where + E: KvEngine, +{ fn drop(&mut self) { if self.cb.is_some() { safe_panic!( @@ -86,7 +95,10 @@ impl Drop for PendingCmd { } } -impl Debug for PendingCmd { +impl Debug for PendingCmd +where + E: KvEngine, +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!( f, @@ -99,14 +111,27 @@ impl Debug for PendingCmd { } /// Commands waiting to be committed and applied. -#[derive(Default, Debug)] -pub struct PendingCmdQueue { - normals: VecDeque, - conf_change: Option, +#[derive(Debug)] +pub struct PendingCmdQueue +where + E: KvEngine, +{ + normals: VecDeque>, + conf_change: Option>, } -impl PendingCmdQueue { - fn pop_normal(&mut self, index: u64, term: u64) -> Option { +impl PendingCmdQueue +where + E: KvEngine, +{ + fn new() -> PendingCmdQueue { + PendingCmdQueue { + normals: VecDeque::new(), + conf_change: None, + } + } + + fn pop_normal(&mut self, index: u64, term: u64) -> Option> { self.normals.pop_front().and_then(|cmd| { if self.normals.capacity() > SHRINK_PENDING_CMD_QUEUE_CAP && self.normals.len() < SHRINK_PENDING_CMD_QUEUE_CAP @@ -121,18 +146,18 @@ impl PendingCmdQueue { }) } - fn append_normal(&mut self, cmd: PendingCmd) { + fn append_normal(&mut self, cmd: PendingCmd) { self.normals.push_back(cmd); } - fn take_conf_change(&mut self) -> Option { + fn take_conf_change(&mut self) -> Option> { // conf change will not be affected when changing between follower and leader, // so there is no need to check term. self.conf_change.take() } // TODO: seems we don't need to separate conf change from normal entries. - fn set_conf_change(&mut self, cmd: PendingCmd) { + fn set_conf_change(&mut self, cmd: PendingCmd) { self.conf_change = Some(cmd); } } @@ -163,7 +188,10 @@ impl Range { } #[derive(Debug)] -pub enum ExecResult { +pub enum ExecResult +where + E: KvEngine, +{ ChangePeer(ChangePeer), CompactLog { state: RaftTruncatedState, @@ -188,7 +216,8 @@ pub enum ExecResult { ComputeHash { region: Region, index: u64, - snap: RocksSnapshot, + snap: E::Snapshot, + _phantom: PhantomData, }, VerifyHash { index: u64, @@ -203,11 +232,14 @@ pub enum ExecResult { } /// The possible returned value when applying logs. -pub enum ApplyResult { +pub enum ApplyResult +where + E: KvEngine, +{ None, Yield, /// Additional result that needs to be sent back to raftstore. - Res(ExecResult), + Res(ExecResult), /// It is unable to apply the `CommitMerge` until the source peer /// has applied to the required position and sets the atomic boolean /// to true. @@ -230,18 +262,24 @@ impl ExecContext { } } -struct ApplyCallback { +struct ApplyCallback +where + E: KvEngine, +{ region: Region, - cbs: Vec<(Option>, RaftCmdResponse)>, + cbs: Vec<(Option>, RaftCmdResponse)>, } -impl ApplyCallback { - fn new(region: Region) -> ApplyCallback { +impl ApplyCallback +where + E: KvEngine, +{ + fn new(region: Region) -> ApplyCallback { let cbs = vec![]; ApplyCallback { region, cbs } } - fn invoke_all(self, host: &CoprocessorHost) { + fn invoke_all(self, host: &CoprocessorHost) { for (cb, mut resp) in self.cbs { host.post_apply(&self.region, &mut resp); if let Some(cb) = cb { @@ -250,20 +288,26 @@ impl ApplyCallback { } } - fn push(&mut self, cb: Option>, resp: RaftCmdResponse) { + fn push(&mut self, cb: Option>, resp: RaftCmdResponse) { self.cbs.push((cb, resp)); } } #[derive(Clone)] -pub enum Notifier { - Router(RaftRouter), +pub enum Notifier +where + E: KvEngine, +{ + Router(RaftRouter), #[cfg(test)] - Sender(Sender>), + Sender(Sender>), } -impl Notifier { - fn notify(&self, region_id: u64, msg: PeerMsg) { +impl Notifier +where + E: KvEngine, +{ + fn notify(&self, region_id: u64, msg: PeerMsg) { match *self { Notifier::Router(ref r) => { r.force_send(region_id, msg).unwrap(); @@ -274,17 +318,21 @@ impl Notifier { } } -struct ApplyContext> { +struct ApplyContext +where + E: KvEngine, + W: WriteBatch + WriteBatchVecExt, +{ tag: String, timer: Option, - host: CoprocessorHost, + host: CoprocessorHost, importer: Arc, - region_scheduler: Scheduler, + region_scheduler: Scheduler>, router: ApplyRouter, - notifier: Notifier, - engine: RocksEngine, - cbs: MustConsumeVec, - apply_res: Vec, + notifier: Notifier, + engine: E, + cbs: MustConsumeVec>, + apply_res: Vec>, exec_ctx: Option, kv_wb: Option, @@ -302,18 +350,22 @@ struct ApplyContext> { use_delete_range: bool, } -impl> ApplyContext { +impl ApplyContext +where + E: KvEngine, + W: WriteBatch + WriteBatchVecExt, +{ pub fn new( tag: String, - host: CoprocessorHost, + host: CoprocessorHost, importer: Arc, - region_scheduler: Scheduler, - engine: RocksEngine, + region_scheduler: Scheduler>, + engine: E, router: ApplyRouter, - notifier: Notifier, + notifier: Notifier, cfg: &Config, - ) -> ApplyContext { - ApplyContext:: { + ) -> ApplyContext { + ApplyContext:: { tag, timer: None, host, @@ -341,7 +393,7 @@ impl> ApplyContext { /// A general apply progress for a delegate is: /// `prepare_for` -> `commit` [-> `commit` ...] -> `finish_for`. /// After all delegates are handled, `write_to_db` method should be called. - pub fn prepare_for(&mut self, delegate: &mut ApplyDelegate) { + pub fn prepare_for(&mut self, delegate: &mut ApplyDelegate) { self.prepare_write_batch(); self.cbs.push(ApplyCallback::new(delegate.region.clone())); self.last_applied_index = delegate.apply_state.get_applied_index(); @@ -375,7 +427,7 @@ impl> ApplyContext { /// write the changes into rocksdb. /// /// This call is valid only when it's between a `prepare_for` and `finish_for`. - pub fn commit(&mut self, delegate: &mut ApplyDelegate) { + pub fn commit(&mut self, delegate: &mut ApplyDelegate) { if self.last_applied_index < delegate.apply_state.get_applied_index() { delegate.write_apply_state(self.kv_wb.as_mut().unwrap()); } @@ -384,7 +436,7 @@ impl> ApplyContext { self.commit_opt(delegate, true); } - fn commit_opt(&mut self, delegate: &mut ApplyDelegate, persistent: bool) { + fn commit_opt(&mut self, delegate: &mut ApplyDelegate, persistent: bool) { delegate.update_metrics(self); if persistent { self.write_to_db(); @@ -430,7 +482,11 @@ impl> ApplyContext { } /// Finishes `Apply`s for the delegate. - pub fn finish_for(&mut self, delegate: &mut ApplyDelegate, results: VecDeque) { + pub fn finish_for( + &mut self, + delegate: &mut ApplyDelegate, + results: VecDeque>, + ) { if !delegate.pending_remove { delegate.write_apply_state(self.kv_wb.as_mut().unwrap()); } @@ -504,7 +560,7 @@ impl> ApplyContext { } /// Calls the callback of `cmd` when the Region is removed. -fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) { +fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) { debug!( "region is removed, notify commands"; "region_id" => region_id, @@ -515,14 +571,19 @@ fn notify_region_removed(region_id: u64, peer_id: u64, mut cmd: PendingCmd) { notify_req_region_removed(region_id, cmd.cb.take().unwrap()); } -pub fn notify_req_region_removed(region_id: u64, cb: Callback) { +pub fn notify_req_region_removed(region_id: u64, cb: Callback) { let region_not_found = Error::RegionNotFound(region_id); let resp = cmd_resp::new_error(region_not_found); cb.invoke_with_response(resp); } /// Calls the callback of `cmd` when it can not be processed further. -fn notify_stale_command(region_id: u64, peer_id: u64, term: u64, mut cmd: PendingCmd) { +fn notify_stale_command( + region_id: u64, + peer_id: u64, + term: u64, + mut cmd: PendingCmd, +) { info!( "command is stale, skip"; "region_id" => region_id, @@ -533,7 +594,7 @@ fn notify_stale_command(region_id: u64, peer_id: u64, term: u64, mut cmd: Pendin notify_stale_req(term, cmd.cb.take().unwrap()); } -pub fn notify_stale_req(term: u64, cb: Callback) { +pub fn notify_stale_req(term: u64, cb: Callback) { let resp = cmd_resp::err_resp(Error::StaleCommand, term); cb.invoke_with_response(resp); } @@ -599,17 +660,23 @@ struct WaitSourceMergeState { logs_up_to_date: Arc, } -struct YieldState { +struct YieldState +where + E: KvEngine, +{ /// All of the entries that need to continue to be applied after /// the source peer has applied its logs. pending_entries: Vec, /// All of messages that need to continue to be handled after /// the source peer has applied its logs and pending entries /// are all handled. - pending_msgs: Vec, + pending_msgs: Vec>, } -impl Debug for YieldState { +impl Debug for YieldState +where + E: KvEngine, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("YieldState") .field("pending_entries", &self.pending_entries.len()) @@ -640,7 +707,10 @@ impl Debug for WaitSourceMergeState { /// located at this store, and it will get the corresponding apply delegate to /// handle the apply task to make the code logic more clear. #[derive(Debug)] -pub struct ApplyDelegate { +pub struct ApplyDelegate +where + E: KvEngine, +{ /// The ID of the peer. id: u64, /// The term of the Region. @@ -659,7 +729,7 @@ pub struct ApplyDelegate { pending_remove: bool, /// The commands waiting to be committed and applied - pending_cmds: PendingCmdQueue, + pending_cmds: PendingCmdQueue, /// The counter of pending request snapshots. See more in `Peer`. pending_request_snapshot_count: Arc, @@ -667,7 +737,7 @@ pub struct ApplyDelegate { is_merging: bool, /// Records the epoch version after the last merge. last_merge_version: u64, - yield_state: Option, + yield_state: Option>, /// A temporary state that keeps track of the progress of the source peer state when /// CommitMerge is unable to be executed. wait_merge_state: Option, @@ -692,8 +762,11 @@ pub struct ApplyDelegate { metrics: ApplyMetrics, } -impl ApplyDelegate { - fn from_registration(reg: Registration) -> ApplyDelegate { +impl ApplyDelegate +where + E: KvEngine, +{ + fn from_registration(reg: Registration) -> ApplyDelegate { ApplyDelegate { id: reg.id, tag: format!("[region {}] {}", reg.region.get_id(), reg.id), @@ -709,7 +782,7 @@ impl ApplyDelegate { yield_state: None, wait_merge_state: None, is_merging: reg.is_merging, - pending_cmds: Default::default(), + pending_cmds: PendingCmdQueue::new(), metrics: Default::default(), last_merge_version: 0, pending_request_snapshot_count: reg.pending_request_snapshot_count, @@ -726,9 +799,9 @@ impl ApplyDelegate { } /// Handles all the committed_entries, namely, applies the committed entries. - fn handle_raft_committed_entries>( + fn handle_raft_committed_entries>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, mut committed_entries: Vec, ) { if committed_entries.is_empty() { @@ -803,15 +876,15 @@ impl ApplyDelegate { } } - fn update_metrics>( + fn update_metrics>( &mut self, - apply_ctx: &ApplyContext, + apply_ctx: &ApplyContext, ) { self.metrics.written_bytes += apply_ctx.delta_bytes(); self.metrics.written_keys += apply_ctx.delta_keys(); } - fn write_apply_state>(&self, wb: &mut W) { + fn write_apply_state>(&self, wb: &mut W) { wb.put_msg_cf( CF_RAFT, &keys::apply_state_key(self.region.get_id()), @@ -825,11 +898,11 @@ impl ApplyDelegate { }); } - fn handle_raft_entry_normal>( + fn handle_raft_entry_normal>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, entry: &Entry, - ) -> ApplyResult { + ) -> ApplyResult { fail_point!("yield_apply_1000", self.region_id() == 1000, |_| { ApplyResult::Yield }); @@ -871,11 +944,11 @@ impl ApplyDelegate { ApplyResult::None } - fn handle_raft_entry_conf_change>( + fn handle_raft_entry_conf_change>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, entry: &Entry, - ) -> ApplyResult { + ) -> ApplyResult { // Although conf change can't yield in normal case, it is convenient to // simulate yield before applying a conf change log. fail_point!("yield_apply_conf_change_3", self.id() == 3, |_| { @@ -905,12 +978,7 @@ impl ApplyDelegate { } } - fn find_cb( - &mut self, - index: u64, - term: u64, - is_conf_change: bool, - ) -> Option> { + fn find_cb(&mut self, index: u64, term: u64, is_conf_change: bool) -> Option> { let (region_id, peer_id) = (self.region_id(), self.id()); if is_conf_change { if let Some(mut cmd) = self.pending_cmds.take_conf_change() { @@ -941,13 +1009,13 @@ impl ApplyDelegate { None } - fn process_raft_cmd>( + fn process_raft_cmd>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, index: u64, term: u64, cmd: RaftCmdRequest, - ) -> ApplyResult { + ) -> ApplyResult { if index == 0 { panic!( "{} processing raft command needs a none zero index", @@ -996,13 +1064,13 @@ impl ApplyDelegate { /// 2. it encounters an error that may not occur on all stores, in this case /// we should try to apply the entry again or panic. Considering that this /// usually due to disk operation fail, which is rare, so just panic is ok. - fn apply_raft_cmd>( + fn apply_raft_cmd>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, index: u64, term: u64, req: &RaftCmdRequest, - ) -> (RaftCmdResponse, ApplyResult) { + ) -> (RaftCmdResponse, ApplyResult) { // if pending remove, apply should be aborted already. assert!(!self.pending_remove); @@ -1076,10 +1144,7 @@ impl ApplyDelegate { (resp, exec_result) } - fn destroy>( - &mut self, - apply_ctx: &mut ApplyContext, - ) { + fn destroy>(&mut self, apply_ctx: &mut ApplyContext) { self.stopped = true; apply_ctx.router.close(self.region_id()); for cmd in self.pending_cmds.normals.drain(..) { @@ -1105,13 +1170,16 @@ impl ApplyDelegate { } } -impl ApplyDelegate { +impl ApplyDelegate +where + E: KvEngine, +{ // Only errors that will also occur on all other stores should be returned. - fn exec_raft_cmd>( + fn exec_raft_cmd>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &RaftCmdRequest, - ) -> Result<(RaftCmdResponse, ApplyResult)> { + ) -> Result<(RaftCmdResponse, ApplyResult)> { // Include region for epoch not match after merge may cause key not in range. let include_region = req.get_header().get_region_epoch().get_version() >= self.last_merge_version; @@ -1123,11 +1191,11 @@ impl ApplyDelegate { } } - fn exec_admin_cmd>( + fn exec_admin_cmd>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &RaftCmdRequest, - ) -> Result<(RaftCmdResponse, ApplyResult)> { + ) -> Result<(RaftCmdResponse, ApplyResult)> { let request = req.get_admin_request(); let cmd_type = request.get_cmd_type(); if cmd_type != AdminCmdType::CompactLog && cmd_type != AdminCmdType::CommitMerge { @@ -1166,11 +1234,11 @@ impl ApplyDelegate { Ok((resp, exec_result)) } - fn exec_write_cmd>( + fn exec_write_cmd>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &RaftCmdRequest, - ) -> Result<(RaftCmdResponse, ApplyResult)> { + ) -> Result<(RaftCmdResponse, ApplyResult)> { fail_point!( "on_apply_write_cmd", cfg!(release) || self.id() == 3, @@ -1239,7 +1307,10 @@ impl ApplyDelegate { } // Write commands related. -impl ApplyDelegate { +impl ApplyDelegate +where + E: KvEngine, +{ fn handle_put(&mut self, wb: &mut W, req: &Request) -> Result { let (key, value) = (req.get_put().get_key(), req.get_put().get_value()); // region key range has no data prefix, so we must use origin key to check. @@ -1325,7 +1396,7 @@ impl ApplyDelegate { fn handle_delete_range( &mut self, - engine: &RocksEngine, + engine: &E, req: &Request, ranges: &mut Vec, use_delete_range: bool, @@ -1397,7 +1468,7 @@ impl ApplyDelegate { fn handle_ingest_sst( &mut self, importer: &Arc, - engine: &RocksEngine, + engine: &E, req: &Request, ssts: &mut Vec, ) -> Result { @@ -1429,12 +1500,15 @@ impl ApplyDelegate { } // Admin commands related. -impl ApplyDelegate { - fn exec_change_peer>( +impl ApplyDelegate +where + E: KvEngine, +{ + fn exec_change_peer>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, request: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { let request = request.get_change_peer(); let peer = request.get_peer(); let store_id = peer.get_store_id(); @@ -1629,11 +1703,11 @@ impl ApplyDelegate { )) } - fn exec_split>( + fn exec_split>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { info!( "split is deprecated, redirect to use batch split"; "region_id" => self.region_id(), @@ -1651,11 +1725,11 @@ impl ApplyDelegate { self.exec_batch_split(ctx, &admin_req) } - fn exec_batch_split>( + fn exec_batch_split>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { fail_point!( "apply_before_split_1_3", { self.id == 3 && self.region_id() == 1 }, @@ -1760,11 +1834,11 @@ impl ApplyDelegate { )) } - fn exec_prepare_merge>( + fn exec_prepare_merge>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { fail_point!("apply_before_prepare_merge"); PEER_ADMIN_CMD_COUNTER.prepare_merge.all.inc(); @@ -1831,11 +1905,11 @@ impl ApplyDelegate { // 7. resume `exec_commit_merge` in target apply fsm // 8. `on_ready_commit_merge` in target peer fsm and send `MergeResult` to source peer fsm // 9. `on_merge_result` in source peer fsm (destroy itself) - fn exec_commit_merge>( + fn exec_commit_merge>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { { let apply_before_commit_merge = || { fail_point!( @@ -1964,11 +2038,11 @@ impl ApplyDelegate { )) } - fn exec_rollback_merge>( + fn exec_rollback_merge>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { PEER_ADMIN_CMD_COUNTER.rollback_merge.all.inc(); let region_state_key = keys::region_state_key(self.region_id()); let state: RegionLocalState = match ctx.engine.get_msg_cf(CF_RAFT, ®ion_state_key) { @@ -2006,11 +2080,11 @@ impl ApplyDelegate { )) } - fn exec_compact_log>( + fn exec_compact_log>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { PEER_ADMIN_CMD_COUNTER.compact.all.inc(); let compact_index = req.get_compact_log().get_compact_index(); @@ -2066,11 +2140,11 @@ impl ApplyDelegate { )) } - fn exec_compute_hash>( + fn exec_compute_hash>( &self, - ctx: &ApplyContext, + ctx: &ApplyContext, _: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { let resp = AdminResponse::default(); Ok(( resp, @@ -2082,15 +2156,16 @@ impl ApplyDelegate { // TODO: figure out another way to do consistency check without snapshot // or short life snapshot. snap: ctx.engine.snapshot(), + _phantom: PhantomData, }), )) } - fn exec_verify_hash>( + fn exec_verify_hash>( &self, - _: &ApplyContext, + _: &ApplyContext, req: &AdminRequest, - ) -> Result<(AdminResponse, ApplyResult)> { + ) -> Result<(AdminResponse, ApplyResult)> { let verify_req = req.get_verify_hash(); let index = verify_req.get_index(); let hash = verify_req.get_hash().to_vec(); @@ -2229,15 +2304,21 @@ impl Registration { } } -pub struct Proposal { +pub struct Proposal +where + E: KvEngine, +{ is_conf_change: bool, index: u64, term: u64, - pub cb: Callback, + pub cb: Callback, } -impl Proposal { - pub fn new(is_conf_change: bool, index: u64, term: u64, cb: Callback) -> Proposal { +impl Proposal +where + E: KvEngine, +{ + pub fn new(is_conf_change: bool, index: u64, term: u64, cb: Callback) -> Proposal { Proposal { is_conf_change, index, @@ -2247,14 +2328,20 @@ impl Proposal { } } -pub struct RegionProposal { +pub struct RegionProposal +where + E: KvEngine, +{ pub id: u64, pub region_id: u64, - pub props: Vec, + pub props: Vec>, } -impl RegionProposal { - pub fn new(id: u64, region_id: u64, props: Vec) -> RegionProposal { +impl RegionProposal +where + E: KvEngine, +{ + pub fn new(id: u64, region_id: u64, props: Vec>) -> RegionProposal { RegionProposal { id, region_id, @@ -2308,13 +2395,16 @@ impl GenSnapTask { self.commit_index } - pub fn generate_and_schedule_snapshot( + pub fn generate_and_schedule_snapshot( self, - kv_snap: RocksSnapshot, + kv_snap: E::Snapshot, last_applied_index_term: u64, last_applied_state: RaftApplyState, - region_sched: &Scheduler, - ) -> Result<()> { + region_sched: &Scheduler>, + ) -> Result<()> + where + E: KvEngine, + { let snapshot = RegionTask::Gen { region_id: self.region_id, notifier: self.snap_notifier, @@ -2374,13 +2464,16 @@ pub enum ChangeCmd { }, } -pub enum Msg { +pub enum Msg +where + E: KvEngine, +{ Apply { start: Instant, apply: Apply, }, Registration(Registration), - Proposal(RegionProposal), + Proposal(RegionProposal), LogsUpToDate(CatchUpLogs), Noop, Destroy(Destroy), @@ -2388,25 +2481,29 @@ pub enum Msg { Change { cmd: ChangeCmd, region_epoch: RegionEpoch, - cb: Callback, + cb: Callback, }, #[cfg(any(test, feature = "testexport"))] - Validate(u64, Box), + #[allow(clippy::type_complexity)] + Validate(u64, Box, bool)) + Send>), } -impl Msg { - pub fn apply(apply: Apply) -> Msg { +impl Msg +where + E: KvEngine, +{ + pub fn apply(apply: Apply) -> Msg { Msg::Apply { start: Instant::now(), apply, } } - pub fn register(peer: &Peer) -> Msg { + pub fn register(peer: &Peer) -> Msg { Msg::Registration(Registration::new(peer)) } - pub fn destroy(region_id: u64, async_remove: bool) -> Msg { + pub fn destroy(region_id: u64, async_remove: bool) -> Msg { Msg::Destroy(Destroy { region_id, async_remove, @@ -2414,7 +2511,10 @@ impl Msg { } } -impl Debug for Msg { +impl Debug for Msg +where + E: KvEngine, +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { Msg::Apply { apply, .. } => write!(f, "[region {}] async apply", apply.region_id), @@ -2455,17 +2555,23 @@ pub struct ApplyMetrics { } #[derive(Debug)] -pub struct ApplyRes { +pub struct ApplyRes +where + E: KvEngine, +{ pub region_id: u64, pub apply_state: RaftApplyState, pub applied_index_term: u64, - pub exec_res: VecDeque, + pub exec_res: VecDeque>, pub metrics: ApplyMetrics, } #[derive(Debug)] -pub enum TaskRes { - Apply(ApplyRes), +pub enum TaskRes +where + E: KvEngine, +{ + Apply(ApplyRes), Destroy { // ID of region that has been destroyed. region_id: u64, @@ -2474,19 +2580,25 @@ pub enum TaskRes { }, } -pub struct ApplyFsm { - delegate: ApplyDelegate, - receiver: Receiver, - mailbox: Option>, +pub struct ApplyFsm +where + E: KvEngine, +{ + delegate: ApplyDelegate, + receiver: Receiver>, + mailbox: Option>>, } -impl ApplyFsm { - fn from_peer(peer: &Peer) -> (LooseBoundedSender, Box) { +impl ApplyFsm +where + E: KvEngine, +{ + fn from_peer(peer: &Peer) -> (LooseBoundedSender>, Box>) { let reg = Registration::new(peer); ApplyFsm::from_registration(reg) } - fn from_registration(reg: Registration) -> (LooseBoundedSender, Box) { + fn from_registration(reg: Registration) -> (LooseBoundedSender>, Box>) { let (tx, rx) = loose_bounded(usize::MAX); let delegate = ApplyDelegate::from_registration(reg); ( @@ -2514,9 +2626,9 @@ impl ApplyFsm { } /// Handles apply tasks, and uses the apply delegate to handle the committed entries. - fn handle_apply>( + fn handle_apply>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, apply: Apply, ) { if apply_ctx.timer.is_none() { @@ -2562,7 +2674,7 @@ impl ApplyFsm { } /// Handles proposals, and appends the commands to the apply delegate. - fn handle_proposal(&mut self, region_proposal: RegionProposal) { + fn handle_proposal(&mut self, region_proposal: RegionProposal) { let (region_id, peer_id) = (self.delegate.region_id(), self.delegate.id()); let propose_num = region_proposal.props.len(); assert_eq!(self.delegate.id, region_proposal.id); @@ -2592,10 +2704,7 @@ impl ApplyFsm { APPLY_PROPOSAL.observe(propose_num as f64); } - fn destroy>( - &mut self, - ctx: &mut ApplyContext, - ) { + fn destroy>(&mut self, ctx: &mut ApplyContext) { let region_id = self.delegate.region_id(); if ctx.apply_res.iter().any(|res| res.region_id == region_id) { // Flush before destroying to avoid reordering messages. @@ -2615,9 +2724,9 @@ impl ApplyFsm { } /// Handles peer destroy. When a peer is destroyed, the corresponding apply delegate should be removed too. - fn handle_destroy>( + fn handle_destroy>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, d: Destroy, ) { assert_eq!(d.region_id, self.delegate.region_id()); @@ -2637,9 +2746,9 @@ impl ApplyFsm { } } - fn resume_pending>( + fn resume_pending>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, ) { if let Some(ref state) = self.delegate.wait_merge_state { let source_region_id = state.logs_up_to_date.load(Ordering::SeqCst); @@ -2672,9 +2781,9 @@ impl ApplyFsm { } } - fn logs_up_to_date_for_merge>( + fn logs_up_to_date_for_merge>( &mut self, - ctx: &mut ApplyContext, + ctx: &mut ApplyContext, catch_up_logs: CatchUpLogs, ) { fail_point!("after_handle_catch_up_logs_for_merge"); @@ -2709,9 +2818,9 @@ impl ApplyFsm { } #[allow(unused_mut)] - fn handle_snapshot>( + fn handle_snapshot>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, snap_task: GenSnapTask, ) { if self.delegate.pending_remove || self.delegate.stopped { @@ -2766,12 +2875,12 @@ impl ApplyFsm { ); } - fn handle_change>( + fn handle_change>( &mut self, - apply_ctx: &mut ApplyContext, + apply_ctx: &mut ApplyContext, cmd: ChangeCmd, region_epoch: RegionEpoch, - cb: Callback, + cb: Callback, ) { let (observe_id, region_id, enabled) = match cmd { ChangeCmd::RegisterObserver { @@ -2813,7 +2922,7 @@ impl ApplyFsm { } ReadResponse { response: Default::default(), - snapshot: Some(RegionSnapshot::::from_snapshot( + snapshot: Some(RegionSnapshot::::from_snapshot( apply_ctx.engine.snapshot().into_sync(), self.delegate.region.clone(), )), @@ -2837,10 +2946,10 @@ impl ApplyFsm { cb.invoke_read(resp); } - fn handle_tasks>( + fn handle_tasks>( &mut self, - apply_ctx: &mut ApplyContext, - msgs: &mut Vec, + apply_ctx: &mut ApplyContext, + msgs: &mut Vec>, ) { let mut channel_timer = None; let mut drainer = msgs.drain(..); @@ -2879,8 +2988,11 @@ impl ApplyFsm { } } -impl Fsm for ApplyFsm { - type Message = Msg; +impl Fsm for ApplyFsm +where + E: KvEngine, +{ + type Message = Msg; #[inline] fn is_stopped(&self) -> bool { @@ -2904,7 +3016,10 @@ impl Fsm for ApplyFsm { } } -impl Drop for ApplyFsm { +impl Drop for ApplyFsm +where + E: KvEngine, +{ fn drop(&mut self) { self.delegate.clear_all_commands_as_stale(); } @@ -2923,15 +3038,21 @@ impl Fsm for ControlFsm { } } -pub struct ApplyPoller> { - msg_buf: Vec, - apply_ctx: ApplyContext, +pub struct ApplyPoller +where + E: KvEngine, + W: WriteBatch + WriteBatchVecExt, +{ + msg_buf: Vec>, + apply_ctx: ApplyContext, messages_per_tick: usize, cfg_tracker: Tracker, } -impl> PollHandler - for ApplyPoller +impl PollHandler, ControlFsm> for ApplyPoller +where + E: KvEngine, + W: WriteBatch + WriteBatchVecExt, { fn begin(&mut self, _batch_size: usize) { if let Some(incoming) = self.cfg_tracker.any_new() { @@ -2955,7 +3076,7 @@ impl> PollHandler Option { + fn handle_normal(&mut self, normal: &mut ApplyFsm) -> Option { let mut expected_msg_count = None; normal.delegate.written = false; if normal.delegate.yield_state.is_some() { @@ -3004,7 +3125,7 @@ impl> PollHandler]) { + fn end(&mut self, fsms: &mut [Box>]) { let is_synced = self.apply_ctx.flush(); if is_synced { for fsm in fsms { @@ -3017,11 +3138,11 @@ impl> PollHandler> { tag: String, cfg: Arc>, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, importer: Arc, - region_scheduler: Scheduler, + region_scheduler: Scheduler>, engine: RocksEngine, - sender: Notifier, + sender: Notifier, router: ApplyRouter, _phantom: PhantomData, } @@ -3029,7 +3150,7 @@ pub struct Builder> { impl> Builder { pub fn new( builder: &RaftPollerBuilder, - sender: Notifier, + sender: Notifier, router: ApplyRouter, ) -> Builder { Builder:: { @@ -3046,14 +3167,14 @@ impl> Builder { } } -impl> HandlerBuilder - for Builder +impl> + HandlerBuilder, ControlFsm> for Builder { - type Handler = ApplyPoller; + type Handler = ApplyPoller; - fn build(&mut self) -> ApplyPoller { + fn build(&mut self) -> ApplyPoller { let cfg = self.cfg.value(); - ApplyPoller:: { + ApplyPoller:: { msg_buf: Vec::with_capacity(cfg.messages_per_tick), apply_ctx: ApplyContext::new( self.tag.clone(), @@ -3073,25 +3194,25 @@ impl> HandlerBuilder, + pub router: BatchRouter, ControlFsm>, } impl Deref for ApplyRouter { - type Target = BatchRouter; + type Target = BatchRouter, ControlFsm>; - fn deref(&self) -> &BatchRouter { + fn deref(&self) -> &BatchRouter, ControlFsm> { &self.router } } impl DerefMut for ApplyRouter { - fn deref_mut(&mut self) -> &mut BatchRouter { + fn deref_mut(&mut self) -> &mut BatchRouter, ControlFsm> { &mut self.router } } impl ApplyRouter { - pub fn schedule_task(&self, region_id: u64, msg: Msg) { + pub fn schedule_task(&self, region_id: u64, msg: Msg) { let reg = match self.try_send(region_id, msg) { Either::Left(Ok(())) => return, Either::Left(Err(TrySendError::Disconnected(msg))) | Either::Right(msg) => match msg { @@ -3165,19 +3286,19 @@ impl ApplyRouter { } pub struct ApplyBatchSystem { - system: BatchSystem, + system: BatchSystem, ControlFsm>, } impl Deref for ApplyBatchSystem { - type Target = BatchSystem; + type Target = BatchSystem, ControlFsm>; - fn deref(&self) -> &BatchSystem { + fn deref(&self) -> &BatchSystem, ControlFsm> { &self.system } } impl DerefMut for ApplyBatchSystem { - fn deref_mut(&mut self) -> &mut BatchSystem { + fn deref_mut(&mut self) -> &mut BatchSystem, ControlFsm> { &mut self.system } } @@ -3300,14 +3421,14 @@ mod tests { fn validate(router: &ApplyRouter, region_id: u64, validate: F) where - F: FnOnce(&ApplyDelegate) + Send + 'static, + F: FnOnce(&ApplyDelegate) + Send + 'static, { let (validate_tx, validate_rx) = mpsc::channel(); router.schedule_task( region_id, Msg::Validate( region_id, - Box::new(move |(delegate, _): (&ApplyDelegate, _)| { + Box::new(move |(delegate, _): (&ApplyDelegate, _)| { validate(delegate); validate_tx.send(()).unwrap(); }), @@ -3317,7 +3438,7 @@ mod tests { } // Make sure msgs are handled in the same batch. - fn batch_messages(router: &ApplyRouter, region_id: u64, msgs: Vec) { + fn batch_messages(router: &ApplyRouter, region_id: u64, msgs: Vec>) { let (notify1, wait1) = mpsc::channel(); let (notify2, wait2) = mpsc::channel(); router.schedule_task( @@ -3339,7 +3460,9 @@ mod tests { notify2.send(()).unwrap(); } - fn fetch_apply_res(receiver: &::std::sync::mpsc::Receiver>) -> ApplyRes { + fn fetch_apply_res( + receiver: &::std::sync::mpsc::Receiver>, + ) -> ApplyRes { match receiver.recv_timeout(Duration::from_secs(3)) { Ok(PeerMsg::ApplyRes { res, .. }) => match res { TaskRes::Apply(res) => res, @@ -3361,7 +3484,7 @@ mod tests { let builder = super::Builder:: { tag: "test-store".to_owned(), cfg, - coprocessor_host: CoprocessorHost::default(), + coprocessor_host: CoprocessorHost::::default(), importer, region_scheduler, sender, @@ -3726,7 +3849,7 @@ mod tests { let (_path, engine) = create_tmp_engine("test-delegate"); let (import_dir, importer) = create_tmp_importer("test-delegate"); let obs = ApplyObserver::default(); - let mut host = CoprocessorHost::default(); + let mut host = CoprocessorHost::::default(); host.registry .register_query_observer(1, BoxQueryObserver::new(obs.clone())); @@ -3981,7 +4104,7 @@ mod tests { fn test_cmd_observer() { let (_path, engine) = create_tmp_engine("test-delegate"); let (_import_dir, importer) = create_tmp_importer("test-delegate"); - let mut host = CoprocessorHost::default(); + let mut host = CoprocessorHost::::default(); let mut obs = ApplyObserver::default(); let (sink, cmdbatch_rx) = mpsc::channel(); obs.cmd_sink = Some(Arc::new(Mutex::new(sink))); @@ -4250,7 +4373,7 @@ mod tests { reg.region.set_peers(peers.clone().into()); let (tx, _rx) = mpsc::channel(); let sender = Notifier::Sender(tx); - let mut host = CoprocessorHost::default(); + let mut host = CoprocessorHost::::default(); let mut obs = ApplyObserver::default(); let (sink, cmdbatch_rx) = mpsc::channel(); obs.cmd_sink = Some(Arc::new(Mutex::new(sink))); @@ -4457,7 +4580,7 @@ mod tests { #[test] fn pending_cmd_leak() { let res = panic_hook::recover_safe(|| { - let _cmd = PendingCmd::new(1, 1, Callback::None); + let _cmd = PendingCmd::::new(1, 1, Callback::None); }); res.unwrap_err(); } @@ -4465,7 +4588,7 @@ mod tests { #[test] fn pending_cmd_leak_dtor_not_abort() { let res = panic_hook::recover_safe(|| { - let _cmd = PendingCmd::new(1, 1, Callback::None); + let _cmd = PendingCmd::::new(1, 1, Callback::None); panic!("Don't abort"); // It would abort and fail if there was a double-panic in PendingCmd dtor. }); diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 73da6daa404..0068e54bdae 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -133,7 +133,7 @@ impl PeerFsm { pub fn create( store_id: u64, cfg: &Config, - sched: Scheduler, + sched: Scheduler>, engines: KvEngines, region: &metapb::Region, ) -> Result> { @@ -177,7 +177,7 @@ impl PeerFsm { pub fn replicate( store_id: u64, cfg: &Config, - sched: Scheduler, + sched: Scheduler>, engines: KvEngines, region_id: u64, peer: metapb::Peer, @@ -620,7 +620,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } - pub fn collect_ready(&mut self, proposals: &mut Vec) { + pub fn collect_ready(&mut self, proposals: &mut Vec>) { let has_ready = self.fsm.has_ready; self.fsm.has_ready = false; if !has_ready || self.fsm.stopped { @@ -835,7 +835,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { } } - fn on_apply_res(&mut self, res: ApplyTaskRes) { + fn on_apply_res(&mut self, res: ApplyTaskRes) { fail_point!("on_apply_res", |_| {}); match res { ApplyTaskRes::Apply(mut res) => { @@ -2182,7 +2182,11 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { assert_eq!(prev, Some(prev_region)); } - fn on_ready_result(&mut self, exec_results: &mut VecDeque, metrics: &ApplyMetrics) { + fn on_ready_result( + &mut self, + exec_results: &mut VecDeque>, + metrics: &ApplyMetrics, + ) { // handle executing committed log results while let Some(result) = exec_results.pop_front() { match result { @@ -2206,6 +2210,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> { region, index, snap, + _phantom, } => self.on_ready_compute_hash(region, index, snap), ExecResult::VerifyHash { index, hash } => self.on_ready_verify_hash(index, hash), ExecResult::DeleteRange { .. } => { diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index e9493e1c600..f23ad8c23e8 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -76,8 +76,8 @@ const RAFT_WB_SHRINK_SIZE: usize = 1024 * 1024; pub const PENDING_VOTES_CAP: usize = 20; const UNREACHABLE_BACKOFF: Duration = Duration::from_secs(10); -pub struct StoreInfo { - pub engine: RocksEngine, +pub struct StoreInfo { + pub engine: E, pub capacity: u64, } @@ -120,7 +120,7 @@ impl StoreMeta { #[inline] pub fn set_region( &mut self, - host: &CoprocessorHost, + host: &CoprocessorHost, region: Region, peer: &mut crate::store::Peer, ) { @@ -200,22 +200,22 @@ impl RaftRouter { pub struct PollContext { pub cfg: Config, pub store: metapb::Store, - pub pd_scheduler: FutureScheduler, + pub pd_scheduler: FutureScheduler>, pub consistency_check_scheduler: Scheduler>, pub split_check_scheduler: Scheduler, // handle Compact, CleanupSST task pub cleanup_scheduler: Scheduler, pub raftlog_gc_scheduler: Scheduler>, - pub region_scheduler: Scheduler, + pub region_scheduler: Scheduler>, pub apply_router: ApplyRouter, pub router: RaftRouter, pub importer: Arc, pub store_meta: Arc>, pub future_poller: ThreadPoolSender, pub raft_metrics: RaftMetrics, - pub snap_mgr: SnapManager, + pub snap_mgr: SnapManager, pub applying_snap_count: Arc, - pub coprocessor_host: CoprocessorHost, + pub coprocessor_host: CoprocessorHost, pub timer: SteadyTimer, pub trans: T, pub pd_client: Arc, @@ -233,7 +233,7 @@ pub struct PollContext { pub current_time: Option, } -impl HandleRaftReadyContext for PollContext { +impl HandleRaftReadyContext for PollContext { fn wb_mut(&mut self) -> (&mut RocksWriteBatch, &mut RocksWriteBatch) { (&mut self.kv_wb, &mut self.raft_wb) } @@ -467,7 +467,7 @@ pub struct RaftPoller { previous_metrics: RaftMetrics, timer: TiInstant, poll_ctx: PollContext, - pending_proposals: Vec, + pending_proposals: Vec>, messages_per_tick: usize, cfg_tracker: Tracker, } @@ -724,19 +724,19 @@ impl PollHandler, StoreFsm> for pub struct RaftPollerBuilder { pub cfg: Arc>, pub store: metapb::Store, - pd_scheduler: FutureScheduler, + pd_scheduler: FutureScheduler>, consistency_check_scheduler: Scheduler>, split_check_scheduler: Scheduler, cleanup_scheduler: Scheduler, raftlog_gc_scheduler: Scheduler>, - pub region_scheduler: Scheduler, + pub region_scheduler: Scheduler>, apply_router: ApplyRouter, pub router: RaftRouter, pub importer: Arc, store_meta: Arc>, future_poller: ThreadPoolSender, - snap_mgr: SnapManager, - pub coprocessor_host: CoprocessorHost, + snap_mgr: SnapManager, + pub coprocessor_host: CoprocessorHost, trans: T, pd_client: Arc, global_stat: GlobalStoreStat, @@ -967,14 +967,14 @@ where } struct Workers { - pd_worker: FutureWorker, + pd_worker: FutureWorker>, consistency_check_worker: Worker>, split_check_worker: Worker, // handle Compact, CleanupSST task cleanup_worker: Worker, raftlog_gc_worker: Worker>, - region_worker: Worker, - coprocessor_host: CoprocessorHost, + region_worker: Worker>, + coprocessor_host: CoprocessorHost, future_poller: ThreadPool, } @@ -1003,10 +1003,10 @@ impl RaftBatchSystem { engines: KvEngines, trans: T, pd_client: Arc, - mgr: SnapManager, - pd_worker: FutureWorker, + mgr: SnapManager, + pd_worker: FutureWorker>, store_meta: Arc>, - mut coprocessor_host: CoprocessorHost, + mut coprocessor_host: CoprocessorHost, importer: Arc, split_check_worker: Worker, dyn_cfg: Box, diff --git a/components/raftstore/src/store/msg.rs b/components/raftstore/src/store/msg.rs index 16858bbc104..9601a8f2cb9 100644 --- a/components/raftstore/src/store/msg.rs +++ b/components/raftstore/src/store/msg.rs @@ -313,7 +313,7 @@ pub enum PeerMsg { /// that the raft node will not work anymore. Tick(PeerTicks), /// Result of applying committed entries. The message can't be lost. - ApplyRes { res: ApplyTaskRes }, + ApplyRes { res: ApplyTaskRes }, /// Message that can't be lost but rarely created. If they are lost, real bad /// things happen like some peers will be considered dead in the group. SignificantMsg(SignificantMsg), diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 4cb8c6b35f3..2446ef6b5bf 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -170,14 +170,14 @@ pub struct Peer { pub peer: metapb::Peer, /// The Raft state machine of this Peer. - pub raft_group: RawNode, + pub raft_group: RawNode>, /// The cache of meta information for Region's other Peers. peer_cache: RefCell>, /// Record the last instant of each peer's heartbeat response. pub peer_heartbeats: HashMap, proposals: ProposalQueue, - apply_proposals: Vec, + apply_proposals: Vec>, leader_missing_time: Option, leader_lease: Lease, @@ -249,7 +249,7 @@ impl Peer { pub fn new( store_id: u64, cfg: &Config, - sched: Scheduler, + sched: Scheduler>, engines: KvEngines, region: &metapb::Region, peer: metapb::Peer, @@ -579,7 +579,7 @@ impl Peer { /// has been preserved in a durable device. pub fn set_region( &mut self, - host: &CoprocessorHost, + host: &CoprocessorHost, reader: &mut ReadDelegate, region: metapb::Region, ) { @@ -620,12 +620,12 @@ impl Peer { } #[inline] - pub fn get_store(&self) -> &PeerStorage { + pub fn get_store(&self) -> &PeerStorage { self.raft_group.store() } #[inline] - pub fn mut_store(&mut self) -> &mut PeerStorage { + pub fn mut_store(&mut self) -> &mut PeerStorage { self.raft_group.mut_store() } @@ -1074,7 +1074,7 @@ impl Peer { self.get_store().last_index() >= index && self.get_store().last_term() >= term } - pub fn take_apply_proposals(&mut self) -> Option { + pub fn take_apply_proposals(&mut self) -> Option> { if self.apply_proposals.is_empty() { return None; } diff --git a/components/raftstore/src/store/peer_storage.rs b/components/raftstore/src/store/peer_storage.rs index 97e152f7329..21a777c84e4 100644 --- a/components/raftstore/src/store/peer_storage.rs +++ b/components/raftstore/src/store/peer_storage.rs @@ -8,9 +8,8 @@ use std::sync::Arc; use std::time::Instant; use std::{cmp, error, u64}; -use engine_rocks::{RocksEngine, RocksSnapshot, RocksWriteBatch}; use engine_traits::CF_RAFT; -use engine_traits::{Iterable, KvEngine, KvEngines, MiscExt, Mutable, Peekable, SyncMutable}; +use engine_traits::{KvEngine, KvEngines, Mutable, Peekable, WriteBatch}; use keys::{self, enc_end_key, enc_start_key}; use kvproto::metapb::{self, Region}; use kvproto::raft_serverpb::{ @@ -231,11 +230,15 @@ impl CacheQueryStats { } } -pub trait HandleRaftReadyContext { +pub trait HandleRaftReadyContext +where + WK: WriteBatch, + WR: WriteBatch, +{ /// Returns the mutable references of WriteBatch for both KvDB and RaftDB in one interface. - fn wb_mut(&mut self) -> (&mut RocksWriteBatch, &mut RocksWriteBatch); - fn kv_wb_mut(&mut self) -> &mut RocksWriteBatch; - fn raft_wb_mut(&mut self) -> &mut RocksWriteBatch; + fn wb_mut(&mut self) -> (&mut WK, &mut WR); + fn kv_wb_mut(&mut self) -> &mut WK; + fn raft_wb_mut(&mut self) -> &mut WR; fn sync_log(&self) -> bool; fn set_sync_log(&mut self, sync: bool); } @@ -273,7 +276,7 @@ pub struct InvokeContext { } impl InvokeContext { - pub fn new(store: &PeerStorage) -> InvokeContext { + pub fn new(store: &PeerStorage) -> InvokeContext { InvokeContext { region_id: store.get_region_id(), raft_state: store.raft_state.clone(), @@ -289,7 +292,7 @@ impl InvokeContext { } #[inline] - pub fn save_raft_state_to(&self, raft_wb: &mut RocksWriteBatch) -> Result<()> { + pub fn save_raft_state_to(&self, raft_wb: &mut impl WriteBatch) -> Result<()> { raft_wb.put_msg(&keys::raft_state_key(self.region_id), &self.raft_state)?; Ok(()) } @@ -298,7 +301,7 @@ impl InvokeContext { pub fn save_snapshot_raft_state_to( &self, snapshot_index: u64, - kv_wb: &mut RocksWriteBatch, + kv_wb: &mut impl WriteBatch, ) -> Result<()> { let mut snapshot_raft_state = self.raft_state.clone(); snapshot_raft_state @@ -315,7 +318,7 @@ impl InvokeContext { } #[inline] - pub fn save_apply_state_to(&self, kv_wb: &mut RocksWriteBatch) -> Result<()> { + pub fn save_apply_state_to(&self, kv_wb: &mut impl WriteBatch) -> Result<()> { kv_wb.put_msg_cf( CF_RAFT, &keys::apply_state_key(self.region_id), @@ -326,8 +329,8 @@ impl InvokeContext { } pub fn recover_from_applying_state( - engines: &KvEngines, - raft_wb: &mut RocksWriteBatch, + engines: &KvEngines, + raft_wb: &mut impl WriteBatch, region_id: u64, ) -> Result<()> { let snapshot_raft_state_key = keys::snapshot_raft_state_key(region_id); @@ -362,7 +365,7 @@ pub fn recover_from_applying_state( } fn init_applied_index_term( - engines: &KvEngines, + engines: &KvEngines, region: &Region, apply_state: &RaftApplyState, ) -> Result { @@ -385,7 +388,7 @@ fn init_applied_index_term( } fn init_raft_state( - engines: &KvEngines, + engines: &KvEngines, region: &Region, ) -> Result { let state_key = keys::raft_state_key(region.get_id()); @@ -406,7 +409,7 @@ fn init_raft_state( } fn init_apply_state( - engines: &KvEngines, + engines: &KvEngines, region: &Region, ) -> Result { Ok( @@ -431,7 +434,7 @@ fn init_apply_state( fn validate_states( region_id: u64, - engines: &KvEngines, + engines: &KvEngines, raft_state: &mut RaftLocalState, apply_state: &RaftApplyState, ) -> Result<()> { @@ -478,7 +481,7 @@ fn validate_states( } fn init_last_term( - engines: &KvEngines, + engines: &KvEngines, region: &Region, raft_state: &RaftLocalState, apply_state: &RaftApplyState, @@ -505,8 +508,11 @@ fn init_last_term( } } -pub struct PeerStorage { - pub engines: KvEngines, +pub struct PeerStorage +where + EK: KvEngine, +{ + pub engines: KvEngines, peer_id: u64, region: metapb::Region, @@ -517,7 +523,7 @@ pub struct PeerStorage { snap_state: RefCell, gen_snap_task: RefCell>, - region_sched: Scheduler, + region_sched: Scheduler>, snap_tried_cnt: RefCell, cache: EntryCache, @@ -526,7 +532,11 @@ pub struct PeerStorage { pub tag: String, } -impl Storage for PeerStorage { +impl Storage for PeerStorage +where + EK: KvEngine, + ER: KvEngine, +{ fn initial_state(&self) -> raft::Result { self.initial_state() } @@ -557,14 +567,18 @@ impl Storage for PeerStorage { } } -impl PeerStorage { +impl PeerStorage +where + EK: KvEngine, + ER: KvEngine, +{ pub fn new( - engines: KvEngines, + engines: KvEngines, region: &metapb::Region, - region_sched: Scheduler, + region_sched: Scheduler>, peer_id: u64, tag: String, - ) -> Result { + ) -> Result> { debug!( "creating storage on specified path"; "region_id" => region.get_id(), @@ -760,8 +774,8 @@ impl PeerStorage { self.region = region; } - pub fn raw_snapshot(&self) -> RocksSnapshot { - RocksSnapshot::new(self.engines.kv.as_inner().clone()) + pub fn raw_snapshot(&self) -> EK::Snapshot { + self.engines.kv.snapshot() } fn validate_snap(&self, snap: &Snapshot, request_index: u64) -> bool { @@ -887,7 +901,7 @@ impl PeerStorage { // Append the given entries to the raft log using previous last index or self.last_index. // Return the new last index for later update. After we commit in engine, we can set last_index // to the return one. - pub fn append( + pub fn append>( &mut self, invoke_ctx: &mut InvokeContext, entries: &[Entry], @@ -972,8 +986,8 @@ impl PeerStorage { &mut self, ctx: &mut InvokeContext, snap: &Snapshot, - kv_wb: &mut RocksWriteBatch, - raft_wb: &mut RocksWriteBatch, + kv_wb: &mut EK::WriteBatch, + raft_wb: &mut ER::WriteBatch, ) -> Result<()> { info!( "begin to apply snapshot"; @@ -1032,8 +1046,8 @@ impl PeerStorage { /// Delete all meta belong to the region. Results are stored in `wb`. pub fn clear_meta( &mut self, - kv_wb: &mut RocksWriteBatch, - raft_wb: &mut RocksWriteBatch, + kv_wb: &mut EK::WriteBatch, + raft_wb: &mut ER::WriteBatch, ) -> Result<()> { let region_id = self.get_region_id(); clear_meta(&self.engines, kv_wb, raft_wb, region_id, &self.raft_state)?; @@ -1075,7 +1089,7 @@ impl PeerStorage { Ok(()) } - pub fn get_raft_engine(&self) -> RocksEngine { + pub fn get_raft_engine(&self) -> ER { self.engines.raft.clone() } @@ -1204,7 +1218,7 @@ impl PeerStorage { /// to update the memory states properly. // Using `&Ready` here to make sure `Ready` struct is not modified in this function. This is // a requirement to advance the ready object properly later. - pub fn handle_raft_ready( + pub fn handle_raft_ready>( &mut self, ready_ctx: &mut H, ready: &Ready, @@ -1245,13 +1259,13 @@ impl PeerStorage { // but not write raft_local_state to raft rocksdb in time. // we write raft state to default rocksdb, with last index set to snap index, // in case of recv raft log after snapshot. - ctx.save_snapshot_raft_state_to(snapshot_index, &mut ready_ctx.kv_wb_mut())?; + ctx.save_snapshot_raft_state_to(snapshot_index, ready_ctx.kv_wb_mut())?; } } // only when apply snapshot if snapshot_index != 0 { - ctx.save_apply_state_to(&mut ready_ctx.kv_wb_mut())?; + ctx.save_apply_state_to(ready_ctx.kv_wb_mut())?; } Ok(ctx) @@ -1311,7 +1325,7 @@ fn get_sync_log_from_entry(entry: &Entry) -> bool { } pub fn fetch_entries_to( - engine: &RocksEngine, + engine: &impl KvEngine, region_id: u64, low: u64, high: u64, @@ -1384,13 +1398,17 @@ pub fn fetch_entries_to( } /// Delete all meta belong to the region. Results are stored in `wb`. -pub fn clear_meta( - engines: &KvEngines, - kv_wb: &mut RocksWriteBatch, - raft_wb: &mut RocksWriteBatch, +pub fn clear_meta( + engines: &KvEngines, + kv_wb: &mut EK::WriteBatch, + raft_wb: &mut ER::WriteBatch, region_id: u64, raft_state: &RaftLocalState, -) -> Result<()> { +) -> Result<()> +where + EK: KvEngine, + ER: KvEngine, +{ let t = Instant::now(); box_try!(kv_wb.delete_cf(CF_RAFT, &keys::region_state_key(region_id))); box_try!(kv_wb.delete_cf(CF_RAFT, &keys::apply_state_key(region_id))); @@ -1423,7 +1441,7 @@ pub fn clear_meta( } pub fn do_snapshot( - mgr: SnapManager, + mgr: SnapManager, engine: &E, kv_snap: E::Snapshot, region_id: u64, @@ -1485,7 +1503,7 @@ where let conf_state = conf_state_from_region(state.get_region()); snapshot.mut_metadata().set_conf_state(conf_state); - let mut s = mgr.get_snapshot_for_building::(&key)?; + let mut s = mgr.get_snapshot_for_building(&key)?; // Set snapshot data. let mut snap_data = RaftSnapshotData::default(); snap_data.set_region(state.get_region().clone()); @@ -1566,8 +1584,8 @@ mod tests { use crate::store::{bootstrap_store, initial_region, prepare_bootstrap_cluster}; use engine::rocks::util::new_engine; use engine::Engines; - use engine_rocks::{CloneCompat, Compat, RocksWriteBatch}; - use engine_traits::WriteBatchExt; + use engine_rocks::{CloneCompat, Compat, RocksEngine, RocksWriteBatch}; + use engine_traits::{Iterable, SyncMutable, WriteBatchExt}; use engine_traits::{ALL_CFS, CF_DEFAULT}; use kvproto::raft_serverpb::RaftSnapshotData; use raft::eraftpb::HardState; @@ -1584,7 +1602,10 @@ mod tests { use super::*; - fn new_storage(sched: Scheduler, path: &TempDir) -> PeerStorage { + fn new_storage( + sched: Scheduler>, + path: &TempDir, + ) -> PeerStorage { let kv_db = Arc::new(new_engine(path.path().to_str().unwrap(), None, ALL_CFS, None).unwrap()); let raft_path = path.path().join(Path::new("raft")); @@ -1606,7 +1627,7 @@ mod tests { } impl ReadyContext { - fn new(s: &PeerStorage) -> ReadyContext { + fn new(s: &PeerStorage) -> ReadyContext { ReadyContext { kv_wb: s.engines.kv.write_batch(), raft_wb: s.engines.raft.write_batch(), @@ -1615,7 +1636,7 @@ mod tests { } } - impl HandleRaftReadyContext for ReadyContext { + impl HandleRaftReadyContext for ReadyContext { fn wb_mut(&mut self) -> (&mut RocksWriteBatch, &mut RocksWriteBatch) { (&mut self.kv_wb, &mut self.raft_wb) } @@ -1634,10 +1655,10 @@ mod tests { } fn new_storage_from_ents( - sched: Scheduler, + sched: Scheduler>, path: &TempDir, ents: &[Entry], - ) -> PeerStorage { + ) -> PeerStorage { let mut store = new_storage(sched, path); let mut kv_wb = store.engines.kv.write_batch(); let mut ctx = InvokeContext::new(&store); @@ -1659,7 +1680,7 @@ mod tests { store } - fn append_ents(store: &mut PeerStorage, ents: &[Entry]) { + fn append_ents(store: &mut PeerStorage, ents: &[Entry]) { let mut ctx = InvokeContext::new(store); let mut ready_ctx = ReadyContext::new(store); store.append(&mut ctx, ents, &mut ready_ctx).unwrap(); @@ -1668,7 +1689,7 @@ mod tests { store.raft_state = ctx.raft_state; } - fn validate_cache(store: &PeerStorage, exp_ents: &[Entry]) { + fn validate_cache(store: &PeerStorage, exp_ents: &[Entry]) { assert_eq!(store.cache.cache, exp_ents); for e in exp_ents { let key = keys::raft_log_key(store.get_region_id(), e.get_index()); @@ -1712,7 +1733,7 @@ mod tests { } } - fn get_meta_key_count(store: &PeerStorage) -> usize { + fn get_meta_key_count(store: &PeerStorage) -> usize { let region_id = store.get_region_id(); let mut count = 0; let (meta_start, meta_end) = ( @@ -1881,7 +1902,7 @@ mod tests { fn generate_and_schedule_snapshot( gen_task: GenSnapTask, engines: &KvEngines, - sched: &Scheduler, + sched: &Scheduler>, ) -> Result<()> { let apply_state: RaftApplyState = engines .kv @@ -1921,7 +1942,7 @@ mod tests { 0, true, Duration::from_secs(0), - CoprocessorHost::default(), + CoprocessorHost::::default(), router, ); worker.start(runner).unwrap(); @@ -2238,7 +2259,7 @@ mod tests { 0, true, Duration::from_secs(0), - CoprocessorHost::default(), + CoprocessorHost::::default(), router, ); worker.start(runner).unwrap(); @@ -2426,7 +2447,7 @@ mod tests { let region = initial_region(1, 1, 1); prepare_bootstrap_cluster(&engines.c(), ®ion).unwrap(); - let build_storage = || -> Result { + let build_storage = || -> Result> { PeerStorage::new(engines.c(), ®ion, sched.clone(), 0, "".to_owned()) }; let mut s = build_storage().unwrap(); diff --git a/components/raftstore/src/store/region_snapshot.rs b/components/raftstore/src/store/region_snapshot.rs index 4413bc6ea56..305992cb8ef 100644 --- a/components/raftstore/src/store/region_snapshot.rs +++ b/components/raftstore/src/store/region_snapshot.rs @@ -34,7 +34,7 @@ where E: KvEngine, { #[allow(clippy::new_ret_no_self)] // temporary until this returns RegionSnapshot - pub fn new(ps: &PeerStorage) -> RegionSnapshot { + pub fn new(ps: &PeerStorage) -> RegionSnapshot { RegionSnapshot::from_snapshot(ps.raw_snapshot().into_sync(), ps.region().clone()) } @@ -407,14 +407,17 @@ mod tests { type DataSet = Vec<(Vec, Vec)>; - fn new_peer_storage(engines: KvEngines, r: &Region) -> PeerStorage { + fn new_peer_storage( + engines: KvEngines, + r: &Region, + ) -> PeerStorage { let (sched, _) = worker::dummy_scheduler(); PeerStorage::new(engines, r, sched, 0, "".to_owned()).unwrap() } fn load_default_dataset( engines: KvEngines, - ) -> (PeerStorage, DataSet) { + ) -> (PeerStorage, DataSet) { let mut r = Region::default(); r.mut_peers().push(Peer::default()); r.set_id(10); @@ -438,7 +441,7 @@ mod tests { fn load_multiple_levels_dataset( engines: KvEngines, - ) -> (PeerStorage, DataSet) { + ) -> (PeerStorage, DataSet) { let mut r = Region::default(); r.mut_peers().push(Peer::default()); r.set_id(10); diff --git a/components/raftstore/src/store/snap.rs b/components/raftstore/src/store/snap.rs index 9acb5fc9526..8ce9141b53d 100644 --- a/components/raftstore/src/store/snap.rs +++ b/components/raftstore/src/store/snap.rs @@ -172,7 +172,7 @@ where pub region: Region, pub abort: Arc, pub write_batch_size: usize, - pub coprocessor_host: CoprocessorHost, + pub coprocessor_host: CoprocessorHost, } /// `Snapshot` is a trait for snapshot. @@ -1183,7 +1183,7 @@ struct SnapManagerCore { snap_size: Arc, } -fn notify_stats(ch: Option<&RaftRouter>) { +fn notify_stats(ch: Option<&RaftRouter>) { if let Some(ch) = ch { if let Err(e) = ch.send_control(StoreMsg::SnapshotStats) { error!( @@ -1196,17 +1196,23 @@ fn notify_stats(ch: Option<&RaftRouter>) { /// `SnapManagerCore` trace all current processing snapshots. #[derive(Clone)] -pub struct SnapManager { +pub struct SnapManager +where + E: KvEngine, +{ // directory to store snapfile. core: Arc>, - router: Option>, + router: Option>, limiter: Limiter, max_total_size: u64, encryption_key_manager: Option>, } -impl SnapManager { - pub fn new>(path: T, router: Option>) -> Self { +impl SnapManager +where + E: KvEngine, +{ + pub fn new>(path: T, router: Option>) -> Self { SnapManagerBuilder::default().build(path, router) } @@ -1303,7 +1309,7 @@ impl SnapManager { self.core.rl().registry.contains_key(key) } - pub fn get_snapshot_for_building( + pub fn get_snapshot_for_building( &self, key: &SnapKey, ) -> RaftStoreResult>> { @@ -1420,7 +1426,7 @@ impl SnapManager { Ok(self.get_concrete_snapshot_for_applying(key)?) } - pub fn get_snapshot_for_applying_to_engine( + pub fn get_snapshot_for_applying_to_engine( &self, key: &SnapKey, ) -> RaftStoreResult>> { @@ -1521,7 +1527,10 @@ impl SnapManager { } } -impl SnapshotDeleter for SnapManager { +impl SnapshotDeleter for SnapManager +where + E: KvEngine, +{ fn delete_snapshot( &self, key: &SnapKey, @@ -1576,11 +1585,11 @@ impl SnapManagerBuilder { self.key_manager = m; self } - pub fn build>( + pub fn build, E: KvEngine>( self, path: T, - router: Option>, - ) -> SnapManager { + router: Option>, + ) -> SnapManager { let limiter = Limiter::new(if self.max_write_bytes_per_sec > 0 { self.max_write_bytes_per_sec as f64 } else { @@ -1982,7 +1991,7 @@ pub mod tests { region, abort: Arc::new(AtomicUsize::new(JOB_STATUS_RUNNING)), write_batch_size: TEST_WRITE_BATCH_SIZE, - coprocessor_host: CoprocessorHost::default(), + coprocessor_host: CoprocessorHost::::default(), }; // Verify thte snapshot applying is ok. assert!(s4.apply(options).is_ok()); @@ -2322,7 +2331,7 @@ pub mod tests { region, abort: Arc::new(AtomicUsize::new(JOB_STATUS_RUNNING)), write_batch_size: TEST_WRITE_BATCH_SIZE, - coprocessor_host: CoprocessorHost::default(), + coprocessor_host: CoprocessorHost::::default(), }; assert!(s5.apply(options).is_err()); @@ -2469,7 +2478,7 @@ pub mod tests { let temp_path = temp_dir.path().join("snap1"); let path = temp_path.to_str().unwrap().to_owned(); assert!(!temp_path.exists()); - let mut mgr = SnapManager::new(path, None); + let mut mgr = SnapManager::::new(path, None); mgr.init().unwrap(); assert!(temp_path.exists()); @@ -2485,7 +2494,7 @@ pub mod tests { fn test_snap_mgr_v2() { let temp_dir = Builder::new().prefix("test-snap-mgr-v2").tempdir().unwrap(); let path = temp_dir.path().to_str().unwrap().to_owned(); - let mgr = SnapManager::new(path.clone(), None); + let mgr = SnapManager::::new(path.clone(), None); mgr.init().unwrap(); assert_eq!(mgr.get_total_snap_size(), 0); @@ -2567,7 +2576,7 @@ pub mod tests { assert!(!s3.exists()); assert!(!s4.exists()); - let mgr = SnapManager::new(path, None); + let mgr = SnapManager::::new(path, None); mgr.init().unwrap(); assert_eq!(mgr.get_total_snap_size(), expected_size * 2); @@ -2582,7 +2591,11 @@ pub mod tests { assert_eq!(mgr.get_total_snap_size(), 0); } - fn check_registry_around_deregister(mgr: SnapManager, key: &SnapKey, entry: &SnapEntry) { + fn check_registry_around_deregister( + mgr: SnapManager, + key: &SnapKey, + entry: &SnapEntry, + ) { let snap_keys = mgr.list_idle_snap().unwrap(); assert!(snap_keys.is_empty()); assert!(mgr.has_registered(key)); @@ -2616,9 +2629,7 @@ pub mod tests { // Ensure the snapshot being built will not be deleted on GC. src_mgr.register(key.clone(), SnapEntry::Generating); - let mut s1 = src_mgr - .get_snapshot_for_building::(&key) - .unwrap(); + let mut s1 = src_mgr.get_snapshot_for_building(&key).unwrap(); let mut snap_data = RaftSnapshotData::default(); snap_data.set_region(region.clone()); let mut stat = SnapshotStatistics::new(); @@ -2687,7 +2698,7 @@ pub mod tests { let max_total_size = 10240; let snap_mgr = SnapManagerBuilder::default() .max_total_size(max_total_size) - .build::<_>(snapfiles_path.path().to_str().unwrap(), None); + .build::<_, RocksEngine>(snapfiles_path.path().to_str().unwrap(), None); let snapshot = RocksSnapshot::new(engine.kv.clone()); // Add an oldest snapshot for receiving. @@ -2695,9 +2706,7 @@ pub mod tests { let recv_head = { let mut stat = SnapshotStatistics::new(); let mut snap_data = RaftSnapshotData::default(); - let mut s = snap_mgr - .get_snapshot_for_building::(&recv_key) - .unwrap(); + let mut s = snap_mgr.get_snapshot_for_building(&recv_key).unwrap(); s.build( engine.kv.c(), &snapshot, @@ -2725,9 +2734,7 @@ pub mod tests { for (i, region_id) in regions.into_iter().enumerate() { let key = SnapKey::new(region_id, 1, 1); let region = gen_test_region(region_id, 1, 1); - let mut s = snap_mgr - .get_snapshot_for_building::(&key) - .unwrap(); + let mut s = snap_mgr.get_snapshot_for_building(&key).unwrap(); let mut snap_data = RaftSnapshotData::default(); let mut stat = SnapshotStatistics::new(); s.build( diff --git a/components/raftstore/src/store/worker/cleanup.rs b/components/raftstore/src/store/worker/cleanup.rs index 933b87228e7..bf6d2c751f0 100644 --- a/components/raftstore/src/store/worker/cleanup.rs +++ b/components/raftstore/src/store/worker/cleanup.rs @@ -6,6 +6,7 @@ use super::cleanup_sst::{Runner as CleanupSSTRunner, Task as CleanupSSTTask}; use super::compact::{Runner as CompactRunner, Task as CompactTask}; use crate::store::StoreRouter; +use engine_traits::KvEngine; use pd_client::PdClient; use tikv_util::worker::Runnable; @@ -23,13 +24,17 @@ impl Display for Task { } } -pub struct Runner { - compact: CompactRunner, +pub struct Runner { + compact: CompactRunner, cleanup_sst: CleanupSSTRunner, } -impl Runner { - pub fn new(compact: CompactRunner, cleanup_sst: CleanupSSTRunner) -> Runner { +impl Runner +where + C: PdClient, + S: StoreRouter, +{ + pub fn new(compact: CompactRunner, cleanup_sst: CleanupSSTRunner) -> Runner { Runner { compact, cleanup_sst, @@ -37,7 +42,12 @@ impl Runner { } } -impl Runnable for Runner { +impl Runnable for Runner +where + E: KvEngine, + C: PdClient, + S: StoreRouter, +{ fn run(&mut self, task: Task) { match task { Task::Compact(t) => self.compact.run(t), diff --git a/components/raftstore/src/store/worker/compact.rs b/components/raftstore/src/store/worker/compact.rs index a04be4a5d02..d9425d311b9 100644 --- a/components/raftstore/src/store/worker/compact.rs +++ b/components/raftstore/src/store/worker/compact.rs @@ -5,9 +5,8 @@ use std::error; use std::fmt::{self, Display, Formatter}; use std::time::Instant; -use engine_rocks::RocksEngine; +use engine_traits::KvEngine; use engine_traits::CF_WRITE; -use engine_traits::{CompactExt, KvEngine}; use tikv_util::worker::Runnable; use super::metrics::COMPACT_RANGE_CF; @@ -83,12 +82,15 @@ quick_error! { } } -pub struct Runner { - engine: RocksEngine, +pub struct Runner { + engine: E, } -impl Runner { - pub fn new(engine: RocksEngine) -> Runner { +impl Runner +where + E: KvEngine, +{ + pub fn new(engine: E) -> Runner { Runner { engine } } @@ -118,7 +120,10 @@ impl Runner { } } -impl Runnable for Runner { +impl Runnable for Runner +where + E: KvEngine, +{ fn run(&mut self, task: Task) { match task { Task::Compact { diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index 79d6edc6a56..94512d55a0c 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -12,8 +12,7 @@ use futures::Future; use tokio_core::reactor::Handle; use tokio_timer::Delay; -use engine_rocks::RocksEngine; -use engine_traits::MiscExt; +use engine_traits::KvEngine; use fs2; use kvproto::metapb; use kvproto::pdpb; @@ -59,7 +58,10 @@ pub trait FlowStatsReporter: Send + Clone + Sync + 'static { fn report_read_stats(&self, read_stats: HashMap); } -impl FlowStatsReporter for Scheduler { +impl FlowStatsReporter for Scheduler> +where + E: KvEngine, +{ fn report_read_stats(&self, read_stats: HashMap) { if let Err(e) = self.schedule(Task::ReadStats { read_stats }) { error!("Failed to send read flow statistics"; "err" => ?e); @@ -74,14 +76,17 @@ pub trait DynamicConfig: Send + 'static { } /// Uses an asynchronous thread to tell PD something. -pub enum Task { +pub enum Task +where + E: KvEngine, +{ AskSplit { region: metapb::Region, split_key: Vec, peer: metapb::Peer, // If true, right Region derives origin region_id. right_derive: bool, - callback: Callback, + callback: Callback, }, AskBatchSplit { region: metapb::Region, @@ -89,7 +94,7 @@ pub enum Task { peer: metapb::Peer, // If true, right Region derives origin region_id. right_derive: bool, - callback: Callback, + callback: Callback, }, Heartbeat { term: u64, @@ -104,7 +109,7 @@ pub enum Task { }, StoreHeartbeat { stats: pdpb::StoreStats, - store_info: StoreInfo, + store_info: StoreInfo, }, ReportBatchSplit { regions: Vec, @@ -180,7 +185,10 @@ pub struct PeerStat { pub last_report_ts: UnixSecs, } -impl Display for Task { +impl Display for Task +where + E: KvEngine, +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match *self { Task::AskSplit { @@ -259,15 +267,21 @@ fn convert_record_pairs(m: HashMap) -> RecordPairVec { .collect() } -struct StatsMonitor { - scheduler: Scheduler, +struct StatsMonitor +where + E: KvEngine, +{ + scheduler: Scheduler>, handle: Option>, sender: Option>, interval: Duration, } -impl StatsMonitor { - pub fn new(interval: Duration, scheduler: Scheduler) -> Self { +impl StatsMonitor +where + E: KvEngine, +{ + pub fn new(interval: Duration, scheduler: Scheduler>) -> Self { StatsMonitor { scheduler, handle: None, @@ -324,12 +338,16 @@ impl StatsMonitor { } } -pub struct Runner { +pub struct Runner +where + E: KvEngine, + T: PdClient + ConfigClient, +{ store_id: u64, pd_client: Arc, config_handler: Box, - router: RaftRouter, - db: RocksEngine, + router: RaftRouter, + db: E, region_peers: HashMap, store_stat: StoreStat, is_hb_receiver_scheduled: bool, @@ -339,22 +357,26 @@ pub struct Runner { // use for Runner inner handle function to send Task to itself // actually it is the sender connected to Runner's Worker which // calls Runner's run() on Task received. - scheduler: Scheduler, - stats_monitor: StatsMonitor, + scheduler: Scheduler>, + stats_monitor: StatsMonitor, } -impl Runner { +impl Runner +where + E: KvEngine, + T: PdClient + ConfigClient, +{ const INTERVAL_DIVISOR: u32 = 2; pub fn new( store_id: u64, pd_client: Arc, config_handler: Box, - router: RaftRouter, - db: RocksEngine, - scheduler: Scheduler, + router: RaftRouter, + db: E, + scheduler: Scheduler>, store_heartbeat_interval: u64, - ) -> Runner { + ) -> Runner { let interval = Duration::from_secs(store_heartbeat_interval) / Self::INTERVAL_DIVISOR; let mut stats_monitor = StatsMonitor::new(interval, scheduler.clone()); if let Err(e) = stats_monitor.start() { @@ -383,7 +405,7 @@ impl Runner { split_key: Vec, peer: metapb::Peer, right_derive: bool, - callback: Callback, + callback: Callback, ) { let router = self.router.clone(); let f = self.pd_client.ask_split(region.clone()).then(move |resp| { @@ -424,7 +446,7 @@ impl Runner { mut split_keys: Vec>, peer: metapb::Peer, right_derive: bool, - callback: Callback, + callback: Callback, ) { let router = self.router.clone(); let scheduler = self.scheduler.clone(); @@ -533,7 +555,7 @@ impl Runner { &mut self, handle: &Handle, mut stats: pdpb::StoreStats, - store_info: StoreInfo, + store_info: StoreInfo, ) { let disk_stats = match fs2::statvfs(store_info.engine.path()) { Err(e) => { @@ -838,8 +860,12 @@ impl Runner { } } -impl Runnable for Runner { - fn run(&mut self, task: Task, handle: &Handle) { +impl Runnable> for Runner +where + E: KvEngine, + T: PdClient + ConfigClient, +{ + fn run(&mut self, task: Task, handle: &Handle) { debug!("executing task"; "task" => %task); if !self.is_hb_receiver_scheduled { @@ -1019,14 +1045,16 @@ fn new_merge_request(merge: pdpb::Merge) -> AdminRequest { req } -fn send_admin_request( - router: &RaftRouter, +fn send_admin_request( + router: &RaftRouter, region_id: u64, epoch: metapb::RegionEpoch, peer: metapb::Peer, request: AdminRequest, - callback: Callback, -) { + callback: Callback, +) where + E: KvEngine, +{ let cmd_type = request.get_cmd_type(); let mut req = RaftCmdRequest::default(); @@ -1045,7 +1073,11 @@ fn send_admin_request( } /// Sends merge fail message to gc merge source. -fn send_merge_fail(router: &RaftRouter, source_region_id: u64, target: metapb::Peer) { +fn send_merge_fail( + router: &RaftRouter, + source_region_id: u64, + target: metapb::Peer, +) { let target_id = target.get_id(); if let Err(e) = router.force_send( source_region_id, @@ -1063,7 +1095,7 @@ fn send_merge_fail(router: &RaftRouter, source_region_id: u64, targ /// Sends a raft message to destroy the specified stale Peer fn send_destroy_peer_message( - router: &RaftRouter, + router: &RaftRouter, local_region: metapb::Region, peer: metapb::Peer, pd_region: metapb::Region, @@ -1086,6 +1118,7 @@ fn send_destroy_peer_message( #[cfg(not(target_os = "macos"))] #[cfg(test)] mod tests { + use engine_rocks::RocksEngine; use std::sync::Mutex; use std::time::Instant; use tikv_util::worker::FutureWorker; @@ -1094,13 +1127,13 @@ mod tests { struct RunnerTest { store_stat: Arc>, - stats_monitor: StatsMonitor, + stats_monitor: StatsMonitor, } impl RunnerTest { fn new( interval: u64, - scheduler: Scheduler, + scheduler: Scheduler>, store_stat: Arc>, ) -> RunnerTest { let mut stats_monitor = StatsMonitor::new(Duration::from_secs(interval), scheduler); @@ -1127,8 +1160,8 @@ mod tests { } } - impl Runnable for RunnerTest { - fn run(&mut self, task: Task, _handle: &Handle) { + impl Runnable> for RunnerTest { + fn run(&mut self, task: Task, _handle: &Handle) { if let Task::StoreInfos { cpu_usages, read_io_rates, diff --git a/components/raftstore/src/store/worker/region.rs b/components/raftstore/src/store/worker/region.rs index 998220bc5f1..5e5869344da 100644 --- a/components/raftstore/src/store/worker/region.rs +++ b/components/raftstore/src/store/worker/region.rs @@ -9,9 +9,9 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use std::u64; -use engine_rocks::{RocksEngine, RocksSnapshot}; +use engine_rocks::RocksEngine; use engine_traits::CF_RAFT; -use engine_traits::{KvEngines, MiscExt, Mutable, Peekable, WriteBatchExt}; +use engine_traits::{KvEngine, KvEngines, Mutable}; use kvproto::raft_serverpb::{PeerState, RaftApplyState, RegionLocalState}; use raft::eraftpb::Snapshot as RaftSnapshot; @@ -46,12 +46,15 @@ const CLEANUP_MAX_DURATION: Duration = Duration::from_secs(5); /// Region related task #[derive(Debug)] -pub enum Task { +pub enum Task +where + E: KvEngine, +{ Gen { region_id: u64, last_applied_index_term: u64, last_applied_state: RaftApplyState, - kv_snap: RocksSnapshot, + kv_snap: E::Snapshot, notifier: SyncSender, }, Apply { @@ -68,8 +71,11 @@ pub enum Task { }, } -impl Task { - pub fn destroy(region_id: u64, start_key: Vec, end_key: Vec) -> Task { +impl Task +where + E: KvEngine, +{ + pub fn destroy(region_id: u64, start_key: Vec, end_key: Vec) -> Task { Task::Destroy { region_id, start_key, @@ -78,7 +84,10 @@ impl Task { } } -impl Display for Task { +impl Display for Task +where + E: KvEngine, +{ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match *self { Task::Gen { region_id, .. } => write!(f, "Snap gen for {}", region_id), @@ -206,29 +215,38 @@ impl PendingDeleteRanges { } #[derive(Clone)] -struct SnapContext { - engines: KvEngines, +struct SnapContext +where + EK: KvEngine, + ER: KvEngine, +{ + engines: KvEngines, batch_size: usize, - mgr: SnapManager, + mgr: SnapManager, use_delete_range: bool, clean_stale_peer_delay: Duration, pending_delete_ranges: PendingDeleteRanges, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, router: R, } -impl> SnapContext { +impl SnapContext +where + EK: KvEngine, + ER: KvEngine, + R: CasualRouter, +{ /// Generates the snapshot of the Region. fn generate_snap( &self, region_id: u64, last_applied_index_term: u64, last_applied_state: RaftApplyState, - kv_snap: RocksSnapshot, + kv_snap: EK::Snapshot, notifier: SyncSender, ) -> Result<()> { // do we need to check leader here? - let snap = box_try!(store::do_snapshot::( + let snap = box_try!(store::do_snapshot::( self.mgr.clone(), &self.engines.kv, kv_snap, @@ -259,7 +277,7 @@ impl> SnapContext { region_id: u64, last_applied_index_term: u64, last_applied_state: RaftApplyState, - kv_snap: RocksSnapshot, + kv_snap: EK::Snapshot, notifier: SyncSender, ) { SNAP_COUNTER.generate.all.inc(); @@ -526,24 +544,33 @@ impl> SnapContext { } } -pub struct Runner { +pub struct Runner +where + EK: KvEngine, + ER: KvEngine, +{ pool: ThreadPool, - ctx: SnapContext, + ctx: SnapContext, // we may delay some apply tasks if level 0 files to write stall threshold, // pending_applies records all delayed apply task, and will check again later - pending_applies: VecDeque, + pending_applies: VecDeque>, } -impl> Runner { +impl Runner +where + EK: KvEngine, + ER: KvEngine, + R: CasualRouter, +{ pub fn new( - engines: KvEngines, - mgr: SnapManager, + engines: KvEngines, + mgr: SnapManager, batch_size: usize, use_delete_range: bool, clean_stale_peer_delay: Duration, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, router: R, - ) -> Runner { + ) -> Runner { Runner { pool: Builder::new(thd_name!("snap-generator")) .max_thread_count(GENERATE_POOL_SIZE) @@ -592,11 +619,13 @@ impl> Runner { } } -impl Runnable for Runner +impl Runnable> for Runner where - R: CasualRouter + Send + Clone + 'static, + EK: KvEngine, + ER: KvEngine, + R: CasualRouter + Send + Clone + 'static, { - fn run(&mut self, task: Task) { + fn run(&mut self, task: Task) { match task { Task::Gen { region_id, @@ -658,9 +687,11 @@ pub enum Event { CheckApply, } -impl RunnableWithTimer for Runner +impl RunnableWithTimer, Event> for Runner where - R: CasualRouter + Send + Clone + 'static, + EK: KvEngine, + ER: KvEngine, + R: CasualRouter + Send + Clone + 'static, { fn on_timeout(&mut self, timer: &mut Timer, event: Event) { match event { @@ -698,7 +729,7 @@ mod tests { use engine::rocks; use engine::rocks::{ColumnFamilyOptions, Writable}; use engine::Engines; - use engine_rocks::{CloneCompat, Compat, RocksSnapshot}; + use engine_rocks::{CloneCompat, Compat, RocksEngine, RocksSnapshot}; use engine_traits::{CompactExt, Mutable, Peekable, WriteBatchExt}; use engine_traits::{CF_DEFAULT, CF_RAFT}; use kvproto::raft_serverpb::{PeerState, RaftApplyState, RegionLocalState}; @@ -843,7 +874,7 @@ mod tests { 0, true, Duration::from_secs(0), - CoprocessorHost::default(), + CoprocessorHost::::default(), router, ); let mut timer = Timer::new(1); @@ -884,7 +915,7 @@ mod tests { } let data = s1.get_data(); let key = SnapKey::from_snap(&s1).unwrap(); - let mgr = SnapManager::new(snap_dir.path().to_str().unwrap(), None); + let mgr = SnapManager::::new(snap_dir.path().to_str().unwrap(), None); let mut s2 = mgr.get_snapshot_for_sending(&key).unwrap(); let mut s3 = mgr.get_snapshot_for_receiving(&key, &data[..]).unwrap(); io::copy(&mut s2, &mut s3).unwrap(); diff --git a/components/raftstore/src/store/worker/split_check.rs b/components/raftstore/src/store/worker/split_check.rs index 998e6ffec56..acf961d0419 100644 --- a/components/raftstore/src/store/worker/split_check.rs +++ b/components/raftstore/src/store/worker/split_check.rs @@ -5,8 +5,8 @@ use std::collections::BinaryHeap; use std::fmt::{self, Display, Formatter}; use std::mem; -use engine_rocks::{RocksEngine, RocksEngineIterator}; -use engine_traits::{CfName, IterOptions, Iterable, Iterator, CF_WRITE, LARGE_CFS}; +use engine_rocks::RocksEngine; +use engine_traits::{CfName, IterOptions, Iterable, Iterator, KvEngine, CF_WRITE, LARGE_CFS}; use kvproto::metapb::Region; use kvproto::metapb::RegionEpoch; use kvproto::pdpb::CheckPolicy; @@ -66,19 +66,22 @@ impl Ord for KeyEntry { } } -struct MergedIterator { - iters: Vec<(CfName, RocksEngineIterator)>, +struct MergedIterator { + iters: Vec<(CfName, I)>, heap: BinaryHeap, } -impl MergedIterator { - fn new( - db: &RocksEngine, +impl MergedIterator +where + I: Iterator, +{ + fn new( + db: &E, cfs: &[CfName], start_key: &[u8], end_key: &[u8], fill_cache: bool, - ) -> Result { + ) -> Result> { let mut iters = Vec::with_capacity(cfs.len()); let mut heap = BinaryHeap::with_capacity(cfs.len()); for (pos, cf) in cfs.iter().enumerate() { @@ -162,7 +165,7 @@ impl Display for Task { pub struct Runner { engine: RocksEngine, router: S, - coprocessor: CoprocessorHost, + coprocessor: CoprocessorHost, cfg: Config, } @@ -170,7 +173,7 @@ impl> Runner { pub fn new( engine: RocksEngine, router: S, - coprocessor: CoprocessorHost, + coprocessor: CoprocessorHost, cfg: Config, ) -> Runner { Runner { @@ -267,35 +270,40 @@ impl> Runner { end_key: &[u8], ) -> Result>> { let timer = CHECK_SPILT_HISTOGRAM.start_coarse_timer(); - MergedIterator::new(&self.engine, LARGE_CFS, start_key, end_key, false).map( - |mut iter| { - let mut size = 0; - let mut keys = 0; - while let Some(e) = iter.next() { - if host.on_kv(region, &e) { - return; - } - size += e.entry_size() as u64; - keys += 1; + MergedIterator::<::Iterator>::new( + &self.engine, + LARGE_CFS, + start_key, + end_key, + false, + ) + .map(|mut iter| { + let mut size = 0; + let mut keys = 0; + while let Some(e) = iter.next() { + if host.on_kv(region, &e) { + return; } + size += e.entry_size() as u64; + keys += 1; + } - // if we scan the whole range, we can update approximate size and keys with accurate value. - info!( - "update approximate size and keys with accurate value"; - "region_id" => region.get_id(), - "size" => size, - "keys" => keys, - ); - let _ = self.router.send( - region.get_id(), - CasualMessage::RegionApproximateSize { size }, - ); - let _ = self.router.send( - region.get_id(), - CasualMessage::RegionApproximateKeys { keys }, - ); - }, - )?; + // if we scan the whole range, we can update approximate size and keys with accurate value. + info!( + "update approximate size and keys with accurate value"; + "region_id" => region.get_id(), + "size" => size, + "keys" => keys, + ); + let _ = self.router.send( + region.get_id(), + CasualMessage::RegionApproximateSize { size }, + ); + let _ = self.router.send( + region.get_id(), + CasualMessage::RegionApproximateKeys { keys }, + ); + })?; timer.observe_duration(); Ok(host.split_keys()) diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 022470efe47..b46bc14b812 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -32,8 +32,8 @@ use tikv_util::config::VersionTrack; use tikv_util::worker::{FutureWorker, Worker}; pub struct ChannelTransportCore { - snap_paths: HashMap, - routers: HashMap>, + snap_paths: HashMap, TempDir)>, + routers: HashMap>>, } #[derive(Clone)] @@ -121,7 +121,8 @@ pub struct NodeCluster { pd_client: Arc, nodes: HashMap>, simulate_trans: HashMap, - post_create_coprocessor_host: Option>, + #[allow(clippy::type_complexity)] + post_create_coprocessor_host: Option)>>, } impl NodeCluster { @@ -138,7 +139,10 @@ impl NodeCluster { impl NodeCluster { #[allow(dead_code)] - pub fn get_node_router(&self, node_id: u64) -> SimulateTransport { + pub fn get_node_router( + &self, + node_id: u64, + ) -> SimulateTransport> { self.trans .core .lock() @@ -152,7 +156,11 @@ impl NodeCluster { // Set a function that will be invoked after creating each CoprocessorHost. The first argument // of `op` is the node_id. // Set this before invoking `run_node`. - pub fn post_create_coprocessor_host(&mut self, op: Box) { + #[allow(clippy::type_complexity)] + pub fn post_create_coprocessor_host( + &mut self, + op: Box)>, + ) { self.post_create_coprocessor_host = Some(op) } diff --git a/components/test_raftstore/src/router.rs b/components/test_raftstore/src/router.rs index 07e3b0418aa..0a7abc1f774 100644 --- a/components/test_raftstore/src/router.rs +++ b/components/test_raftstore/src/router.rs @@ -29,7 +29,7 @@ impl MockRaftStoreRouter { } } -impl RaftStoreRouter for MockRaftStoreRouter { +impl RaftStoreRouter for MockRaftStoreRouter { fn significant_send(&self, region_id: u64, msg: SignificantMsg) -> RaftStoreResult<()> { let mut senders = self.senders.lock().unwrap(); if let Some(tx) = senders.get_mut(®ion_id) { diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index cb8a2103208..b9600cce80f 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -45,7 +45,7 @@ use tikv_util::config::VersionTrack; use tikv_util::security::SecurityManager; use tikv_util::worker::{FutureWorker, Worker}; -type SimulateStoreTransport = SimulateTransport; +type SimulateStoreTransport = SimulateTransport>; type SimulateServerTransport = SimulateTransport>; @@ -62,7 +62,7 @@ struct ServerMeta { } type PendingServices = Vec Service>>; -type CopHooks = Vec>; +type CopHooks = Vec)>>; pub struct ServerCluster { metas: HashMap, diff --git a/components/test_raftstore/src/transport_simulate.rs b/components/test_raftstore/src/transport_simulate.rs index 28e403165ba..d3a39e9cb11 100644 --- a/components/test_raftstore/src/transport_simulate.rs +++ b/components/test_raftstore/src/transport_simulate.rs @@ -188,7 +188,7 @@ impl Transport for SimulateTransport { } } -impl RaftStoreRouter for SimulateTransport { +impl> RaftStoreRouter for SimulateTransport { fn send_raft_msg(&self, msg: RaftMessage) -> Result<()> { filter_send(&self.filters, msg, |m| self.ch.send_raft_msg(m)) } diff --git a/src/config.rs b/src/config.rs index 8677bd247e8..f234d784afe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2476,7 +2476,7 @@ impl ConfigHandler { pub fn start( id: String, mut controller: ConfigController, - scheduler: FutureScheduler, + scheduler: FutureScheduler>, ) -> CfgResult { if controller.get_current().enable_dynamic_config { if let Err(e) = scheduler.schedule(PdTask::RefreshConfig) { diff --git a/src/import/sst_service.rs b/src/import/sst_service.rs index 5af2c8ff758..dc73fb43363 100644 --- a/src/import/sst_service.rs +++ b/src/import/sst_service.rs @@ -43,7 +43,7 @@ pub struct ImportSSTService { security_mgr: Arc, } -impl ImportSSTService { +impl> ImportSSTService { pub fn new( cfg: Config, router: Router, @@ -68,7 +68,7 @@ impl ImportSSTService { } } -impl ImportSst for ImportSSTService { +impl> ImportSst for ImportSSTService { fn switch_mode( &mut self, ctx: RpcContext<'_>, diff --git a/src/server/debug.rs b/src/server/debug.rs index 00e95a5e0d2..76dc71d20a0 100644 --- a/src/server/debug.rs +++ b/src/server/debug.rs @@ -15,7 +15,7 @@ use engine::rocks::{ }; use engine::IterOptionsExt; use engine::{self, Engines}; -use engine_rocks::{CloneCompat, Compat, RocksWriteBatch}; +use engine_rocks::{CloneCompat, Compat, RocksEngine, RocksWriteBatch}; use engine_traits::{ IterOptions, Iterable, Mutable, Peekable, TableProperties, TablePropertiesCollection, TablePropertiesExt, WriteBatch, WriteOptions, @@ -509,7 +509,7 @@ impl Debugger { })?; let tag = format!("[region {}] {}", region.get_id(), peer_id); - let peer_storage = box_try!(PeerStorage::new( + let peer_storage = box_try!(PeerStorage::::new( self.engines.c(), region, fake_snap_worker.scheduler(), diff --git a/src/server/gc_worker/applied_lock_collector.rs b/src/server/gc_worker/applied_lock_collector.rs index 0ee6b4f5808..838ed1a0816 100644 --- a/src/server/gc_worker/applied_lock_collector.rs +++ b/src/server/gc_worker/applied_lock_collector.rs @@ -6,6 +6,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use txn_types::Key; +use engine_rocks::RocksEngine; use engine_traits::{CfName, CF_LOCK}; use kvproto::kvrpcpb::LockInfo; use kvproto::raft_cmdpb::{CmdType, Request as RaftRequest}; @@ -97,7 +98,7 @@ impl LockObserver { Self { state, sender } } - pub fn register(self, coprocessor_host: &mut CoprocessorHost) { + pub fn register(self, coprocessor_host: &mut CoprocessorHost) { coprocessor_host .registry .register_apply_snapshot_observer(1, BoxApplySnapshotObserver::new(self.clone())); @@ -355,7 +356,7 @@ pub struct AppliedLockCollector { } impl AppliedLockCollector { - pub fn new(coprocessor_host: &mut CoprocessorHost) -> Result { + pub fn new(coprocessor_host: &mut CoprocessorHost) -> Result { let worker = Mutex::new(WorkerBuilder::new("lock-collector").create()); let scheduler = worker.lock().unwrap().scheduler(); @@ -480,7 +481,7 @@ mod tests { res } - fn new_test_collector() -> (AppliedLockCollector, CoprocessorHost) { + fn new_test_collector() -> (AppliedLockCollector, CoprocessorHost) { let mut coprocessor_host = CoprocessorHost::default(); let collector = AppliedLockCollector::new(&mut coprocessor_host).unwrap(); (collector, coprocessor_host) diff --git a/src/server/gc_worker/gc_worker.rs b/src/server/gc_worker/gc_worker.rs index 82b1f3e4a7c..9b580ec4488 100644 --- a/src/server/gc_worker/gc_worker.rs +++ b/src/server/gc_worker/gc_worker.rs @@ -137,7 +137,7 @@ impl Display for GcTask { struct GcRunner { engine: E, local_storage: Option, - raft_store_router: Option, + raft_store_router: Option>, region_info_accessor: Option, /// Used to limit the write flow of GC. @@ -153,7 +153,7 @@ impl GcRunner { pub fn new( engine: E, local_storage: Option, - raft_store_router: Option, + raft_store_router: Option>, cfg_tracker: Tracker, region_info_accessor: Option, cfg: GcConfig, @@ -658,7 +658,7 @@ pub struct GcWorker { /// `local_storage` represent the underlying RocksDB of the `engine`. local_storage: Option, /// `raft_store_router` is useful to signal raftstore clean region size informations. - raft_store_router: Option, + raft_store_router: Option>, /// Access the region's meta before getting snapshot, which will wake hibernating regions up. /// This is useful to do the `need_gc` check without waking hibernatin regions up. /// This is not set for tests. @@ -721,7 +721,7 @@ impl GcWorker { pub fn new( engine: E, local_storage: Option, - raft_store_router: Option, + raft_store_router: Option>, region_info_accessor: Option, cfg: GcConfig, ) -> GcWorker { @@ -771,7 +771,7 @@ impl GcWorker { pub fn start_observe_lock_apply( &mut self, - coprocessor_host: &mut CoprocessorHost, + coprocessor_host: &mut CoprocessorHost, ) -> Result<()> { assert!(self.applied_lock_collector.is_none()); let collector = Arc::new(AppliedLockCollector::new(coprocessor_host)?); diff --git a/src/server/lock_manager/deadlock.rs b/src/server/lock_manager/deadlock.rs index ce9fba54bab..46fb82bee7f 100644 --- a/src/server/lock_manager/deadlock.rs +++ b/src/server/lock_manager/deadlock.rs @@ -7,6 +7,7 @@ use super::waiter_manager::Scheduler as WaiterMgrScheduler; use super::{Error, Result}; use crate::server::resolve::StoreAddrResolver; use crate::storage::lock_manager::Lock; +use engine_rocks::RocksEngine; use futures::{Future, Sink, Stream}; use grpcio::{ self, DuplexSink, Environment, RequestStream, RpcContext, RpcStatus, RpcStatusCode, UnarySink, @@ -398,7 +399,7 @@ impl RoleChangeNotifier { } } - pub(crate) fn register(self, host: &mut CoprocessorHost) { + pub(crate) fn register(self, host: &mut CoprocessorHost) { host.registry .register_role_observer(1, BoxRoleObserver::new(self.clone())); host.registry @@ -1085,7 +1086,9 @@ pub mod tests { } } - fn start_deadlock_detector(host: &mut CoprocessorHost) -> (FutureWorker, Scheduler) { + fn start_deadlock_detector( + host: &mut CoprocessorHost, + ) -> (FutureWorker, Scheduler) { let waiter_mgr_worker = FutureWorker::new("dummy-waiter-mgr"); let waiter_mgr_scheduler = WaiterMgrScheduler::new(waiter_mgr_worker.scheduler()); let mut detector_worker = FutureWorker::new("test-deadlock-detector"); diff --git a/src/server/lock_manager/mod.rs b/src/server/lock_manager/mod.rs index 73cc26fc643..087e5918789 100644 --- a/src/server/lock_manager/mod.rs +++ b/src/server/lock_manager/mod.rs @@ -26,6 +26,7 @@ use crate::storage::{ }; use raftstore::coprocessor::CoprocessorHost; +use engine_rocks::RocksEngine; use parking_lot::Mutex; use pd_client::PdClient; use tikv_util::collections::HashSet; @@ -182,7 +183,7 @@ impl LockManager { /// Creates a `RoleChangeNotifier` of the deadlock detector worker and registers it to /// the `CoprocessorHost` to observe the role change events of the leader region. - pub fn register_detector_role_change_observer(&self, host: &mut CoprocessorHost) { + pub fn register_detector_role_change_observer(&self, host: &mut CoprocessorHost) { let role_change_notifier = RoleChangeNotifier::new(self.detector_scheduler.clone()); role_change_notifier.register(host); } diff --git a/src/server/node.rs b/src/server/node.rs index 37b50d96452..3564881fd9d 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -41,7 +41,7 @@ pub fn create_raft_storage( pipelined_pessimistic_lock: bool, ) -> Result, LockManager>> where - S: RaftStoreRouter + 'static, + S: RaftStoreRouter + 'static, { let store = Storage::from_engine(engine, cfg, read_pool, lock_mgr, pipelined_pessimistic_lock)?; Ok(store) @@ -118,10 +118,10 @@ where &mut self, engines: Engines, trans: T, - snap_mgr: SnapManager, - pd_worker: FutureWorker, + snap_mgr: SnapManager, + pd_worker: FutureWorker>, store_meta: Arc>, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, importer: Arc, split_check_worker: Worker, dyn_cfg: Box, @@ -334,10 +334,10 @@ where store_id: u64, engines: Engines, trans: T, - snap_mgr: SnapManager, - pd_worker: FutureWorker, + snap_mgr: SnapManager, + pd_worker: FutureWorker>, store_meta: Arc>, - coprocessor_host: CoprocessorHost, + coprocessor_host: CoprocessorHost, importer: Arc, split_check_worker: Worker, dyn_cfg: Box, diff --git a/src/server/raft_client.rs b/src/server/raft_client.rs index 20d4300929d..848fff260d2 100644 --- a/src/server/raft_client.rs +++ b/src/server/raft_client.rs @@ -10,6 +10,7 @@ use super::load_statistics::ThreadLoad; use super::metrics::*; use super::{Config, Result}; use crossbeam::channel::SendError; +use engine_rocks::RocksEngine; use futures::{future, stream, Future, Poll, Sink, Stream}; use grpcio::{ ChannelBuilder, Environment, Error as GrpcError, RpcStatus, RpcStatusCode, WriteFlags, @@ -40,7 +41,7 @@ struct Conn { } impl Conn { - fn new( + fn new + 'static>( env: Arc, router: T, addr: &str, @@ -150,7 +151,7 @@ pub struct RaftClient { timer: Handle, } -impl RaftClient { +impl> RaftClient { pub fn new( env: Arc, cfg: Arc, diff --git a/src/server/raftkv.rs b/src/server/raftkv.rs index 27225d2d0cd..17c41ac17fd 100644 --- a/src/server/raftkv.rs +++ b/src/server/raftkv.rs @@ -105,7 +105,7 @@ impl From for KvError { /// `RaftKv` is a storage engine base on `RaftStore`. #[derive(Clone)] -pub struct RaftKv { +pub struct RaftKv + 'static> { router: S, engine: RocksEngine, } @@ -161,7 +161,7 @@ fn on_read_result( } } -impl RaftKv { +impl> RaftKv { /// Create a RaftKv using specified configuration. pub fn new(router: S, engine: RocksEngine) -> RaftKv { RaftKv { router, engine } @@ -256,19 +256,19 @@ fn invalid_resp_type(exp: CmdType, act: CmdType) -> Error { )) } -impl Display for RaftKv { +impl> Display for RaftKv { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "RaftKv") } } -impl Debug for RaftKv { +impl> Debug for RaftKv { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "RaftKv") } } -impl Engine for RaftKv { +impl> Engine for RaftKv { type Snap = RegionSnapshot; fn async_write( diff --git a/src/server/server.rs b/src/server/server.rs index 324a12728a1..985088967c1 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -18,6 +18,7 @@ use crate::coprocessor::Endpoint; use crate::server::gc_worker::GcWorker; use crate::storage::lock_manager::LockManager; use crate::storage::{Engine, Storage}; +use engine_rocks::RocksEngine; use raftstore::router::RaftStoreRouter; use raftstore::store::SnapManager; use tikv_util::security::SecurityManager; @@ -44,7 +45,7 @@ pub const STATS_THREAD_PREFIX: &str = "transport-stats"; /// /// It hosts various internal components, including gRPC, the raftstore router /// and a snapshot worker. -pub struct Server { +pub struct Server + 'static, S: StoreAddrResolver + 'static> { env: Arc, /// A GrpcServer builder or a GrpcServer. /// @@ -55,7 +56,7 @@ pub struct Server trans: ServerTransport, raft_router: T, // For sending/receiving snapshots. - snap_mgr: SnapManager, + snap_mgr: SnapManager, snap_worker: Worker, // Currently load statistics is done in the thread. @@ -66,7 +67,7 @@ pub struct Server timer: Handle, } -impl Server { +impl, S: StoreAddrResolver + 'static> Server { #[allow(clippy::too_many_arguments)] pub fn new( cfg: &Arc, @@ -75,7 +76,7 @@ impl Server { cop: Endpoint, raft_router: T, resolver: S, - snap_mgr: SnapManager, + snap_mgr: SnapManager, gc_worker: GcWorker, yatp_read_pool: Option, ) -> Result { @@ -319,7 +320,7 @@ mod tests { significant_msg_sender: Sender, } - impl RaftStoreRouter for TestRaftStoreRouter { + impl RaftStoreRouter for TestRaftStoreRouter { fn send_raft_msg(&self, _: RaftMessage) -> RaftStoreResult<()> { self.tx.send(1).unwrap(); Ok(()) diff --git a/src/server/service/debug.rs b/src/server/service/debug.rs index e2773ddb36a..f02d05b72c6 100644 --- a/src/server/service/debug.rs +++ b/src/server/service/debug.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use engine::rocks::util::stats as rocksdb_stats; use engine::Engines; +use engine_rocks::RocksEngine; use fail; use futures::{future, stream, Future, Stream}; use futures_cpupool::CpuPool; @@ -47,7 +48,7 @@ fn error_to_grpc_error(tag: &'static str, e: Error) -> GrpcError { /// Service handles the RPC messages for the `Debug` service. #[derive(Clone)] -pub struct Service { +pub struct Service> { pool: CpuPool, debugger: Debugger, raft_router: T, @@ -55,7 +56,7 @@ pub struct Service { security_mgr: Arc, } -impl Service { +impl> Service { /// Constructs a new `Service` with `Engines`, a `RaftStoreRouter` and a `GcWorker`. pub fn new( engines: Engines, @@ -93,7 +94,7 @@ impl Service { } } -impl debugpb::Debug for Service { +impl + 'static> debugpb::Debug for Service { fn get(&mut self, ctx: RpcContext<'_>, mut req: GetRequest, sink: UnarySink) { if !check_common_name(self.security_mgr.cert_allowed_cn(), &ctx) { return; @@ -517,7 +518,7 @@ impl debugpb::Debug for Service { } } -fn region_detail( +fn region_detail>( raft_router: T, region_id: u64, store_id: u64, @@ -555,7 +556,7 @@ fn region_detail( }) } -fn consistency_check( +fn consistency_check>( raft_router: T, mut detail: RegionDetailResponse, ) -> impl Future { diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index 10bfa46388e..def87678c76 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -21,6 +21,7 @@ use crate::storage::{ lock_manager::LockManager, PointGetCommand, Storage, TxnStatus, }; +use engine_rocks::RocksEngine; use futures::executor::{self, Notify, Spawn}; use futures::{future, Async, Future, Sink, Stream}; use grpcio::{ @@ -47,7 +48,7 @@ const GRPC_MSG_NOTIFY_SIZE: usize = 8; /// Service handles the RPC messages for the `Tikv` service. #[derive(Clone)] -pub struct Service { +pub struct Service + 'static, E: Engine, L: LockManager> { /// Used to handle requests related to GC. gc_worker: GcWorker, // For handling KV requests. @@ -72,7 +73,7 @@ pub struct Service { security_mgr: Arc, } -impl Service { +impl + 'static, E: Engine, L: LockManager> Service { /// Constructs a new `Service` which provides the `Tikv` service. pub fn new( storage: Storage, @@ -142,7 +143,9 @@ macro_rules! handle_request { } } -impl Tikv for Service { +impl + 'static, E: Engine, L: LockManager> Tikv + for Service +{ handle_request!(kv_get, future_get, GetRequest, GetResponse); handle_request!(kv_scan, future_scan, ScanRequest, ScanResponse); handle_request!( diff --git a/src/server/snap.rs b/src/server/snap.rs index e617f8bc314..7e81d9f4bc1 100644 --- a/src/server/snap.rs +++ b/src/server/snap.rs @@ -15,6 +15,7 @@ use kvproto::raft_serverpb::RaftMessage; use kvproto::raft_serverpb::{Done, SnapshotChunk}; use kvproto::tikvpb::TikvClient; +use engine_rocks::RocksEngine; use raftstore::router::RaftStoreRouter; use raftstore::store::{GenericSnapshot, SnapEntry, SnapKey, SnapManager}; use tikv_util::security::SecurityManager; @@ -102,7 +103,7 @@ struct SendStat { /// It will first send the normal raft snapshot message and then send the snapshot file. fn send_snap( env: Arc, - mgr: SnapManager, + mgr: SnapManager, security_mgr: Arc, cfg: &Config, addr: &str, @@ -181,7 +182,7 @@ struct RecvSnapContext { } impl RecvSnapContext { - fn new(head_chunk: Option, snap_mgr: &SnapManager) -> Result { + fn new(head_chunk: Option, snap_mgr: &SnapManager) -> Result { // head_chunk is None means the stream is empty. let mut head = head_chunk.ok_or_else(|| Error::Other("empty gRPC stream".into()))?; if !head.has_message() { @@ -217,7 +218,7 @@ impl RecvSnapContext { }) } - fn finish(self, raft_router: R) -> Result<()> { + fn finish>(self, raft_router: R) -> Result<()> { let key = self.key; if let Some(mut file) = self.file { info!("saving snapshot file"; "snap_key" => %key, "file" => file.path()); @@ -234,10 +235,10 @@ impl RecvSnapContext { } } -fn recv_snap( +fn recv_snap + 'static>( stream: RequestStream, sink: ClientStreamingSink, - snap_mgr: SnapManager, + snap_mgr: SnapManager, raft_router: R, ) -> impl Future { let stream = stream.map_err(Error::from); @@ -290,9 +291,9 @@ fn recv_snap( .map_err(Error::from) } -pub struct Runner { +pub struct Runner + 'static> { env: Arc, - snap_mgr: SnapManager, + snap_mgr: SnapManager, pool: CpuPool, raft_router: R, security_mgr: Arc, @@ -301,10 +302,10 @@ pub struct Runner { recving_count: Arc, } -impl Runner { +impl + 'static> Runner { pub fn new( env: Arc, - snap_mgr: SnapManager, + snap_mgr: SnapManager, r: R, security_mgr: Arc, cfg: Arc, @@ -325,7 +326,7 @@ impl Runner { } } -impl Runnable for Runner { +impl + 'static> Runnable for Runner { fn run(&mut self, task: Task) { match task { Task::Recv { stream, sink } => { diff --git a/src/server/status_server.rs b/src/server/status_server.rs index 82ef32b77a3..6fb5e0af6f5 100644 --- a/src/server/status_server.rs +++ b/src/server/status_server.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use super::Result; use crate::config::TiKvConfig; +use engine_rocks::RocksEngine; use raftstore::store::PdTask; use tikv_alloc::error::ProfError; use tikv_util::collections::HashMap; @@ -88,11 +89,14 @@ pub struct StatusServer { tx: Sender<()>, rx: Option>, addr: Option, - pd_sender: Arc>, + pd_sender: Arc>>, } impl StatusServer { - pub fn new(status_thread_pool_size: usize, pd_sender: FutureScheduler) -> Self { + pub fn new( + status_thread_pool_size: usize, + pd_sender: FutureScheduler>, + ) -> Self { let thread_pool = Builder::new() .pool_size(status_thread_pool_size) .name_prefix("status-server-") @@ -215,7 +219,7 @@ impl StatusServer { } fn config_handler( - pd_sender: &FutureScheduler, + pd_sender: &FutureScheduler>, ) -> Box, Error = hyper::Error> + Send> { let (cfg_sender, rx) = oneshot::channel(); if pd_sender @@ -618,6 +622,7 @@ mod tests { use crate::config::TiKvConfig; use crate::server::status_server::StatusServer; + use engine_rocks::RocksEngine; use raftstore::store::PdTask; use test_util::new_security_cfg; use tikv_util::collections::HashSet; @@ -672,8 +677,8 @@ mod tests { #[test] fn test_config_endpoint() { struct Runner; - impl FutureRunnable for Runner { - fn run(&mut self, t: PdTask, _: &Handle) { + impl FutureRunnable> for Runner { + fn run(&mut self, t: PdTask, _: &Handle) { match t { PdTask::GetConfig { cfg_sender } => cfg_sender.send(String::new()).unwrap(), _ => unreachable!(), diff --git a/src/server/transport.rs b/src/server/transport.rs index 5e3f7b44072..bde129635fd 100644 --- a/src/server/transport.rs +++ b/src/server/transport.rs @@ -9,6 +9,7 @@ use crate::server::raft_client::RaftClient; use crate::server::resolve::StoreAddrResolver; use crate::server::snap::Task as SnapTask; use crate::server::Result; +use engine_rocks::RocksEngine; use raft::SnapshotStatus; use raftstore::router::RaftStoreRouter; use raftstore::store::Transport; @@ -19,7 +20,7 @@ use tikv_util::HandyRwLock; pub struct ServerTransport where - T: RaftStoreRouter + 'static, + T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static, { raft_client: Arc>>, @@ -31,7 +32,7 @@ where impl Clone for ServerTransport where - T: RaftStoreRouter + 'static, + T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static, { fn clone(&self) -> Self { @@ -45,7 +46,9 @@ where } } -impl ServerTransport { +impl + 'static, S: StoreAddrResolver + 'static> + ServerTransport +{ pub fn new( raft_client: Arc>>, snap_scheduler: Scheduler, @@ -228,7 +231,7 @@ impl ServerTranspo impl Transport for ServerTransport where - T: RaftStoreRouter + 'static, + T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static, { fn send(&mut self, msg: RaftMessage) -> RaftStoreResult<()> { @@ -242,14 +245,14 @@ where } } -struct SnapshotReporter { +struct SnapshotReporter + 'static> { raft_router: T, region_id: u64, to_peer_id: u64, to_store_id: u64, } -impl SnapshotReporter { +impl + 'static> SnapshotReporter { pub fn report(&self, status: SnapshotStatus) { debug!( "send snapshot"; diff --git a/tests/benches/misc/raftkv/mod.rs b/tests/benches/misc/raftkv/mod.rs index 63178978121..36634163181 100644 --- a/tests/benches/misc/raftkv/mod.rs +++ b/tests/benches/misc/raftkv/mod.rs @@ -64,7 +64,7 @@ impl SyncBenchRouter { } } -impl RaftStoreRouter for SyncBenchRouter { +impl RaftStoreRouter for SyncBenchRouter { fn send_raft_msg(&self, _: RaftMessage) -> Result<()> { Ok(()) }