Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,5 @@ bigdecimal = "0.4.8"
bitvec = "1.0.1"
config = "0.13.4"
tempfile = "3.21.0"
md-5 = "0.10.6"
md-5 = "0.10.6"
lru = "0.16.1"
18 changes: 18 additions & 0 deletions curvine-common/src/conf/master_conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ pub struct MasterConf {
pub ttl_retry_interval: String,
#[serde(skip)]
pub ttl_retry_interval_unit: DurationUnit,

// Eviction configuration
pub enable_quota_eviction: bool,
pub quota_eviction_mode: String,
pub quota_eviction_policy: String,
pub quota_eviction_high_rate: f64,
pub quota_eviction_low_rate: f64,
pub quota_eviction_scan_page: i32,
pub quota_eviction_dry_run: bool,
}

impl MasterConf {
Expand Down Expand Up @@ -252,6 +261,15 @@ impl Default for MasterConf {

ttl_retry_interval: "1s".to_string(),
ttl_retry_interval_unit: Default::default(),

// Eviction configuration defaults
enable_quota_eviction: false,
quota_eviction_mode: "free".to_string(),
quota_eviction_policy: "lru".to_string(),
quota_eviction_high_rate: 0.8,
quota_eviction_low_rate: 0.6,
quota_eviction_scan_page: 2,
quota_eviction_dry_run: false,
};

conf.init().unwrap();
Expand Down
1 change: 1 addition & 0 deletions curvine-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ chrono = { workspace = true }
rand = { workspace = true }
url = { workspace = true }
parking_lot = { workspace = true }
lru = { workspace = true }
86 changes: 48 additions & 38 deletions curvine-server/src/master/fs/heartbeat_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use crate::master::fs::MasterFilesystem;
use crate::master::quota::QuotaManager;
use crate::master::replication::master_replication_manager::MasterReplicationManager;
use crate::master::MasterMonitor;
use curvine_common::error::FsError;
Expand All @@ -30,6 +31,7 @@ pub struct HeartbeatChecker {
worker_blacklist_ms: u64,
worker_lost_ms: u64,
replication_manager: Arc<MasterReplicationManager>,
quota_manager: Arc<QuotaManager>,
}

impl HeartbeatChecker {
Expand All @@ -38,6 +40,7 @@ impl HeartbeatChecker {
monitor: MasterMonitor,
executor: Arc<GroupExecutor>,
replication_manager: Arc<MasterReplicationManager>,
quota_manager: Arc<QuotaManager>,
) -> Self {
let worker_blacklist_ms = fs.conf.worker_blacklist_interval_ms();
let worker_lost_ms = fs.conf.worker_lost_interval_ms();
Expand All @@ -48,6 +51,7 @@ impl HeartbeatChecker {
worker_blacklist_ms,
worker_lost_ms,
replication_manager,
quota_manager,
}
}
}
Expand All @@ -60,50 +64,56 @@ impl LoopTask for HeartbeatChecker {
return Ok(());
}

let mut wm = self.fs.worker_manager.write();
let workers = wm.get_last_heartbeat();
let now = LocalTime::mills();
{
let mut wm = self.fs.worker_manager.write();
let workers = wm.get_last_heartbeat();
let now = LocalTime::mills();

for (id, last_update) in workers {
if now > last_update + self.worker_blacklist_ms {
// Worker blacklist timeout
let worker = wm.add_blacklist_worker(id);
warn!(
"Worker {:?} has no heartbeat for more than {} ms and will be blacklisted",
worker, self.worker_blacklist_ms
);
}
for (id, last_update) in workers {
if now > last_update + self.worker_blacklist_ms {
// Worker blacklist timeout
let worker = wm.add_blacklist_worker(id);
warn!(
"Worker {:?} has no heartbeat for more than {} ms and will be blacklisted",
worker, self.worker_blacklist_ms
);
}

if now > last_update + self.worker_lost_ms {
// Heartbeat timeout
let removed = wm.remove_expired_worker(id);
warn!(
"Worker {:?} has no heartbeat for more than {} ms and will be removed",
removed, self.worker_lost_ms
);
// Asynchronously delete all block location data.
let fs = self.fs.clone();
let rm = self.replication_manager.clone();
let res = self.executor.spawn(move || {
let spend = TimeSpent::new();
let block_ids = try_log!(fs.delete_locations(id), vec![]);
let block_num = block_ids.len();
if let Err(e) = rm.report_under_replicated_blocks(id, block_ids) {
error!(
"Errors on reporting under-replicated {} blocks. err: {:?}",
block_num, e
);
}
info!(
"Delete worker {} all locations used {} ms",
id,
spend.used_ms()
if now > last_update + self.worker_lost_ms {
// Heartbeat timeout
let removed = wm.remove_expired_worker(id);
warn!(
"Worker {:?} has no heartbeat for more than {} ms and will be removed",
removed, self.worker_lost_ms
);
});
let _ = try_log!(res);
// Asynchronously delete all block location data.
let fs = self.fs.clone();
let rm = self.replication_manager.clone();
let res = self.executor.spawn(move || {
let spend = TimeSpent::new();
let block_ids = try_log!(fs.delete_locations(id), vec![]);
let block_num = block_ids.len();
if let Err(e) = rm.report_under_replicated_blocks(id, block_ids) {
error!(
"Errors on reporting under-replicated {} blocks. err: {:?}",
block_num, e
);
}
info!(
"Delete worker {} all locations used {} ms",
id,
spend.used_ms()
);
});
let _ = try_log!(res);
}
}
}

if let Ok(info) = self.fs.master_info() {
self.quota_manager.detector(Some(info));
};

Ok(())
}

Expand Down
14 changes: 13 additions & 1 deletion curvine-server/src/master/fs/master_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::master::fs::master_filesystem::MasterFilesystem;
use crate::master::meta::inode::ttl::ttl_manager::InodeTtlManager;
use crate::master::meta::inode::ttl::ttl_scheduler::TtlHeartbeatChecker;
use crate::master::meta::inode::ttl_scheduler::TtlHeartbeatConfig;
use crate::master::quota::QuotaManager;
use crate::master::replication::master_replication_manager::MasterReplicationManager;
use crate::master::MasterMonitor;
use curvine_common::executor::ScheduledExecutor;
Expand All @@ -30,6 +31,7 @@ pub struct MasterActor {
pub master_monitor: MasterMonitor,
pub executor: Arc<GroupExecutor>,
pub replication_manager: Arc<MasterReplicationManager>,
pub quota_manager: Arc<QuotaManager>,
}

impl MasterActor {
Expand All @@ -38,12 +40,14 @@ impl MasterActor {
master_monitor: MasterMonitor,
executor: Arc<GroupExecutor>,
replication_manager: &Arc<MasterReplicationManager>,
quota_manager: Arc<QuotaManager>,
) -> Self {
Self {
fs,
master_monitor,
executor,
replication_manager: replication_manager.clone(),
quota_manager,
}
}

Expand All @@ -54,6 +58,7 @@ impl MasterActor {
self.master_monitor.clone(),
self.executor.clone(),
self.replication_manager.clone(),
self.quota_manager.clone(),
)
.unwrap();

Expand Down Expand Up @@ -105,11 +110,18 @@ impl MasterActor {
master_monitor: MasterMonitor,
executor: Arc<GroupExecutor>,
replication_manager: Arc<MasterReplicationManager>,
quota_manager: Arc<QuotaManager>,
) -> CommonResult<()> {
let check_ms = fs.conf.worker_check_interval_ms();
let scheduler = ScheduledExecutor::new("worker-heartbeat", check_ms);

let task = HeartbeatChecker::new(fs, master_monitor, executor, replication_manager);
let task = HeartbeatChecker::new(
fs,
master_monitor,
executor,
replication_manager,
quota_manager,
);

scheduler.start(task)?;
Ok(())
Expand Down
33 changes: 31 additions & 2 deletions curvine-server/src/master/journal/journal_system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::master::eviction::evictor::LRUEvictor;
use crate::master::eviction::types::EvictionPolicy;
use crate::master::eviction::EvictionConf;
use crate::master::fs::{MasterFilesystem, WorkerManager};
use crate::master::journal::{JournalLoader, JournalWriter};
use crate::master::meta::inode::ttl::ttl_bucket::TtlBucketList;
use crate::master::meta::FsDir;
use crate::master::{MasterMonitor, MetaRaftJournal, MountManager, SyncFsDir, SyncWorkerManager};
use crate::master::{
MasterMonitor, MetaRaftJournal, MountManager, QuotaManager, SyncFsDir, SyncWorkerManager,
};
use curvine_common::conf::ClusterConf;
use curvine_common::proto::raft::SnapshotData;
use curvine_common::raft::storage::{AppStorage, LogStorage, RocksLogStorage};
Expand All @@ -39,6 +44,7 @@ pub struct JournalSystem {
raft_journal: MetaRaftJournal,
master_monitor: MasterMonitor,
mount_manager: Arc<MountManager>,
quota_manager: Arc<QuotaManager>,
}

impl JournalSystem {
Expand All @@ -49,6 +55,7 @@ impl JournalSystem {
raft_journal: MetaRaftJournal,
master_monitor: MasterMonitor,
mount_manager: Arc<MountManager>,
quota_manager: Arc<QuotaManager>,
) -> Self {
Self {
rt,
Expand All @@ -57,6 +64,7 @@ impl JournalSystem {
raft_journal,
master_monitor,
mount_manager,
quota_manager,
}
}

Expand Down Expand Up @@ -88,7 +96,20 @@ impl JournalSystem {
// Create TTL bucket list early with configuration
let ttl_bucket_list = Arc::new(TtlBucketList::new(conf.master.ttl_bucket_interval_ms()));

let fs_dir = SyncFsDir::new(FsDir::new(conf, journal_writer, ttl_bucket_list)?);
let eviction_conf = EvictionConf::from_conf(conf);
let evictor = match eviction_conf.policy {
EvictionPolicy::Lru | EvictionPolicy::Lfu | EvictionPolicy::Arc => {
Arc::new(LRUEvictor::new(eviction_conf.clone()))
}
};

let fs_dir = SyncFsDir::new(FsDir::new(
conf,
journal_writer,
ttl_bucket_list,
evictor.clone(),
)?);

let fs = MasterFilesystem::new(
conf,
fs_dir.clone(),
Expand All @@ -98,6 +119,9 @@ impl JournalSystem {

let mount_manager = Arc::new(MountManager::new(fs.clone()));

let quota_manager =
QuotaManager::new(eviction_conf, fs.clone(), evictor.clone(), rt.clone());

let raft_journal = MetaRaftJournal::new(
rt.clone(),
log_store,
Expand All @@ -113,6 +137,7 @@ impl JournalSystem {
raft_journal,
master_monitor,
mount_manager,
quota_manager,
);

Ok(js)
Expand Down Expand Up @@ -149,6 +174,10 @@ impl JournalSystem {
self.mount_manager.clone()
}

pub fn quota_manager(&self) -> Arc<QuotaManager> {
self.quota_manager.clone()
}

// Create a snapshot manually, dedicated for testing.
pub fn create_snapshot(&self) -> RaftResult<()> {
let data = self.raft_journal.app_store().create_snapshot(1, 1)?;
Expand Down
2 changes: 2 additions & 0 deletions curvine-server/src/master/master_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl Master {
let fs = journal_system.fs();
let worker_manager = journal_system.worker_manager();
let mount_manager = journal_system.mount_manager();
let quota_manager = journal_system.quota_manager();

let rt = Arc::new(conf.master_server_conf().create_runtime());

Expand All @@ -149,6 +150,7 @@ impl Master {
journal_system.master_monitor(),
conf.master.new_executor(),
&replication_manager,
quota_manager,
);

let job_manager = Arc::new(JobManager::from_cluster_conf(
Expand Down
Loading