From 46e539aee168fda8996301ec0e3bd00730064f9c Mon Sep 17 00:00:00 2001 From: tier-cap <80742231+tier-cap@users.noreply.github.com> Date: Wed, 23 Jun 2021 16:55:22 +0800 Subject: [PATCH] *: support read only and recovery when disk full (#10264) * [feature] support read only and recovery when disk full. 1. when disk full, all the business write traffic will not be allowed. 2. no mather minority or majority or all servers disk full happen, you can recove by adding machines or storage, then service normally. Signed-off-by: tier-cap * [feature] support read only and recovery when disk full. adjust impl on reviews. Signed-off-by: tier-cap * [feature] support read only and recovery when disk full. opt one test case taking long time prbs. Signed-off-by: tier-cap * [feature] support read only and recovery when disk full. change ut impl on reviews. Signed-off-by: tier-cap * [feature] support read only and recovery when disk full. change ut impl on reviews. Signed-off-by: tier-cap * [feature] support read only and recovery when disk full. remote the unused code and commend Signed-off-by: tier-cap * clean up tests and fix a bug about transfer leader Signed-off-by: qupeng * a little fix Signed-off-by: qupeng * disk full recovery change 3 details 1. config change allowed 2. campaign success log allowed 3. add the raft engine size stats. Signed-off-by: tier-cap * fix one bug Signed-off-by: tier-cap * change details by review comment. Signed-off-by: tier-cap * simplify some impls by comments. Signed-off-by: tier-cap * change the atomic var access mode by review comments Signed-off-by: tier-cap Co-authored-by: tier-cap Co-authored-by: qupeng --- .gitignore | 2 + Cargo.toml | 1 + components/engine_panic/src/raft_engine.rs | 4 + components/engine_rocks/src/raft_engine.rs | 9 +- components/engine_traits/src/raft_engine.rs | 2 + components/raft_log_engine/src/engine.rs | 5 + components/raftstore/src/store/fsm/peer.rs | 31 ++++- components/raftstore/src/store/fsm/store.rs | 4 + components/raftstore/src/store/peer.rs | 12 +- components/raftstore/src/store/worker/pd.rs | 5 + components/server/src/server.rs | 74 +++++++++++- components/tikv_util/src/sys/disk.rs | 22 ++++ components/tikv_util/src/sys/mod.rs | 4 +- tests/failpoints/cases/mod.rs | 1 + tests/failpoints/cases/test_disk_full.rs | 127 ++++++++++++++++++++ 15 files changed, 294 insertions(+), 9 deletions(-) create mode 100644 components/tikv_util/src/sys/disk.rs create mode 100644 tests/failpoints/cases/test_disk_full.rs diff --git a/.gitignore b/.gitignore index f84645f5b1e..af89a9bef28 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,5 @@ fuzz-incremental/ /db/ /last_tikv.toml /raft/ +core.* + diff --git a/Cargo.toml b/Cargo.toml index f97d6708084..8b1dc892745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ mem-profiling = ["tikv_alloc/mem-profiling"] failpoints = [ "fail/failpoints", "raftstore/failpoints", + "tikv_util/failpoints", "engine_rocks/failpoints" ] cloud-aws = [ diff --git a/components/engine_panic/src/raft_engine.rs b/components/engine_panic/src/raft_engine.rs index 0b08957b1f0..7316a236d56 100644 --- a/components/engine_panic/src/raft_engine.rs +++ b/components/engine_panic/src/raft_engine.rs @@ -92,6 +92,10 @@ impl RaftEngine for PanicEngine { fn dump_stats(&self) -> Result { panic!() } + + fn get_engine_size(&self) -> Result { + panic!() + } } impl RaftLogBatch for PanicWriteBatch { diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index bbc7c986c69..db093bb151f 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -1,6 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{RocksEngine, RocksWriteBatch}; +use crate::{util, RocksEngine, RocksWriteBatch}; use engine_traits::{ Error, Iterable, KvEngine, MiscExt, Mutable, Peekable, RaftEngine, RaftEngineReadOnly, @@ -221,6 +221,13 @@ impl RaftEngine for RocksEngine { fn dump_stats(&self) -> Result { MiscExt::dump_stats(self) } + + fn get_engine_size(&self) -> Result { + let handle = util::get_cf_handle(self.as_inner(), CF_DEFAULT)?; + let used_size = util::get_engine_cf_used_size(self.as_inner(), handle); + + Ok(used_size) + } } impl RaftLogBatch for RocksWriteBatch { diff --git a/components/engine_traits/src/raft_engine.rs b/components/engine_traits/src/raft_engine.rs index 5f7e5356b7c..cc13f317ff5 100644 --- a/components/engine_traits/src/raft_engine.rs +++ b/components/engine_traits/src/raft_engine.rs @@ -80,6 +80,8 @@ pub trait RaftEngine: RaftEngineReadOnly + Clone + Sync + Send + 'static { fn stop(&self) {} fn dump_stats(&self) -> Result; + + fn get_engine_size(&self) -> Result; } pub trait RaftLogBatch: Send { diff --git a/components/raft_log_engine/src/engine.rs b/components/raft_log_engine/src/engine.rs index 5c4bf7669be..bb0ad00132c 100644 --- a/components/raft_log_engine/src/engine.rs +++ b/components/raft_log_engine/src/engine.rs @@ -189,6 +189,11 @@ impl RaftEngine for RaftLogEngine { // Raft engine won't dump anything. Ok("".to_owned()) } + + fn get_engine_size(&self) -> Result { + //TODO impl this when RaftLogEngine is ready to go online. + Ok(0) + } } fn transfer_error(e: RaftEngineError) -> engine_traits::Error { diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index e4ed680799a..383d1e89fe3 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -27,11 +27,11 @@ use kvproto::raft_serverpb::{ }; use kvproto::replication_modepb::{DrAutoSyncState, ReplicationMode}; use protobuf::Message; -use raft::eraftpb::{ConfChangeType, Entry, MessageType}; +use raft::eraftpb::{ConfChangeType, Entry, EntryType, MessageType}; use raft::{self, Progress, ReadState, Ready, SnapshotStatus, StateRole, INVALID_INDEX, NO_LIMIT}; use tikv_alloc::trace::TraceEvent; use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; -use tikv_util::sys::memory_usage_reaches_high_water; +use tikv_util::sys::{disk, memory_usage_reaches_high_water}; use tikv_util::time::duration_to_sec; use tikv_util::worker::{Scheduler, Stopped}; use tikv_util::{box_err, debug, error, info, trace, warn}; @@ -1233,6 +1233,33 @@ where "to_peer_id" => msg.get_to_peer().get_id(), ); + let msg_type = msg.get_message().get_msg_type(); + let store_id = self.ctx.store_id(); + + if disk::disk_full_precheck(store_id) || self.ctx.is_disk_full { + let mut flag = false; + if MessageType::MsgAppend == msg_type { + let entries = msg.get_message().get_entries(); + for i in entries { + let entry_type = i.get_entry_type(); + if EntryType::EntryNormal == entry_type && !i.get_data().is_empty() { + flag = true; + break; + } + } + } else if MessageType::MsgTimeoutNow == msg_type { + flag = true; + } + + if flag { + debug!( + "skip {:?} because of disk full", msg_type; + "region_id" => self.region_id(), "peer_id" => self.fsm.peer_id() + ); + return Err(Error::Timeout("disk full".to_owned())); + } + } + if !self.validate_raft_msg(&msg) { return Ok(()); } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 9615a18dfe9..3ff8a7ab66a 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -40,6 +40,7 @@ use sst_importer::SSTImporter; use tikv_alloc::trace::TraceEvent; use tikv_util::config::{Tracker, VersionTrack}; use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; +use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, Instant as TiInstant}; use tikv_util::timer::SteadyTimer; use tikv_util::worker::{FutureScheduler, FutureWorker, Scheduler, Worker}; @@ -386,6 +387,7 @@ where pub perf_context: EK::PerfContext, pub tick_batch: Vec, pub node_start_time: Option, + pub is_disk_full: bool, } impl HandleRaftReadyContext for PollContext @@ -771,6 +773,7 @@ impl PollHandler, St self.poll_ctx.pending_count = 0; self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; + self.poll_ctx.is_disk_full = disk::is_disk_full(); self.timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context.start_observe(); @@ -1134,6 +1137,7 @@ where tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), feature_gate: self.feature_gate.clone(), + is_disk_full: false, }; ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 4e7b460550b..8151f4462f1 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -57,6 +57,7 @@ use crate::{Error, Result}; use collections::{HashMap, HashSet}; use pd_client::INVALID_ID; use tikv_util::codec::number::decode_u64; +use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, monotonic_raw_now}; use tikv_util::time::{Instant as UtilInstant, ThreadReadId}; use tikv_util::worker::{FutureScheduler, Scheduler}; @@ -2350,7 +2351,14 @@ where return false; } Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb), - Ok(RequestPolicy::ProposeNormal) => self.propose_normal(ctx, req), + Ok(RequestPolicy::ProposeNormal) => { + let store_id = ctx.store_id(); + if disk::disk_full_precheck(store_id) || ctx.is_disk_full { + Err(Error::Timeout("disk full".to_owned())) + } else { + self.propose_normal(ctx, req) + } + } Ok(RequestPolicy::ProposeTransferLeader) => { return self.propose_transfer_leader(ctx, req, cb); } @@ -3126,6 +3134,8 @@ where if self.is_applying_snapshot() || self.has_pending_snapshot() || msg.get_from() != self.leader_id() + // For followers whose disk is full. + || disk::disk_full_precheck(ctx.store_id()) || ctx.is_disk_full { info!( "reject transferring leader"; diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index cdf88bbc9ea..d76fc9e6c89 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -43,6 +43,7 @@ use futures::FutureExt; use pd_client::metrics::*; use pd_client::{Error, PdClient, RegionStat}; use tikv_util::metrics::ThreadInfoStatistics; +use tikv_util::sys::disk; use tikv_util::time::UnixSecs; use tikv_util::timer::GLOBAL_TIMER_HANDLE; use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler, Stopped}; @@ -742,6 +743,10 @@ where let mut available = capacity.checked_sub(used_size).unwrap_or_default(); // We only care about rocksdb SST file size, so we should check disk available here. available = cmp::min(available, disk_stats.available_space()); + + if disk::is_disk_full() { + available = 0; + } if available == 0 { warn!("no available space"); } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index fc2f0d806d3..0caa7ddd2c8 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -22,6 +22,7 @@ use std::{ Arc, Mutex, }, time::{Duration, Instant}, + u64, }; use cdc::MemoryQuota; @@ -60,7 +61,7 @@ use raftstore::{ fsm, fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_MSG_CAP}, memory::MEMTRACE_ROOT, - AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, + AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, SnapManager, SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMsg, }, }; @@ -89,7 +90,7 @@ use tikv_util::{ check_environment_variables, config::{ensure_dir_exist, VersionTrack}, math::MovingAvgU32, - sys::{register_memory_usage_high_water, SysQuota}, + sys::{disk, register_memory_usage_high_water, SysQuota}, thread_group::GroupProperties, time::Monitor, worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker}, @@ -136,10 +137,11 @@ pub fn run_tikv(config: TiKvConfig) { let (limiter, fetcher) = tikv.init_io_utility(); let (engines, engines_info) = tikv.init_raw_engines(Some(limiter.clone())); limiter.set_low_priority_io_adjustor_if_needed(Some(engines_info.clone())); - tikv.init_engines(engines); + tikv.init_engines(engines.clone()); let server_config = tikv.init_servers(); tikv.register_services(); tikv.init_metrics_flusher(fetcher, engines_info); + tikv.init_storage_stats_task(engines); tikv.run_server(server_config); tikv.run_status_server(); @@ -159,6 +161,7 @@ const RESERVED_OPEN_FDS: u64 = 1000; const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000); const DEFAULT_ENGINE_METRICS_RESET_INTERVAL: Duration = Duration::from_millis(60_000); +const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1); /// A complete TiKV server. struct TiKVServer { @@ -171,6 +174,7 @@ struct TiKVServer { resolver: resolve::PdStoreAddrResolver, state: Arc>, store_path: PathBuf, + snap_mgr: Option, // Will be filled in `init_servers`. encryption_key_manager: Option>, engines: Option>, servers: Option>, @@ -253,6 +257,7 @@ impl TiKVServer { resolver, state, store_path, + snap_mgr: None, encryption_key_manager: None, engines: None, servers: None, @@ -405,6 +410,7 @@ impl TiKVServer { if self.config.raft_store.capacity.0 > 0 { capacity = cmp::min(capacity, self.config.raft_store.capacity.0); } + //TODO after disk full readonly impl, such file should be removed. file_system::reserve_space_for_recover( &self.config.storage.data_dir, if self.config.storage.reserve_space.0 == 0 { @@ -681,6 +687,7 @@ impl TiKVServer { node.try_bootstrap_store(engines.engines.clone()) .unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e)); + self.snap_mgr = Some(snap_mgr.clone()); // Create server let server = Server::new( node.id(), @@ -1021,6 +1028,67 @@ impl TiKVServer { }); } + fn init_storage_stats_task(&self, engines: Engines) { + let config_disk_capacity: u64 = self.config.raft_store.capacity.0; + let store_path = self.store_path.clone(); + let snap_mgr = self.snap_mgr.clone().unwrap(); + let disk_reserved = self.config.storage.reserve_space.0; + if disk_reserved == 0 { + info!("disk space checker not enabled"); + return; + } + //TODO wal size ignore? + self.background_worker + .spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || { + let disk_stats = match fs2::statvfs(&store_path) { + Err(e) => { + error!( + "get disk stat for kv store failed"; + "kv path" => store_path.to_str(), + "err" => ?e + ); + return; + } + Ok(stats) => stats, + }; + let disk_cap = disk_stats.total_space(); + let snap_size = snap_mgr.get_total_snap_size().unwrap(); + + let kv_size = engines + .kv + .get_engine_used_size() + .expect("get kv engine size"); + + let raft_size = engines + .raft + .get_engine_size() + .expect("get raft engine size"); + + let used_size = snap_size + kv_size + raft_size; + let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity { + disk_cap + } else { + config_disk_capacity + }; + + let mut available = capacity.checked_sub(used_size).unwrap_or_default(); + available = cmp::min(available, disk_stats.available_space()); + if available <= disk_reserved { + warn!( + "disk full, available={},snap={},engine={},capacity={}", + available, snap_size, kv_size, capacity + ); + disk::set_disk_full(); + } else if disk::is_disk_full() { + info!( + "disk normalized, available={},snap={},engine={},capacity={}", + available, snap_size, kv_size, capacity + ); + disk::clear_disk_full(); + } + }) + } + fn run_server(&mut self, server_config: Arc>) { let server = self.servers.as_mut().unwrap(); server diff --git a/components/tikv_util/src/sys/disk.rs b/components/tikv_util/src/sys/disk.rs new file mode 100644 index 00000000000..b3a65c0d3ff --- /dev/null +++ b/components/tikv_util/src/sys/disk.rs @@ -0,0 +1,22 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use fail::fail_point; +use std::sync::atomic::{AtomicBool, Ordering}; +static DISK_FULL: AtomicBool = AtomicBool::new(false); + +pub fn set_disk_full() { + DISK_FULL.store(true, Ordering::Release); +} + +pub fn clear_disk_full() { + DISK_FULL.store(false, Ordering::Release); +} + +pub fn is_disk_full() -> bool { + DISK_FULL.load(Ordering::Acquire) +} + +pub fn disk_full_precheck(_store_id: u64) -> bool { + fail_point!("disk_full_peer_1", _store_id == 1, |_| true); + fail_point!("disk_full_peer_2", _store_id == 2, |_| true); + false +} diff --git a/components/tikv_util/src/sys/mod.rs b/components/tikv_util/src/sys/mod.rs index b0a1442358f..2800ef71d03 100644 --- a/components/tikv_util/src/sys/mod.rs +++ b/components/tikv_util/src/sys/mod.rs @@ -1,9 +1,9 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. -pub mod cpu_time; - #[cfg(target_os = "linux")] mod cgroup; +pub mod cpu_time; +pub mod disk; // re-export some traits for ease of use use crate::config::{ReadableSize, KIB}; diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index 7f454966839..75a52757518 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -6,6 +6,7 @@ mod test_cmd_epoch_checker; mod test_compact_log; mod test_conf_change; mod test_coprocessor; +mod test_disk_full; mod test_early_apply; mod test_encryption; mod test_gc_worker; diff --git a/tests/failpoints/cases/test_disk_full.rs b/tests/failpoints/cases/test_disk_full.rs new file mode 100644 index 00000000000..dbf6096d28e --- /dev/null +++ b/tests/failpoints/cases/test_disk_full.rs @@ -0,0 +1,127 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use kvproto::raft_cmdpb::*; +use raft::eraftpb::MessageType; +use raftstore::store::msg::*; +use std::sync::mpsc; +use std::time::Duration; +use test_raftstore::*; + +fn assert_disk_full(resp: &RaftCmdResponse) { + let msg = resp.get_header().get_error().get_message(); + assert!(msg.contains("disk full")); +} + +#[test] +fn test_disk_full_unallowed_leader_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test new normal proposals won't be allowed when disk is full. + let old_last_index = cluster.raft_local_state(1, 1).last_index; + let rx = cluster.async_put(b"k2", b"v2").unwrap(); + assert_disk_full(&rx.recv_timeout(Duration::from_secs(2)).unwrap()); + let new_last_index = cluster.raft_local_state(1, 1).last_index; + assert_eq!(old_last_index, new_last_index); + + // Test split won't be allowed when disk is full. + let old_last_index = cluster.raft_local_state(1, 1).last_index; + let region = cluster.get_region(b"k1"); + let (tx, rx) = mpsc::sync_channel(1); + cluster.split_region( + ®ion, + b"k1", + Callback::write(Box::new(move |resp| tx.send(resp.response).unwrap())), + ); + assert_disk_full(&rx.recv_timeout(Duration::from_secs(2)).unwrap()); + let new_last_index = cluster.raft_local_state(1, 1).last_index; + assert_eq!(old_last_index, new_last_index); + + fail::remove("disk_full_peer_1"); +} + +#[test] +fn test_disk_full_allowed_leader_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test transfer leader should be allowed. + cluster.must_transfer_leader(1, new_peer(2, 2)); + + // Transfer the leadership back to store 1. + fail::remove("disk_full_peer_1"); + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test remove peer should be allowed. + cluster.pd_client.must_remove_peer(1, new_peer(3, 3)); + must_get_none(&cluster.get_engine(3), b"k1"); + + // Test add peer should be allowed. + cluster.pd_client.must_add_peer(1, new_peer(3, 3)); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + fail::remove("disk_full_peer_1"); +} + +#[test] +fn test_disk_full_follower_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_2", "return").unwrap(); + + // Test followers will reject pre-transfer-leader command. + let epoch = cluster.get_region_epoch(1); + let transfer = new_admin_request(1, &epoch, new_transfer_leader_cmd(new_peer(2, 2))); + cluster + .call_command_on_leader(transfer, Duration::from_secs(3)) + .unwrap(); + assert_eq!(cluster.leader_of_region(1).unwrap(), new_peer(1, 1)); + cluster.must_put(b"k2", b"v2"); + + // Test followers will drop entries when disk is full. + let old_last_index = cluster.raft_local_state(1, 2).last_index; + cluster.must_put(b"k3", b"v3"); + let new_last_index = cluster.raft_local_state(1, 2).last_index; + assert_eq!(old_last_index, new_last_index); + must_get_none(&cluster.get_engine(2), b"k3"); + + // Test followers will response votes when disk is full. + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1) + .direction(Direction::Send) + .msg_type(MessageType::MsgRequestVoteResponse), + )); + cluster.must_transfer_leader(1, new_peer(3, 3)); + + fail::remove("disk_full_peer_2"); +}