diff --git a/components/file_system/src/lib.rs b/components/file_system/src/lib.rs index 37e0008f4b2..7d398f2e734 100644 --- a/components/file_system/src/lib.rs +++ b/components/file_system/src/lib.rs @@ -412,7 +412,7 @@ impl Read for Sha256Reader { } } -const SPACE_PLACEHOLDER_FILE: &str = "space_placeholder_file"; +pub const SPACE_PLACEHOLDER_FILE: &str = "space_placeholder_file"; /// Create a file with hole, to reserve space for TiKV. pub fn reserve_space_for_recover>(data_dir: P, file_size: u64) -> io::Result<()> { diff --git a/components/raftstore/src/store/peer.rs b/components/raftstore/src/store/peer.rs index 0bf8cc9565a..96890b92987 100644 --- a/components/raftstore/src/store/peer.rs +++ b/components/raftstore/src/store/peer.rs @@ -2329,7 +2329,10 @@ where if self.check_disk_usages_before_propose(ctx, disk_full_opt, &mut stores) { self.propose_normal(ctx, req) } else { - let errmsg = String::from("propose failed: disk full"); + let errmsg = format!( + "propose failed: tikv disk full, cmd-disk_full_opt={:?}, leader-diskUsage={:?}", + disk_full_opt, ctx.self_disk_usage + ); Err(Error::DiskFull(stores, errmsg)) } } @@ -2339,7 +2342,10 @@ where if self.check_disk_usages_before_propose(ctx, disk_full_opt, &mut stores) { self.propose_conf_change(ctx, &req) } else { - let errmsg = String::from("propose failed: disk full"); + let errmsg = format!( + "propose failed: tikv disk full, cmd-disk_full_opt={:?}, leader-diskUsage={:?}", + disk_full_opt, ctx.self_disk_usage + ); Err(Error::DiskFull(stores, errmsg)) } } diff --git a/components/raftstore/src/store/worker/pd.rs b/components/raftstore/src/store/worker/pd.rs index 2e798950dfd..77eab6fe30d 100644 --- a/components/raftstore/src/store/worker/pd.rs +++ b/components/raftstore/src/store/worker/pd.rs @@ -46,7 +46,6 @@ use futures::FutureExt; use pd_client::metrics::*; use pd_client::{Error, PdClient, RegionStat}; use tikv_util::metrics::ThreadInfoStatistics; -use tikv_util::sys::disk; use tikv_util::time::UnixSecs; use tikv_util::timer::GLOBAL_TIMER_HANDLE; use tikv_util::topn::TopN; @@ -895,13 +894,6 @@ where let mut available = capacity.checked_sub(used_size).unwrap_or_default(); // We only care about rocksdb SST file size, so we should check disk available here. available = cmp::min(available, disk_stats.available_space()); - available = available - .checked_sub(disk::get_disk_reserved_space()) - .unwrap_or_default(); - - if disk::is_disk_full() { - available = 0; - } if available == 0 { warn!("no available space"); diff --git a/components/server/src/server.rs b/components/server/src/server.rs index 676a05be412..5f55685621b 100644 --- a/components/server/src/server.rs +++ b/components/server/src/server.rs @@ -17,6 +17,7 @@ use std::{ fs::{self, File}, net::SocketAddr, path::{Path, PathBuf}, + str::FromStr, sync::{ atomic::{AtomicU32, AtomicU64, Ordering}, mpsc, Arc, Mutex, @@ -1075,6 +1076,7 @@ impl TiKVServer { fn init_storage_stats_task(&self, engines: Engines) { let config_disk_capacity: u64 = self.config.raft_store.capacity.0; + let data_dir = self.config.storage.data_dir.clone(); let store_path = self.store_path.clone(); let snap_mgr = self.snap_mgr.clone().unwrap(); let reserve_space = disk::get_disk_reserved_space(); @@ -1111,7 +1113,14 @@ impl TiKVServer { .get_engine_size() .expect("get raft engine size"); - let used_size = snap_size + kv_size + raft_size; + let placeholer_file_path = PathBuf::from_str(&data_dir) + .unwrap() + .join(Path::new(file_system::SPACE_PLACEHOLDER_FILE)); + + let placeholder_size: u64 = + file_system::get_file_size(&placeholer_file_path).unwrap_or_else(|_| 0); + + let used_size = snap_size + kv_size + raft_size + placeholder_size; let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity { disk_cap } else { diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs index 47b57d661da..442b5011298 100644 --- a/components/tikv_kv/src/lib.rs +++ b/components/tikv_kv/src/lib.rs @@ -140,10 +140,6 @@ impl WriteData { self.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull } - pub fn set_allowed_on_disk_already_full(&mut self) { - self.disk_full_opt = DiskFullOpt::AllowedOnAlreadyFull - } - pub fn set_disk_full_opt(&mut self, level: DiskFullOpt) { self.disk_full_opt = level } diff --git a/components/tikv_util/src/sys/disk.rs b/components/tikv_util/src/sys/disk.rs index eb68d8e17c3..c41b8ff3406 100644 --- a/components/tikv_util/src/sys/disk.rs +++ b/components/tikv_util/src/sys/disk.rs @@ -55,8 +55,3 @@ pub fn get_disk_status(_store_id: u64) -> DiskUsage { _ => panic!("Disk Status Value not meet expectations"), } } - -pub fn is_disk_full() -> bool { - let s = DISK_STATUS.load(Ordering::Acquire); - !matches!(s, 0) -} diff --git a/src/storage/txn/commands/check_txn_status.rs b/src/storage/txn/commands/check_txn_status.rs index 18028d775c6..c2974323509 100644 --- a/src/storage/txn/commands/check_txn_status.rs +++ b/src/storage/txn/commands/check_txn_status.rs @@ -121,7 +121,7 @@ impl WriteCommand for CheckTxnStatus { let pr = ProcessResult::TxnStatus { txn_status }; let mut write_data = WriteData::from_modifies(txn.into_modifies()); - write_data.set_allowed_on_disk_already_full(); + write_data.set_allowed_on_disk_almost_full(); Ok(WriteResult { ctx: self.ctx, to_be_write: write_data, diff --git a/src/storage/txn/commands/commit.rs b/src/storage/txn/commands/commit.rs index a1131e240ce..ee36935162b 100644 --- a/src/storage/txn/commands/commit.rs +++ b/src/storage/txn/commands/commit.rs @@ -63,7 +63,7 @@ impl WriteCommand for Commit { txn_status: TxnStatus::committed(self.commit_ts), }; let mut write_data = WriteData::from_modifies(txn.into_modifies()); - write_data.set_allowed_on_disk_already_full(); + write_data.set_allowed_on_disk_almost_full(); Ok(WriteResult { ctx: self.ctx, to_be_write: write_data, diff --git a/src/storage/txn/commands/pessimistic_rollback.rs b/src/storage/txn/commands/pessimistic_rollback.rs index 5c5d5bceb9a..a2735f2b5c2 100644 --- a/src/storage/txn/commands/pessimistic_rollback.rs +++ b/src/storage/txn/commands/pessimistic_rollback.rs @@ -80,7 +80,7 @@ impl WriteCommand for PessimisticRollback { released_locks.wake_up(context.lock_mgr); let mut write_data = WriteData::from_modifies(txn.into_modifies()); - write_data.set_allowed_on_disk_already_full(); + write_data.set_allowed_on_disk_almost_full(); Ok(WriteResult { ctx, to_be_write: write_data, diff --git a/src/storage/txn/commands/rollback.rs b/src/storage/txn/commands/rollback.rs index 83c01fb65f6..fce9d10f5e7 100644 --- a/src/storage/txn/commands/rollback.rs +++ b/src/storage/txn/commands/rollback.rs @@ -52,7 +52,7 @@ impl WriteCommand for Rollback { released_locks.wake_up(context.lock_mgr); let mut write_data = WriteData::from_modifies(txn.into_modifies()); - write_data.set_allowed_on_disk_already_full(); + write_data.set_allowed_on_disk_almost_full(); Ok(WriteResult { ctx: self.ctx, to_be_write: write_data, diff --git a/src/storage/txn/commands/txn_heart_beat.rs b/src/storage/txn/commands/txn_heart_beat.rs index 49dc2c0a1c6..fa0a281658c 100644 --- a/src/storage/txn/commands/txn_heart_beat.rs +++ b/src/storage/txn/commands/txn_heart_beat.rs @@ -79,7 +79,7 @@ impl WriteCommand for TxnHeartBeat { txn_status: TxnStatus::uncommitted(lock, false), }; let mut write_data = WriteData::from_modifies(txn.into_modifies()); - write_data.set_allowed_on_disk_already_full(); + write_data.set_allowed_on_disk_almost_full(); Ok(WriteResult { ctx: self.ctx, to_be_write: write_data, diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index a56ea6a1ff0..82168d9628a 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -728,18 +728,9 @@ impl Scheduler { response_policy, }) => { SCHED_STAGE_COUNTER_VEC.get(tag).write.inc(); - match ctx.get_disk_full_opt() { - DiskFullOpt::AllowedOnAlreadyFull => { - to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlreadyFull - } - DiskFullOpt::AllowedOnAlmostFull => { - // Like Delete operation, TiDB marks it with AllowedOnAlmostFull - // But TiKV just treats it as Normal prewrite. - if to_be_write.disk_full_opt != DiskFullOpt::AllowedOnAlreadyFull { - to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull - } - } - _ => {} + + if ctx.get_disk_full_opt() == DiskFullOpt::AllowedOnAlmostFull { + to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull } if let Some(lock_info) = lock_info { diff --git a/tests/failpoints/cases/test_disk_full.rs b/tests/failpoints/cases/test_disk_full.rs index 0fe5b566b31..429931d29fe 100644 --- a/tests/failpoints/cases/test_disk_full.rs +++ b/tests/failpoints/cases/test_disk_full.rs @@ -224,16 +224,15 @@ fn test_disk_full_txn_behaviors(usage: DiskUsage) { lead_client.must_kv_pessimistic_lock(b"k7".to_vec(), start_ts); // Test pessimistic commit is allowed. - // FIXME: the case can't pass. - // fail::cfg(get_fp(usage, 1), "return").unwrap(); - // let res = lead_client.try_kv_prewrite( - // vec![new_mutation(Op::Put, b"k5", b"v5")], - // b"k4".to_vec(), - // start_ts, - // DiskFullOpt::NotAllowedOnFull, - // ); - // assert!(!res.get_region_error().has_disk_full()); - // lead_client.must_kv_commit(vec![b"k7".to_vec()], start_ts, get_tso(&pd_client)); + fail::cfg(get_fp(usage, 1), "return").unwrap(); + let res = lead_client.try_kv_prewrite( + vec![new_mutation(Op::Put, b"k5", b"v5")], + b"k4".to_vec(), + start_ts, + DiskFullOpt::AllowedOnAlmostFull, + ); + assert!(!res.get_region_error().has_disk_full()); + lead_client.must_kv_commit(vec![b"k7".to_vec()], start_ts, get_tso(&pd_client)); fail::remove(get_fp(usage, 1)); let lock_ts = get_tso(&pd_client);