From 7a7ee93b82f01ebec9ec3c13f865eac11bf27672 Mon Sep 17 00:00:00 2001 From: john Date: Sat, 11 Oct 2025 14:28:44 +0800 Subject: [PATCH] feat: supports capacity eviction management for the entire curvine cluster --- Cargo.lock | 12 +- Cargo.toml | 3 +- curvine-common/src/conf/master_conf.rs | 18 ++ curvine-server/Cargo.toml | 1 + .../src/master/fs/heartbeat_checker.rs | 86 ++++--- curvine-server/src/master/fs/master_actor.rs | 14 +- .../src/master/journal/journal_system.rs | 33 ++- curvine-server/src/master/master_server.rs | 2 + curvine-server/src/master/meta/fs_dir.rs | 8 + curvine-server/src/master/mod.rs | 3 + .../src/master/quota/eviction/evictor.rs | 78 ++++++ .../src/master/quota/eviction/mod.rs | 18 ++ .../src/master/quota/eviction/types.rs | 87 +++++++ curvine-server/src/master/quota/mod.rs | 18 ++ .../src/master/quota/quota_manager.rs | 217 +++++++++++++++++ curvine-tests/tests/quota_eviction_test.rs | 226 ++++++++++++++++++ 16 files changed, 781 insertions(+), 43 deletions(-) create mode 100644 curvine-server/src/master/quota/eviction/evictor.rs create mode 100644 curvine-server/src/master/quota/eviction/mod.rs create mode 100644 curvine-server/src/master/quota/eviction/types.rs create mode 100644 curvine-server/src/master/quota/mod.rs create mode 100644 curvine-server/src/master/quota/quota_manager.rs create mode 100644 curvine-tests/tests/quota_eviction_test.rs diff --git a/Cargo.lock b/Cargo.lock index 3908d13b..7c354e04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -311,7 +311,7 @@ dependencies = [ "http 0.2.12", "http 1.3.1", "http-body 0.4.6", - "lru", + "lru 0.12.5", "percent-encoding", "regex-lite", "sha2", @@ -1450,6 +1450,7 @@ dependencies = [ "indexmap 2.11.0", "linked-hash-map", "log", + "lru 0.16.1", "mini-moka", "num_enum", "once_cell", @@ -2722,6 +2723,15 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "lru" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe949189f46fabb938b3a9a0be30fdd93fd8a09260da863399a8cf3db756ec8" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "lz4-sys" version = "1.11.1+lz4-1.10.0" diff --git a/Cargo.toml b/Cargo.toml index 6dc21fa2..2a305a85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,4 +120,5 @@ bigdecimal = "0.4.8" bitvec = "1.0.1" config = "0.13.4" tempfile = "3.21.0" -md-5 = "0.10.6" \ No newline at end of file +md-5 = "0.10.6" +lru = "0.16.1" \ No newline at end of file diff --git a/curvine-common/src/conf/master_conf.rs b/curvine-common/src/conf/master_conf.rs index 9a6cc2ac..93dc3024 100644 --- a/curvine-common/src/conf/master_conf.rs +++ b/curvine-common/src/conf/master_conf.rs @@ -107,6 +107,15 @@ pub struct MasterConf { pub ttl_retry_interval: String, #[serde(skip)] pub ttl_retry_interval_unit: DurationUnit, + + // Eviction configuration + pub enable_quota_eviction: bool, + pub quota_eviction_mode: String, + pub quota_eviction_policy: String, + pub quota_eviction_high_rate: f64, + pub quota_eviction_low_rate: f64, + pub quota_eviction_scan_page: i32, + pub quota_eviction_dry_run: bool, } impl MasterConf { @@ -252,6 +261,15 @@ impl Default for MasterConf { ttl_retry_interval: "1s".to_string(), ttl_retry_interval_unit: Default::default(), + + // Eviction configuration defaults + enable_quota_eviction: false, + quota_eviction_mode: "free".to_string(), + quota_eviction_policy: "lru".to_string(), + quota_eviction_high_rate: 0.8, + quota_eviction_low_rate: 0.6, + quota_eviction_scan_page: 2, + quota_eviction_dry_run: false, }; conf.init().unwrap(); diff --git a/curvine-server/Cargo.toml b/curvine-server/Cargo.toml index 63f54384..b5bd02bd 100644 --- a/curvine-server/Cargo.toml +++ b/curvine-server/Cargo.toml @@ -45,3 +45,4 @@ chrono = { workspace = true } rand = { workspace = true } url = { workspace = true } parking_lot = { workspace = true } +lru = { workspace = true } diff --git a/curvine-server/src/master/fs/heartbeat_checker.rs b/curvine-server/src/master/fs/heartbeat_checker.rs index bc904a91..857a9fab 100644 --- a/curvine-server/src/master/fs/heartbeat_checker.rs +++ b/curvine-server/src/master/fs/heartbeat_checker.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::master::fs::MasterFilesystem; +use crate::master::quota::QuotaManager; use crate::master::replication::master_replication_manager::MasterReplicationManager; use crate::master::MasterMonitor; use curvine_common::error::FsError; @@ -30,6 +31,7 @@ pub struct HeartbeatChecker { worker_blacklist_ms: u64, worker_lost_ms: u64, replication_manager: Arc, + quota_manager: Arc, } impl HeartbeatChecker { @@ -38,6 +40,7 @@ impl HeartbeatChecker { monitor: MasterMonitor, executor: Arc, replication_manager: Arc, + quota_manager: Arc, ) -> Self { let worker_blacklist_ms = fs.conf.worker_blacklist_interval_ms(); let worker_lost_ms = fs.conf.worker_lost_interval_ms(); @@ -48,6 +51,7 @@ impl HeartbeatChecker { worker_blacklist_ms, worker_lost_ms, replication_manager, + quota_manager, } } } @@ -60,50 +64,56 @@ impl LoopTask for HeartbeatChecker { return Ok(()); } - let mut wm = self.fs.worker_manager.write(); - let workers = wm.get_last_heartbeat(); - let now = LocalTime::mills(); + { + let mut wm = self.fs.worker_manager.write(); + let workers = wm.get_last_heartbeat(); + let now = LocalTime::mills(); - for (id, last_update) in workers { - if now > last_update + self.worker_blacklist_ms { - // Worker blacklist timeout - let worker = wm.add_blacklist_worker(id); - warn!( - "Worker {:?} has no heartbeat for more than {} ms and will be blacklisted", - worker, self.worker_blacklist_ms - ); - } + for (id, last_update) in workers { + if now > last_update + self.worker_blacklist_ms { + // Worker blacklist timeout + let worker = wm.add_blacklist_worker(id); + warn!( + "Worker {:?} has no heartbeat for more than {} ms and will be blacklisted", + worker, self.worker_blacklist_ms + ); + } - if now > last_update + self.worker_lost_ms { - // Heartbeat timeout - let removed = wm.remove_expired_worker(id); - warn!( - "Worker {:?} has no heartbeat for more than {} ms and will be removed", - removed, self.worker_lost_ms - ); - // Asynchronously delete all block location data. - let fs = self.fs.clone(); - let rm = self.replication_manager.clone(); - let res = self.executor.spawn(move || { - let spend = TimeSpent::new(); - let block_ids = try_log!(fs.delete_locations(id), vec![]); - let block_num = block_ids.len(); - if let Err(e) = rm.report_under_replicated_blocks(id, block_ids) { - error!( - "Errors on reporting under-replicated {} blocks. err: {:?}", - block_num, e - ); - } - info!( - "Delete worker {} all locations used {} ms", - id, - spend.used_ms() + if now > last_update + self.worker_lost_ms { + // Heartbeat timeout + let removed = wm.remove_expired_worker(id); + warn!( + "Worker {:?} has no heartbeat for more than {} ms and will be removed", + removed, self.worker_lost_ms ); - }); - let _ = try_log!(res); + // Asynchronously delete all block location data. + let fs = self.fs.clone(); + let rm = self.replication_manager.clone(); + let res = self.executor.spawn(move || { + let spend = TimeSpent::new(); + let block_ids = try_log!(fs.delete_locations(id), vec![]); + let block_num = block_ids.len(); + if let Err(e) = rm.report_under_replicated_blocks(id, block_ids) { + error!( + "Errors on reporting under-replicated {} blocks. err: {:?}", + block_num, e + ); + } + info!( + "Delete worker {} all locations used {} ms", + id, + spend.used_ms() + ); + }); + let _ = try_log!(res); + } } } + if let Ok(info) = self.fs.master_info() { + self.quota_manager.detector(Some(info)); + }; + Ok(()) } diff --git a/curvine-server/src/master/fs/master_actor.rs b/curvine-server/src/master/fs/master_actor.rs index 1f18abcb..a9dcaade 100644 --- a/curvine-server/src/master/fs/master_actor.rs +++ b/curvine-server/src/master/fs/master_actor.rs @@ -17,6 +17,7 @@ use crate::master::fs::master_filesystem::MasterFilesystem; use crate::master::meta::inode::ttl::ttl_manager::InodeTtlManager; use crate::master::meta::inode::ttl::ttl_scheduler::TtlHeartbeatChecker; use crate::master::meta::inode::ttl_scheduler::TtlHeartbeatConfig; +use crate::master::quota::QuotaManager; use crate::master::replication::master_replication_manager::MasterReplicationManager; use crate::master::MasterMonitor; use curvine_common::executor::ScheduledExecutor; @@ -30,6 +31,7 @@ pub struct MasterActor { pub master_monitor: MasterMonitor, pub executor: Arc, pub replication_manager: Arc, + pub quota_manager: Arc, } impl MasterActor { @@ -38,12 +40,14 @@ impl MasterActor { master_monitor: MasterMonitor, executor: Arc, replication_manager: &Arc, + quota_manager: Arc, ) -> Self { Self { fs, master_monitor, executor, replication_manager: replication_manager.clone(), + quota_manager, } } @@ -54,6 +58,7 @@ impl MasterActor { self.master_monitor.clone(), self.executor.clone(), self.replication_manager.clone(), + self.quota_manager.clone(), ) .unwrap(); @@ -105,11 +110,18 @@ impl MasterActor { master_monitor: MasterMonitor, executor: Arc, replication_manager: Arc, + quota_manager: Arc, ) -> CommonResult<()> { let check_ms = fs.conf.worker_check_interval_ms(); let scheduler = ScheduledExecutor::new("worker-heartbeat", check_ms); - let task = HeartbeatChecker::new(fs, master_monitor, executor, replication_manager); + let task = HeartbeatChecker::new( + fs, + master_monitor, + executor, + replication_manager, + quota_manager, + ); scheduler.start(task)?; Ok(()) diff --git a/curvine-server/src/master/journal/journal_system.rs b/curvine-server/src/master/journal/journal_system.rs index 6cc986bc..1e5f94c5 100644 --- a/curvine-server/src/master/journal/journal_system.rs +++ b/curvine-server/src/master/journal/journal_system.rs @@ -12,11 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::master::eviction::evictor::LRUEvictor; +use crate::master::eviction::types::EvictionPolicy; +use crate::master::eviction::EvictionConf; use crate::master::fs::{MasterFilesystem, WorkerManager}; use crate::master::journal::{JournalLoader, JournalWriter}; use crate::master::meta::inode::ttl::ttl_bucket::TtlBucketList; use crate::master::meta::FsDir; -use crate::master::{MasterMonitor, MetaRaftJournal, MountManager, SyncFsDir, SyncWorkerManager}; +use crate::master::{ + MasterMonitor, MetaRaftJournal, MountManager, QuotaManager, SyncFsDir, SyncWorkerManager, +}; use curvine_common::conf::ClusterConf; use curvine_common::proto::raft::SnapshotData; use curvine_common::raft::storage::{AppStorage, LogStorage, RocksLogStorage}; @@ -39,6 +44,7 @@ pub struct JournalSystem { raft_journal: MetaRaftJournal, master_monitor: MasterMonitor, mount_manager: Arc, + quota_manager: Arc, } impl JournalSystem { @@ -49,6 +55,7 @@ impl JournalSystem { raft_journal: MetaRaftJournal, master_monitor: MasterMonitor, mount_manager: Arc, + quota_manager: Arc, ) -> Self { Self { rt, @@ -57,6 +64,7 @@ impl JournalSystem { raft_journal, master_monitor, mount_manager, + quota_manager, } } @@ -88,7 +96,20 @@ impl JournalSystem { // Create TTL bucket list early with configuration let ttl_bucket_list = Arc::new(TtlBucketList::new(conf.master.ttl_bucket_interval_ms())); - let fs_dir = SyncFsDir::new(FsDir::new(conf, journal_writer, ttl_bucket_list)?); + let eviction_conf = EvictionConf::from_conf(conf); + let evictor = match eviction_conf.policy { + EvictionPolicy::Lru | EvictionPolicy::Lfu | EvictionPolicy::Arc => { + Arc::new(LRUEvictor::new(eviction_conf.clone())) + } + }; + + let fs_dir = SyncFsDir::new(FsDir::new( + conf, + journal_writer, + ttl_bucket_list, + evictor.clone(), + )?); + let fs = MasterFilesystem::new( conf, fs_dir.clone(), @@ -98,6 +119,9 @@ impl JournalSystem { let mount_manager = Arc::new(MountManager::new(fs.clone())); + let quota_manager = + QuotaManager::new(eviction_conf, fs.clone(), evictor.clone(), rt.clone()); + let raft_journal = MetaRaftJournal::new( rt.clone(), log_store, @@ -113,6 +137,7 @@ impl JournalSystem { raft_journal, master_monitor, mount_manager, + quota_manager, ); Ok(js) @@ -149,6 +174,10 @@ impl JournalSystem { self.mount_manager.clone() } + pub fn quota_manager(&self) -> Arc { + self.quota_manager.clone() + } + // Create a snapshot manually, dedicated for testing. pub fn create_snapshot(&self) -> RaftResult<()> { let data = self.raft_journal.app_store().create_snapshot(1, 1)?; diff --git a/curvine-server/src/master/master_server.rs b/curvine-server/src/master/master_server.rs index 6ca0d74b..1a2ec5a0 100644 --- a/curvine-server/src/master/master_server.rs +++ b/curvine-server/src/master/master_server.rs @@ -139,6 +139,7 @@ impl Master { let fs = journal_system.fs(); let worker_manager = journal_system.worker_manager(); let mount_manager = journal_system.mount_manager(); + let quota_manager = journal_system.quota_manager(); let rt = Arc::new(conf.master_server_conf().create_runtime()); @@ -149,6 +150,7 @@ impl Master { journal_system.master_monitor(), conf.master.new_executor(), &replication_manager, + quota_manager, ); let job_manager = Arc::new(JobManager::from_cluster_conf( diff --git a/curvine-server/src/master/meta/fs_dir.rs b/curvine-server/src/master/meta/fs_dir.rs index 3aa742b9..23179759 100644 --- a/curvine-server/src/master/meta/fs_dir.rs +++ b/curvine-server/src/master/meta/fs_dir.rs @@ -19,6 +19,7 @@ use crate::master::meta::inode::InodeView::{Dir, File, FileEntry}; use crate::master::meta::inode::*; use crate::master::meta::store::{InodeStore, RocksInodeStore}; use crate::master::meta::{BlockMeta, InodeId}; +use crate::master::quota::eviction::evictor::Evictor; use curvine_common::conf::ClusterConf; use curvine_common::error::FsError; use curvine_common::state::{ @@ -39,6 +40,7 @@ pub struct FsDir { pub(crate) inode_id: InodeId, pub(crate) store: InodeStore, pub(crate) journal_writer: JournalWriter, + pub(crate) evictor: Arc, } impl FsDir { @@ -46,6 +48,7 @@ impl FsDir { conf: &ClusterConf, journal_writer: JournalWriter, ttl_bucket_list: Arc, + evictor: Arc, ) -> FsResult { let db_conf = conf.meta_rocks_conf(); @@ -58,6 +61,7 @@ impl FsDir { inode_id: InodeId::new(), store: state, journal_writer, + evictor, }; fs_dir.update_last_inode_id(last_inode_id)?; @@ -513,6 +517,8 @@ impl FsDir { file.features.complete_write(); file.len = len; + self.evictor.on_access(file.id()); + self.store .apply_complete_file(inode.as_ref(), commit_block.as_ref())?; self.journal_writer.log_complete_file( @@ -521,6 +527,7 @@ impl FsDir { inode.as_file_ref()?, commit_block, )?; + Ok(true) } @@ -529,6 +536,7 @@ impl FsDir { file: &InodeFile, ) -> FsResult>> { let locs = self.store.get_file_locations(file)?; + self.evictor.on_access(file.id()); Ok(locs) } diff --git a/curvine-server/src/master/mod.rs b/curvine-server/src/master/mod.rs index a7c68763..b50616bf 100644 --- a/curvine-server/src/master/mod.rs +++ b/curvine-server/src/master/mod.rs @@ -50,6 +50,9 @@ pub use rpc_context::RpcContext; pub mod mount; +pub mod quota; +pub use self::quota::*; + pub type MetaRaftJournal = RaftJournal; pub type SyncFsDir = ArcRwLock; pub type SyncWorkerManager = ArcRwLock; diff --git a/curvine-server/src/master/quota/eviction/evictor.rs b/curvine-server/src/master/quota/eviction/evictor.rs new file mode 100644 index 00000000..21f0fcf3 --- /dev/null +++ b/curvine-server/src/master/quota/eviction/evictor.rs @@ -0,0 +1,78 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Mutex; + +use crate::master::quota::eviction::types::EvictionConf; + +pub trait Evictor: Send + Sync { + fn on_access(&self, inode_id: i64); + fn select_victims(&self, limit: usize) -> Vec; + fn remove_victims(&self, inode_ids: &[i64]); +} + +pub struct LRUEvictor { + caches: Mutex>, + conf: EvictionConf, +} + +impl LRUEvictor { + pub fn new(conf: EvictionConf) -> Self { + Self { + caches: Mutex::new(lru::LruCache::unbounded()), + conf, + } + } + + fn peek_victims(&self, limit: usize) -> Vec { + if let Ok(caches) = self.caches.lock() { + caches + .iter() + .rev() + .take(limit) + .map(|(&inode_id, _)| inode_id) + .collect() + } else { + Vec::new() + } + } + + fn remove_victims(&self, inode_ids: &[i64]) { + if let Ok(mut caches) = self.caches.lock() { + for &inode_id in inode_ids { + caches.pop(&inode_id); + } + } + } +} + +impl Evictor for LRUEvictor { + fn on_access(&self, inode_id: i64) { + if !self.conf.enable_quota_eviction { + return; + } + + if let Ok(mut caches) = self.caches.lock() { + caches.put(inode_id, ()); + } + } + + fn select_victims(&self, limit: usize) -> Vec { + self.peek_victims(limit) + } + + fn remove_victims(&self, inode_ids: &[i64]) { + self.remove_victims(inode_ids) + } +} diff --git a/curvine-server/src/master/quota/eviction/mod.rs b/curvine-server/src/master/quota/eviction/mod.rs new file mode 100644 index 00000000..ebcb950a --- /dev/null +++ b/curvine-server/src/master/quota/eviction/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod evictor; +pub mod types; + +pub use types::{EvictPlan, EvictionConf, EvictionMode}; diff --git a/curvine-server/src/master/quota/eviction/types.rs b/curvine-server/src/master/quota/eviction/types.rs new file mode 100644 index 00000000..5d30670f --- /dev/null +++ b/curvine-server/src/master/quota/eviction/types.rs @@ -0,0 +1,87 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use curvine_common::conf::ClusterConf; +use std::fmt; + +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum EvictionPolicy { + Lru, + Lfu, + Arc, +} + +#[derive(Clone, Copy, Debug)] +pub enum EvictionMode { + FreeFile, + DeleteFile, +} + +#[derive(Clone, Debug)] +pub struct EvictionConf { + pub enable_quota_eviction: bool, + pub eviction_mode: EvictionMode, + pub policy: EvictionPolicy, + pub high_watermark: f64, + pub low_watermark: f64, + pub candidate_scan_page: usize, + pub dry_run: bool, +} + +impl EvictionConf { + pub fn from_conf(conf: &ClusterConf) -> Self { + let master_conf = &conf.master; + + // Parse eviction mode from string + let eviction_mode = match master_conf.quota_eviction_mode.as_str() { + "delete" => EvictionMode::DeleteFile, + _ => EvictionMode::FreeFile, + }; + + // Parse eviction policy from string (case-insensitive) + let policy = match master_conf.quota_eviction_policy.to_lowercase().as_str() { + "lru" => EvictionPolicy::Lru, + "lfu" => EvictionPolicy::Lfu, + "arc" => EvictionPolicy::Arc, + _ => EvictionPolicy::Lru, + }; + + Self { + enable_quota_eviction: master_conf.enable_quota_eviction, + eviction_mode, + policy, + high_watermark: master_conf.quota_eviction_high_rate, + low_watermark: master_conf.quota_eviction_low_rate, + candidate_scan_page: master_conf.quota_eviction_scan_page as usize, + dry_run: master_conf.quota_eviction_dry_run, + } + } +} + +#[derive(Clone, Debug, Default)] +pub struct EvictPlan { + pub trigger_used: i64, + pub quota_size: i64, + pub target_free_bytes: i64, +} + +impl fmt::Display for EvictPlan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "EvictPlan(trigger_used={}, quota_size={}, target_free_bytes={})", + self.trigger_used, self.quota_size, self.target_free_bytes + ) + } +} diff --git a/curvine-server/src/master/quota/mod.rs b/curvine-server/src/master/quota/mod.rs new file mode 100644 index 00000000..b230f744 --- /dev/null +++ b/curvine-server/src/master/quota/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod quota_manager; +pub use quota_manager::QuotaManager; + +pub mod eviction; diff --git a/curvine-server/src/master/quota/quota_manager.rs b/curvine-server/src/master/quota/quota_manager.rs new file mode 100644 index 00000000..0bab64cd --- /dev/null +++ b/curvine-server/src/master/quota/quota_manager.rs @@ -0,0 +1,217 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::master::fs::MasterFilesystem; +use crate::master::meta::inode::ttl_executor::InodeTtlExecutor; +use crate::master::meta::inode::InodeView; +use crate::master::quota::eviction::evictor::Evictor; +use crate::master::quota::eviction::types::EvictPlan; +use crate::master::quota::eviction::EvictionConf; +use crate::master::quota::eviction::EvictionMode; +use curvine_common::state::MasterInfo; +use orpc::runtime::{RpcRuntime, Runtime}; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; + +pub struct QuotaManager { + eviction_conf: EvictionConf, + fs: MasterFilesystem, + evictor: Arc, + ttl_executor: InodeTtlExecutor, + tx: Sender>, +} + +impl QuotaManager { + pub fn new( + eviction_conf: EvictionConf, + fs: MasterFilesystem, + evictor: Arc, + rt: Arc, + ) -> Arc { + let (tx, mut rx): (Sender>, Receiver>) = + mpsc::channel(1024); + + let manager = Arc::new(QuotaManager { + evictor, + ttl_executor: InodeTtlExecutor::new(fs.clone()), + eviction_conf, + fs: fs.clone(), + tx, + }); + + let mgr = manager.clone(); + + rt.spawn(async move { + while let Some(cluster_info) = rx.recv().await { + mgr.handle_trigger(cluster_info); + } + }); + + manager + } + + fn is_eviction_enabled(&self) -> bool { + self.eviction_conf.enable_quota_eviction + } + + pub fn detector(&self, info: Option) { + let _ = self.tx.try_send(info); + } + + fn handle_trigger(&self, cluster_info: Option) { + if !self.is_eviction_enabled() { + return; + } + + let Some(info) = cluster_info else { + log::warn!("cluster-evict: failed to fetch master_info"); + return; + }; + + let curvine_used = info.fs_used; + let curvine_quota = info.available + info.fs_used; + + if curvine_quota <= 0 { + return; + } + + if curvine_used <= 0 { + log::debug!("cluster-evict: curvine_used <= 0, stopping eviction"); + return; + } + + let Some(mut plan) = self.create_evict_plan(curvine_used, curvine_quota) else { + log::debug!( + "cluster-evict: no eviction needed, curvine_used={}, curvine_quota={}, usage_ratio={:.2}%", + curvine_used, curvine_quota, (curvine_used as f64 / curvine_quota as f64) * 100.0 + ); + return; + }; + + log::info!( + "cluster-evict: starting eviction, curvine_used={}, curvine_quota={}, usage_ratio={:.2}%, target_free={}", + curvine_used, curvine_quota, (curvine_used as f64 / curvine_quota as f64) * 100.0, plan.target_free_bytes + ); + + loop { + let step_free = plan.target_free_bytes; + + if step_free <= 0 { + log::debug!("cluster-evict: step_free <= 0, stopping eviction"); + break; + } + + let inode_ids = self + .evictor + .select_victims(self.eviction_conf.candidate_scan_page); + + if inode_ids.is_empty() { + log::debug!("cluster-evict: no more victims available, stopping eviction"); + break; + } + + if self.eviction_conf.dry_run { + log::debug!( + "cluster-evict: dry_run=true, would process inode_ids_step={}", + inode_ids.len() + ); + break; + } + + let total_freed = { + let fs_guard = self.fs.fs_dir.read(); + let freed = inode_ids + .iter() + .filter_map(|&inode_id| fs_guard.store.get_inode(inode_id, None).ok().flatten()) + .map(|inode_view| match &inode_view { + InodeView::File(_, f) => f.len.max(0), + _ => 0, + }) + .sum::(); + freed + }; + + if total_freed <= 0 { + log::debug!("cluster-evict: total_freed <= 0, stopping eviction"); + break; + } + + self.execute_eviction(self.eviction_conf.eviction_mode, &inode_ids); + + plan.target_free_bytes = plan.target_free_bytes.saturating_sub(total_freed); + + if plan.target_free_bytes <= 0 { + log::info!("cluster-evict: reached target_free_bytes, stopping eviction"); + break; + } + } + } + + fn create_evict_plan(&self, used: i64, quota: i64) -> Option { + if quota <= 0 { + return None; + } + + let usage_ratio = used as f64 / quota as f64; + if usage_ratio < self.eviction_conf.high_watermark { + return None; + } + + let target_ratio = self + .eviction_conf + .low_watermark + .min(self.eviction_conf.high_watermark) + .min(1.0); + + let target_used = (target_ratio * quota as f64) as i64; + let target_free_bytes = (used - target_used).max(0); + + Some(EvictPlan { + trigger_used: used, + quota_size: quota, + target_free_bytes, + }) + } + + fn execute_eviction(&self, mode: EvictionMode, inode_ids: &[i64]) { + let mut successfully_evicted = Vec::with_capacity(inode_ids.len()); + + for inode_id_i64 in inode_ids { + let inode_id = *inode_id_i64 as u64; + let res = match mode { + EvictionMode::FreeFile => self.ttl_executor.free_inode(inode_id), + EvictionMode::DeleteFile => self.ttl_executor.delete_inode(inode_id), + }; + + match res { + Ok(_) => { + successfully_evicted.push(*inode_id_i64); + } + Err(e) => { + log::warn!( + "prequota-evict: executor failed for inode_id={}, err={}", + inode_id, + e + ); + } + } + } + + if !successfully_evicted.is_empty() { + self.evictor.remove_victims(&successfully_evicted); + } + } +} diff --git a/curvine-tests/tests/quota_eviction_test.rs b/curvine-tests/tests/quota_eviction_test.rs new file mode 100644 index 00000000..162068fc --- /dev/null +++ b/curvine-tests/tests/quota_eviction_test.rs @@ -0,0 +1,226 @@ +// Copyright 2025 OPPO. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use curvine_client::file::CurvineFileSystem; +use curvine_common::conf::ClusterConf; +use curvine_common::fs::{Path, Reader, Writer}; +use curvine_server::test::MiniCluster; +use log::info; +use orpc::common::Utils; +use orpc::runtime::RpcRuntime; +use orpc::CommonResult; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +/// Test LRU eviction functionality with comprehensive scenarios +#[test] +fn quota_lru_eviction_integration_test() -> CommonResult<()> { + // Clean up test directory before starting + let test_data_dir = "/tmp/curvine-test-data-lru"; + let _ = std::fs::remove_dir_all(test_data_dir); + std::fs::create_dir_all(test_data_dir)?; + + let mut conf = ClusterConf::default(); + + conf.worker.data_dir = vec![test_data_dir.to_string()]; + + // Enable cluster-level eviction + conf.master.enable_quota_eviction = true; + conf.master.quota_eviction_mode = "delete".to_string(); + conf.master.quota_eviction_policy = "lru".to_string(); + + // Set watermarks to trigger eviction: high=0.155%, low=0.1447% + // Adjusted watermarks to target ~375MB release (5 files × 75MB) + // With 750MB used and ~254GB quota: target = 750MB - (0.001447 * 254GB) ≈ 375MB + conf.master.quota_eviction_high_rate = 0.00155; + conf.master.quota_eviction_low_rate = 0.001447; + + // Set candidate_scan_page=1 for precise LRU testing (delete files one by one) + // This ensures exact LRU behavior verification without over-eviction + conf.master.quota_eviction_scan_page = 1; + conf.master.quota_eviction_dry_run = false; + + // Reduce heartbeat interval to trigger eviction checks more frequently + conf.master.worker_check_interval = "2s".to_string(); + + let cluster = MiniCluster::with_num(&conf, 1, 1); + let conf = cluster.master_conf().clone(); + + cluster.start_cluster(); + + // Wait for cluster to stabilize + info!("Waiting for cluster to stabilize..."); + Utils::sleep(10000); + + // Use multi-threaded runtime for async operations (required for FsWriterBuffer background tasks) + let rt = Arc::new(conf.client_rpc_conf().create_runtime()); + let rt1 = rt.clone(); + let res: CommonResult<()> = rt.block_on(async move { + let fs = CurvineFileSystem::with_rt(conf, rt1)?; + + info!("=== LRU Eviction Test Suite ==="); + + // Test 1: Create multiple files with different access patterns + info!("Test 1: Creating files with LRU access pattern"); + let test_dir = Path::new("/lru-test")?; + fs.mkdir(&test_dir, true).await?; + + // Create 10 files, each 75MB (total 750MB) to trigger eviction + // With target ~370MB, deleting 5 files (5×75MB=375MB) should satisfy the target + let file_size = 75 * 1024 * 1024; // 75MB + let num_files = 10; + let content = vec![b'A'; file_size]; + + let mut file_paths = Vec::new(); + for i in 0..num_files { + let file_path = Path::new(format!("/lru-test/file_{}.dat", i))?; + info!("Creating file: {}", file_path); + + let mut writer = fs.create(&file_path, true).await?; + writer.write(&content).await?; + writer.complete().await?; + + file_paths.push(file_path); + + // Small delay between file creations + tokio::time::sleep(Duration::from_millis(100)).await; + } + + info!( + "Created {} files, total size: {} MB", + num_files, + (num_files * file_size) / (1024 * 1024) + ); + + // Test 2: Access files in a specific order to establish LRU pattern + // Access files 5-9 to make them more recently used + // Files 0-4 should be LRU candidates + info!("Test 2: Establishing LRU pattern by accessing files 5-9"); + for file_path in file_paths.iter().skip(5) { + info!("Accessing file: {}", file_path); + + let mut reader = fs.open(file_path).await?; + let mut buffer = vec![0u8; 1024]; + reader.read(&mut buffer).await?; + reader.complete().await?; + tokio::time::sleep(Duration::from_millis(50)).await; + } + + // Test 3: Check initial file status + info!("Test 3: Verifying all files exist before eviction"); + for (i, file_path) in file_paths.iter().enumerate() { + let exists = fs.exists(file_path).await?; + assert!(exists, "File {} should exist before eviction", i); + } + + // Test 4: Trigger eviction by waiting for heartbeat checker + info!("Test 4: Waiting for eviction to be triggered (up to 30 seconds)..."); + let start = Instant::now(); + let timeout = Duration::from_secs(30); + + let mut eviction_occurred = false; + while start.elapsed() < timeout { + tokio::time::sleep(Duration::from_secs(2)).await; + + // Check if any LRU files (0-4) have been evicted + let mut evicted_count = 0; + for (i, file_path) in file_paths.iter().enumerate().take(5) { + let exists = fs.exists(file_path).await?; + if !exists { + evicted_count += 1; + info!("File {} has been evicted", i); + } + } + + if evicted_count > 0 { + eviction_occurred = true; + info!("Eviction detected: {} files evicted", evicted_count); + break; + } + } + + if !eviction_occurred { + info!("Warning: No eviction occurred within timeout."); + info!(" This may be expected if:"); + info!(" 1. Test directory size (750MB) is less than 0.155% of disk capacity"); + info!(" 2. Disk usage is below 0.155% high watermark"); + info!(" Current config: high_watermark=0.155%, low_watermark=0.1447%, candidate_scan_page=1 (precise LRU)"); + } else { + info!("Eviction successfully triggered"); + } + + // Test 5: Verify LRU behavior - recently accessed files should still exist + info!("Test 5: Verifying LRU behavior - recently accessed files (5-9) should remain"); + let mut recent_files_remaining = 0; + for file_path in file_paths.iter().skip(5) { + let exists = fs.exists(file_path).await?; + if exists { + recent_files_remaining += 1; + } + } + + info!( + "Recently accessed files remaining: {}/5", + recent_files_remaining + ); + + // Validate the core logic: LRU files (0-4) should be evicted, recent files (5-9) should remain + if eviction_occurred { + // Check that LRU files were evicted + for (i, file_path) in file_paths.iter().enumerate().take(5) { + let exists = fs.exists(file_path).await?; + assert!(!exists, "LRU file {} should have been evicted", i); + } + + // Check that recently accessed files largely remain + // Over-deletion is allowed; the target may require evicting one of 5..9. + // So require at least 4 of 5 to remain. + assert!( + recent_files_remaining >= 4, + "At least 4 of the recently accessed files (5..9) should remain, actual: {}", + recent_files_remaining + ); + + info!("LRU eviction logic validated: correct files were evicted"); + } + + // Test 6: Cleanup + info!("Test 6: Cleaning up test directory"); + for file_path in &file_paths { + let exists = fs.exists(file_path).await?; + if exists { + fs.delete(file_path, false).await?; + } + } + fs.delete(&test_dir, true).await?; + + info!("=== Test Summary ==="); + info!("- Files created: {}", num_files); + info!("- Eviction triggered: {}", eviction_occurred); + info!("- Recent files remaining: {}/{}", recent_files_remaining, 5); + + Ok(()) + }); + + match res { + Ok(_) => { + info!("All LRU eviction tests completed"); + Ok(()) + } + Err(e) => { + eprintln!("LRU eviction tests failed: {:?}", e); + Err(e) + } + } +}