From 47866aa842acd97c3afd750aa95d3453833ffd62 Mon Sep 17 00:00:00 2001 From: Wallace Date: Thu, 10 Jun 2021 23:02:29 +0800 Subject: [PATCH] Fix test: test_region_heartbeat_report_approximate_size (#10328) * fix unstable test Signed-off-by: Little-Wallace * increase batch size Signed-off-by: Little-Wallace * fix fmt Signed-off-by: Little-Wallace * fix fmt Signed-off-by: Little-Wallace * reduce send message Signed-off-by: Little-Wallace * reduce election timeout Signed-off-by: Little-Wallace * fix split check diff size Signed-off-by: Little-Wallace * reduce election timeout Signed-off-by: Little-Wallace * reduce send message Signed-off-by: Little-Wallace * remove fail point Signed-off-by: Little-Wallace * increase channel size Signed-off-by: Little-Wallace * fix tests fmt Signed-off-by: Little-Wallace * add some comment Signed-off-by: Little-Wallace Co-authored-by: Jay --- components/raftstore/src/store/fsm/peer.rs | 2 + components/test_raftstore/src/cluster.rs | 36 +++++----- components/test_raftstore/src/pd.rs | 17 ++--- tests/failpoints/cases/test_split_region.rs | 69 ++++++++++++++++++ .../raftstore/test_region_heartbeat.rs | 71 ------------------- 5 files changed, 99 insertions(+), 96 deletions(-) diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index 03375a98369..0078ac2df2e 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -3612,6 +3612,7 @@ where return; } + fail_point!("on_split_region_check_tick"); self.register_split_region_check_tick(); // To avoid frequent scan, we only add new scan tasks if all previous tasks @@ -3768,6 +3769,7 @@ where self.fsm.peer.has_calculated_region_size = true; self.register_split_region_check_tick(); self.register_pd_heartbeat_tick(); + fail_point!("on_approximate_region_size"); } fn on_approximate_region_keys(&mut self, keys: u64) { diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs index aa58bde9d17..7fe9fdb9b97 100644 --- a/components/test_raftstore/src/cluster.rs +++ b/components/test_raftstore/src/cluster.rs @@ -928,30 +928,32 @@ impl Cluster { } pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) { - let resp = self.request( - key, - vec![new_put_cf_cmd(cf, key, value)], - false, - Duration::from_secs(5), - ); - if resp.get_header().has_error() { - panic!("response {:?} has error", resp); + match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) { + Ok(resp) => { + assert_eq!(resp.get_responses().len(), 1); + assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put); + } + Err(e) => { + panic!("has error: {:?}", e); + } } - assert_eq!(resp.get_responses().len(), 1); - assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put); } pub fn put(&mut self, key: &[u8], value: &[u8]) -> result::Result<(), PbError> { - let resp = self.request( - key, - vec![new_put_cf_cmd(CF_DEFAULT, key, value)], - false, - Duration::from_secs(5), - ); + self.batch_put(key, vec![new_put_cf_cmd(CF_DEFAULT, key, value)]) + .map(|_| ()) + } + + pub fn batch_put( + &mut self, + region_key: &[u8], + reqs: Vec, + ) -> result::Result { + let resp = self.request(region_key, reqs, false, Duration::from_secs(5)); if resp.get_header().has_error() { Err(resp.get_header().get_error().clone()) } else { - Ok(()) + Ok(resp) } } diff --git a/components/test_raftstore/src/pd.rs b/components/test_raftstore/src/pd.rs index e297911d32d..657e68f26d5 100644 --- a/components/test_raftstore/src/pd.rs +++ b/components/test_raftstore/src/pd.rs @@ -128,7 +128,7 @@ impl Operator { fn make_region_heartbeat_response( &self, region_id: u64, - cluster: &Cluster, + cluster: &PdCluster, ) -> pdpb::RegionHeartbeatResponse { match *self { Operator::AddPeer { ref peer, .. } => { @@ -200,7 +200,7 @@ impl Operator { fn try_finished( &mut self, - cluster: &Cluster, + cluster: &PdCluster, region: &metapb::Region, leader: &metapb::Peer, ) -> bool { @@ -282,7 +282,7 @@ impl Operator { } } -struct Cluster { +struct PdCluster { meta: metapb::Cluster, stores: HashMap, regions: BTreeMap, @@ -316,13 +316,13 @@ struct Cluster { pub check_merge_target_integrity: bool, } -impl Cluster { - fn new(cluster_id: u64) -> Cluster { +impl PdCluster { + fn new(cluster_id: u64) -> PdCluster { let mut meta = metapb::Cluster::default(); meta.set_id(cluster_id); meta.set_max_peer_count(5); - Cluster { + PdCluster { meta, stores: HashMap::default(), regions: BTreeMap::new(), @@ -678,6 +678,7 @@ impl Cluster { if let Some(status) = replication_status { self.region_replication_status.insert(region.id, status); } + fail_point!("test_raftstore::pd::region_heartbeat"); self.handle_heartbeat(region, leader) } @@ -724,7 +725,7 @@ pub fn bootstrap_with_first_region(pd_client: Arc) -> Result<()> { pub struct TestPdClient { cluster_id: u64, - cluster: Arc>, + cluster: Arc>, timer: Handle, is_incompatible: bool, tso: AtomicU64, @@ -739,7 +740,7 @@ impl TestPdClient { feature_gate.set_version("999.0.0").unwrap(); TestPdClient { cluster_id, - cluster: Arc::new(RwLock::new(Cluster::new(cluster_id))), + cluster: Arc::new(RwLock::new(PdCluster::new(cluster_id))), timer: GLOBAL_TIMER_HANDLE.clone(), is_incompatible, tso: AtomicU64::new(1), diff --git a/tests/failpoints/cases/test_split_region.rs b/tests/failpoints/cases/test_split_region.rs index 8c0ae6568db..b33e644df75 100644 --- a/tests/failpoints/cases/test_split_region.rs +++ b/tests/failpoints/cases/test_split_region.rs @@ -9,6 +9,7 @@ use kvproto::metapb::Region; use kvproto::raft_serverpb::RaftMessage; use pd_client::PdClient; use raft::eraftpb::MessageType; +use raftstore::store::config::Config as RaftstoreConfig; use raftstore::store::util::is_vote_msg; use raftstore::Result; use tikv_util::HandyRwLock; @@ -659,3 +660,71 @@ fn test_split_duplicated_batch() { rx2.recv_timeout(Duration::from_secs(3)).unwrap(); must_get_equal(&cluster.get_engine(3), b"k11", b"v11"); } + +/// We depend on split-check task to update approximate size of region even if this region does not +/// need to split. +#[test] +fn test_report_approximate_size_after_split_check() { + let mut cluster = new_server_cluster(0, 3); + cluster.cfg.raft_store = RaftstoreConfig::default(); + cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(100); + cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(100); + cluster.cfg.raft_store.region_split_check_diff = ReadableSize::kb(64); + cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(50); + cluster.cfg.raft_store.raft_store_max_leader_lease = ReadableDuration::millis(300); + cluster.run(); + cluster.must_put_cf("write", b"k0", b"k1"); + let region_id = cluster.get_region_id(b"k0"); + let approximate_size = cluster + .pd_client + .get_region_approximate_size(region_id) + .unwrap_or_default(); + let approximate_keys = cluster + .pd_client + .get_region_approximate_keys(region_id) + .unwrap_or_default(); + assert!(approximate_size == 0 && approximate_keys == 0); + let (tx, rx) = mpsc::sync_channel(10); + let tx = Arc::new(Mutex::new(tx)); + + fail::cfg_callback("on_split_region_check_tick", move || { + // notify split region tick + let _ = tx.lock().unwrap().send(()); + let tx1 = tx.clone(); + fail::cfg_callback("on_approximate_region_size", move || { + // notify split check finished + let _ = tx1.lock().unwrap().send(()); + let tx2 = tx1.clone(); + fail::cfg_callback("test_raftstore::pd::region_heartbeat", move || { + // notify heartbeat region + let _ = tx2.lock().unwrap().send(()); + }) + .unwrap(); + }) + .unwrap(); + }) + .unwrap(); + let value = vec![1_u8; 8096]; + for i in 0..10 { + let mut reqs = vec![]; + for j in 0..10 { + let k = format!("k{}", i * 10 + j); + reqs.push(new_put_cf_cmd("write", k.as_bytes(), &value)); + } + cluster.batch_put("k100".as_bytes(), reqs).unwrap(); + } + rx.recv().unwrap(); + fail::remove("on_split_region_check_tick"); + rx.recv().unwrap(); + fail::remove("on_approximate_region_size"); + rx.recv().unwrap(); + fail::remove("test_raftstore::pd::region_heartbeat"); + let size = cluster + .pd_client + .get_region_approximate_size(region_id) + .unwrap_or_default(); + // The region does not split, but it still refreshes the approximate_size. + let region_number = cluster.pd_client.get_regions_number(); + assert_eq!(region_number, 1); + assert!(size > approximate_size); +} diff --git a/tests/integrations/raftstore/test_region_heartbeat.rs b/tests/integrations/raftstore/test_region_heartbeat.rs index 8e555b829c4..843d9e20424 100644 --- a/tests/integrations/raftstore/test_region_heartbeat.rs +++ b/tests/integrations/raftstore/test_region_heartbeat.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use std::thread::sleep; use std::time::{Duration, Instant}; -use std::ops::Add; use test_raftstore::*; use tikv_util::config::*; use tikv_util::time::UnixSecs as PdInstant; @@ -204,73 +203,3 @@ fn test_region_heartbeat_term() { } panic!("reported term should be updated"); } - -#[test] -fn test_region_heartbeat_report_approximate_size() { - let mut cluster = new_server_cluster(0, 3); - cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(100); - cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(100); - cluster.cfg.raft_store.region_split_check_diff = ReadableSize::mb(2); - cluster.run(); - for i in 0..100 { - let (k, v) = (format!("k{}", i), format!("v{}", i)); - cluster.must_put_cf("write", k.as_bytes(), v.as_bytes()); - } - cluster.must_flush_cf("write", true); - - let ids = cluster.get_node_ids(); - for id in ids { - cluster.stop_node(id); - cluster.run_node(id).unwrap(); - } - - let region_id = cluster.get_region_id(b""); - let mut approximate_size = 0; - let mut approximate_keys = 0; - for _ in 0..10 { - approximate_size = cluster - .pd_client - .get_region_approximate_size(region_id) - .unwrap_or_default(); - approximate_keys = cluster - .pd_client - .get_region_approximate_keys(region_id) - .unwrap_or_default(); - if approximate_size > 0 && approximate_keys > 0 { - break; - } - sleep_ms(100); - } - assert!(approximate_size > 0 && approximate_keys > 0); - - for i in 100..1000 { - let v = (i as u64) * (i as u64) * (i as u64); - let v_str = format!("v{}", v); - let mut value = "".to_string(); - for _ in 0..100 { - value = value.add(v_str.as_str()); - } - let k = format!("k{}", i); - cluster.must_put_cf("write", k.as_bytes(), value.as_bytes()); - } - let mut size = 0; - let mut keys = 0; - for _ in 0..10 { - size = cluster - .pd_client - .get_region_approximate_size(region_id) - .unwrap_or_default(); - keys = cluster - .pd_client - .get_region_approximate_keys(region_id) - .unwrap_or_default(); - if size > approximate_size && keys > approximate_keys { - break; - } - sleep_ms(100); - } - // The region does not split, but it still refreshes the approximate_size. - let region_number = cluster.pd_client.get_regions_number(); - assert_eq!(region_number, 1); - assert!(size > approximate_size && keys > approximate_keys); -}