Skip to content

Commit

Permalink
resolved_ts: fix high tail latency cause by time-consuming operation …
Browse files Browse the repository at this point in the history
…in raftstore (tikv#10354)

* using a fine grained mutex for region_read_progress

Signed-off-by: linning <[email protected]>

* rename register to registry

Signed-off-by: linning <[email protected]>

* dispatch read_state to update safe_ts

Signed-off-by: linning <[email protected]>

* fix add peer optimization

Signed-off-by: linning <[email protected]>

* send read_state to learner

Signed-off-by: linning <[email protected]>

* check leader in backgroud worker

Signed-off-by: linning <[email protected]>

* refactor

Signed-off-by: linning <[email protected]>

* make clippy happy

Signed-off-by: linning <[email protected]>

* fix test

Signed-off-by: linning <[email protected]>

* redirect requerst at KvService

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>

* remove comment

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>

* add test case for learner

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>

* address comment

Signed-off-by: linning <[email protected]>
  • Loading branch information
NingLin-P authored Jun 22, 2021
1 parent 7ce16e9 commit 6aafdae
Show file tree
Hide file tree
Showing 18 changed files with 501 additions and 280 deletions.
9 changes: 8 additions & 1 deletion components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,8 @@ impl<T: 'static + RaftStoreRouter<RocksEngine>> Endpoint<T> {
Some(id) => id,
None => return vec![],
};
// TODO: should using `RegionReadProgressRegistry` to dump leader info like `resolved-ts`
// to reduce the time holding the `store_meta` mutex
for (region_id, _) in regions {
if let Some(region) = meta.regions.get(&region_id) {
if let Some((term, leader_id)) = meta.leaders.get(&region_id) {
Expand Down Expand Up @@ -1493,7 +1495,12 @@ mod tests {
txn_extra_op: Arc::new(AtomicCell::new(TxnExtraOp::default())),
max_ts_sync_status: Arc::new(AtomicU64::new(0)),
track_ver: TrackVer::new(),
read_progress: Arc::new(RegionReadProgress::new(0, 0, "".to_owned())),
read_progress: Arc::new(RegionReadProgress::new(
&Region::default(),
0,
0,
"".to_owned(),
)),
};
store_meta.lock().unwrap().readers.insert(1, read_delegate);
let (task_sched, task_rx) = dummy_scheduler();
Expand Down
9 changes: 7 additions & 2 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3088,11 +3088,16 @@ where
{
panic!("{} unexpected region {:?}", self.fsm.peer.tag, r);
}
let prev = meta.regions.insert(region.get_id(), region);
let prev = meta.regions.insert(region.get_id(), region.clone());
assert_eq!(prev, Some(prev_region));

drop(meta);

self.fsm.peer.read_progress.update_leader_info(
self.fsm.peer.leader_id(),
self.fsm.peer.term(),
&region,
);

for r in &apply_result.destroyed_regions {
if let Err(e) = self.ctx.router.force_send(
r.get_id(),
Expand Down
159 changes: 4 additions & 155 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ use fail::fail_point;
use futures::compat::Future01CompatExt;
use futures::FutureExt;
use kvproto::import_sstpb::SstMeta;
use kvproto::kvrpcpb::KeyRange;
use kvproto::kvrpcpb::LeaderInfo;
use kvproto::metapb::{self, Region, RegionEpoch};
use kvproto::pdpb::QueryStats;
use kvproto::pdpb::StoreStats;
Expand Down Expand Up @@ -66,7 +64,7 @@ use crate::store::memory::*;
use crate::store::metrics::*;
use crate::store::peer_storage::{self, HandleRaftReadyContext};
use crate::store::transport::Transport;
use crate::store::util::{is_initial_msg, RegionReadProgress};
use crate::store::util::{is_initial_msg, RegionReadProgressRegistry};
use crate::store::worker::{
AutoSplitController, CleanupRunner, CleanupSSTRunner, CleanupSSTTask, CleanupTask,
CompactRunner, CompactTask, ConsistencyCheckRunner, ConsistencyCheckTask, PdRunner,
Expand Down Expand Up @@ -126,8 +124,8 @@ pub struct StoreMeta {
/// source_region_id -> need_atomic
/// Used for reminding the source peer to switch to ready in `atomic_snap_regions`.
pub destroyed_region_for_snap: HashMap<u64, bool>,

pub region_read_progress: HashMap<u64, Arc<RegionReadProgress>>,
/// region_id -> `RegionReadProgress`
pub region_read_progress: RegionReadProgressRegistry,
}

impl StoreMeta {
Expand All @@ -144,7 +142,7 @@ impl StoreMeta {
targets_map: HashMap::default(),
atomic_snap_regions: HashMap::default(),
destroyed_region_for_snap: HashMap::default(),
region_read_progress: HashMap::default(),
region_read_progress: RegionReadProgressRegistry::new(),
}
}

Expand Down Expand Up @@ -614,10 +612,6 @@ impl<'a, EK: KvEngine + 'static, ER: RaftEngine + 'static, T: Transport>
self.on_store_unreachable(store_id);
}
StoreMsg::Start { store } => self.start(store),
StoreMsg::CheckLeader { leaders, cb } => self.on_check_leader(leaders, cb),
StoreMsg::GetStoreSafeTS { key_range, cb } => {
self.on_get_store_safe_ts(key_range, cb)
}
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(f) => f(&self.ctx.cfg),
StoreMsg::UpdateReplicationMode(status) => self.on_update_replication_mode(status),
Expand Down Expand Up @@ -2427,106 +2421,6 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> StoreFsmDelegate<'a, EK, ER
let _ = scheduler.schedule(RaftlogGcTask::Purge);
self.register_raft_engine_purge_tick();
}

fn on_check_leader(&self, leaders: Vec<LeaderInfo>, cb: Box<dyn FnOnce(Vec<u64>) + Send>) {
let timer = TiInstant::now_coarse();
let meta = self.ctx.store_meta.lock().unwrap();
let regions = leaders
.into_iter()
.map(|leader_info| {
if let Some((term, leader_id)) = meta.leaders.get(&leader_info.region_id) {
if let Some(region) = meta.regions.get(&leader_info.region_id) {
if *term == leader_info.term
&& *leader_id == leader_info.peer_id
&& util::compare_region_epoch(
leader_info.get_region_epoch(),
region,
true,
true,
false,
)
.is_ok()
{
if leader_info.has_read_state() {
// TODO: instead of `store_meta`, using another fine grained mutex to protect
// `region_read_progress`
if let Some(pr) =
meta.region_read_progress.get(&leader_info.region_id)
{
// TODO: `update_safe_ts` reqiure to acquire a mutex, although the mutex
// won't be contended, but acquiring a large amount of uncontended mutexs
// could still be time consuming, should move this operation to other thread
// to avoid blocking `raftstore` threads
pr.update_safe_ts(
leader_info.get_read_state().get_applied_index(),
leader_info.get_read_state().get_safe_ts(),
);
}
}
return Some(leader_info.region_id);
}
debug!("check leader failed";
"leader_info" => ?leader_info,
"current_leader" => leader_id,
"current_term" => term,
"current_region" => ?region,
"store_id" => self.fsm.store.id,
);
return None;
}
}
debug!("check leader failed, meta not found";
"leader_info" => ?leader_info,
"store_id" => self.fsm.store.id,
);
None
})
.flatten()
.collect();
cb(regions);

self.ctx
.raft_metrics
.check_leader
.observe(duration_to_sec(timer.elapsed()) as f64);
}

fn on_get_store_safe_ts(&self, key_range: KeyRange, cb: Box<dyn FnOnce(u64) + Send>) {
// `store_safe_ts` won't be accessed frequently (like per-request or per-transaction),
// so it is okay getting `store_safe_ts` from `store_meta` (behide a mutex)
let meta = self.ctx.store_meta.lock().unwrap();
cb(get_range_safe_ts(&meta, key_range));
}
}

// Get the minimal `safe_ts` from regions overlap with the key range [`start_key`, `end_key`)
fn get_range_safe_ts(meta: &StoreMeta, key_range: KeyRange) -> u64 {
if key_range.get_start_key().is_empty() && key_range.get_end_key().is_empty() {
// Fast path to get the min `safe_ts` of all regions in this store
meta.region_read_progress
.iter()
.map(|(_, rrp)| rrp.safe_ts())
.min()
.unwrap_or(0)
} else {
let (start_key, end_key) = (
data_key(key_range.get_start_key()),
data_end_key(key_range.get_end_key()),
);
meta.region_ranges
// get overlapped regions
.range((Excluded(start_key), Unbounded))
.take_while(|(_, id)| end_key > enc_start_key(&meta.regions[id]))
// get the min `safe_ts`
.map(|(_, id)| {
meta.region_read_progress
.get(id)
.unwrap()
.safe_ts()
})
.min()
.unwrap_or(0)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -2584,49 +2478,4 @@ mod tests {
let expected_declined_bytes = vec![(2, 8192), (3, 4096)];
assert_eq!(declined_bytes, expected_declined_bytes);
}

#[test]
fn test_get_range_min_safe_ts() {
fn add_region(meta: &mut StoreMeta, id: u64, kr: KeyRange, safe_ts: u64) {
let mut region = Region::default();
region.set_id(id);
region.set_start_key(kr.get_start_key().to_vec());
region.set_end_key(kr.get_end_key().to_vec());
region.set_peers(vec![kvproto::metapb::Peer::default()].into());
let rrp = RegionReadProgress::new(1, 1, "".to_owned());
rrp.update_safe_ts(1, safe_ts);
assert_eq!(rrp.safe_ts(), safe_ts);
meta.region_ranges.insert(enc_end_key(&region), id);
meta.regions.insert(id, region);
meta.region_read_progress.insert(id, Arc::new(rrp));
}

fn key_range(start_key: &[u8], end_key: &[u8]) -> KeyRange {
let mut kr = KeyRange::default();
kr.set_start_key(start_key.to_vec());
kr.set_end_key(end_key.to_vec());
kr
}

let mut meta = StoreMeta::new(0);
assert_eq!(0, get_range_safe_ts(&meta, key_range(b"", b"")));
add_region(&mut meta, 1, key_range(b"", b"k1"), 100);
assert_eq!(100, get_range_safe_ts(&meta, key_range(b"", b"")));
assert_eq!(0, get_range_safe_ts(&meta, key_range(b"k1", b"")));

add_region(&mut meta, 2, key_range(b"k5", b"k6"), 80);
add_region(&mut meta, 3, key_range(b"k6", b"k9"), 70);
add_region(&mut meta, 4, key_range(b"k9", b""), 90);
assert_eq!(70, get_range_safe_ts(&meta, key_range(b"", b"")));
assert_eq!(80, get_range_safe_ts(&meta, key_range(b"", b"k6")));
assert_eq!(90, get_range_safe_ts(&meta, key_range(b"k99", b"")));
assert_eq!(70, get_range_safe_ts(&meta, key_range(b"k5", b"k99")));
assert_eq!(70, get_range_safe_ts(&meta, key_range(b"k", b"k9")));
assert_eq!(80, get_range_safe_ts(&meta, key_range(b"k4", b"k6")));
assert_eq!(100, get_range_safe_ts(&meta, key_range(b"", b"k1")));
assert_eq!(90, get_range_safe_ts(&meta, key_range(b"k9", b"")));
assert_eq!(80, get_range_safe_ts(&meta, key_range(b"k5", b"k6")));
assert_eq!(0, get_range_safe_ts(&meta, key_range(b"k1", b"k4")));
assert_eq!(0, get_range_safe_ts(&meta, key_range(b"k2", b"k3")));
}
}
3 changes: 2 additions & 1 deletion components/raftstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ pub use self::snap::{
SnapManagerBuilder, Snapshot, SnapshotStatistics,
};
pub use self::transport::{CasualRouter, ProposalRouter, StoreRouter, Transport};
pub use self::util::RegionReadProgress;
pub use self::util::{RegionReadProgress, RegionReadProgressRegistry};
pub use self::worker::{
AutoSplitController, FlowStatistics, FlowStatsReporter, PdTask, QueryStats, ReadDelegate,
ReadStats, SplitConfig, SplitConfigManager, TrackVer,
};
pub use self::worker::{CheckLeaderRunner, CheckLeaderTask};
pub use self::worker::{KeyEntry, LocalReader, RegionTask};
pub use self::worker::{SplitCheckRunner, SplitCheckTask};
15 changes: 1 addition & 14 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::time::Instant;
use bitflags::bitflags;
use engine_traits::{CompactedEvent, KvEngine, Snapshot};
use kvproto::import_sstpb::SstMeta;
use kvproto::kvrpcpb::{ExtraOp as TxnExtraOp, KeyRange, LeaderInfo};
use kvproto::kvrpcpb::ExtraOp as TxnExtraOp;
use kvproto::metapb;
use kvproto::metapb::RegionEpoch;
use kvproto::pdpb::CheckPolicy;
Expand Down Expand Up @@ -505,15 +505,6 @@ where
Start {
store: metapb::Store,
},
CheckLeader {
leaders: Vec<LeaderInfo>,
cb: Box<dyn FnOnce(Vec<u64>) + Send>,
},
// Get the minimal `safe_ts` from regions overlap with the key range [`start_key`, `end_key`)
GetStoreSafeTS {
key_range: KeyRange,
cb: Box<dyn FnOnce(u64) + Send>,
},
/// Message only used for test.
#[cfg(any(test, feature = "testexport"))]
Validate(Box<dyn FnOnce(&crate::store::Config) + Send>),
Expand Down Expand Up @@ -543,10 +534,6 @@ where
),
StoreMsg::Tick(tick) => write!(fmt, "StoreTick {:?}", tick),
StoreMsg::Start { ref store } => write!(fmt, "Start store {:?}", store),
StoreMsg::CheckLeader { ref leaders, .. } => write!(fmt, "CheckLeader {:?}", leaders),
StoreMsg::GetStoreSafeTS { ref key_range, .. } => {
write!(fmt, "GetStoreSafeTS {:?}", key_range)
}
#[cfg(any(test, feature = "testexport"))]
StoreMsg::Validate(_) => write!(fmt, "Validate config"),
StoreMsg::UpdateReplicationMode(_) => write!(fmt, "UpdateReplicationMode"),
Expand Down
9 changes: 9 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ where
cmd_epoch_checker: Default::default(),
last_unpersisted_number: 0,
read_progress: Arc::new(RegionReadProgress::new(
region,
applied_index,
REGION_READ_PROGRESS_CAP,
tag,
Expand Down Expand Up @@ -982,6 +983,10 @@ where
// becoming a leader.
self.maybe_update_read_progress(reader, progress);

// Update leader info
self.read_progress
.update_leader_info(self.leader_id(), self.term(), self.region());

if !self.pending_remove {
host.on_region_changed(self.region(), RegionChangeEvent::Update, self.get_role());
}
Expand Down Expand Up @@ -1498,6 +1503,10 @@ where
"term" => term,
"peer_id" => self.peer_id(),
);

self.read_progress
.update_leader_info(leader_id, term, self.region());

let mut meta = ctx.store_meta.lock().unwrap();
meta.leaders.insert(self.region_id, (term, leader_id));
}
Expand Down
Loading

0 comments on commit 6aafdae

Please sign in to comment.