Skip to content

Commit 0843f60

Browse files
committed
feat: implement bidirectional LoadJob support
1 parent 97855d5 commit 0843f60

File tree

6 files changed

+207
-39
lines changed

6 files changed

+207
-39
lines changed

curvine-common/src/state/job.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,13 @@ pub struct JobStatus {
7777
#[derive(Default, Debug, Deserialize, Serialize)]
7878
pub struct LoadJobCommand {
7979
pub source_path: String,
80+
pub target_path: Option<String>,
8081
pub replicas: Option<i32>,
8182
pub block_size: Option<i64>,
8283
pub storage_type: Option<StorageType>,
8384
pub ttl_ms: Option<i64>,
8485
pub ttl_action: Option<TtlAction>,
86+
pub overwrite: Option<bool>,
8587
}
8688

8789
impl LoadJobCommand {
@@ -93,11 +95,13 @@ impl LoadJobCommand {
9395
#[derive(Default)]
9496
pub struct LoadJobCommandBuilder {
9597
source_path: String,
98+
target_path: Option<String>,
9699
replicas: Option<i32>,
97100
block_size: Option<i64>,
98101
storage_type: Option<StorageType>,
99102
ttl_ms: Option<i64>,
100103
ttl_action: Option<TtlAction>,
104+
overwrite: Option<bool>,
101105
}
102106

103107
impl LoadJobCommandBuilder {
@@ -108,6 +112,11 @@ impl LoadJobCommandBuilder {
108112
}
109113
}
110114

115+
pub fn target_path(mut self, target_path: impl Into<String>) -> Self {
116+
let _ = self.target_path.insert(target_path.into());
117+
self
118+
}
119+
111120
pub fn replicas(mut self, replicas: i32) -> Self {
112121
let _ = self.replicas.insert(replicas);
113122
self
@@ -133,14 +142,21 @@ impl LoadJobCommandBuilder {
133142
self
134143
}
135144

145+
pub fn overwrite(mut self, overwrite: bool) -> Self {
146+
let _ = self.overwrite.insert(overwrite);
147+
self
148+
}
149+
136150
pub fn build(self) -> LoadJobCommand {
137151
LoadJobCommand {
138152
source_path: self.source_path,
153+
target_path: self.target_path,
139154
replicas: self.replicas,
140155
block_size: self.block_size,
141156
storage_type: self.storage_type,
142157
ttl_ms: self.ttl_ms,
143158
ttl_action: self.ttl_action,
159+
overwrite: self.overwrite,
144160
}
145161
}
146162
}
@@ -157,6 +173,7 @@ pub struct LoadJobInfo {
157173
pub ttl_action: TtlAction,
158174
pub mount_info: MountInfo,
159175
pub create_time: i64,
176+
pub overwrite: Option<bool>,
160177
}
161178

162179
#[derive(Debug, Clone, Serialize, Deserialize)]

curvine-common/src/state/mount.rs

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,4 +347,74 @@ mod tests {
347347
"s3://spark/a/b/c/dt=2025/1.csv"
348348
);
349349
}
350+
351+
#[test]
352+
fn test_bidirectional_path_conversion() {
353+
// Mount config: s3://flink/user → /mnt/s3
354+
let info = MountInfo {
355+
ufs_path: "s3://flink/user".to_string(),
356+
cv_path: "/mnt/s3".to_string(),
357+
..Default::default()
358+
};
359+
360+
// Test 1: UFS → CV (Import) - root level file
361+
let ufs_path = Path::from_str("s3://flink/user/batch_add_path_migrate_task.py").unwrap();
362+
let cv_result = info.get_cv_path(&ufs_path).unwrap();
363+
assert_eq!(
364+
cv_result.full_path(),
365+
"/mnt/s3/batch_add_path_migrate_task.py"
366+
);
367+
368+
// Test 2: CV → UFS (Export) - root level file
369+
let cv_path = Path::from_str("/mnt/s3/batch_add_path_migrate_task.py").unwrap();
370+
let ufs_result = info.get_ufs_path(&cv_path).unwrap();
371+
assert_eq!(
372+
ufs_result.full_path(),
373+
"s3://flink/user/batch_add_path_migrate_task.py"
374+
);
375+
376+
// Test 3: UFS → CV (Import) - nested directory
377+
let ufs_nested = Path::from_str("s3://flink/user/dir1/dir2/file.txt").unwrap();
378+
let cv_nested = info.get_cv_path(&ufs_nested).unwrap();
379+
assert_eq!(cv_nested.full_path(), "/mnt/s3/dir1/dir2/file.txt");
380+
381+
// Test 4: CV → UFS (Export) - nested directory
382+
let cv_nested = Path::from_str("/mnt/s3/dir1/dir2/file.txt").unwrap();
383+
let ufs_nested = info.get_ufs_path(&cv_nested).unwrap();
384+
assert_eq!(ufs_nested.full_path(), "s3://flink/user/dir1/dir2/file.txt");
385+
386+
// Test 5: UFS → CV (Import) - special characters in path
387+
let ufs_special =
388+
Path::from_str("s3://flink/user/test_data/dt=2025-01-30/part-00000.parquet").unwrap();
389+
let cv_special = info.get_cv_path(&ufs_special).unwrap();
390+
assert_eq!(
391+
cv_special.full_path(),
392+
"/mnt/s3/test_data/dt=2025-01-30/part-00000.parquet"
393+
);
394+
395+
// Test 6: CV → UFS (Export) - special characters in path
396+
let cv_special =
397+
Path::from_str("/mnt/s3/test_data/dt=2025-01-30/part-00000.parquet").unwrap();
398+
let ufs_special = info.get_ufs_path(&cv_special).unwrap();
399+
assert_eq!(
400+
ufs_special.full_path(),
401+
"s3://flink/user/test_data/dt=2025-01-30/part-00000.parquet"
402+
);
403+
404+
// Test 7: Verify is_cv() detection
405+
assert!(cv_path.is_cv());
406+
assert!(!ufs_path.is_cv());
407+
408+
// Test 8: Round-trip conversion (UFS → CV → UFS)
409+
let original_ufs = Path::from_str("s3://flink/user/data/test.csv").unwrap();
410+
let to_cv = info.get_cv_path(&original_ufs).unwrap();
411+
let back_to_ufs = info.get_ufs_path(&to_cv).unwrap();
412+
assert_eq!(original_ufs.full_path(), back_to_ufs.full_path());
413+
414+
// Test 9: Round-trip conversion (CV → UFS → CV)
415+
let original_cv = Path::from_str("/mnt/s3/data/test.csv").unwrap();
416+
let to_ufs = info.get_ufs_path(&original_cv).unwrap();
417+
let back_to_cv = info.get_cv_path(&to_ufs).unwrap();
418+
assert_eq!(original_cv.full_path(), back_to_cv.full_path());
419+
}
350420
}

curvine-server/src/master/job/job_context.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl JobContext {
8282
ttl_action,
8383
mount_info: mnt.clone(),
8484
create_time: LocalTime::mills() as i64,
85+
overwrite: job_conf.overwrite,
8586
};
8687

8788
JobContext {

curvine-server/src/master/job/job_manager.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,10 @@ impl JobManager {
110110

111111
pub fn submit_load_job(&self, command: LoadJobCommand) -> FsResult<LoadJobResult> {
112112
let source_path = Path::from_str(&command.source_path)?;
113-
if source_path.is_cv() {
114-
return err_box!("No need to load cv path");
115-
}
116113

117-
// check mount
114+
// Check mount info for both UFS and CV paths
115+
// - For UFS path: Import (UFS → Curvine)
116+
// - For CV path: Export (Curvine → UFS), requires mount info to determine target UFS
118117
let mnt = if let Some(mnt) = self.mount_manager.get_mount_info(&source_path)? {
119118
mnt
120119
} else {

curvine-server/src/master/job/job_runner.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::common::UfsFactory;
1616
use crate::master::fs::policy::ChooseContext;
1717
use crate::master::fs::MasterFilesystem;
1818
use crate::master::{JobContext, JobStore, TaskDetail};
19-
use curvine_client::unified::UfsFileSystem;
2019
use curvine_common::conf::ClientConf;
2120
use curvine_common::error::FsError;
2221
use curvine_common::fs::{FileSystem, Path};
@@ -110,18 +109,30 @@ impl LoadJobRunner {
110109
mnt: MountInfo,
111110
) -> FsResult<LoadJobResult> {
112111
let source_path = Path::from_str(&command.source_path)?;
113-
let target_path = mnt.get_cv_path(&source_path)?;
112+
113+
let target_path = if let Some(ref target) = command.target_path {
114+
Path::from_str(target)?
115+
} else {
116+
if source_path.is_cv() {
117+
mnt.get_ufs_path(&source_path)?
118+
} else {
119+
mnt.get_cv_path(&source_path)?
120+
}
121+
};
114122

115123
let job_id = Self::create_job_id(source_path.full_path());
116124
let result = LoadJobResult {
117125
job_id: job_id.clone(),
118126
target_path: target_path.clone_path(),
119127
};
120128

121-
let ufs = self.factory.get_ufs(&mnt)?;
122-
let source_status = ufs.get_status(&source_path).await?;
129+
let source_status = if source_path.is_cv() {
130+
self.master_fs.file_status(source_path.path())?
131+
} else {
132+
let ufs = self.factory.get_ufs(&mnt)?;
133+
ufs.get_status(&source_path).await?
134+
};
123135

124-
// check job status
125136
if self.check_job_exists(&job_id, &source_status, &target_path) {
126137
info!(
127138
"job {}, source_path {} already exists",
@@ -136,13 +147,13 @@ impl LoadJobRunner {
136147
&command,
137148
job_id.clone(),
138149
source_path.clone_uri(),
139-
target_path.clone_path(),
150+
target_path.clone_uri(),
140151
&mnt,
141152
&ClientConf::default(),
142153
);
143154

144155
let res = self
145-
.create_all_tasks(&mut job_context, source_status, &ufs, &mnt)
156+
.create_all_tasks(&mut job_context, source_status, &mnt)
146157
.await;
147158

148159
match res {
@@ -189,7 +200,6 @@ impl LoadJobRunner {
189200
&self,
190201
job: &mut JobContext,
191202
source_status: FileStatus,
192-
ufs: &UfsFileSystem,
193203
mnt: &MountInfo,
194204
) -> FsResult<i64> {
195205
job.update_state(JobTaskState::Pending, "Assigning workers");
@@ -199,18 +209,46 @@ impl LoadJobRunner {
199209
let mut stack = LinkedList::new();
200210
let mut task_index = 0;
201211
stack.push_back(source_status);
212+
213+
// Get target base path for direction detection
214+
let target_base = Path::from_str(&job.info.target_path)?;
215+
202216
while let Some(status) = stack.pop_front() {
203217
if status.is_dir {
218+
// List directory based on path type
204219
let dir_path = Path::from_str(status.path)?;
205-
let childs = ufs.list_status(&dir_path).await?;
220+
let childs = if dir_path.is_cv() {
221+
// Traverse Curvine directory
222+
self.master_fs.list_status(dir_path.path())?
223+
} else {
224+
// Traverse UFS directory
225+
let ufs = self.factory.get_ufs(mnt)?;
226+
ufs.list_status(&dir_path).await?
227+
};
228+
206229
for child in childs {
207230
stack.push_back(child);
208231
}
209232
} else {
210233
let worker = self.choose_worker(block_size)?;
211234

212235
let source_path = Path::from_str(status.path)?;
213-
let target_path = mnt.get_cv_path(&source_path)?;
236+
237+
// Calculate target_path based on source and target types
238+
let target_path = if source_path.is_cv() && !target_base.is_cv() {
239+
// Export: Curvine → UFS
240+
mnt.get_ufs_path(&source_path)?
241+
} else if !source_path.is_cv() && target_base.is_cv() {
242+
// Import: UFS → Curvine
243+
mnt.get_cv_path(&source_path)?
244+
} else {
245+
// Same type (Curvine→Curvine or UFS→UFS), not supported yet
246+
return err_box!(
247+
"Unsupported path combination: source={}, target={}",
248+
source_path.full_path(),
249+
target_base.full_path()
250+
);
251+
};
214252

215253
let task_id = format!("{}_task_{}", job.info.job_id, task_index);
216254
task_index += 1;
@@ -221,7 +259,7 @@ impl LoadJobRunner {
221259
task_id: task_id.clone(),
222260
worker: worker.clone(),
223261
source_path: source_path.clone_uri(),
224-
target_path: target_path.clone_path(),
262+
target_path: target_path.clone_uri(),
225263
create_time: LocalTime::mills() as i64,
226264
};
227265
job.add_task(task.clone());

0 commit comments

Comments
 (0)