From 479ce6856dfaf8d12786b477c43fe05f407f782e Mon Sep 17 00:00:00 2001 From: 5kbpers Date: Mon, 2 Aug 2021 13:01:06 +0800 Subject: [PATCH] raftstore/pd: replace FutureWorker with Worker for pd worker (#10589) * replace FutureWorker with Worker for pd worker Signed-off-by: 5kbpers * remove spawn_local Signed-off-by: 5kbpers Co-authored-by: Ti Chi Robot --- components/raftstore/src/store/fsm/peer.rs | 4 +- components/raftstore/src/store/fsm/store.rs | 18 ++--- components/raftstore/src/store/peer.rs | 4 +- components/raftstore/src/store/worker/pd.rs | 79 ++++++++++++------- components/server/src/server.rs | 4 +- components/test_raftstore/src/node.rs | 4 +- components/test_raftstore/src/server.rs | 4 +- components/tikv_util/src/worker/pool.rs | 4 + src/server/node.rs | 6 +- .../integrations/config/dynamic/raftstore.rs | 4 +- .../integrations/raftstore/test_bootstrap.rs | 4 +- tests/integrations/server/kv_service.rs | 4 +- 12 files changed, 80 insertions(+), 59 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 9759846de9e..4d677a20f28 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -34,7 +34,7 @@ use tikv_alloc::trace::TraceEvent; use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; use tikv_util::sys::{disk, memory_usage_reaches_high_water}; use tikv_util::time::duration_to_sec; -use tikv_util::worker::{Scheduler, Stopped}; +use tikv_util::worker::{ScheduleError, Scheduler}; use tikv_util::{box_err, debug, defer, error, info, trace, warn}; use tikv_util::{escape, is_zero_duration, Either}; use txn_types::WriteBatchFlags; @@ -3724,7 +3724,7 @@ where right_derive: self.ctx.cfg.right_derive_when_split, callback: cb, }; - if let Err(Stopped(t)) = self.ctx.pd_scheduler.schedule(task) { + if let Err(ScheduleError::Stopped(t)) = self.ctx.pd_scheduler.schedule(task) { error!( "failed to notify pd to split: Stopped"; "region_id" => self.fsm.region_id(), diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 5473cd68bd7..404fbfc09aa 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -44,7 +44,7 @@ use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, Instant as TiInstant}; use tikv_util::timer::SteadyTimer; -use tikv_util::worker::{FutureScheduler, FutureWorker, Scheduler, Worker}; +use tikv_util::worker::{LazyWorker, Scheduler, Worker}; use tikv_util::{ box_err, box_try, debug, defer, error, info, is_zero_duration, slow_log, sys as sys_util, warn, Either, RingQueue, @@ -350,7 +350,7 @@ where pub processed_fsm_count: usize, pub cfg: Config, pub store: metapb::Store, - pub pd_scheduler: FutureScheduler>, + pub pd_scheduler: Scheduler>, pub consistency_check_scheduler: Scheduler>, pub split_check_scheduler: Scheduler, // handle Compact, CleanupSST task @@ -906,7 +906,7 @@ impl PollHandler, St pub struct RaftPollerBuilder { pub cfg: Arc>, pub store: metapb::Store, - pd_scheduler: FutureScheduler>, + pd_scheduler: Scheduler>, consistency_check_scheduler: Scheduler>, split_check_scheduler: Scheduler, cleanup_scheduler: Scheduler, @@ -1160,7 +1160,7 @@ where } struct Workers { - pd_worker: FutureWorker>, + pd_worker: LazyWorker>, background_worker: Worker, // Both of cleanup tasks and region tasks get their own workers, instead of reusing @@ -1198,7 +1198,7 @@ impl RaftBatchSystem { trans: T, pd_client: Arc, mgr: SnapManager, - pd_worker: FutureWorker>, + pd_worker: LazyWorker>, store_meta: Arc>, mut coprocessor_host: CoprocessorHost, importer: Arc, @@ -1378,8 +1378,9 @@ impl RaftBatchSystem { auto_split_controller, concurrency_manager, snap_mgr, + workers.pd_worker.remote(), ); - box_try!(workers.pd_worker.start(pd_runner)); + assert!(workers.pd_worker.start(pd_runner)); if let Err(e) = sys_util::thread::set_priority(sys_util::HIGH_PRI) { warn!("set thread priority for raftstore failed"; "error" => ?e); @@ -1396,7 +1397,7 @@ impl RaftBatchSystem { } let mut workers = self.workers.take().unwrap(); // Wait all workers finish. - let handle = workers.pd_worker.stop(); + workers.pd_worker.stop(); self.apply_system.shutdown(); MEMTRACE_APPLY_ROUTER_ALIVE.trace(TraceEvent::Reset(0)); @@ -1408,9 +1409,6 @@ impl RaftBatchSystem { MEMTRACE_RAFT_ROUTER_ALIVE.trace(TraceEvent::Reset(0)); MEMTRACE_RAFT_ROUTER_LEAK.trace(TraceEvent::Reset(0)); - if let Some(h) = handle { - h.join().unwrap(); - } workers.coprocessor_host.shutdown(); workers.cleanup_worker.stop(); workers.region_worker.stop(); diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index b56f73b52b0..5f0de36ee43 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -61,7 +61,7 @@ use tikv_util::codec::number::decode_u64; use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, monotonic_raw_now}; use tikv_util::time::{Instant as TiInstant, InstantExt, ThreadReadId}; -use tikv_util::worker::{FutureScheduler, Scheduler}; +use tikv_util::worker::Scheduler; use tikv_util::Either; use tikv_util::{box_err, debug, error, info, warn}; use txn_types::WriteBatchFlags; @@ -3638,7 +3638,7 @@ where self.send_extra_message(extra_msg, &mut ctx.trans, &to_peer); } - pub fn require_updating_max_ts(&self, pd_scheduler: &FutureScheduler>) { + pub fn require_updating_max_ts(&self, pd_scheduler: &Scheduler>) { let epoch = self.region().get_region_epoch(); let term_low_bits = self.term() & ((1 << 32) - 1); // 32 bits let version_lot_bits = epoch.get_version() & ((1 << 31) - 1); // 31 bits diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index 3c8ccd599e1..91244d434cc 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -11,12 +11,9 @@ use std::thread::{Builder, JoinHandle}; use std::time::{Duration, Instant}; use std::{cmp, io}; +use engine_traits::{KvEngine, RaftEngine}; #[cfg(feature = "failpoints")] use fail::fail_point; -use futures::future::TryFutureExt; -use tokio::task::spawn_local; - -use engine_traits::{KvEngine, RaftEngine}; use kvproto::raft_cmdpb::{ AdminCmdType, AdminRequest, ChangePeerRequest, ChangePeerV2Request, RaftCmdRequest, SplitRequest, @@ -26,6 +23,7 @@ use kvproto::replication_modepb::RegionReplicationStatus; use kvproto::{metapb, pdpb}; use prometheus::local::LocalHistogram; use raft::eraftpb::ConfChangeType; +use yatp::Remote; use crate::store::cmd_resp::new_error; use crate::store::metrics::*; @@ -48,7 +46,7 @@ use tikv_util::sys::disk; use tikv_util::time::UnixSecs; use tikv_util::timer::GLOBAL_TIMER_HANDLE; use tikv_util::topn::TopN; -use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler, Stopped}; +use tikv_util::worker::{Runnable, ScheduleError, Scheduler}; use tikv_util::{box_err, debug, error, info, thd_name, warn}; type RecordPairVec = Vec; @@ -521,6 +519,7 @@ where concurrency_manager: ConcurrencyManager, snap_mgr: SnapManager, + remote: Remote, } impl Runner @@ -540,6 +539,7 @@ where auto_split_controller: AutoSplitController, concurrency_manager: ConcurrencyManager, snap_mgr: SnapManager, + remote: Remote, ) -> Runner { let interval = store_heartbeat_interval / Self::INTERVAL_DIVISOR; let mut stats_monitor = StatsMonitor::new(interval, scheduler.clone()); @@ -559,6 +559,7 @@ where stats_monitor, concurrency_manager, snap_mgr, + remote, } } @@ -603,7 +604,7 @@ where } } }; - spawn_local(f); + self.remote.spawn(f); } // Note: The parameter doesn't contain `self` because this function may @@ -618,6 +619,7 @@ where right_derive: bool, callback: Callback, task: String, + remote: Remote, ) { if split_keys.is_empty() { info!("empty split key, skip ask batch split"; @@ -661,7 +663,7 @@ where right_derive, callback, }; - if let Err(Stopped(t)) = scheduler.schedule(task) { + if let Err(ScheduleError::Stopped(t)) = scheduler.schedule(task) { error!( "failed to notify pd to split: Stopped"; "region_id" => region_id, @@ -686,7 +688,7 @@ where } } }; - spawn_local(f); + remote.spawn(f); } fn handle_heartbeat( @@ -710,17 +712,23 @@ where .region_keys_read .observe(region_stat.read_keys as f64); - let f = self - .pd_client - .region_heartbeat(term, region.clone(), peer, region_stat, replication_status) - .map_err(move |e| { + let resp = self.pd_client.region_heartbeat( + term, + region.clone(), + peer, + region_stat, + replication_status, + ); + let f = async move { + if let Err(e) = resp.await { debug!( "failed to send heartbeat"; "region_id" => region.get_id(), "err" => ?e ); - }); - spawn_local(f); + } + }; + self.remote.spawn(f); } fn handle_store_heartbeat(&mut self, mut stats: pdpb::StoreStats, store_info: StoreInfo) { @@ -843,14 +851,17 @@ where } } }; - spawn_local(f); + self.remote.spawn(f); } fn handle_report_batch_split(&self, regions: Vec) { - let f = self.pd_client.report_batch_split(regions).map_err(|e| { - warn!("report split failed"; "err" => ?e); - }); - spawn_local(f); + let resp = self.pd_client.report_batch_split(regions); + let f = async move { + if let Err(e) = resp.await { + warn!("report split failed"; "err" => ?e); + } + }; + self.remote.spawn(f); } fn handle_validate_peer(&self, local_region: metapb::Region, peer: metapb::Peer) { @@ -920,7 +931,7 @@ where } } }; - spawn_local(f); + self.remote.spawn(f); } fn schedule_heartbeat_receiver(&mut self) { @@ -1024,7 +1035,7 @@ where Err(e) => panic!("unexpected error: {:?}", e), } }; - spawn_local(f); + self.remote.spawn(f); self.is_hb_receiver_scheduled = true; } @@ -1127,9 +1138,10 @@ where if delay { info!("[failpoint] delay update max ts for 1s"; "region_id" => region_id); let deadline = Instant::now() + Duration::from_secs(1); - spawn_local(GLOBAL_TIMER_HANDLE.delay(deadline).compat().then(|_| f)); + self.remote + .spawn(GLOBAL_TIMER_HANDLE.delay(deadline).compat().then(|_| f)); } else { - spawn_local(f); + self.remote.spawn(f); } } @@ -1150,16 +1162,18 @@ where } } }; - spawn_local(f); + self.remote.spawn(f); } } -impl Runnable> for Runner +impl Runnable for Runner where EK: KvEngine, ER: RaftEngine, T: PdClient, { + type Task = Task; + fn run(&mut self, task: Task) { debug!("executing task"; "task" => %task); @@ -1199,11 +1213,13 @@ where right_derive, callback, String::from("batch_split"), + self.remote.clone(), ), Task::AutoSplit { split_infos } => { let pd_client = self.pd_client.clone(); let router = self.router.clone(); let scheduler = self.scheduler.clone(); + let remote = self.remote.clone(); let f = async move { for split_info in split_infos { @@ -1220,11 +1236,12 @@ where true, Callback::None, String::from("auto_split"), + remote.clone(), ); } } }; - spawn_local(f); + self.remote.spawn(f); } Task::Heartbeat(hb_task) => { @@ -1513,7 +1530,7 @@ mod tests { use kvproto::pdpb::QueryKind; use std::sync::Mutex; use std::time::Instant; - use tikv_util::worker::FutureWorker; + use tikv_util::worker::LazyWorker; use super::*; @@ -1553,7 +1570,9 @@ mod tests { } } - impl Runnable> for RunnerTest { + impl Runnable for RunnerTest { + type Task = Task; + fn run(&mut self, task: Task) { if let Task::StoreInfos { cpu_usages, @@ -1580,10 +1599,10 @@ mod tests { #[test] fn test_collect_stats() { - let mut pd_worker = FutureWorker::new("test-pd-worker"); + let mut pd_worker = LazyWorker::new("test-pd-worker"); let store_stat = Arc::new(Mutex::new(StoreStat::default())); let runner = RunnerTest::new(1, pd_worker.scheduler(), Arc::clone(&store_stat)); - pd_worker.start(runner).unwrap(); + assert!(pd_worker.start(runner)); let start = Instant::now(); loop { diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 43e6d151a78..29ff642c398 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -90,7 +90,7 @@ use tikv_util::{ sys::{disk, register_memory_usage_high_water, SysQuota}, thread_group::GroupProperties, time::{Instant, Monitor}, - worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker}, + worker::{Builder as WorkerBuilder, LazyWorker, Worker}, }; use tokio::runtime::Builder; @@ -562,7 +562,7 @@ impl TiKVServer { let engines = self.engines.as_ref().unwrap(); - let pd_worker = FutureWorker::new("pd-worker"); + let pd_worker = LazyWorker::new("pd-worker"); let pd_sender = pd_worker.scheduler(); let unified_read_pool = if self.config.readpool.is_unified_pool_enabled() { diff --git a/components/test_raftstore/src/node.rs b/components/test_raftstore/src/node.rs index 42a21c33fa0..6f000bb0dcd 100644 --- a/components/test_raftstore/src/node.rs +++ b/components/test_raftstore/src/node.rs @@ -34,7 +34,7 @@ use tikv::server::Node; use tikv::server::Result as ServerResult; use tikv_util::config::VersionTrack; use tikv_util::time::ThreadReadId; -use tikv_util::worker::{Builder as WorkerBuilder, FutureWorker}; +use tikv_util::worker::{Builder as WorkerBuilder, LazyWorker}; pub struct ChannelTransportCore { snap_paths: HashMap, @@ -198,7 +198,7 @@ impl Simulator for NodeCluster { system: RaftBatchSystem, ) -> ServerResult { assert!(node_id == 0 || !self.nodes.contains_key(&node_id)); - let pd_worker = FutureWorker::new("test-pd-worker"); + let pd_worker = LazyWorker::new("test-pd-worker"); let simulate_trans = SimulateTransport::new(self.trans.clone()); let mut raft_store = cfg.raft_store.clone(); diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs index 8e043eec56b..afb14985630 100644 --- a/components/test_raftstore/src/server.rs +++ b/components/test_raftstore/src/server.rs @@ -59,7 +59,7 @@ use tikv::{ }; use tikv_util::config::VersionTrack; use tikv_util::time::ThreadReadId; -use tikv_util::worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker}; +use tikv_util::worker::{Builder as WorkerBuilder, LazyWorker}; use tikv_util::HandyRwLock; use txn_types::TxnExtraScheduler; @@ -249,7 +249,7 @@ impl Simulator for ServerCluster { } // Create storage. - let pd_worker = FutureWorker::new("test-pd-worker"); + let pd_worker = LazyWorker::new("test-pd-worker"); let pd_sender = pd_worker.scheduler(); let storage_read_pool = ReadPool::from(storage::build_read_pool( &tikv::config::StorageReadPoolConfig::default_for_test(), diff --git a/components/tikv_util/src/worker/pool.rs b/components/tikv_util/src/worker/pool.rs index 0ad38bada75..4b4d86ba96b 100644 --- a/components/tikv_util/src/worker/pool.rs +++ b/components/tikv_util/src/worker/pool.rs @@ -198,6 +198,10 @@ impl LazyWorker { self.stop(); self.worker.stop() } + + pub fn remote(&self) -> Remote { + self.worker.remote.clone() + } } pub struct ReceiverWrapper { diff --git a/src/server/node.rs b/src/server/node.rs index 77ceb3f1fe4..2689ce79abe 100644 --- a/src/server/node.rs +++ b/src/server/node.rs @@ -26,7 +26,7 @@ use raftstore::store::AutoSplitController; use raftstore::store::{self, initial_region, Config as StoreConfig, SnapManager, Transport}; use raftstore::store::{GlobalReplicationState, PdTask, SplitCheckTask}; use tikv_util::config::VersionTrack; -use tikv_util::worker::{FutureWorker, Scheduler, Worker}; +use tikv_util::worker::{LazyWorker, Scheduler, Worker}; const MAX_CHECK_CLUSTER_BOOTSTRAPPED_RETRY_COUNT: u64 = 60; const CHECK_CLUSTER_BOOTSTRAPPED_RETRY_SECONDS: u64 = 3; @@ -155,7 +155,7 @@ where engines: Engines, trans: T, snap_mgr: SnapManager, - pd_worker: FutureWorker>, + pd_worker: LazyWorker>, store_meta: Arc>, coprocessor_host: CoprocessorHost, importer: Arc, @@ -371,7 +371,7 @@ where engines: Engines, trans: T, snap_mgr: SnapManager, - pd_worker: FutureWorker>, + pd_worker: LazyWorker>, store_meta: Arc>, coprocessor_host: CoprocessorHost, importer: Arc, diff --git a/tests/integrations/config/dynamic/raftstore.rs b/tests/integrations/config/dynamic/raftstore.rs index 75e2295f525..b36717da647 100644 --- a/tests/integrations/config/dynamic/raftstore.rs +++ b/tests/integrations/config/dynamic/raftstore.rs @@ -19,7 +19,7 @@ use engine_traits::{Engines, ALL_CFS}; use tempfile::TempDir; use test_raftstore::TestPdClient; use tikv_util::config::VersionTrack; -use tikv_util::worker::{dummy_scheduler, FutureWorker, Worker}; +use tikv_util::worker::{dummy_scheduler, LazyWorker, Worker}; #[derive(Clone)] struct MockTransport; @@ -94,7 +94,7 @@ fn start_raftstore( Module::Raftstore, Box::new(RaftstoreConfigManager(cfg_track.clone())), ); - let pd_worker = FutureWorker::new("store-config"); + let pd_worker = LazyWorker::new("store-config"); let (split_check_scheduler, _) = dummy_scheduler(); system diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs index b065f65b603..277c66f777f 100644 --- a/tests/integrations/raftstore/test_bootstrap.rs +++ b/tests/integrations/raftstore/test_bootstrap.rs @@ -18,7 +18,7 @@ use test_raftstore::*; use tikv::import::SSTImporter; use tikv::server::Node; use tikv_util::config::VersionTrack; -use tikv_util::worker::{dummy_scheduler, Builder as WorkerBuilder, FutureWorker}; +use tikv_util::worker::{dummy_scheduler, Builder as WorkerBuilder, LazyWorker}; fn test_bootstrap_idempotent(cluster: &mut Cluster) { // assume that there is a node bootstrap the cluster and add region in pd successfully @@ -67,7 +67,7 @@ fn test_node_bootstrap_with_prepared_data() { bg_worker, ); let snap_mgr = SnapManager::new(tmp_mgr.path().to_str().unwrap()); - let pd_worker = FutureWorker::new("test-pd-worker"); + let pd_worker = LazyWorker::new("test-pd-worker"); // assume there is a node has bootstrapped the cluster and add region in pd successfully bootstrap_with_first_region(Arc::clone(&pd_client)).unwrap(); diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index 59cadacccb1..e7fc6fc8a43 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -34,7 +34,7 @@ use tikv::import::SSTImporter; use tikv::server; use tikv::server::gc_worker::sync_gc; use tikv::server::service::{batch_commands_request, batch_commands_response}; -use tikv_util::worker::{dummy_scheduler, FutureWorker}; +use tikv_util::worker::{dummy_scheduler, LazyWorker}; use tikv_util::HandyRwLock; use txn_types::{Key, Lock, LockType, TimeStamp}; @@ -941,7 +941,7 @@ fn test_double_run_node() { let router = cluster.sim.rl().get_router(id).unwrap(); let mut sim = cluster.sim.wl(); let node = sim.get_node(id).unwrap(); - let pd_worker = FutureWorker::new("test-pd-worker"); + let pd_worker = LazyWorker::new("test-pd-worker"); let simulate_trans = SimulateTransport::new(ChannelTransport::new()); let tmp = Builder::new().prefix("test_cluster").tempdir().unwrap(); let snap_mgr = SnapManager::new(tmp.path().to_str().unwrap());