diff --git a/.gitignore b/.gitignore index f84645f5b1e..af89a9bef28 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,5 @@ fuzz-incremental/ /db/ /last_tikv.toml /raft/ +core.* + diff --git a/Cargo.toml b/Cargo.toml index f97d6708084..8b1dc892745 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ mem-profiling = ["tikv_alloc/mem-profiling"] failpoints = [ "fail/failpoints", "raftstore/failpoints", + "tikv_util/failpoints", "engine_rocks/failpoints" ] cloud-aws = [ diff --git a/components/engine_panic/src/raft_engine.rs b/components/engine_panic/src/raft_engine.rs index 0b08957b1f0..7316a236d56 100644 --- a/components/engine_panic/src/raft_engine.rs +++ b/components/engine_panic/src/raft_engine.rs @@ -92,6 +92,10 @@ impl RaftEngine for PanicEngine { fn dump_stats(&self) -> Result { panic!() } + + fn get_engine_size(&self) -> Result { + panic!() + } } impl RaftLogBatch for PanicWriteBatch { diff --git a/components/engine_rocks/src/raft_engine.rs b/components/engine_rocks/src/raft_engine.rs index bbc7c986c69..db093bb151f 100644 --- a/components/engine_rocks/src/raft_engine.rs +++ b/components/engine_rocks/src/raft_engine.rs @@ -1,6 +1,6 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use crate::{RocksEngine, RocksWriteBatch}; +use crate::{util, RocksEngine, RocksWriteBatch}; use engine_traits::{ Error, Iterable, KvEngine, MiscExt, Mutable, Peekable, RaftEngine, RaftEngineReadOnly, @@ -221,6 +221,13 @@ impl RaftEngine for RocksEngine { fn dump_stats(&self) -> Result { MiscExt::dump_stats(self) } + + fn get_engine_size(&self) -> Result { + let handle = util::get_cf_handle(self.as_inner(), CF_DEFAULT)?; + let used_size = util::get_engine_cf_used_size(self.as_inner(), handle); + + Ok(used_size) + } } impl RaftLogBatch for RocksWriteBatch { diff --git a/components/engine_traits/src/raft_engine.rs b/components/engine_traits/src/raft_engine.rs index 5f7e5356b7c..cc13f317ff5 100644 --- a/components/engine_traits/src/raft_engine.rs +++ b/components/engine_traits/src/raft_engine.rs @@ -80,6 +80,8 @@ pub trait RaftEngine: RaftEngineReadOnly + Clone + Sync + Send + 'static { fn stop(&self) {} fn dump_stats(&self) -> Result; + + fn get_engine_size(&self) -> Result; } pub trait RaftLogBatch: Send { diff --git a/components/raft_log_engine/src/engine.rs b/components/raft_log_engine/src/engine.rs index 5c4bf7669be..bb0ad00132c 100644 --- a/components/raft_log_engine/src/engine.rs +++ b/components/raft_log_engine/src/engine.rs @@ -189,6 +189,11 @@ impl RaftEngine for RaftLogEngine { // Raft engine won't dump anything. Ok("".to_owned()) } + + fn get_engine_size(&self) -> Result { + //TODO impl this when RaftLogEngine is ready to go online. + Ok(0) + } } fn transfer_error(e: RaftEngineError) -> engine_traits::Error { diff --git a/components/raftstore/src/store/fsm/peer.rs b/components/raftstore/src/store/fsm/peer.rs index e4ed680799a..383d1e89fe3 100644 --- a/components/raftstore/src/store/fsm/peer.rs +++ b/components/raftstore/src/store/fsm/peer.rs @@ -27,11 +27,11 @@ use kvproto::raft_serverpb::{ }; use kvproto::replication_modepb::{DrAutoSyncState, ReplicationMode}; use protobuf::Message; -use raft::eraftpb::{ConfChangeType, Entry, MessageType}; +use raft::eraftpb::{ConfChangeType, Entry, EntryType, MessageType}; use raft::{self, Progress, ReadState, Ready, SnapshotStatus, StateRole, INVALID_INDEX, NO_LIMIT}; use tikv_alloc::trace::TraceEvent; use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; -use tikv_util::sys::memory_usage_reaches_high_water; +use tikv_util::sys::{disk, memory_usage_reaches_high_water}; use tikv_util::time::duration_to_sec; use tikv_util::worker::{Scheduler, Stopped}; use tikv_util::{box_err, debug, error, info, trace, warn}; @@ -1233,6 +1233,33 @@ where "to_peer_id" => msg.get_to_peer().get_id(), ); + let msg_type = msg.get_message().get_msg_type(); + let store_id = self.ctx.store_id(); + + if disk::disk_full_precheck(store_id) || self.ctx.is_disk_full { + let mut flag = false; + if MessageType::MsgAppend == msg_type { + let entries = msg.get_message().get_entries(); + for i in entries { + let entry_type = i.get_entry_type(); + if EntryType::EntryNormal == entry_type && !i.get_data().is_empty() { + flag = true; + break; + } + } + } else if MessageType::MsgTimeoutNow == msg_type { + flag = true; + } + + if flag { + debug!( + "skip {:?} because of disk full", msg_type; + "region_id" => self.region_id(), "peer_id" => self.fsm.peer_id() + ); + return Err(Error::Timeout("disk full".to_owned())); + } + } + if !self.validate_raft_msg(&msg) { return Ok(()); } diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs index 9615a18dfe9..3ff8a7ab66a 100644 --- a/components/raftstore/src/store/fsm/store.rs +++ b/components/raftstore/src/store/fsm/store.rs @@ -40,6 +40,7 @@ use sst_importer::SSTImporter; use tikv_alloc::trace::TraceEvent; use tikv_util::config::{Tracker, VersionTrack}; use tikv_util::mpsc::{self, LooseBoundedSender, Receiver}; +use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, Instant as TiInstant}; use tikv_util::timer::SteadyTimer; use tikv_util::worker::{FutureScheduler, FutureWorker, Scheduler, Worker}; @@ -386,6 +387,7 @@ where pub perf_context: EK::PerfContext, pub tick_batch: Vec, pub node_start_time: Option, + pub is_disk_full: bool, } impl HandleRaftReadyContext for PollContext @@ -771,6 +773,7 @@ impl PollHandler, St self.poll_ctx.pending_count = 0; self.poll_ctx.sync_log = false; self.poll_ctx.has_ready = false; + self.poll_ctx.is_disk_full = disk::is_disk_full(); self.timer = TiInstant::now_coarse(); // update config self.poll_ctx.perf_context.start_observe(); @@ -1134,6 +1137,7 @@ where tick_batch: vec![PeerTickBatch::default(); 256], node_start_time: Some(TiInstant::now_coarse()), feature_gate: self.feature_gate.clone(), + is_disk_full: false, }; ctx.update_ticks_timeout(); let tag = format!("[store {}]", ctx.store.get_id()); diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 4e7b460550b..8151f4462f1 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -57,6 +57,7 @@ use crate::{Error, Result}; use collections::{HashMap, HashSet}; use pd_client::INVALID_ID; use tikv_util::codec::number::decode_u64; +use tikv_util::sys::disk; use tikv_util::time::{duration_to_sec, monotonic_raw_now}; use tikv_util::time::{Instant as UtilInstant, ThreadReadId}; use tikv_util::worker::{FutureScheduler, Scheduler}; @@ -2350,7 +2351,14 @@ where return false; } Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb), - Ok(RequestPolicy::ProposeNormal) => self.propose_normal(ctx, req), + Ok(RequestPolicy::ProposeNormal) => { + let store_id = ctx.store_id(); + if disk::disk_full_precheck(store_id) || ctx.is_disk_full { + Err(Error::Timeout("disk full".to_owned())) + } else { + self.propose_normal(ctx, req) + } + } Ok(RequestPolicy::ProposeTransferLeader) => { return self.propose_transfer_leader(ctx, req, cb); } @@ -3126,6 +3134,8 @@ where if self.is_applying_snapshot() || self.has_pending_snapshot() || msg.get_from() != self.leader_id() + // For followers whose disk is full. + || disk::disk_full_precheck(ctx.store_id()) || ctx.is_disk_full { info!( "reject transferring leader"; diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index cdf88bbc9ea..d76fc9e6c89 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -43,6 +43,7 @@ use futures::FutureExt; use pd_client::metrics::*; use pd_client::{Error, PdClient, RegionStat}; use tikv_util::metrics::ThreadInfoStatistics; +use tikv_util::sys::disk; use tikv_util::time::UnixSecs; use tikv_util::timer::GLOBAL_TIMER_HANDLE; use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler, Stopped}; @@ -742,6 +743,10 @@ where let mut available = capacity.checked_sub(used_size).unwrap_or_default(); // We only care about rocksdb SST file size, so we should check disk available here. available = cmp::min(available, disk_stats.available_space()); + + if disk::is_disk_full() { + available = 0; + } if available == 0 { warn!("no available space"); } diff --git a/components/server/src/server.rs b/components/server/src/server.rs index fc2f0d806d3..0caa7ddd2c8 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -22,6 +22,7 @@ use std::{ Arc, Mutex, }, time::{Duration, Instant}, + u64, }; use cdc::MemoryQuota; @@ -60,7 +61,7 @@ use raftstore::{ fsm, fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_MSG_CAP}, memory::MEMTRACE_ROOT, - AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, + AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, SnapManager, SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMsg, }, }; @@ -89,7 +90,7 @@ use tikv_util::{ check_environment_variables, config::{ensure_dir_exist, VersionTrack}, math::MovingAvgU32, - sys::{register_memory_usage_high_water, SysQuota}, + sys::{disk, register_memory_usage_high_water, SysQuota}, thread_group::GroupProperties, time::Monitor, worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker}, @@ -136,10 +137,11 @@ pub fn run_tikv(config: TiKvConfig) { let (limiter, fetcher) = tikv.init_io_utility(); let (engines, engines_info) = tikv.init_raw_engines(Some(limiter.clone())); limiter.set_low_priority_io_adjustor_if_needed(Some(engines_info.clone())); - tikv.init_engines(engines); + tikv.init_engines(engines.clone()); let server_config = tikv.init_servers(); tikv.register_services(); tikv.init_metrics_flusher(fetcher, engines_info); + tikv.init_storage_stats_task(engines); tikv.run_server(server_config); tikv.run_status_server(); @@ -159,6 +161,7 @@ const RESERVED_OPEN_FDS: u64 = 1000; const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000); const DEFAULT_ENGINE_METRICS_RESET_INTERVAL: Duration = Duration::from_millis(60_000); +const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1); /// A complete TiKV server. struct TiKVServer { @@ -171,6 +174,7 @@ struct TiKVServer { resolver: resolve::PdStoreAddrResolver, state: Arc>, store_path: PathBuf, + snap_mgr: Option, // Will be filled in `init_servers`. encryption_key_manager: Option>, engines: Option>, servers: Option>, @@ -253,6 +257,7 @@ impl TiKVServer { resolver, state, store_path, + snap_mgr: None, encryption_key_manager: None, engines: None, servers: None, @@ -405,6 +410,7 @@ impl TiKVServer { if self.config.raft_store.capacity.0 > 0 { capacity = cmp::min(capacity, self.config.raft_store.capacity.0); } + //TODO after disk full readonly impl, such file should be removed. file_system::reserve_space_for_recover( &self.config.storage.data_dir, if self.config.storage.reserve_space.0 == 0 { @@ -681,6 +687,7 @@ impl TiKVServer { node.try_bootstrap_store(engines.engines.clone()) .unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e)); + self.snap_mgr = Some(snap_mgr.clone()); // Create server let server = Server::new( node.id(), @@ -1021,6 +1028,67 @@ impl TiKVServer { }); } + fn init_storage_stats_task(&self, engines: Engines) { + let config_disk_capacity: u64 = self.config.raft_store.capacity.0; + let store_path = self.store_path.clone(); + let snap_mgr = self.snap_mgr.clone().unwrap(); + let disk_reserved = self.config.storage.reserve_space.0; + if disk_reserved == 0 { + info!("disk space checker not enabled"); + return; + } + //TODO wal size ignore? + self.background_worker + .spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || { + let disk_stats = match fs2::statvfs(&store_path) { + Err(e) => { + error!( + "get disk stat for kv store failed"; + "kv path" => store_path.to_str(), + "err" => ?e + ); + return; + } + Ok(stats) => stats, + }; + let disk_cap = disk_stats.total_space(); + let snap_size = snap_mgr.get_total_snap_size().unwrap(); + + let kv_size = engines + .kv + .get_engine_used_size() + .expect("get kv engine size"); + + let raft_size = engines + .raft + .get_engine_size() + .expect("get raft engine size"); + + let used_size = snap_size + kv_size + raft_size; + let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity { + disk_cap + } else { + config_disk_capacity + }; + + let mut available = capacity.checked_sub(used_size).unwrap_or_default(); + available = cmp::min(available, disk_stats.available_space()); + if available <= disk_reserved { + warn!( + "disk full, available={},snap={},engine={},capacity={}", + available, snap_size, kv_size, capacity + ); + disk::set_disk_full(); + } else if disk::is_disk_full() { + info!( + "disk normalized, available={},snap={},engine={},capacity={}", + available, snap_size, kv_size, capacity + ); + disk::clear_disk_full(); + } + }) + } + fn run_server(&mut self, server_config: Arc>) { let server = self.servers.as_mut().unwrap(); server diff --git a/components/tikv_util/src/sys/disk.rs b/components/tikv_util/src/sys/disk.rs new file mode 100644 index 00000000000..b3a65c0d3ff --- /dev/null +++ b/components/tikv_util/src/sys/disk.rs @@ -0,0 +1,22 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. +use fail::fail_point; +use std::sync::atomic::{AtomicBool, Ordering}; +static DISK_FULL: AtomicBool = AtomicBool::new(false); + +pub fn set_disk_full() { + DISK_FULL.store(true, Ordering::Release); +} + +pub fn clear_disk_full() { + DISK_FULL.store(false, Ordering::Release); +} + +pub fn is_disk_full() -> bool { + DISK_FULL.load(Ordering::Acquire) +} + +pub fn disk_full_precheck(_store_id: u64) -> bool { + fail_point!("disk_full_peer_1", _store_id == 1, |_| true); + fail_point!("disk_full_peer_2", _store_id == 2, |_| true); + false +} diff --git a/components/tikv_util/src/sys/mod.rs b/components/tikv_util/src/sys/mod.rs index b0a1442358f..2800ef71d03 100644 --- a/components/tikv_util/src/sys/mod.rs +++ b/components/tikv_util/src/sys/mod.rs @@ -1,9 +1,9 @@ // Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0. -pub mod cpu_time; - #[cfg(target_os = "linux")] mod cgroup; +pub mod cpu_time; +pub mod disk; // re-export some traits for ease of use use crate::config::{ReadableSize, KIB}; diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs index 7f454966839..75a52757518 100644 --- a/tests/failpoints/cases/mod.rs +++ b/tests/failpoints/cases/mod.rs @@ -6,6 +6,7 @@ mod test_cmd_epoch_checker; mod test_compact_log; mod test_conf_change; mod test_coprocessor; +mod test_disk_full; mod test_early_apply; mod test_encryption; mod test_gc_worker; diff --git a/tests/failpoints/cases/test_disk_full.rs b/tests/failpoints/cases/test_disk_full.rs new file mode 100644 index 00000000000..dbf6096d28e --- /dev/null +++ b/tests/failpoints/cases/test_disk_full.rs @@ -0,0 +1,127 @@ +// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. + +use kvproto::raft_cmdpb::*; +use raft::eraftpb::MessageType; +use raftstore::store::msg::*; +use std::sync::mpsc; +use std::time::Duration; +use test_raftstore::*; + +fn assert_disk_full(resp: &RaftCmdResponse) { + let msg = resp.get_header().get_error().get_message(); + assert!(msg.contains("disk full")); +} + +#[test] +fn test_disk_full_unallowed_leader_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test new normal proposals won't be allowed when disk is full. + let old_last_index = cluster.raft_local_state(1, 1).last_index; + let rx = cluster.async_put(b"k2", b"v2").unwrap(); + assert_disk_full(&rx.recv_timeout(Duration::from_secs(2)).unwrap()); + let new_last_index = cluster.raft_local_state(1, 1).last_index; + assert_eq!(old_last_index, new_last_index); + + // Test split won't be allowed when disk is full. + let old_last_index = cluster.raft_local_state(1, 1).last_index; + let region = cluster.get_region(b"k1"); + let (tx, rx) = mpsc::sync_channel(1); + cluster.split_region( + ®ion, + b"k1", + Callback::write(Box::new(move |resp| tx.send(resp.response).unwrap())), + ); + assert_disk_full(&rx.recv_timeout(Duration::from_secs(2)).unwrap()); + let new_last_index = cluster.raft_local_state(1, 1).last_index; + assert_eq!(old_last_index, new_last_index); + + fail::remove("disk_full_peer_1"); +} + +#[test] +fn test_disk_full_allowed_leader_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test transfer leader should be allowed. + cluster.must_transfer_leader(1, new_peer(2, 2)); + + // Transfer the leadership back to store 1. + fail::remove("disk_full_peer_1"); + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_1", "return").unwrap(); + + // Test remove peer should be allowed. + cluster.pd_client.must_remove_peer(1, new_peer(3, 3)); + must_get_none(&cluster.get_engine(3), b"k1"); + + // Test add peer should be allowed. + cluster.pd_client.must_add_peer(1, new_peer(3, 3)); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + fail::remove("disk_full_peer_1"); +} + +#[test] +fn test_disk_full_follower_behaviors() { + let mut cluster = new_server_cluster(0, 3); + cluster.pd_client.disable_default_operator(); + cluster.run(); + + // To ensure all replicas are not pending. + cluster.must_put(b"k1", b"v1"); + must_get_equal(&cluster.get_engine(1), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(2), b"k1", b"v1"); + must_get_equal(&cluster.get_engine(3), b"k1", b"v1"); + + cluster.must_transfer_leader(1, new_peer(1, 1)); + fail::cfg("disk_full_peer_2", "return").unwrap(); + + // Test followers will reject pre-transfer-leader command. + let epoch = cluster.get_region_epoch(1); + let transfer = new_admin_request(1, &epoch, new_transfer_leader_cmd(new_peer(2, 2))); + cluster + .call_command_on_leader(transfer, Duration::from_secs(3)) + .unwrap(); + assert_eq!(cluster.leader_of_region(1).unwrap(), new_peer(1, 1)); + cluster.must_put(b"k2", b"v2"); + + // Test followers will drop entries when disk is full. + let old_last_index = cluster.raft_local_state(1, 2).last_index; + cluster.must_put(b"k3", b"v3"); + let new_last_index = cluster.raft_local_state(1, 2).last_index; + assert_eq!(old_last_index, new_last_index); + must_get_none(&cluster.get_engine(2), b"k3"); + + // Test followers will response votes when disk is full. + cluster.add_send_filter(CloneFilterFactory( + RegionPacketFilter::new(1, 1) + .direction(Direction::Send) + .msg_type(MessageType::MsgRequestVoteResponse), + )); + cluster.must_transfer_leader(1, new_peer(3, 3)); + + fail::remove("disk_full_peer_2"); +}