Skip to content

Commit e160520

Browse files
Introduce query stage scheduler (#1935)
Co-authored-by: yangzhong <[email protected]>
1 parent cb00a2d commit e160520

File tree

4 files changed

+225
-121
lines changed

4 files changed

+225
-121
lines changed

ballista/rust/core/src/event_loop.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ pub trait EventAction<E>: Send + Sync {
3737

3838
#[derive(Clone)]
3939
pub struct EventLoop<E> {
40-
name: String,
40+
pub name: String,
41+
pub buffer_size: usize,
4142
stopped: Arc<AtomicBool>,
42-
buffer_size: usize,
4343
action: Arc<dyn EventAction<E>>,
4444
tx_event: Option<mpsc::Sender<E>>,
4545
}
@@ -52,8 +52,8 @@ impl<E: Send + 'static> EventLoop<E> {
5252
) -> Self {
5353
Self {
5454
name,
55-
stopped: Arc::new(AtomicBool::new(false)),
5655
buffer_size,
56+
stopped: Arc::new(AtomicBool::new(false)),
5757
action,
5858
tx_event: None,
5959
}

ballista/rust/scheduler/src/scheduler_server/grpc.rs

Lines changed: 12 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
use anyhow::Context;
1919
use ballista_core::config::TaskSchedulingPolicy;
20-
use ballista_core::error::BallistaError;
2120
use ballista_core::execution_plans::ShuffleWriterExec;
2221
use ballista_core::serde::protobuf::execute_query_params::Query;
2322
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
@@ -26,30 +25,27 @@ use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
2625
use ballista_core::serde::protobuf::{
2726
job_status, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, FailedJob,
2827
FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams,
29-
GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PartitionId,
30-
PollWorkParams, PollWorkResult, QueuedJob, RegisterExecutorParams,
31-
RegisterExecutorResult, RunningJob, TaskDefinition, TaskStatus,
32-
UpdateTaskStatusParams, UpdateTaskStatusResult,
28+
GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams,
29+
PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult,
30+
TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
3331
};
3432
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
3533
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
3634
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
3735
use datafusion::datasource::file_format::parquet::ParquetFormat;
3836
use datafusion::datasource::file_format::FileFormat;
3937
use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore};
40-
use datafusion::logical_plan::LogicalPlan;
41-
use datafusion::physical_plan::ExecutionPlan;
4238
use futures::StreamExt;
4339
use log::{debug, error, info, trace, warn};
4440
use rand::{distributions::Alphanumeric, thread_rng, Rng};
4541
use std::collections::HashSet;
4642
use std::convert::TryInto;
4743
use std::sync::Arc;
48-
use std::time::{Instant, SystemTime, UNIX_EPOCH};
44+
use std::time::{SystemTime, UNIX_EPOCH};
4945
use tonic::{Request, Response, Status};
5046

51-
use crate::planner::DistributedPlanner;
5247
use crate::scheduler_server::event_loop::SchedulerServerEvent;
48+
use crate::scheduler_server::query_stage_scheduler::QueryStageSchedulerEvent;
5349
use crate::scheduler_server::SchedulerServer;
5450

5551
#[tonic::async_trait]
@@ -422,9 +418,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
422418
tonic::Status::internal(format!("Could not save job metadata: {}", e))
423419
})?;
424420

425-
// Create job details for the plan, like stages, tasks, etc
426-
// TODO To achieve more throughput, maybe change it to be event-based processing in the future
427-
match create_job(self, job_id.clone(), plan).await {
421+
match self
422+
.post_event(QueryStageSchedulerEvent::JobSubmitted(
423+
job_id.clone(),
424+
Box::new(plan),
425+
))
426+
.await
427+
{
428428
Err(error) => {
429429
let msg = format!("Job {} failed due to {}", job_id, error);
430430
warn!("{}", msg);
@@ -470,112 +470,6 @@ fn generate_job_id() -> String {
470470
.collect()
471471
}
472472

473-
async fn create_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
474-
scheduler_server: &SchedulerServer<T, U>,
475-
job_id: String,
476-
plan: LogicalPlan,
477-
) -> Result<(), BallistaError> {
478-
// create physical plan using DataFusion
479-
let plan = async move {
480-
let start = Instant::now();
481-
482-
let ctx = scheduler_server.ctx.read().await.clone();
483-
let optimized_plan = ctx.optimize(&plan).map_err(|e| {
484-
let msg = format!("Could not create optimized logical plan: {}", e);
485-
error!("{}", msg);
486-
BallistaError::General(msg)
487-
})?;
488-
489-
debug!("Calculated optimized plan: {:?}", optimized_plan);
490-
491-
let plan = ctx
492-
.create_physical_plan(&optimized_plan)
493-
.await
494-
.map_err(|e| {
495-
let msg = format!("Could not create physical plan: {}", e);
496-
error!("{}", msg);
497-
BallistaError::General(msg)
498-
});
499-
500-
info!(
501-
"DataFusion created physical plan in {} milliseconds",
502-
start.elapsed().as_millis()
503-
);
504-
505-
plan
506-
}
507-
.await?;
508-
509-
scheduler_server
510-
.state
511-
.save_job_metadata(
512-
&job_id,
513-
&JobStatus {
514-
status: Some(job_status::Status::Running(RunningJob {})),
515-
},
516-
)
517-
.await
518-
.map_err(|e| {
519-
warn!("Could not update job {} status to running: {}", job_id, e);
520-
e
521-
})?;
522-
523-
// create distributed physical plan using Ballista
524-
let mut planner = DistributedPlanner::new();
525-
let stages = planner
526-
.plan_query_stages(&job_id, plan)
527-
.await
528-
.map_err(|e| {
529-
let msg = format!("Could not plan query stages: {}", e);
530-
error!("{}", msg);
531-
BallistaError::General(msg)
532-
})?;
533-
534-
// save stages into state
535-
for shuffle_writer in stages {
536-
scheduler_server
537-
.state
538-
.save_stage_plan(&job_id, shuffle_writer.stage_id(), shuffle_writer.clone())
539-
.await
540-
.map_err(|e| {
541-
let msg = format!("Could not save stage plan: {}", e);
542-
error!("{}", msg);
543-
BallistaError::General(msg)
544-
})?;
545-
let num_partitions = shuffle_writer.output_partitioning().partition_count();
546-
for partition_id in 0..num_partitions {
547-
let pending_status = TaskStatus {
548-
task_id: Some(PartitionId {
549-
job_id: job_id.clone(),
550-
stage_id: shuffle_writer.stage_id() as u32,
551-
partition_id: partition_id as u32,
552-
}),
553-
status: None,
554-
};
555-
scheduler_server
556-
.state
557-
.save_task_status(&pending_status)
558-
.await
559-
.map_err(|e| {
560-
let msg = format!("Could not save task status: {}", e);
561-
error!("{}", msg);
562-
BallistaError::General(msg)
563-
})?;
564-
}
565-
}
566-
567-
if let Some(event_loop) = scheduler_server.event_loop.as_ref() {
568-
// Send job_id to the scheduler channel
569-
event_loop
570-
.get_sender()?
571-
.post_event(SchedulerServerEvent::JobSubmitted(job_id))
572-
.await
573-
.unwrap();
574-
};
575-
576-
Ok(())
577-
}
578-
579473
#[cfg(all(test, feature = "sled"))]
580474
mod test {
581475
use std::sync::Arc;

ballista/rust/scheduler/src/scheduler_server/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};
3333
use crate::scheduler_server::event_loop::{
3434
SchedulerServerEvent, SchedulerServerEventAction,
3535
};
36+
use crate::scheduler_server::query_stage_scheduler::{
37+
QueryStageScheduler, QueryStageSchedulerEvent,
38+
};
3639
use crate::state::backend::StateBackendClient;
3740
use crate::state::SchedulerState;
3841

@@ -45,6 +48,7 @@ pub mod externalscaler {
4548
mod event_loop;
4649
mod external_scaler;
4750
mod grpc;
51+
mod query_stage_scheduler;
4852

4953
type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
5054

@@ -55,6 +59,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
5559
policy: TaskSchedulingPolicy,
5660
executors_client: Option<ExecutorsClient>,
5761
event_loop: Option<EventLoop<SchedulerServerEvent>>,
62+
query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
5863
ctx: Arc<RwLock<ExecutionContext>>,
5964
codec: BallistaCodec<T, U>,
6065
}
@@ -98,6 +103,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
98103
} else {
99104
(None, None)
100105
};
106+
let query_stage_scheduler =
107+
Arc::new(QueryStageScheduler::new(ctx.clone(), state.clone(), None));
108+
let query_stage_event_loop =
109+
EventLoop::new("query_stage".to_owned(), 10000, query_stage_scheduler);
101110
Self {
102111
state,
103112
start_time: SystemTime::now()
@@ -107,6 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
107116
policy,
108117
executors_client,
109118
event_loop,
119+
query_stage_event_loop,
110120
ctx,
111121
codec,
112122
}
@@ -122,11 +132,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
122132
{
123133
if let Some(event_loop) = self.event_loop.as_mut() {
124134
event_loop.start()?;
135+
136+
let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
137+
self.ctx.clone(),
138+
self.state.clone(),
139+
Some(event_loop.get_sender()?),
140+
));
141+
let query_stage_event_loop = EventLoop::new(
142+
self.query_stage_event_loop.name.clone(),
143+
self.query_stage_event_loop.buffer_size,
144+
query_stage_scheduler,
145+
);
146+
self.query_stage_event_loop = query_stage_event_loop;
125147
}
148+
149+
self.query_stage_event_loop.start()?;
126150
}
127151

128152
Ok(())
129153
}
154+
155+
async fn post_event(&self, event: QueryStageSchedulerEvent) -> Result<()> {
156+
self.query_stage_event_loop
157+
.get_sender()?
158+
.post_event(event)
159+
.await
160+
}
130161
}
131162

132163
/// Create a DataFusion context that is compatible with Ballista

0 commit comments

Comments
 (0)