Skip to content

Introduce Ballista query stage scheduler #1935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions ballista/rust/core/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ pub trait EventAction<E>: Send + Sync {

#[derive(Clone)]
pub struct EventLoop<E> {
name: String,
pub name: String,
pub buffer_size: usize,
stopped: Arc<AtomicBool>,
buffer_size: usize,
action: Arc<dyn EventAction<E>>,
tx_event: Option<mpsc::Sender<E>>,
}
Expand All @@ -52,8 +52,8 @@ impl<E: Send + 'static> EventLoop<E> {
) -> Self {
Self {
name,
stopped: Arc::new(AtomicBool::new(false)),
buffer_size,
stopped: Arc::new(AtomicBool::new(false)),
action,
tx_event: None,
}
Expand Down
130 changes: 12 additions & 118 deletions ballista/rust/scheduler/src/scheduler_server/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use anyhow::Context;
use ballista_core::config::TaskSchedulingPolicy;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf::execute_query_params::Query;
use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient;
Expand All @@ -26,30 +25,27 @@ use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc;
use ballista_core::serde::protobuf::{
job_status, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, FailedJob,
FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams,
GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PartitionId,
PollWorkParams, PollWorkResult, QueuedJob, RegisterExecutorParams,
RegisterExecutorResult, RunningJob, TaskDefinition, TaskStatus,
UpdateTaskStatusParams, UpdateTaskStatusResult,
GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams,
PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult,
TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata};
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan};
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore};
use datafusion::logical_plan::LogicalPlan;
use datafusion::physical_plan::ExecutionPlan;
use futures::StreamExt;
use log::{debug, error, info, trace, warn};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use std::collections::HashSet;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::time::{SystemTime, UNIX_EPOCH};
use tonic::{Request, Response, Status};

use crate::planner::DistributedPlanner;
use crate::scheduler_server::event_loop::SchedulerServerEvent;
use crate::scheduler_server::query_stage_scheduler::QueryStageSchedulerEvent;
use crate::scheduler_server::SchedulerServer;

#[tonic::async_trait]
Expand Down Expand Up @@ -422,9 +418,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
tonic::Status::internal(format!("Could not save job metadata: {}", e))
})?;

// Create job details for the plan, like stages, tasks, etc
// TODO To achieve more throughput, maybe change it to be event-based processing in the future
match create_job(self, job_id.clone(), plan).await {
match self
.post_event(QueryStageSchedulerEvent::JobSubmitted(
job_id.clone(),
Box::new(plan),
))
.await
{
Err(error) => {
let msg = format!("Job {} failed due to {}", job_id, error);
warn!("{}", msg);
Expand Down Expand Up @@ -470,112 +470,6 @@ fn generate_job_id() -> String {
.collect()
}

async fn create_job<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
scheduler_server: &SchedulerServer<T, U>,
job_id: String,
plan: LogicalPlan,
) -> Result<(), BallistaError> {
// create physical plan using DataFusion
let plan = async move {
let start = Instant::now();

let ctx = scheduler_server.ctx.read().await.clone();
let optimized_plan = ctx.optimize(&plan).map_err(|e| {
let msg = format!("Could not create optimized logical plan: {}", e);
error!("{}", msg);
BallistaError::General(msg)
})?;

debug!("Calculated optimized plan: {:?}", optimized_plan);

let plan = ctx
.create_physical_plan(&optimized_plan)
.await
.map_err(|e| {
let msg = format!("Could not create physical plan: {}", e);
error!("{}", msg);
BallistaError::General(msg)
});

info!(
"DataFusion created physical plan in {} milliseconds",
start.elapsed().as_millis()
);

plan
}
.await?;

scheduler_server
.state
.save_job_metadata(
&job_id,
&JobStatus {
status: Some(job_status::Status::Running(RunningJob {})),
},
)
.await
.map_err(|e| {
warn!("Could not update job {} status to running: {}", job_id, e);
e
})?;

// create distributed physical plan using Ballista
let mut planner = DistributedPlanner::new();
let stages = planner
.plan_query_stages(&job_id, plan)
.await
.map_err(|e| {
let msg = format!("Could not plan query stages: {}", e);
error!("{}", msg);
BallistaError::General(msg)
})?;

// save stages into state
for shuffle_writer in stages {
scheduler_server
.state
.save_stage_plan(&job_id, shuffle_writer.stage_id(), shuffle_writer.clone())
.await
.map_err(|e| {
let msg = format!("Could not save stage plan: {}", e);
error!("{}", msg);
BallistaError::General(msg)
})?;
let num_partitions = shuffle_writer.output_partitioning().partition_count();
for partition_id in 0..num_partitions {
let pending_status = TaskStatus {
task_id: Some(PartitionId {
job_id: job_id.clone(),
stage_id: shuffle_writer.stage_id() as u32,
partition_id: partition_id as u32,
}),
status: None,
};
scheduler_server
.state
.save_task_status(&pending_status)
.await
.map_err(|e| {
let msg = format!("Could not save task status: {}", e);
error!("{}", msg);
BallistaError::General(msg)
})?;
}
}

if let Some(event_loop) = scheduler_server.event_loop.as_ref() {
// Send job_id to the scheduler channel
event_loop
.get_sender()?
.post_event(SchedulerServerEvent::JobSubmitted(job_id))
.await
.unwrap();
};

Ok(())
}

#[cfg(all(test, feature = "sled"))]
mod test {
use std::sync::Arc;
Expand Down
31 changes: 31 additions & 0 deletions ballista/rust/scheduler/src/scheduler_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};
use crate::scheduler_server::event_loop::{
SchedulerServerEvent, SchedulerServerEventAction,
};
use crate::scheduler_server::query_stage_scheduler::{
QueryStageScheduler, QueryStageSchedulerEvent,
};
use crate::state::backend::StateBackendClient;
use crate::state::SchedulerState;

Expand All @@ -45,6 +48,7 @@ pub mod externalscaler {
mod event_loop;
mod external_scaler;
mod grpc;
mod query_stage_scheduler;

type ExecutorsClient = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;

Expand All @@ -55,6 +59,7 @@ pub struct SchedulerServer<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
policy: TaskSchedulingPolicy,
executors_client: Option<ExecutorsClient>,
event_loop: Option<EventLoop<SchedulerServerEvent>>,
query_stage_event_loop: EventLoop<QueryStageSchedulerEvent>,
ctx: Arc<RwLock<ExecutionContext>>,
codec: BallistaCodec<T, U>,
}
Expand Down Expand Up @@ -98,6 +103,10 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
} else {
(None, None)
};
let query_stage_scheduler =
Arc::new(QueryStageScheduler::new(ctx.clone(), state.clone(), None));
let query_stage_event_loop =
EventLoop::new("query_stage".to_owned(), 10000, query_stage_scheduler);
Self {
state,
start_time: SystemTime::now()
Expand All @@ -107,6 +116,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
policy,
executors_client,
event_loop,
query_stage_event_loop,
ctx,
codec,
}
Expand All @@ -122,11 +132,32 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
{
if let Some(event_loop) = self.event_loop.as_mut() {
event_loop.start()?;

let query_stage_scheduler = Arc::new(QueryStageScheduler::new(
self.ctx.clone(),
self.state.clone(),
Some(event_loop.get_sender()?),
));
let query_stage_event_loop = EventLoop::new(
self.query_stage_event_loop.name.clone(),
self.query_stage_event_loop.buffer_size,
query_stage_scheduler,
);
self.query_stage_event_loop = query_stage_event_loop;
}

self.query_stage_event_loop.start()?;
}

Ok(())
}

async fn post_event(&self, event: QueryStageSchedulerEvent) -> Result<()> {
self.query_stage_event_loop
.get_sender()?
.post_event(event)
.await
}
}

/// Create a DataFusion context that is compatible with Ballista
Expand Down
Loading