Skip to content

Commit

Permalink
raftstore/pd: replace FutureWorker with Worker for pd worker (tikv#10589
Browse files Browse the repository at this point in the history
)

* replace FutureWorker with Worker for pd worker

Signed-off-by: 5kbpers <[email protected]>

* remove spawn_local

Signed-off-by: 5kbpers <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
5kbpers and ti-chi-bot authored Aug 2, 2021
1 parent 9adb033 commit 479ce68
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 59 deletions.
4 changes: 2 additions & 2 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tikv_alloc::trace::TraceEvent;
use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use tikv_util::sys::{disk, memory_usage_reaches_high_water};
use tikv_util::time::duration_to_sec;
use tikv_util::worker::{Scheduler, Stopped};
use tikv_util::worker::{ScheduleError, Scheduler};
use tikv_util::{box_err, debug, defer, error, info, trace, warn};
use tikv_util::{escape, is_zero_duration, Either};
use txn_types::WriteBatchFlags;
Expand Down Expand Up @@ -3724,7 +3724,7 @@ where
right_derive: self.ctx.cfg.right_derive_when_split,
callback: cb,
};
if let Err(Stopped(t)) = self.ctx.pd_scheduler.schedule(task) {
if let Err(ScheduleError::Stopped(t)) = self.ctx.pd_scheduler.schedule(task) {
error!(
"failed to notify pd to split: Stopped";
"region_id" => self.fsm.region_id(),
Expand Down
18 changes: 8 additions & 10 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use tikv_util::sys::disk;
use tikv_util::time::{duration_to_sec, Instant as TiInstant};
use tikv_util::timer::SteadyTimer;
use tikv_util::worker::{FutureScheduler, FutureWorker, Scheduler, Worker};
use tikv_util::worker::{LazyWorker, Scheduler, Worker};
use tikv_util::{
box_err, box_try, debug, defer, error, info, is_zero_duration, slow_log, sys as sys_util, warn,
Either, RingQueue,
Expand Down Expand Up @@ -350,7 +350,7 @@ where
pub processed_fsm_count: usize,
pub cfg: Config,
pub store: metapb::Store,
pub pd_scheduler: FutureScheduler<PdTask<EK>>,
pub pd_scheduler: Scheduler<PdTask<EK>>,
pub consistency_check_scheduler: Scheduler<ConsistencyCheckTask<EK::Snapshot>>,
pub split_check_scheduler: Scheduler<SplitCheckTask>,
// handle Compact, CleanupSST task
Expand Down Expand Up @@ -906,7 +906,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
pub struct RaftPollerBuilder<EK: KvEngine, ER: RaftEngine, T> {
pub cfg: Arc<VersionTrack<Config>>,
pub store: metapb::Store,
pd_scheduler: FutureScheduler<PdTask<EK>>,
pd_scheduler: Scheduler<PdTask<EK>>,
consistency_check_scheduler: Scheduler<ConsistencyCheckTask<EK::Snapshot>>,
split_check_scheduler: Scheduler<SplitCheckTask>,
cleanup_scheduler: Scheduler<CleanupTask>,
Expand Down Expand Up @@ -1160,7 +1160,7 @@ where
}

struct Workers<EK: KvEngine> {
pd_worker: FutureWorker<PdTask<EK>>,
pd_worker: LazyWorker<PdTask<EK>>,
background_worker: Worker,

// Both of cleanup tasks and region tasks get their own workers, instead of reusing
Expand Down Expand Up @@ -1198,7 +1198,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
trans: T,
pd_client: Arc<C>,
mgr: SnapManager,
pd_worker: FutureWorker<PdTask<EK>>,
pd_worker: LazyWorker<PdTask<EK>>,
store_meta: Arc<Mutex<StoreMeta>>,
mut coprocessor_host: CoprocessorHost<EK>,
importer: Arc<SSTImporter>,
Expand Down Expand Up @@ -1378,8 +1378,9 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
auto_split_controller,
concurrency_manager,
snap_mgr,
workers.pd_worker.remote(),
);
box_try!(workers.pd_worker.start(pd_runner));
assert!(workers.pd_worker.start(pd_runner));

if let Err(e) = sys_util::thread::set_priority(sys_util::HIGH_PRI) {
warn!("set thread priority for raftstore failed"; "error" => ?e);
Expand All @@ -1396,7 +1397,7 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
}
let mut workers = self.workers.take().unwrap();
// Wait all workers finish.
let handle = workers.pd_worker.stop();
workers.pd_worker.stop();

self.apply_system.shutdown();
MEMTRACE_APPLY_ROUTER_ALIVE.trace(TraceEvent::Reset(0));
Expand All @@ -1408,9 +1409,6 @@ impl<EK: KvEngine, ER: RaftEngine> RaftBatchSystem<EK, ER> {
MEMTRACE_RAFT_ROUTER_ALIVE.trace(TraceEvent::Reset(0));
MEMTRACE_RAFT_ROUTER_LEAK.trace(TraceEvent::Reset(0));

if let Some(h) = handle {
h.join().unwrap();
}
workers.coprocessor_host.shutdown();
workers.cleanup_worker.stop();
workers.region_worker.stop();
Expand Down
4 changes: 2 additions & 2 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tikv_util::codec::number::decode_u64;
use tikv_util::sys::disk;
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use tikv_util::time::{Instant as TiInstant, InstantExt, ThreadReadId};
use tikv_util::worker::{FutureScheduler, Scheduler};
use tikv_util::worker::Scheduler;
use tikv_util::Either;
use tikv_util::{box_err, debug, error, info, warn};
use txn_types::WriteBatchFlags;
Expand Down Expand Up @@ -3638,7 +3638,7 @@ where
self.send_extra_message(extra_msg, &mut ctx.trans, &to_peer);
}

pub fn require_updating_max_ts(&self, pd_scheduler: &FutureScheduler<PdTask<EK>>) {
pub fn require_updating_max_ts(&self, pd_scheduler: &Scheduler<PdTask<EK>>) {
let epoch = self.region().get_region_epoch();
let term_low_bits = self.term() & ((1 << 32) - 1); // 32 bits
let version_lot_bits = epoch.get_version() & ((1 << 31) - 1); // 31 bits
Expand Down
79 changes: 49 additions & 30 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ use std::thread::{Builder, JoinHandle};
use std::time::{Duration, Instant};
use std::{cmp, io};

use engine_traits::{KvEngine, RaftEngine};
#[cfg(feature = "failpoints")]
use fail::fail_point;
use futures::future::TryFutureExt;
use tokio::task::spawn_local;

use engine_traits::{KvEngine, RaftEngine};
use kvproto::raft_cmdpb::{
AdminCmdType, AdminRequest, ChangePeerRequest, ChangePeerV2Request, RaftCmdRequest,
SplitRequest,
Expand All @@ -26,6 +23,7 @@ use kvproto::replication_modepb::RegionReplicationStatus;
use kvproto::{metapb, pdpb};
use prometheus::local::LocalHistogram;
use raft::eraftpb::ConfChangeType;
use yatp::Remote;

use crate::store::cmd_resp::new_error;
use crate::store::metrics::*;
Expand All @@ -48,7 +46,7 @@ use tikv_util::sys::disk;
use tikv_util::time::UnixSecs;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tikv_util::topn::TopN;
use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler, Stopped};
use tikv_util::worker::{Runnable, ScheduleError, Scheduler};
use tikv_util::{box_err, debug, error, info, thd_name, warn};

type RecordPairVec = Vec<pdpb::RecordPair>;
Expand Down Expand Up @@ -521,6 +519,7 @@ where

concurrency_manager: ConcurrencyManager,
snap_mgr: SnapManager,
remote: Remote<yatp::task::future::TaskCell>,
}

impl<EK, ER, T> Runner<EK, ER, T>
Expand All @@ -540,6 +539,7 @@ where
auto_split_controller: AutoSplitController,
concurrency_manager: ConcurrencyManager,
snap_mgr: SnapManager,
remote: Remote<yatp::task::future::TaskCell>,
) -> Runner<EK, ER, T> {
let interval = store_heartbeat_interval / Self::INTERVAL_DIVISOR;
let mut stats_monitor = StatsMonitor::new(interval, scheduler.clone());
Expand All @@ -559,6 +559,7 @@ where
stats_monitor,
concurrency_manager,
snap_mgr,
remote,
}
}

Expand Down Expand Up @@ -603,7 +604,7 @@ where
}
}
};
spawn_local(f);
self.remote.spawn(f);
}

// Note: The parameter doesn't contain `self` because this function may
Expand All @@ -618,6 +619,7 @@ where
right_derive: bool,
callback: Callback<EK::Snapshot>,
task: String,
remote: Remote<yatp::task::future::TaskCell>,
) {
if split_keys.is_empty() {
info!("empty split key, skip ask batch split";
Expand Down Expand Up @@ -661,7 +663,7 @@ where
right_derive,
callback,
};
if let Err(Stopped(t)) = scheduler.schedule(task) {
if let Err(ScheduleError::Stopped(t)) = scheduler.schedule(task) {
error!(
"failed to notify pd to split: Stopped";
"region_id" => region_id,
Expand All @@ -686,7 +688,7 @@ where
}
}
};
spawn_local(f);
remote.spawn(f);
}

fn handle_heartbeat(
Expand All @@ -710,17 +712,23 @@ where
.region_keys_read
.observe(region_stat.read_keys as f64);

let f = self
.pd_client
.region_heartbeat(term, region.clone(), peer, region_stat, replication_status)
.map_err(move |e| {
let resp = self.pd_client.region_heartbeat(
term,
region.clone(),
peer,
region_stat,
replication_status,
);
let f = async move {
if let Err(e) = resp.await {
debug!(
"failed to send heartbeat";
"region_id" => region.get_id(),
"err" => ?e
);
});
spawn_local(f);
}
};
self.remote.spawn(f);
}

fn handle_store_heartbeat(&mut self, mut stats: pdpb::StoreStats, store_info: StoreInfo<EK>) {
Expand Down Expand Up @@ -843,14 +851,17 @@ where
}
}
};
spawn_local(f);
self.remote.spawn(f);
}

fn handle_report_batch_split(&self, regions: Vec<metapb::Region>) {
let f = self.pd_client.report_batch_split(regions).map_err(|e| {
warn!("report split failed"; "err" => ?e);
});
spawn_local(f);
let resp = self.pd_client.report_batch_split(regions);
let f = async move {
if let Err(e) = resp.await {
warn!("report split failed"; "err" => ?e);
}
};
self.remote.spawn(f);
}

fn handle_validate_peer(&self, local_region: metapb::Region, peer: metapb::Peer) {
Expand Down Expand Up @@ -920,7 +931,7 @@ where
}
}
};
spawn_local(f);
self.remote.spawn(f);
}

fn schedule_heartbeat_receiver(&mut self) {
Expand Down Expand Up @@ -1024,7 +1035,7 @@ where
Err(e) => panic!("unexpected error: {:?}", e),
}
};
spawn_local(f);
self.remote.spawn(f);
self.is_hb_receiver_scheduled = true;
}

Expand Down Expand Up @@ -1127,9 +1138,10 @@ where
if delay {
info!("[failpoint] delay update max ts for 1s"; "region_id" => region_id);
let deadline = Instant::now() + Duration::from_secs(1);
spawn_local(GLOBAL_TIMER_HANDLE.delay(deadline).compat().then(|_| f));
self.remote
.spawn(GLOBAL_TIMER_HANDLE.delay(deadline).compat().then(|_| f));
} else {
spawn_local(f);
self.remote.spawn(f);
}
}

Expand All @@ -1150,16 +1162,18 @@ where
}
}
};
spawn_local(f);
self.remote.spawn(f);
}
}

impl<EK, ER, T> Runnable<Task<EK>> for Runner<EK, ER, T>
impl<EK, ER, T> Runnable for Runner<EK, ER, T>
where
EK: KvEngine,
ER: RaftEngine,
T: PdClient,
{
type Task = Task<EK>;

fn run(&mut self, task: Task<EK>) {
debug!("executing task"; "task" => %task);

Expand Down Expand Up @@ -1199,11 +1213,13 @@ where
right_derive,
callback,
String::from("batch_split"),
self.remote.clone(),
),
Task::AutoSplit { split_infos } => {
let pd_client = self.pd_client.clone();
let router = self.router.clone();
let scheduler = self.scheduler.clone();
let remote = self.remote.clone();

let f = async move {
for split_info in split_infos {
Expand All @@ -1220,11 +1236,12 @@ where
true,
Callback::None,
String::from("auto_split"),
remote.clone(),
);
}
}
};
spawn_local(f);
self.remote.spawn(f);
}

Task::Heartbeat(hb_task) => {
Expand Down Expand Up @@ -1513,7 +1530,7 @@ mod tests {
use kvproto::pdpb::QueryKind;
use std::sync::Mutex;
use std::time::Instant;
use tikv_util::worker::FutureWorker;
use tikv_util::worker::LazyWorker;

use super::*;

Expand Down Expand Up @@ -1553,7 +1570,9 @@ mod tests {
}
}

impl Runnable<Task<KvTestEngine>> for RunnerTest {
impl Runnable for RunnerTest {
type Task = Task<KvTestEngine>;

fn run(&mut self, task: Task<KvTestEngine>) {
if let Task::StoreInfos {
cpu_usages,
Expand All @@ -1580,10 +1599,10 @@ mod tests {

#[test]
fn test_collect_stats() {
let mut pd_worker = FutureWorker::new("test-pd-worker");
let mut pd_worker = LazyWorker::new("test-pd-worker");
let store_stat = Arc::new(Mutex::new(StoreStat::default()));
let runner = RunnerTest::new(1, pd_worker.scheduler(), Arc::clone(&store_stat));
pd_worker.start(runner).unwrap();
assert!(pd_worker.start(runner));

let start = Instant::now();
loop {
Expand Down
4 changes: 2 additions & 2 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ use tikv_util::{
sys::{disk, register_memory_usage_high_water, SysQuota},
thread_group::GroupProperties,
time::{Instant, Monitor},
worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker},
worker::{Builder as WorkerBuilder, LazyWorker, Worker},
};
use tokio::runtime::Builder;

Expand Down Expand Up @@ -562,7 +562,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {

let engines = self.engines.as_ref().unwrap();

let pd_worker = FutureWorker::new("pd-worker");
let pd_worker = LazyWorker::new("pd-worker");
let pd_sender = pd_worker.scheduler();

let unified_read_pool = if self.config.readpool.is_unified_pool_enabled() {
Expand Down
Loading

0 comments on commit 479ce68

Please sign in to comment.