Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion curvine-cli/src/cmds/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ pub struct MountCommand {
#[arg(long, default_value = "0")]
ttl_ms: String,

#[arg(long, default_value = "none")]
#[arg(
long,
default_value = "none",
help = "TTL expiration action when file expires:\n none - No action\n delete - Delete file\n persist - Export to UFS (skip if exists), keep CV cache\n evict - Export to UFS (skip if exists), delete CV cache\n flush - Force export to UFS (overwrite), delete CV cache"
)]
ttl_action: String,

#[arg(long)]
Expand Down
17 changes: 17 additions & 0 deletions curvine-common/src/state/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,13 @@ pub struct JobStatus {
#[derive(Default, Debug, Deserialize, Serialize)]
pub struct LoadJobCommand {
pub source_path: String,
pub target_path: Option<String>,
pub replicas: Option<i32>,
pub block_size: Option<i64>,
pub storage_type: Option<StorageType>,
pub ttl_ms: Option<i64>,
pub ttl_action: Option<TtlAction>,
pub overwrite: Option<bool>,
}

impl LoadJobCommand {
Expand All @@ -93,11 +95,13 @@ impl LoadJobCommand {
#[derive(Default)]
pub struct LoadJobCommandBuilder {
source_path: String,
target_path: Option<String>,
replicas: Option<i32>,
block_size: Option<i64>,
storage_type: Option<StorageType>,
ttl_ms: Option<i64>,
ttl_action: Option<TtlAction>,
overwrite: Option<bool>,
}

impl LoadJobCommandBuilder {
Expand All @@ -108,6 +112,11 @@ impl LoadJobCommandBuilder {
}
}

pub fn target_path(mut self, target_path: impl Into<String>) -> Self {
let _ = self.target_path.insert(target_path.into());
self
}

pub fn replicas(mut self, replicas: i32) -> Self {
let _ = self.replicas.insert(replicas);
self
Expand All @@ -133,14 +142,21 @@ impl LoadJobCommandBuilder {
self
}

pub fn overwrite(mut self, overwrite: bool) -> Self {
let _ = self.overwrite.insert(overwrite);
self
}

pub fn build(self) -> LoadJobCommand {
LoadJobCommand {
source_path: self.source_path,
target_path: self.target_path,
replicas: self.replicas,
block_size: self.block_size,
storage_type: self.storage_type,
ttl_ms: self.ttl_ms,
ttl_action: self.ttl_action,
overwrite: self.overwrite,
}
}
}
Expand All @@ -157,6 +173,7 @@ pub struct LoadJobInfo {
pub ttl_action: TtlAction,
pub mount_info: MountInfo,
pub create_time: i64,
pub overwrite: Option<bool>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
70 changes: 70 additions & 0 deletions curvine-common/src/state/mount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,74 @@ mod tests {
"s3://spark/a/b/c/dt=2025/1.csv"
);
}

#[test]
fn test_bidirectional_path_conversion() {
// Mount config: s3://flink/user → /mnt/s3
let info = MountInfo {
ufs_path: "s3://flink/user".to_string(),
cv_path: "/mnt/s3".to_string(),
..Default::default()
};

// Test 1: UFS → CV (Import) - root level file
let ufs_path = Path::from_str("s3://flink/user/batch_add_path_migrate_task.py").unwrap();
let cv_result = info.get_cv_path(&ufs_path).unwrap();
assert_eq!(
cv_result.full_path(),
"/mnt/s3/batch_add_path_migrate_task.py"
);

// Test 2: CV → UFS (Export) - root level file
let cv_path = Path::from_str("/mnt/s3/batch_add_path_migrate_task.py").unwrap();
let ufs_result = info.get_ufs_path(&cv_path).unwrap();
assert_eq!(
ufs_result.full_path(),
"s3://flink/user/batch_add_path_migrate_task.py"
);

// Test 3: UFS → CV (Import) - nested directory
let ufs_nested = Path::from_str("s3://flink/user/dir1/dir2/file.txt").unwrap();
let cv_nested = info.get_cv_path(&ufs_nested).unwrap();
assert_eq!(cv_nested.full_path(), "/mnt/s3/dir1/dir2/file.txt");

// Test 4: CV → UFS (Export) - nested directory
let cv_nested = Path::from_str("/mnt/s3/dir1/dir2/file.txt").unwrap();
let ufs_nested = info.get_ufs_path(&cv_nested).unwrap();
assert_eq!(ufs_nested.full_path(), "s3://flink/user/dir1/dir2/file.txt");

// Test 5: UFS → CV (Import) - special characters in path
let ufs_special =
Path::from_str("s3://flink/user/test_data/dt=2025-01-30/part-00000.parquet").unwrap();
let cv_special = info.get_cv_path(&ufs_special).unwrap();
assert_eq!(
cv_special.full_path(),
"/mnt/s3/test_data/dt=2025-01-30/part-00000.parquet"
);

// Test 6: CV → UFS (Export) - special characters in path
let cv_special =
Path::from_str("/mnt/s3/test_data/dt=2025-01-30/part-00000.parquet").unwrap();
let ufs_special = info.get_ufs_path(&cv_special).unwrap();
assert_eq!(
ufs_special.full_path(),
"s3://flink/user/test_data/dt=2025-01-30/part-00000.parquet"
);

// Test 7: Verify is_cv() detection
assert!(cv_path.is_cv());
assert!(!ufs_path.is_cv());

// Test 8: Round-trip conversion (UFS → CV → UFS)
let original_ufs = Path::from_str("s3://flink/user/data/test.csv").unwrap();
let to_cv = info.get_cv_path(&original_ufs).unwrap();
let back_to_ufs = info.get_ufs_path(&to_cv).unwrap();
assert_eq!(original_ufs.full_path(), back_to_ufs.full_path());

// Test 9: Round-trip conversion (CV → UFS → CV)
let original_cv = Path::from_str("/mnt/s3/data/test.csv").unwrap();
let to_ufs = info.get_ufs_path(&original_cv).unwrap();
let back_to_cv = info.get_cv_path(&to_ufs).unwrap();
assert_eq!(original_cv.full_path(), back_to_cv.full_path());
}
}
20 changes: 7 additions & 13 deletions curvine-common/src/state/ttl_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,12 @@ use serde::{Deserialize, Serialize};
Hash,
)]
pub enum TtlAction {
/// No action is performed.
#[default]
None = 0,

/// Try moving to slow storage such as disk
Move = 1,

/// Move to the underlying storage after expiration
Ufs = 2,

/// Delete after expiration
Delete = 3,
Delete = 1,
Persist = 2,
Evict = 3,
Flush = 4,
}

impl TryFrom<&str> for TtlAction {
Expand All @@ -51,10 +45,10 @@ impl TryFrom<&str> for TtlAction {
fn try_from(value: &str) -> Result<Self, Self::Error> {
let action = match value.to_uppercase().as_str() {
"NONE" => TtlAction::None,
"MOVE" => TtlAction::Move,
"UFS" => TtlAction::Ufs,
"DELETE" => TtlAction::Delete,

"PERSIST" => TtlAction::Persist,
"EVICT" => TtlAction::Evict,
"FLUSH" => TtlAction::Flush,
_ => return err_box!("invalid ttl action: {}", value),
};

Expand Down
5 changes: 3 additions & 2 deletions curvine-s3-gateway/src/utils/s3_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ pub fn fill_ttl_info(head: &mut HeadObjectResult, file_status: &FileStatus) {
head.expiration = Some(
match file_status.storage_policy.ttl_action {
TtlAction::Delete => "expiry-date=\"{}\" rule-id=\"ttl-delete\"",
TtlAction::Move => "expiry-date=\"{}\" rule-id=\"ttl-move\"",
TtlAction::Ufs => "expiry-date=\"{}\" rule-id=\"ttl-ufs\"",
TtlAction::Persist => "expiry-date=\"{}\" rule-id=\"ttl-persist\"",
TtlAction::Evict => "expiry-date=\"{}\" rule-id=\"ttl-evict\"",
TtlAction::Flush => "expiry-date=\"{}\" rule-id=\"ttl-flush\"",
_ => "rule-id=\"no-expiry\"",
}
.to_string(),
Expand Down
25 changes: 17 additions & 8 deletions curvine-server/src/master/fs/master_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::common::UfsFactory;
use crate::master::fs::heartbeat_checker::HeartbeatChecker;
use crate::master::fs::master_filesystem::MasterFilesystem;
use crate::master::job::JobManager;
use crate::master::meta::inode::ttl::ttl_manager::InodeTtlManager;
use crate::master::meta::inode::ttl::ttl_scheduler::TtlHeartbeatChecker;
use crate::master::meta::inode::ttl_scheduler::TtlHeartbeatConfig;
use crate::master::mount::MountManager;
use crate::master::replication::master_replication_manager::MasterReplicationManager;
use crate::master::MasterMonitor;
use curvine_common::executor::ScheduledExecutor;
use log::{error, info};
use log::info;
use orpc::runtime::GroupExecutor;
use orpc::CommonResult;
use std::sync::Arc;
Expand Down Expand Up @@ -56,13 +59,14 @@ impl MasterActor {
self.replication_manager.clone(),
)
.unwrap();

if let Err(e) = self.start_ttl_scheduler() {
error!("Failed to start inode ttl scheduler: {}", e);
}
}

pub fn start_ttl_scheduler(&mut self) -> CommonResult<()> {
pub fn start_ttl_scheduler(
&mut self,
mount_manager: Arc<MountManager>,
factory: Arc<UfsFactory>,
job_manager: Arc<JobManager>,
) -> CommonResult<()> {
info!("Starting inode ttl scheduler.");

let ttl_bucket_list = {
Expand All @@ -71,10 +75,15 @@ impl MasterActor {
fs_dir.get_ttl_bucket_list()
};

let ttl_manager = InodeTtlManager::new(self.fs.clone(), ttl_bucket_list)?;
let ttl_manager = InodeTtlManager::new(
self.fs.clone(),
ttl_bucket_list,
mount_manager,
factory,
job_manager,
)?;
let ttl_manager_arc = Arc::new(ttl_manager);

// TTL manager is ready for use immediately after creation
self.start_ttl_heartbeat_checker(ttl_manager_arc)?;

info!("Inode ttl scheduler started successfully.");
Expand Down
2 changes: 2 additions & 0 deletions curvine-server/src/master/job/job_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl TaskDetail {
}
}

#[derive(Clone)]
pub struct JobContext {
pub info: LoadJobInfo,
pub state: StateCtl,
Expand Down Expand Up @@ -82,6 +83,7 @@ impl JobContext {
ttl_action,
mount_info: mnt.clone(),
create_time: LocalTime::mills() as i64,
overwrite: job_conf.overwrite,
};

JobContext {
Expand Down
15 changes: 11 additions & 4 deletions curvine-server/src/master/job/job_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,10 @@ impl JobManager {

pub fn submit_load_job(&self, command: LoadJobCommand) -> FsResult<LoadJobResult> {
let source_path = Path::from_str(&command.source_path)?;
if source_path.is_cv() {
return err_box!("No need to load cv path");
}

// check mount
// Check mount info for both UFS and CV paths
// - For UFS path: Import (UFS → Curvine)
// - For CV path: Export (Curvine → UFS), requires mount info to determine target UFS
let mnt = if let Some(mnt) = self.mount_manager.get_mount_info(&source_path)? {
mnt
} else {
Expand Down Expand Up @@ -175,6 +174,14 @@ impl JobManager {
) -> FsResult<()> {
self.jobs.update_progress(job_id, task_id, progress)
}

pub fn jobs(&self) -> &JobStore {
&self.jobs
}

pub fn factory(&self) -> &Arc<UfsFactory> {
&self.factory
}
}

struct JobCleanupTask {
Expand Down
Loading