Skip to content

Commit 4fb1271

Browse files
committed
publish task progress
1 parent 517cc5f commit 4fb1271

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

proto/engine.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ message TaskRequest {
2626
// bytes payload = 2;
2727
}
2828
message Task {
29+
string id = 4; // the task unique identifier
2930
bytes task_payload = 1;
3031
string task_id = 2; // namespace:task
3132
bytes payload = 3;

src/bin/server.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,15 @@ impl Engine for EngineService {
133133
bytes: task_payload.clone(),
134134
user_id: uid.clone(),
135135
given_at: Utc::now(),
136-
id: ttask.id,
136+
id: ttask.id.clone(),
137137
});
138138
api.executing_tasks
139139
.tasks
140140
.insert(ID(namespace, task_name), exec_tsks);
141141
let store = bincode::serialize(&api.executing_tasks.clone()).unwrap();
142142
api.db.insert("executing_tasks", store).unwrap();
143143
let response = proto::Task {
144+
id: ttask.id,
144145
task_id: input.task_id.clone(),
145146
task_payload,
146147
payload: Vec::new(),
@@ -151,6 +152,47 @@ impl Engine for EngineService {
151152
&self,
152153
request: tonic::Request<proto::Task>,
153154
) -> Result<tonic::Response<proto::Empty>, tonic::Status> {
155+
let mut api = self.EngineAPI.write().await;
156+
let challenge = get_auth(&request);
157+
let uid = get_uid(&request);
158+
let db = api.db.clone();
159+
160+
let task_id = request.get_ref().task_id.clone();
161+
let alen = &task_id.split(":").collect::<Vec<&str>>().len();
162+
if *alen != 2 {
163+
return Err(Status::invalid_argument("Invalid Params"));
164+
}
165+
let namespace = &task_id.split(":").collect::<Vec<&str>>()[0];
166+
let task_name = &task_id.split(":").collect::<Vec<&str>>()[1];
167+
168+
if !Events::CheckAuth(&mut api, uid.clone(), challenge, db) {
169+
info!("Aquire Task denied due to Invalid Auth");
170+
return Err(Status::permission_denied("invalid auth"));
171+
};
172+
let mem_tsk = api
173+
.executing_tasks
174+
.tasks
175+
.get(&ID(namespace, task_name))
176+
.unwrap();
177+
let tsk = mem_tsk
178+
.iter()
179+
.find(|f| f.id == task_id.clone() && f.user_id == uid.clone());
180+
if let Some(tsk) = tsk {
181+
// Exec Tasks -> DB
182+
let mut nmem_tsk = mem_tsk.clone();
183+
nmem_tsk.retain(|f| f.id != task_id.clone() && f.user_id != uid.clone());
184+
api.executing_tasks
185+
.tasks
186+
.insert(ID(namespace, task_name), nmem_tsk.clone());
187+
let t_mem_execs = api.executing_tasks.clone();
188+
api.db
189+
.insert("executing_tasks", bincode::serialize(&t_mem_execs).unwrap())
190+
.unwrap();
191+
// tsk-> solved Tsks
192+
// Solved tsks -> DB
193+
} else {
194+
return Err(tonic::Status::not_found("Invalid taskid or userid"));
195+
}
154196
Err(tonic::Status::ok("message"))
155197
}
156198
async fn create_task(

0 commit comments

Comments
 (0)