Skip to content

Commit

Permalink
Fix test: test_region_heartbeat_report_approximate_size (tikv#10328)
Browse files Browse the repository at this point in the history
* fix unstable test

Signed-off-by: Little-Wallace <[email protected]>

* increase batch size

Signed-off-by: Little-Wallace <[email protected]>

* fix fmt

Signed-off-by: Little-Wallace <[email protected]>

* fix fmt

Signed-off-by: Little-Wallace <[email protected]>

* reduce send message

Signed-off-by: Little-Wallace <[email protected]>

* reduce election timeout

Signed-off-by: Little-Wallace <[email protected]>

* fix split check diff size

Signed-off-by: Little-Wallace <[email protected]>

* reduce election timeout

Signed-off-by: Little-Wallace <[email protected]>

* reduce send message

Signed-off-by: Little-Wallace <[email protected]>

* remove fail point

Signed-off-by: Little-Wallace <[email protected]>

* increase channel size

Signed-off-by: Little-Wallace <[email protected]>

* fix tests fmt

Signed-off-by: Little-Wallace <[email protected]>

* add some comment

Signed-off-by: Little-Wallace <[email protected]>

Co-authored-by: Jay <[email protected]>
  • Loading branch information
Little-Wallace and BusyJay authored Jun 10, 2021
1 parent a4a96e5 commit 47866aa
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 96 deletions.
2 changes: 2 additions & 0 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3612,6 +3612,7 @@ where
return;
}

fail_point!("on_split_region_check_tick");
self.register_split_region_check_tick();

// To avoid frequent scan, we only add new scan tasks if all previous tasks
Expand Down Expand Up @@ -3768,6 +3769,7 @@ where
self.fsm.peer.has_calculated_region_size = true;
self.register_split_region_check_tick();
self.register_pd_heartbeat_tick();
fail_point!("on_approximate_region_size");
}

fn on_approximate_region_keys(&mut self, keys: u64) {
Expand Down
36 changes: 19 additions & 17 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,30 +928,32 @@ impl<T: Simulator> Cluster<T> {
}

pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) {
let resp = self.request(
key,
vec![new_put_cf_cmd(cf, key, value)],
false,
Duration::from_secs(5),
);
if resp.get_header().has_error() {
panic!("response {:?} has error", resp);
match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) {
Ok(resp) => {
assert_eq!(resp.get_responses().len(), 1);
assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put);
}
Err(e) => {
panic!("has error: {:?}", e);
}
}
assert_eq!(resp.get_responses().len(), 1);
assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put);
}

pub fn put(&mut self, key: &[u8], value: &[u8]) -> result::Result<(), PbError> {
let resp = self.request(
key,
vec![new_put_cf_cmd(CF_DEFAULT, key, value)],
false,
Duration::from_secs(5),
);
self.batch_put(key, vec![new_put_cf_cmd(CF_DEFAULT, key, value)])
.map(|_| ())
}

pub fn batch_put(
&mut self,
region_key: &[u8],
reqs: Vec<Request>,
) -> result::Result<RaftCmdResponse, PbError> {
let resp = self.request(region_key, reqs, false, Duration::from_secs(5));
if resp.get_header().has_error() {
Err(resp.get_header().get_error().clone())
} else {
Ok(())
Ok(resp)
}
}

Expand Down
17 changes: 9 additions & 8 deletions components/test_raftstore/src/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Operator {
fn make_region_heartbeat_response(
&self,
region_id: u64,
cluster: &Cluster,
cluster: &PdCluster,
) -> pdpb::RegionHeartbeatResponse {
match *self {
Operator::AddPeer { ref peer, .. } => {
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Operator {

fn try_finished(
&mut self,
cluster: &Cluster,
cluster: &PdCluster,
region: &metapb::Region,
leader: &metapb::Peer,
) -> bool {
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Operator {
}
}

struct Cluster {
struct PdCluster {
meta: metapb::Cluster,
stores: HashMap<u64, Store>,
regions: BTreeMap<Key, metapb::Region>,
Expand Down Expand Up @@ -316,13 +316,13 @@ struct Cluster {
pub check_merge_target_integrity: bool,
}

impl Cluster {
fn new(cluster_id: u64) -> Cluster {
impl PdCluster {
fn new(cluster_id: u64) -> PdCluster {
let mut meta = metapb::Cluster::default();
meta.set_id(cluster_id);
meta.set_max_peer_count(5);

Cluster {
PdCluster {
meta,
stores: HashMap::default(),
regions: BTreeMap::new(),
Expand Down Expand Up @@ -678,6 +678,7 @@ impl Cluster {
if let Some(status) = replication_status {
self.region_replication_status.insert(region.id, status);
}
fail_point!("test_raftstore::pd::region_heartbeat");

self.handle_heartbeat(region, leader)
}
Expand Down Expand Up @@ -724,7 +725,7 @@ pub fn bootstrap_with_first_region(pd_client: Arc<TestPdClient>) -> Result<()> {

pub struct TestPdClient {
cluster_id: u64,
cluster: Arc<RwLock<Cluster>>,
cluster: Arc<RwLock<PdCluster>>,
timer: Handle,
is_incompatible: bool,
tso: AtomicU64,
Expand All @@ -739,7 +740,7 @@ impl TestPdClient {
feature_gate.set_version("999.0.0").unwrap();
TestPdClient {
cluster_id,
cluster: Arc::new(RwLock::new(Cluster::new(cluster_id))),
cluster: Arc::new(RwLock::new(PdCluster::new(cluster_id))),
timer: GLOBAL_TIMER_HANDLE.clone(),
is_incompatible,
tso: AtomicU64::new(1),
Expand Down
69 changes: 69 additions & 0 deletions tests/failpoints/cases/test_split_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use kvproto::metapb::Region;
use kvproto::raft_serverpb::RaftMessage;
use pd_client::PdClient;
use raft::eraftpb::MessageType;
use raftstore::store::config::Config as RaftstoreConfig;
use raftstore::store::util::is_vote_msg;
use raftstore::Result;
use tikv_util::HandyRwLock;
Expand Down Expand Up @@ -659,3 +660,71 @@ fn test_split_duplicated_batch() {
rx2.recv_timeout(Duration::from_secs(3)).unwrap();
must_get_equal(&cluster.get_engine(3), b"k11", b"v11");
}

/// We depend on split-check task to update approximate size of region even if this region does not
/// need to split.
#[test]
fn test_report_approximate_size_after_split_check() {
let mut cluster = new_server_cluster(0, 3);
cluster.cfg.raft_store = RaftstoreConfig::default();
cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.region_split_check_diff = ReadableSize::kb(64);
cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(50);
cluster.cfg.raft_store.raft_store_max_leader_lease = ReadableDuration::millis(300);
cluster.run();
cluster.must_put_cf("write", b"k0", b"k1");
let region_id = cluster.get_region_id(b"k0");
let approximate_size = cluster
.pd_client
.get_region_approximate_size(region_id)
.unwrap_or_default();
let approximate_keys = cluster
.pd_client
.get_region_approximate_keys(region_id)
.unwrap_or_default();
assert!(approximate_size == 0 && approximate_keys == 0);
let (tx, rx) = mpsc::sync_channel(10);
let tx = Arc::new(Mutex::new(tx));

fail::cfg_callback("on_split_region_check_tick", move || {
// notify split region tick
let _ = tx.lock().unwrap().send(());
let tx1 = tx.clone();
fail::cfg_callback("on_approximate_region_size", move || {
// notify split check finished
let _ = tx1.lock().unwrap().send(());
let tx2 = tx1.clone();
fail::cfg_callback("test_raftstore::pd::region_heartbeat", move || {
// notify heartbeat region
let _ = tx2.lock().unwrap().send(());
})
.unwrap();
})
.unwrap();
})
.unwrap();
let value = vec![1_u8; 8096];
for i in 0..10 {
let mut reqs = vec![];
for j in 0..10 {
let k = format!("k{}", i * 10 + j);
reqs.push(new_put_cf_cmd("write", k.as_bytes(), &value));
}
cluster.batch_put("k100".as_bytes(), reqs).unwrap();
}
rx.recv().unwrap();
fail::remove("on_split_region_check_tick");
rx.recv().unwrap();
fail::remove("on_approximate_region_size");
rx.recv().unwrap();
fail::remove("test_raftstore::pd::region_heartbeat");
let size = cluster
.pd_client
.get_region_approximate_size(region_id)
.unwrap_or_default();
// The region does not split, but it still refreshes the approximate_size.
let region_number = cluster.pd_client.get_regions_number();
assert_eq!(region_number, 1);
assert!(size > approximate_size);
}
71 changes: 0 additions & 71 deletions tests/integrations/raftstore/test_region_heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use std::thread::sleep;
use std::time::{Duration, Instant};

use std::ops::Add;
use test_raftstore::*;
use tikv_util::config::*;
use tikv_util::time::UnixSecs as PdInstant;
Expand Down Expand Up @@ -204,73 +203,3 @@ fn test_region_heartbeat_term() {
}
panic!("reported term should be updated");
}

#[test]
fn test_region_heartbeat_report_approximate_size() {
let mut cluster = new_server_cluster(0, 3);
cluster.cfg.raft_store.pd_heartbeat_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(100);
cluster.cfg.raft_store.region_split_check_diff = ReadableSize::mb(2);
cluster.run();
for i in 0..100 {
let (k, v) = (format!("k{}", i), format!("v{}", i));
cluster.must_put_cf("write", k.as_bytes(), v.as_bytes());
}
cluster.must_flush_cf("write", true);

let ids = cluster.get_node_ids();
for id in ids {
cluster.stop_node(id);
cluster.run_node(id).unwrap();
}

let region_id = cluster.get_region_id(b"");
let mut approximate_size = 0;
let mut approximate_keys = 0;
for _ in 0..10 {
approximate_size = cluster
.pd_client
.get_region_approximate_size(region_id)
.unwrap_or_default();
approximate_keys = cluster
.pd_client
.get_region_approximate_keys(region_id)
.unwrap_or_default();
if approximate_size > 0 && approximate_keys > 0 {
break;
}
sleep_ms(100);
}
assert!(approximate_size > 0 && approximate_keys > 0);

for i in 100..1000 {
let v = (i as u64) * (i as u64) * (i as u64);
let v_str = format!("v{}", v);
let mut value = "".to_string();
for _ in 0..100 {
value = value.add(v_str.as_str());
}
let k = format!("k{}", i);
cluster.must_put_cf("write", k.as_bytes(), value.as_bytes());
}
let mut size = 0;
let mut keys = 0;
for _ in 0..10 {
size = cluster
.pd_client
.get_region_approximate_size(region_id)
.unwrap_or_default();
keys = cluster
.pd_client
.get_region_approximate_keys(region_id)
.unwrap_or_default();
if size > approximate_size && keys > approximate_keys {
break;
}
sleep_ms(100);
}
// The region does not split, but it still refreshes the approximate_size.
let region_number = cluster.pd_client.get_regions_number();
assert_eq!(region_number, 1);
assert!(size > approximate_size && keys > approximate_keys);
}

0 comments on commit 47866aa

Please sign in to comment.