Skip to content

Commit

Permalink
*: fix one bug about drop table/truncate table failed when TiKV is al…
Browse files Browse the repository at this point in the history
…most disk full (tikv#10712)

Signed-off-by: tier-cap <[email protected]>

Co-authored-by: qupeng <[email protected]>
  • Loading branch information
tier-cap and hicqu authored Aug 13, 2021
1 parent 834ec11 commit d1a199e
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 48 deletions.
2 changes: 1 addition & 1 deletion components/file_system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ impl<R: Read> Read for Sha256Reader<R> {
}
}

const SPACE_PLACEHOLDER_FILE: &str = "space_placeholder_file";
pub const SPACE_PLACEHOLDER_FILE: &str = "space_placeholder_file";

/// Create a file with hole, to reserve space for TiKV.
pub fn reserve_space_for_recover<P: AsRef<Path>>(data_dir: P, file_size: u64) -> io::Result<()> {
Expand Down
10 changes: 8 additions & 2 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2329,7 +2329,10 @@ where
if self.check_disk_usages_before_propose(ctx, disk_full_opt, &mut stores) {
self.propose_normal(ctx, req)
} else {
let errmsg = String::from("propose failed: disk full");
let errmsg = format!(
"propose failed: tikv disk full, cmd-disk_full_opt={:?}, leader-diskUsage={:?}",
disk_full_opt, ctx.self_disk_usage
);
Err(Error::DiskFull(stores, errmsg))
}
}
Expand All @@ -2339,7 +2342,10 @@ where
if self.check_disk_usages_before_propose(ctx, disk_full_opt, &mut stores) {
self.propose_conf_change(ctx, &req)
} else {
let errmsg = String::from("propose failed: disk full");
let errmsg = format!(
"propose failed: tikv disk full, cmd-disk_full_opt={:?}, leader-diskUsage={:?}",
disk_full_opt, ctx.self_disk_usage
);
Err(Error::DiskFull(stores, errmsg))
}
}
Expand Down
8 changes: 0 additions & 8 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use futures::FutureExt;
use pd_client::metrics::*;
use pd_client::{Error, PdClient, RegionStat};
use tikv_util::metrics::ThreadInfoStatistics;
use tikv_util::sys::disk;
use tikv_util::time::UnixSecs;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tikv_util::topn::TopN;
Expand Down Expand Up @@ -895,13 +894,6 @@ where
let mut available = capacity.checked_sub(used_size).unwrap_or_default();
// We only care about rocksdb SST file size, so we should check disk available here.
available = cmp::min(available, disk_stats.available_space());
available = available
.checked_sub(disk::get_disk_reserved_space())
.unwrap_or_default();

if disk::is_disk_full() {
available = 0;
}

if available == 0 {
warn!("no available space");
Expand Down
11 changes: 10 additions & 1 deletion components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
fs::{self, File},
net::SocketAddr,
path::{Path, PathBuf},
str::FromStr,
sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
mpsc, Arc, Mutex,
Expand Down Expand Up @@ -1075,6 +1076,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {

fn init_storage_stats_task(&self, engines: Engines<RocksEngine, ER>) {
let config_disk_capacity: u64 = self.config.raft_store.capacity.0;
let data_dir = self.config.storage.data_dir.clone();
let store_path = self.store_path.clone();
let snap_mgr = self.snap_mgr.clone().unwrap();
let reserve_space = disk::get_disk_reserved_space();
Expand Down Expand Up @@ -1111,7 +1113,14 @@ impl<ER: RaftEngine> TiKVServer<ER> {
.get_engine_size()
.expect("get raft engine size");

let used_size = snap_size + kv_size + raft_size;
let placeholer_file_path = PathBuf::from_str(&data_dir)
.unwrap()
.join(Path::new(file_system::SPACE_PLACEHOLDER_FILE));

let placeholder_size: u64 =
file_system::get_file_size(&placeholer_file_path).unwrap_or_else(|_| 0);

let used_size = snap_size + kv_size + raft_size + placeholder_size;
let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity {
disk_cap
} else {
Expand Down
4 changes: 0 additions & 4 deletions components/tikv_kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ impl WriteData {
self.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull
}

pub fn set_allowed_on_disk_already_full(&mut self) {
self.disk_full_opt = DiskFullOpt::AllowedOnAlreadyFull
}

pub fn set_disk_full_opt(&mut self, level: DiskFullOpt) {
self.disk_full_opt = level
}
Expand Down
5 changes: 0 additions & 5 deletions components/tikv_util/src/sys/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,3 @@ pub fn get_disk_status(_store_id: u64) -> DiskUsage {
_ => panic!("Disk Status Value not meet expectations"),
}
}

pub fn is_disk_full() -> bool {
let s = DISK_STATUS.load(Ordering::Acquire);
!matches!(s, 0)
}
2 changes: 1 addition & 1 deletion src/storage/txn/commands/check_txn_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for CheckTxnStatus {

let pr = ProcessResult::TxnStatus { txn_status };
let mut write_data = WriteData::from_modifies(txn.into_modifies());
write_data.set_allowed_on_disk_already_full();
write_data.set_allowed_on_disk_almost_full();
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/commands/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for Commit {
txn_status: TxnStatus::committed(self.commit_ts),
};
let mut write_data = WriteData::from_modifies(txn.into_modifies());
write_data.set_allowed_on_disk_already_full();
write_data.set_allowed_on_disk_almost_full();
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/commands/pessimistic_rollback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for PessimisticRollback {
released_locks.wake_up(context.lock_mgr);

let mut write_data = WriteData::from_modifies(txn.into_modifies());
write_data.set_allowed_on_disk_already_full();
write_data.set_allowed_on_disk_almost_full();
Ok(WriteResult {
ctx,
to_be_write: write_data,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/commands/rollback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for Rollback {
released_locks.wake_up(context.lock_mgr);

let mut write_data = WriteData::from_modifies(txn.into_modifies());
write_data.set_allowed_on_disk_already_full();
write_data.set_allowed_on_disk_almost_full();
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/commands/txn_heart_beat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<S: Snapshot, L: LockManager> WriteCommand<S, L> for TxnHeartBeat {
txn_status: TxnStatus::uncommitted(lock, false),
};
let mut write_data = WriteData::from_modifies(txn.into_modifies());
write_data.set_allowed_on_disk_already_full();
write_data.set_allowed_on_disk_almost_full();
Ok(WriteResult {
ctx: self.ctx,
to_be_write: write_data,
Expand Down
15 changes: 3 additions & 12 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,18 +728,9 @@ impl<E: Engine, L: LockManager> Scheduler<E, L> {
response_policy,
}) => {
SCHED_STAGE_COUNTER_VEC.get(tag).write.inc();
match ctx.get_disk_full_opt() {
DiskFullOpt::AllowedOnAlreadyFull => {
to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlreadyFull
}
DiskFullOpt::AllowedOnAlmostFull => {
// Like Delete operation, TiDB marks it with AllowedOnAlmostFull
// But TiKV just treats it as Normal prewrite.
if to_be_write.disk_full_opt != DiskFullOpt::AllowedOnAlreadyFull {
to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull
}
}
_ => {}

if ctx.get_disk_full_opt() == DiskFullOpt::AllowedOnAlmostFull {
to_be_write.disk_full_opt = DiskFullOpt::AllowedOnAlmostFull
}

if let Some(lock_info) = lock_info {
Expand Down
19 changes: 9 additions & 10 deletions tests/failpoints/cases/test_disk_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,15 @@ fn test_disk_full_txn_behaviors(usage: DiskUsage) {
lead_client.must_kv_pessimistic_lock(b"k7".to_vec(), start_ts);

// Test pessimistic commit is allowed.
// FIXME: the case can't pass.
// fail::cfg(get_fp(usage, 1), "return").unwrap();
// let res = lead_client.try_kv_prewrite(
// vec![new_mutation(Op::Put, b"k5", b"v5")],
// b"k4".to_vec(),
// start_ts,
// DiskFullOpt::NotAllowedOnFull,
// );
// assert!(!res.get_region_error().has_disk_full());
// lead_client.must_kv_commit(vec![b"k7".to_vec()], start_ts, get_tso(&pd_client));
fail::cfg(get_fp(usage, 1), "return").unwrap();
let res = lead_client.try_kv_prewrite(
vec![new_mutation(Op::Put, b"k5", b"v5")],
b"k4".to_vec(),
start_ts,
DiskFullOpt::AllowedOnAlmostFull,
);
assert!(!res.get_region_error().has_disk_full());
lead_client.must_kv_commit(vec![b"k7".to_vec()], start_ts, get_tso(&pd_client));

fail::remove(get_fp(usage, 1));
let lock_ts = get_tso(&pd_client);
Expand Down

0 comments on commit d1a199e

Please sign in to comment.