Skip to content

Commit

Permalink
*: support read only and recovery when disk full (tikv#10264)
Browse files Browse the repository at this point in the history
* [feature] support read only and recovery when disk full.

1. when disk full, all the business write traffic will not be allowed.
2. no mather minority or majority or all servers disk full happen, you
	can recove by adding machines or storage, then service normally.

Signed-off-by: tier-cap <[email protected]>

* [feature] support read only and recovery when disk full.

adjust impl on reviews.

Signed-off-by: tier-cap <[email protected]>

* [feature] support read only and recovery when disk full.

opt one test case taking long time prbs.

Signed-off-by: tier-cap <[email protected]>

* [feature] support read only and recovery when disk full.

change ut impl on reviews.

Signed-off-by: tier-cap <[email protected]>

* [feature] support read only and recovery when disk full.

change ut impl on reviews.

Signed-off-by: tier-cap <[email protected]>

* [feature] support read only and recovery when disk full.

remote the unused code and commend

Signed-off-by: tier-cap <[email protected]>

* clean up tests and fix a bug about transfer leader

Signed-off-by: qupeng <[email protected]>

* a little fix

Signed-off-by: qupeng <[email protected]>

* disk full recovery change 3 details
1. config change allowed
2. campaign success log allowed
3. add the raft engine size stats.

Signed-off-by: tier-cap <[email protected]>

* fix one bug

Signed-off-by: tier-cap <[email protected]>

* change details by review comment.

Signed-off-by: tier-cap <[email protected]>

* simplify some impls by comments.

Signed-off-by: tier-cap <[email protected]>

* change the atomic var access mode by review comments

Signed-off-by: tier-cap <[email protected]>

Co-authored-by: tier-cap <[email protected]>
Co-authored-by: qupeng <[email protected]>
  • Loading branch information
3 people authored Jun 23, 2021
1 parent 338aabf commit 46e539a
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 9 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ fuzz-incremental/
/db/
/last_tikv.toml
/raft/
core.*

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mem-profiling = ["tikv_alloc/mem-profiling"]
failpoints = [
"fail/failpoints",
"raftstore/failpoints",
"tikv_util/failpoints",
"engine_rocks/failpoints"
]
cloud-aws = [
Expand Down
4 changes: 4 additions & 0 deletions components/engine_panic/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ impl RaftEngine for PanicEngine {
fn dump_stats(&self) -> Result<String> {
panic!()
}

fn get_engine_size(&self) -> Result<u64> {
panic!()
}
}

impl RaftLogBatch for PanicWriteBatch {
Expand Down
9 changes: 8 additions & 1 deletion components/engine_rocks/src/raft_engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{RocksEngine, RocksWriteBatch};
use crate::{util, RocksEngine, RocksWriteBatch};

use engine_traits::{
Error, Iterable, KvEngine, MiscExt, Mutable, Peekable, RaftEngine, RaftEngineReadOnly,
Expand Down Expand Up @@ -221,6 +221,13 @@ impl RaftEngine for RocksEngine {
fn dump_stats(&self) -> Result<String> {
MiscExt::dump_stats(self)
}

fn get_engine_size(&self) -> Result<u64> {
let handle = util::get_cf_handle(self.as_inner(), CF_DEFAULT)?;
let used_size = util::get_engine_cf_used_size(self.as_inner(), handle);

Ok(used_size)
}
}

impl RaftLogBatch for RocksWriteBatch {
Expand Down
2 changes: 2 additions & 0 deletions components/engine_traits/src/raft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub trait RaftEngine: RaftEngineReadOnly + Clone + Sync + Send + 'static {
fn stop(&self) {}

fn dump_stats(&self) -> Result<String>;

fn get_engine_size(&self) -> Result<u64>;
}

pub trait RaftLogBatch: Send {
Expand Down
5 changes: 5 additions & 0 deletions components/raft_log_engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ impl RaftEngine for RaftLogEngine {
// Raft engine won't dump anything.
Ok("".to_owned())
}

fn get_engine_size(&self) -> Result<u64> {
//TODO impl this when RaftLogEngine is ready to go online.
Ok(0)
}
}

fn transfer_error(e: RaftEngineError) -> engine_traits::Error {
Expand Down
31 changes: 29 additions & 2 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use kvproto::raft_serverpb::{
};
use kvproto::replication_modepb::{DrAutoSyncState, ReplicationMode};
use protobuf::Message;
use raft::eraftpb::{ConfChangeType, Entry, MessageType};
use raft::eraftpb::{ConfChangeType, Entry, EntryType, MessageType};
use raft::{self, Progress, ReadState, Ready, SnapshotStatus, StateRole, INVALID_INDEX, NO_LIMIT};
use tikv_alloc::trace::TraceEvent;
use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use tikv_util::sys::memory_usage_reaches_high_water;
use tikv_util::sys::{disk, memory_usage_reaches_high_water};
use tikv_util::time::duration_to_sec;
use tikv_util::worker::{Scheduler, Stopped};
use tikv_util::{box_err, debug, error, info, trace, warn};
Expand Down Expand Up @@ -1233,6 +1233,33 @@ where
"to_peer_id" => msg.get_to_peer().get_id(),
);

let msg_type = msg.get_message().get_msg_type();
let store_id = self.ctx.store_id();

if disk::disk_full_precheck(store_id) || self.ctx.is_disk_full {
let mut flag = false;
if MessageType::MsgAppend == msg_type {
let entries = msg.get_message().get_entries();
for i in entries {
let entry_type = i.get_entry_type();
if EntryType::EntryNormal == entry_type && !i.get_data().is_empty() {
flag = true;
break;
}
}
} else if MessageType::MsgTimeoutNow == msg_type {
flag = true;
}

if flag {
debug!(
"skip {:?} because of disk full", msg_type;
"region_id" => self.region_id(), "peer_id" => self.fsm.peer_id()
);
return Err(Error::Timeout("disk full".to_owned()));
}
}

if !self.validate_raft_msg(&msg) {
return Ok(());
}
Expand Down
4 changes: 4 additions & 0 deletions components/raftstore/src/store/fsm/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use sst_importer::SSTImporter;
use tikv_alloc::trace::TraceEvent;
use tikv_util::config::{Tracker, VersionTrack};
use tikv_util::mpsc::{self, LooseBoundedSender, Receiver};
use tikv_util::sys::disk;
use tikv_util::time::{duration_to_sec, Instant as TiInstant};
use tikv_util::timer::SteadyTimer;
use tikv_util::worker::{FutureScheduler, FutureWorker, Scheduler, Worker};
Expand Down Expand Up @@ -386,6 +387,7 @@ where
pub perf_context: EK::PerfContext,
pub tick_batch: Vec<PeerTickBatch>,
pub node_start_time: Option<TiInstant>,
pub is_disk_full: bool,
}

impl<EK, ER, T> HandleRaftReadyContext<EK::WriteBatch, ER::LogBatch> for PollContext<EK, ER, T>
Expand Down Expand Up @@ -771,6 +773,7 @@ impl<EK: KvEngine, ER: RaftEngine, T: Transport> PollHandler<PeerFsm<EK, ER>, St
self.poll_ctx.pending_count = 0;
self.poll_ctx.sync_log = false;
self.poll_ctx.has_ready = false;
self.poll_ctx.is_disk_full = disk::is_disk_full();
self.timer = TiInstant::now_coarse();
// update config
self.poll_ctx.perf_context.start_observe();
Expand Down Expand Up @@ -1134,6 +1137,7 @@ where
tick_batch: vec![PeerTickBatch::default(); 256],
node_start_time: Some(TiInstant::now_coarse()),
feature_gate: self.feature_gate.clone(),
is_disk_full: false,
};
ctx.update_ticks_timeout();
let tag = format!("[store {}]", ctx.store.get_id());
Expand Down
12 changes: 11 additions & 1 deletion components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use crate::{Error, Result};
use collections::{HashMap, HashSet};
use pd_client::INVALID_ID;
use tikv_util::codec::number::decode_u64;
use tikv_util::sys::disk;
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use tikv_util::time::{Instant as UtilInstant, ThreadReadId};
use tikv_util::worker::{FutureScheduler, Scheduler};
Expand Down Expand Up @@ -2350,7 +2351,14 @@ where
return false;
}
Ok(RequestPolicy::ReadIndex) => return self.read_index(ctx, req, err_resp, cb),
Ok(RequestPolicy::ProposeNormal) => self.propose_normal(ctx, req),
Ok(RequestPolicy::ProposeNormal) => {
let store_id = ctx.store_id();
if disk::disk_full_precheck(store_id) || ctx.is_disk_full {
Err(Error::Timeout("disk full".to_owned()))
} else {
self.propose_normal(ctx, req)
}
}
Ok(RequestPolicy::ProposeTransferLeader) => {
return self.propose_transfer_leader(ctx, req, cb);
}
Expand Down Expand Up @@ -3126,6 +3134,8 @@ where
if self.is_applying_snapshot()
|| self.has_pending_snapshot()
|| msg.get_from() != self.leader_id()
// For followers whose disk is full.
|| disk::disk_full_precheck(ctx.store_id()) || ctx.is_disk_full
{
info!(
"reject transferring leader";
Expand Down
5 changes: 5 additions & 0 deletions components/raftstore/src/store/worker/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use futures::FutureExt;
use pd_client::metrics::*;
use pd_client::{Error, PdClient, RegionStat};
use tikv_util::metrics::ThreadInfoStatistics;
use tikv_util::sys::disk;
use tikv_util::time::UnixSecs;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tikv_util::worker::{FutureRunnable as Runnable, FutureScheduler as Scheduler, Stopped};
Expand Down Expand Up @@ -742,6 +743,10 @@ where
let mut available = capacity.checked_sub(used_size).unwrap_or_default();
// We only care about rocksdb SST file size, so we should check disk available here.
available = cmp::min(available, disk_stats.available_space());

if disk::is_disk_full() {
available = 0;
}
if available == 0 {
warn!("no available space");
}
Expand Down
74 changes: 71 additions & 3 deletions components/server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
Arc, Mutex,
},
time::{Duration, Instant},
u64,
};

use cdc::MemoryQuota;
Expand Down Expand Up @@ -60,7 +61,7 @@ use raftstore::{
fsm,
fsm::store::{RaftBatchSystem, RaftRouter, StoreMeta, PENDING_MSG_CAP},
memory::MEMTRACE_ROOT,
AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader,
AutoSplitController, CheckLeaderRunner, GlobalReplicationState, LocalReader, SnapManager,
SnapManagerBuilder, SplitCheckRunner, SplitConfigManager, StoreMsg,
},
};
Expand Down Expand Up @@ -89,7 +90,7 @@ use tikv_util::{
check_environment_variables,
config::{ensure_dir_exist, VersionTrack},
math::MovingAvgU32,
sys::{register_memory_usage_high_water, SysQuota},
sys::{disk, register_memory_usage_high_water, SysQuota},
thread_group::GroupProperties,
time::Monitor,
worker::{Builder as WorkerBuilder, FutureWorker, LazyWorker, Worker},
Expand Down Expand Up @@ -136,10 +137,11 @@ pub fn run_tikv(config: TiKvConfig) {
let (limiter, fetcher) = tikv.init_io_utility();
let (engines, engines_info) = tikv.init_raw_engines(Some(limiter.clone()));
limiter.set_low_priority_io_adjustor_if_needed(Some(engines_info.clone()));
tikv.init_engines(engines);
tikv.init_engines(engines.clone());
let server_config = tikv.init_servers();
tikv.register_services();
tikv.init_metrics_flusher(fetcher, engines_info);
tikv.init_storage_stats_task(engines);
tikv.run_server(server_config);
tikv.run_status_server();

Expand All @@ -159,6 +161,7 @@ const RESERVED_OPEN_FDS: u64 = 1000;

const DEFAULT_METRICS_FLUSH_INTERVAL: Duration = Duration::from_millis(10_000);
const DEFAULT_ENGINE_METRICS_RESET_INTERVAL: Duration = Duration::from_millis(60_000);
const DEFAULT_STORAGE_STATS_INTERVAL: Duration = Duration::from_secs(1);

/// A complete TiKV server.
struct TiKVServer<ER: RaftEngine> {
Expand All @@ -171,6 +174,7 @@ struct TiKVServer<ER: RaftEngine> {
resolver: resolve::PdStoreAddrResolver,
state: Arc<Mutex<GlobalReplicationState>>,
store_path: PathBuf,
snap_mgr: Option<SnapManager>, // Will be filled in `init_servers`.
encryption_key_manager: Option<Arc<DataKeyManager>>,
engines: Option<TiKVEngines<ER>>,
servers: Option<Servers<ER>>,
Expand Down Expand Up @@ -253,6 +257,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
resolver,
state,
store_path,
snap_mgr: None,
encryption_key_manager: None,
engines: None,
servers: None,
Expand Down Expand Up @@ -405,6 +410,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
if self.config.raft_store.capacity.0 > 0 {
capacity = cmp::min(capacity, self.config.raft_store.capacity.0);
}
//TODO after disk full readonly impl, such file should be removed.
file_system::reserve_space_for_recover(
&self.config.storage.data_dir,
if self.config.storage.reserve_space.0 == 0 {
Expand Down Expand Up @@ -681,6 +687,7 @@ impl<ER: RaftEngine> TiKVServer<ER> {
node.try_bootstrap_store(engines.engines.clone())
.unwrap_or_else(|e| fatal!("failed to bootstrap node id: {}", e));

self.snap_mgr = Some(snap_mgr.clone());
// Create server
let server = Server::new(
node.id(),
Expand Down Expand Up @@ -1021,6 +1028,67 @@ impl<ER: RaftEngine> TiKVServer<ER> {
});
}

fn init_storage_stats_task(&self, engines: Engines<RocksEngine, ER>) {
let config_disk_capacity: u64 = self.config.raft_store.capacity.0;
let store_path = self.store_path.clone();
let snap_mgr = self.snap_mgr.clone().unwrap();
let disk_reserved = self.config.storage.reserve_space.0;
if disk_reserved == 0 {
info!("disk space checker not enabled");
return;
}
//TODO wal size ignore?
self.background_worker
.spawn_interval_task(DEFAULT_STORAGE_STATS_INTERVAL, move || {
let disk_stats = match fs2::statvfs(&store_path) {
Err(e) => {
error!(
"get disk stat for kv store failed";
"kv path" => store_path.to_str(),
"err" => ?e
);
return;
}
Ok(stats) => stats,
};
let disk_cap = disk_stats.total_space();
let snap_size = snap_mgr.get_total_snap_size().unwrap();

let kv_size = engines
.kv
.get_engine_used_size()
.expect("get kv engine size");

let raft_size = engines
.raft
.get_engine_size()
.expect("get raft engine size");

let used_size = snap_size + kv_size + raft_size;
let capacity = if config_disk_capacity == 0 || disk_cap < config_disk_capacity {
disk_cap
} else {
config_disk_capacity
};

let mut available = capacity.checked_sub(used_size).unwrap_or_default();
available = cmp::min(available, disk_stats.available_space());
if available <= disk_reserved {
warn!(
"disk full, available={},snap={},engine={},capacity={}",
available, snap_size, kv_size, capacity
);
disk::set_disk_full();
} else if disk::is_disk_full() {
info!(
"disk normalized, available={},snap={},engine={},capacity={}",
available, snap_size, kv_size, capacity
);
disk::clear_disk_full();
}
})
}

fn run_server(&mut self, server_config: Arc<VersionTrack<ServerConfig>>) {
let server = self.servers.as_mut().unwrap();
server
Expand Down
22 changes: 22 additions & 0 deletions components/tikv_util/src/sys/disk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
use fail::fail_point;
use std::sync::atomic::{AtomicBool, Ordering};
static DISK_FULL: AtomicBool = AtomicBool::new(false);

pub fn set_disk_full() {
DISK_FULL.store(true, Ordering::Release);
}

pub fn clear_disk_full() {
DISK_FULL.store(false, Ordering::Release);
}

pub fn is_disk_full() -> bool {
DISK_FULL.load(Ordering::Acquire)
}

pub fn disk_full_precheck(_store_id: u64) -> bool {
fail_point!("disk_full_peer_1", _store_id == 1, |_| true);
fail_point!("disk_full_peer_2", _store_id == 2, |_| true);
false
}
4 changes: 2 additions & 2 deletions components/tikv_util/src/sys/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2017 TiKV Project Authors. Licensed under Apache-2.0.

pub mod cpu_time;

#[cfg(target_os = "linux")]
mod cgroup;
pub mod cpu_time;
pub mod disk;

// re-export some traits for ease of use
use crate::config::{ReadableSize, KIB};
Expand Down
1 change: 1 addition & 0 deletions tests/failpoints/cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod test_cmd_epoch_checker;
mod test_compact_log;
mod test_conf_change;
mod test_coprocessor;
mod test_disk_full;
mod test_early_apply;
mod test_encryption;
mod test_gc_worker;
Expand Down
Loading

0 comments on commit 46e539a

Please sign in to comment.