Skip to content

Commit

Permalink
raftstore: Convert RocksEngine to type parameters (tikv#7371)
Browse files Browse the repository at this point in the history
Signed-off-by: Brian Anderson <[email protected]>
  • Loading branch information
brson authored Apr 17, 2020
1 parent 7d13ca0 commit e76005b
Show file tree
Hide file tree
Showing 41 changed files with 894 additions and 588 deletions.
19 changes: 11 additions & 8 deletions cmd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,22 +123,22 @@ struct TiKVServer {
engines: Option<Engines>,
servers: Option<Servers>,
region_info_accessor: RegionInfoAccessor,
coprocessor_host: Option<CoprocessorHost>,
coprocessor_host: Option<CoprocessorHost<RocksEngine>>,
to_stop: Vec<Box<dyn Stop>>,
lock_files: Vec<File>,
}

struct Engines {
engines: engine::Engines,
store_meta: Arc<Mutex<StoreMeta>>,
engine: RaftKv<ServerRaftStoreRouter>,
raft_router: ServerRaftStoreRouter,
engine: RaftKv<ServerRaftStoreRouter<RocksEngine>>,
raft_router: ServerRaftStoreRouter<RocksEngine>,
}

struct Servers {
pd_sender: FutureScheduler<PdTask>,
pd_sender: FutureScheduler<PdTask<RocksEngine>>,
lock_mgr: Option<LockManager>,
server: Server<ServerRaftStoreRouter, resolve::PdStoreAddrResolver>,
server: Server<ServerRaftStoreRouter<RocksEngine>, resolve::PdStoreAddrResolver>,
node: Node<RpcClient>,
importer: Arc<SSTImporter>,
cdc_scheduler: tikv_util::worker::Scheduler<cdc::Task>,
Expand Down Expand Up @@ -436,7 +436,7 @@ impl TiKVServer {
});
}

fn init_gc_worker(&mut self) -> GcWorker<RaftKv<ServerRaftStoreRouter>> {
fn init_gc_worker(&mut self) -> GcWorker<RaftKv<ServerRaftStoreRouter<RocksEngine>>> {
let engines = self.engines.as_ref().unwrap();
let mut gc_worker = GcWorker::new(
engines.engine.clone(),
Expand All @@ -457,7 +457,7 @@ impl TiKVServer {

fn init_servers(
&mut self,
gc_worker: &GcWorker<RaftKv<ServerRaftStoreRouter>>,
gc_worker: &GcWorker<RaftKv<ServerRaftStoreRouter<RocksEngine>>>,
) -> Arc<ServerConfig> {
let mut cfg_controller = self.cfg_controller.take().unwrap();
cfg_controller.register(
Expand Down Expand Up @@ -656,7 +656,10 @@ impl TiKVServer {
server_config
}

fn register_services(&mut self, gc_worker: GcWorker<RaftKv<ServerRaftStoreRouter>>) {
fn register_services(
&mut self,
gc_worker: GcWorker<RaftKv<ServerRaftStoreRouter<RocksEngine>>>,
) {
let servers = self.servers.as_mut().unwrap();
let engines = self.engines.as_ref().unwrap();

Expand Down
6 changes: 3 additions & 3 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub struct Endpoint<T> {
min_ts_region_id: u64,
}

impl<T: 'static + RaftStoreRouter> Endpoint<T> {
impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
pub fn new(
pd_client: Arc<dyn PdClient>,
scheduler: Scheduler<Task>,
Expand Down Expand Up @@ -774,7 +774,7 @@ impl Initializer {
}
}

impl<T: 'static + RaftStoreRouter> Runnable<Task> for Endpoint<T> {
impl<T: 'static + RaftStoreRouter<RocksEngine>> Runnable<Task> for Endpoint<T> {
fn run(&mut self, task: Task) {
debug!("run cdc task"; "task" => %task);
match task {
Expand Down Expand Up @@ -808,7 +808,7 @@ impl<T: 'static + RaftStoreRouter> Runnable<Task> for Endpoint<T> {
}
}

impl<T: 'static + RaftStoreRouter> RunnableWithTimer<Task, ()> for Endpoint<T> {
impl<T: 'static + RaftStoreRouter<RocksEngine>> RunnableWithTimer<Task, ()> for Endpoint<T> {
fn on_timeout(&mut self, timer: &mut Timer<()>, _: ()) {
CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64);
if self.min_resolved_ts != TimeStamp::max() {
Expand Down
3 changes: 2 additions & 1 deletion components/cdc/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::cell::RefCell;
use std::sync::{Arc, RwLock};

use engine_rocks::RocksEngine;
use raft::StateRole;
use raftstore::coprocessor::*;
use raftstore::store::fsm::ObserveID;
Expand Down Expand Up @@ -40,7 +41,7 @@ impl CdcObserver {
}
}

pub fn register_to(&self, coprocessor_host: &mut CoprocessorHost) {
pub fn register_to(&self, coprocessor_host: &mut CoprocessorHost<RocksEngine>) {
// 100 is the priority of the observer. CDC should have a high priority.
coprocessor_host
.registry
Expand Down
3 changes: 2 additions & 1 deletion components/cdc/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::rc::Rc;
use std::sync::*;
use std::time::Duration;

use engine_rocks::RocksEngine;
use futures::{Future, Stream};
use grpcio::{ChannelBuilder, Environment};
use grpcio::{ClientDuplexReceiver, ClientDuplexSender, ClientUnaryReceiver};
Expand Down Expand Up @@ -113,7 +114,7 @@ impl TestSuite {
let cdc_ob = cdc::CdcObserver::new(scheduler.clone());
obs.insert(id, cdc_ob.clone());
sim.coprocessor_hooks.entry(id).or_default().push(Box::new(
move |host: &mut CoprocessorHost| {
move |host: &mut CoprocessorHost<RocksEngine>| {
cdc_ob.register_to(host);
},
));
Expand Down
49 changes: 30 additions & 19 deletions components/raftstore/src/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use engine_rocks::RocksEngine;
use engine_traits::{CfName, KvEngine};
use kvproto::metapb::Region;
use kvproto::pdpb::CheckPolicy;
Expand Down Expand Up @@ -151,7 +150,7 @@ impl_box_observer!(BoxCmdObserver, CmdObserver, WrappedCmdObserver);
#[derive(Clone)]
pub struct Registry<E>
where
E: KvEngine,
E: 'static,
{
admin_observers: Vec<Entry<BoxAdminObserver>>,
query_observers: Vec<Entry<BoxQueryObserver>>,
Expand All @@ -163,10 +162,7 @@ where
// TODO: add endpoint
}

impl<E> Default for Registry<E>
where
E: KvEngine,
{
impl<E> Default for Registry<E> {
fn default() -> Registry<E> {
Registry {
admin_observers: Default::default(),
Expand All @@ -193,10 +189,7 @@ macro_rules! push {
};
}

impl<E> Registry<E>
where
E: KvEngine,
{
impl<E> Registry<E> {
pub fn register_admin_observer(&mut self, priority: u32, ao: BoxAdminObserver) {
push!(priority, ao, self.admin_observers);
}
Expand Down Expand Up @@ -275,13 +268,30 @@ macro_rules! loop_ob {
}

/// Admin and invoke all coprocessors.
#[derive(Default, Clone)]
pub struct CoprocessorHost {
pub registry: Registry<RocksEngine>,
#[derive(Clone)]
pub struct CoprocessorHost<E>
where
E: 'static,
{
pub registry: Registry<E>,
}

impl CoprocessorHost {
pub fn new<C: CasualRouter<RocksEngine> + Clone + Send + 'static>(ch: C) -> CoprocessorHost {
impl<E> Default for CoprocessorHost<E>
where
E: 'static,
{
fn default() -> Self {
CoprocessorHost {
registry: Default::default(),
}
}
}

impl<E> CoprocessorHost<E>
where
E: KvEngine,
{
pub fn new<C: CasualRouter<E> + Clone + Send + 'static>(ch: C) -> CoprocessorHost<E> {
let mut registry = Registry::default();
registry.register_split_check_observer(
200,
Expand Down Expand Up @@ -396,10 +406,10 @@ impl CoprocessorHost {
&self,
cfg: &'a Config,
region: &Region,
engine: &RocksEngine,
engine: &E,
auto_split: bool,
policy: CheckPolicy,
) -> SplitCheckerHost<'a, RocksEngine> {
) -> SplitCheckerHost<'a, E> {
let mut host = SplitCheckerHost::new(auto_split, cfg);
loop_ob!(
region,
Expand Down Expand Up @@ -486,6 +496,7 @@ mod tests {
use std::sync::atomic::*;
use std::sync::Arc;

use engine_rocks::RocksEngine;
use kvproto::metapb::Region;
use kvproto::raft_cmdpb::{
AdminRequest, AdminResponse, RaftCmdRequest, RaftCmdResponse, Request, Response,
Expand Down Expand Up @@ -616,7 +627,7 @@ mod tests {

#[test]
fn test_trigger_right_hook() {
let mut host = CoprocessorHost::default();
let mut host = CoprocessorHost::<RocksEngine>::default();
let ob = TestCoprocessor::default();
host.registry
.register_admin_observer(1, BoxAdminObserver::new(ob.clone()));
Expand Down Expand Up @@ -678,7 +689,7 @@ mod tests {

#[test]
fn test_order() {
let mut host = CoprocessorHost::default();
let mut host = CoprocessorHost::<RocksEngine>::default();

let ob1 = TestCoprocessor::default();
host.registry
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore/src/coprocessor/region_info_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use super::{
BoxRegionChangeObserver, BoxRoleObserver, Coprocessor, CoprocessorHost, ObserverContext,
RegionChangeEvent, RegionChangeObserver, Result, RoleObserver,
};
use engine_rocks::RocksEngine;
use keys::{data_end_key, data_key};
use kvproto::metapb::Region;
use raft::StateRole;
Expand Down Expand Up @@ -142,7 +143,7 @@ impl RoleObserver for RegionEventListener {

/// Creates an `RegionEventListener` and register it to given coprocessor host.
fn register_region_event_listener(
host: &mut CoprocessorHost,
host: &mut CoprocessorHost<RocksEngine>,
scheduler: Scheduler<RegionInfoQuery>,
) {
let listener = RegionEventListener { scheduler };
Expand Down Expand Up @@ -457,7 +458,7 @@ impl RegionInfoAccessor {
/// Creates a new `RegionInfoAccessor` and register to `host`.
/// `RegionInfoAccessor` doesn't need, and should not be created more than once. If it's needed
/// in different places, just clone it, and their contents are shared.
pub fn new(host: &mut CoprocessorHost) -> Self {
pub fn new(host: &mut CoprocessorHost<RocksEngine>) -> Self {
let worker = WorkerBuilder::new("region-collector-worker").create();
let scheduler = worker.scheduler();

Expand Down
51 changes: 33 additions & 18 deletions components/raftstore/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@ use crate::store::{
Callback, CasualMessage, LocalReader, PeerMsg, RaftCommand, SignificantMsg, StoreMsg,
};
use crate::{DiscardReason, Error as RaftStoreError, Result as RaftStoreResult};
use engine_rocks::RocksEngine;
use engine_traits::KvEngine;
use raft::SnapshotStatus;

/// Routes messages to the raftstore.
pub trait RaftStoreRouter: Send + Clone {
pub trait RaftStoreRouter<E>: Send + Clone
where
E: KvEngine,
{
/// Sends RaftMessage to local store.
fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()>;

/// Sends RaftCmdRequest to local store.
fn send_command(&self, req: RaftCmdRequest, cb: Callback<RocksEngine>) -> RaftStoreResult<()>;
fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()>;

/// Sends a significant message. We should guarantee that the message can't be dropped.
fn significant_send(&self, region_id: u64, msg: SignificantMsg) -> RaftStoreResult<()>;
Expand Down Expand Up @@ -53,20 +56,23 @@ pub trait RaftStoreRouter: Send + Clone {
)
}

fn casual_send(&self, region_id: u64, msg: CasualMessage<RocksEngine>) -> RaftStoreResult<()>;
fn casual_send(&self, region_id: u64, msg: CasualMessage<E>) -> RaftStoreResult<()>;
}

#[derive(Clone)]
pub struct RaftStoreBlackHole;

impl RaftStoreRouter for RaftStoreBlackHole {
impl<E> RaftStoreRouter<E> for RaftStoreBlackHole
where
E: KvEngine,
{
/// Sends RaftMessage to local store.
fn send_raft_msg(&self, _: RaftMessage) -> RaftStoreResult<()> {
Ok(())
}

/// Sends RaftCmdRequest to local store.
fn send_command(&self, _: RaftCmdRequest, _: Callback<RocksEngine>) -> RaftStoreResult<()> {
fn send_command(&self, _: RaftCmdRequest, _: Callback<E>) -> RaftStoreResult<()> {
Ok(())
}

Expand All @@ -77,24 +83,30 @@ impl RaftStoreRouter for RaftStoreBlackHole {

fn broadcast_unreachable(&self, _: u64) {}

fn casual_send(&self, _: u64, _: CasualMessage<RocksEngine>) -> RaftStoreResult<()> {
fn casual_send(&self, _: u64, _: CasualMessage<E>) -> RaftStoreResult<()> {
Ok(())
}
}

/// A router that routes messages to the raftstore
#[derive(Clone)]
pub struct ServerRaftStoreRouter {
router: RaftRouter<RocksEngine>,
local_reader: LocalReader<RaftRouter<RocksEngine>, RocksEngine>,
pub struct ServerRaftStoreRouter<E>
where
E: KvEngine,
{
router: RaftRouter<E>,
local_reader: LocalReader<RaftRouter<E>, E>,
}

impl ServerRaftStoreRouter {
impl<E> ServerRaftStoreRouter<E>
where
E: KvEngine,
{
/// Creates a new router.
pub fn new(
router: RaftRouter<RocksEngine>,
local_reader: LocalReader<RaftRouter<RocksEngine>, RocksEngine>,
) -> ServerRaftStoreRouter {
router: RaftRouter<E>,
local_reader: LocalReader<RaftRouter<E>, E>,
) -> ServerRaftStoreRouter<E> {
ServerRaftStoreRouter {
router,
local_reader,
Expand All @@ -119,17 +131,20 @@ pub fn handle_send_error<T>(region_id: u64, e: TrySendError<T>) -> RaftStoreErro
}
}

impl RaftStoreRouter for ServerRaftStoreRouter {
impl<E> RaftStoreRouter<E> for ServerRaftStoreRouter<E>
where
E: KvEngine,
{
fn send_raft_msg(&self, msg: RaftMessage) -> RaftStoreResult<()> {
let region_id = msg.get_region_id();
self.router
.send_raft_message(msg)
.map_err(|e| handle_send_error(region_id, e))
}

fn send_command(&self, req: RaftCmdRequest, cb: Callback<RocksEngine>) -> RaftStoreResult<()> {
fn send_command(&self, req: RaftCmdRequest, cb: Callback<E>) -> RaftStoreResult<()> {
let cmd = RaftCommand::new(req, cb);
if LocalReader::<RaftRouter<RocksEngine>, RocksEngine>::acceptable(&cmd.request) {
if LocalReader::<RaftRouter<E>, E>::acceptable(&cmd.request) {
self.local_reader.execute_raft_command(cmd);
Ok(())
} else {
Expand All @@ -153,7 +168,7 @@ impl RaftStoreRouter for ServerRaftStoreRouter {
Ok(())
}

fn casual_send(&self, region_id: u64, msg: CasualMessage<RocksEngine>) -> RaftStoreResult<()> {
fn casual_send(&self, region_id: u64, msg: CasualMessage<E>) -> RaftStoreResult<()> {
self.router
.send(region_id, PeerMsg::CasualMessage(msg))
.map_err(|e| handle_send_error(region_id, e))
Expand Down
Loading

0 comments on commit e76005b

Please sign in to comment.