Skip to content

Commit 15526f9

Browse files
yahoNanJingyangzhong
andauthored
Fix stuck issue for the load testing of Push-based task scheduling (#2006)
* Fix concurrently updating ExecutorData * Fix get_available_executors_data() to avoid losing event of SchedulerServerEvent::JobSubmitted in offer_resources() * Fix missing update JobStatus from Queued to Running * Fix for PR review Co-authored-by: yangzhong <yangzhong@ebay.com>
1 parent 1d49dc9 commit 15526f9

File tree

6 files changed

+94
-16
lines changed

6 files changed

+94
-16
lines changed

ballista/rust/core/src/serde/scheduler/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ pub struct ExecutorData {
147147
pub available_task_slots: u32,
148148
}
149149

150+
pub struct ExecutorDataChange {
151+
pub executor_id: String,
152+
pub task_slots: i32,
153+
}
154+
150155
struct ExecutorResourcePair {
151156
total: protobuf::executor_resource::Resource,
152157
available: protobuf::executor_resource::Resource,

ballista/rust/scheduler/src/scheduler_server/event_loop.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use log::{debug, warn};
2424
use ballista_core::error::{BallistaError, Result};
2525
use ballista_core::event_loop::EventAction;
2626
use ballista_core::serde::protobuf::{LaunchTaskParams, TaskDefinition};
27-
use ballista_core::serde::scheduler::ExecutorData;
27+
use ballista_core::serde::scheduler::ExecutorDataChange;
2828
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
2929

3030
use crate::scheduler_server::ExecutorsClient;
@@ -70,12 +70,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
7070
return Ok(Some(SchedulerServerEvent::JobSubmitted(job_id)));
7171
}
7272

73+
let mut executors_data_change: Vec<ExecutorDataChange> = available_executors
74+
.iter()
75+
.map(|executor_data| ExecutorDataChange {
76+
executor_id: executor_data.executor_id.clone(),
77+
task_slots: executor_data.available_task_slots as i32,
78+
})
79+
.collect();
80+
7381
let (tasks_assigment, num_tasks) = self
7482
.state
7583
.fetch_tasks(&mut available_executors, &job_id)
7684
.await?;
85+
for (data_change, data) in executors_data_change
86+
.iter_mut()
87+
.zip(available_executors.iter())
88+
{
89+
data_change.task_slots =
90+
data.available_task_slots as i32 - data_change.task_slots;
91+
}
92+
7793
if num_tasks > 0 {
78-
self.launch_tasks(&available_executors, tasks_assigment)
94+
self.launch_tasks(&executors_data_change, tasks_assigment)
7995
.await?;
8096
}
8197

@@ -84,12 +100,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
84100

85101
async fn launch_tasks(
86102
&self,
87-
executors: &[ExecutorData],
103+
executors: &[ExecutorDataChange],
88104
tasks_assigment: Vec<Vec<TaskDefinition>>,
89105
) -> Result<()> {
90106
for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() {
91107
if !tasks.is_empty() {
92-
let executor_data = &executors[idx_executor];
108+
let executor_data_change = &executors[idx_executor];
93109
debug!(
94110
"Start to launch tasks {:?} to executor {:?}",
95111
tasks
@@ -107,14 +123,17 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
107123
}
108124
})
109125
.collect::<Vec<String>>(),
110-
executor_data.executor_id
126+
executor_data_change.executor_id
111127
);
112128
let mut client = {
113129
let clients = self.executors_client.read().await;
114-
clients.get(&executor_data.executor_id).unwrap().clone()
130+
clients
131+
.get(&executor_data_change.executor_id)
132+
.unwrap()
133+
.clone()
115134
};
116135
// Update the resources first
117-
self.state.save_executor_data(executor_data.clone());
136+
self.state.update_executor_data(executor_data_change);
118137
// TODO check whether launching task is successful or not
119138
client.launch_task(LaunchTaskParams { task: tasks }).await?;
120139
} else {

ballista/rust/scheduler/src/scheduler_server/grpc.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use ballista_core::serde::protobuf::{
3030
TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
3131
};
3232
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
33-
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
33+
use ballista_core::serde::scheduler::{
34+
ExecutorData, ExecutorDataChange, ExecutorMetadata,
35+
};
3436
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
3537
use datafusion::datasource::file_format::parquet::ParquetFormat;
3638
use datafusion::datasource::file_format::FileFormat;
@@ -290,9 +292,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
290292
jobs.insert(task_id.job_id.clone());
291293
}
292294
}
293-
if let Some(mut executor_data) = self.state.get_executor_data(&executor_id) {
294-
executor_data.available_task_slots += num_tasks as u32;
295-
self.state.save_executor_data(executor_data);
295+
296+
if let Some(executor_data) = self.state.get_executor_data(&executor_id) {
297+
self.state.update_executor_data(&ExecutorDataChange {
298+
executor_id: executor_data.executor_id,
299+
task_slots: num_tasks as i32,
300+
});
296301
} else {
297302
error!("Fail to get executor data for {:?}", &executor_id);
298303
}

ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ use std::sync::Arc;
1919
use std::time::Instant;
2020

2121
use async_trait::async_trait;
22-
use log::{debug, error, info};
22+
use log::{debug, error, info, warn};
2323
use tokio::sync::RwLock;
2424

2525
use ballista_core::error::{BallistaError, Result};
2626
use ballista_core::event_loop::{EventAction, EventSender};
27-
use ballista_core::serde::protobuf::{PartitionId, TaskStatus};
27+
use ballista_core::serde::protobuf::{
28+
job_status, JobStatus, PartitionId, RunningJob, TaskStatus,
29+
};
2830
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
2931
use datafusion::logical_plan::LogicalPlan;
3032
use datafusion::physical_plan::ExecutionPlan;
@@ -160,7 +162,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
160162
) -> Result<Option<QueryStageSchedulerEvent>> {
161163
match event {
162164
QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => {
165+
info!("Job {} submitted", job_id);
163166
let plan = self.create_physical_plan(plan).await?;
167+
if let Err(e) = self
168+
.state
169+
.save_job_metadata(
170+
&job_id,
171+
&JobStatus {
172+
status: Some(job_status::Status::Running(RunningJob {})),
173+
},
174+
)
175+
.await
176+
{
177+
warn!("Could not update job {} status to running: {}", job_id, e);
178+
}
164179
self.generate_stages(&job_id, plan).await?;
165180

166181
if let Some(event_sender) = self.event_sender.as_ref() {

ballista/rust/scheduler/src/state/in_memory_state.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
// under the License.
1717

1818
use ballista_core::serde::protobuf::{ExecutorHeartbeat, TaskStatus};
19-
use ballista_core::serde::scheduler::ExecutorData;
19+
use ballista_core::serde::scheduler::{ExecutorData, ExecutorDataChange};
20+
use log::{error, info, warn};
2021
use parking_lot::RwLock;
2122
use std::collections::{HashMap, HashSet};
2223
use std::sync::Arc;
@@ -85,6 +86,33 @@ impl InMemorySchedulerState {
8586
executors_data.insert(executor_data.executor_id.clone(), executor_data);
8687
}
8788

89+
pub(crate) fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) {
90+
let mut executors_data = self.executors_data.write();
91+
if let Some(executor_data) =
92+
executors_data.get_mut(&executor_data_change.executor_id)
93+
{
94+
let available_task_slots = executor_data.available_task_slots as i32
95+
+ executor_data_change.task_slots;
96+
if available_task_slots < 0 {
97+
error!(
98+
"Available task slots {} for executor {} is less than 0",
99+
available_task_slots, executor_data.executor_id
100+
);
101+
} else {
102+
info!(
103+
"available_task_slots for executor {} becomes {}",
104+
executor_data.executor_id, available_task_slots
105+
);
106+
executor_data.available_task_slots = available_task_slots as u32;
107+
}
108+
} else {
109+
warn!(
110+
"Could not find executor data for {}",
111+
executor_data_change.executor_id
112+
);
113+
}
114+
}
115+
88116
pub(crate) fn get_executor_data(&self, executor_id: &str) -> Option<ExecutorData> {
89117
let executors_data = self.executors_data.read();
90118
executors_data.get(executor_id).cloned()
@@ -100,7 +128,8 @@ impl InMemorySchedulerState {
100128
executors_data
101129
.iter()
102130
.filter_map(|(exec, data)| {
103-
alive_executors.contains(exec).then(|| data.clone())
131+
(data.available_task_slots > 0 && alive_executors.contains(exec))
132+
.then(|| data.clone())
104133
})
105134
.collect::<Vec<ExecutorData>>()
106135
};

ballista/rust/scheduler/src/state/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use ballista_core::serde::protobuf::{
3232
FailedTask, JobStatus, RunningJob, RunningTask, TaskStatus,
3333
};
3434
use ballista_core::serde::scheduler::{
35-
ExecutorData, ExecutorMetadata, PartitionId, PartitionStats,
35+
ExecutorData, ExecutorDataChange, ExecutorMetadata, PartitionId, PartitionStats,
3636
};
3737
use ballista_core::serde::{protobuf, AsExecutionPlan, AsLogicalPlan, BallistaCodec};
3838
use datafusion::prelude::ExecutionContext;
@@ -227,6 +227,11 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerState<T,
227227
self.in_memory_state.save_executor_data(executor_data);
228228
}
229229

230+
pub fn update_executor_data(&self, executor_data_change: &ExecutorDataChange) {
231+
self.in_memory_state
232+
.update_executor_data(executor_data_change);
233+
}
234+
230235
pub fn get_available_executors_data(&self) -> Vec<ExecutorData> {
231236
self.in_memory_state.get_available_executors_data()
232237
}

0 commit comments

Comments
 (0)