diff --git a/ballista/rust/core/src/event_loop.rs b/ballista/rust/core/src/event_loop.rs index 225bfce532f9..af18ddcbd966 100644 --- a/ballista/rust/core/src/event_loop.rs +++ b/ballista/rust/core/src/event_loop.rs @@ -37,9 +37,9 @@ pub trait EventAction: Send + Sync { #[derive(Clone)] pub struct EventLoop { - name: String, + pub name: String, + pub buffer_size: usize, stopped: Arc, - buffer_size: usize, action: Arc>, tx_event: Option>, } @@ -52,8 +52,8 @@ impl EventLoop { ) -> Self { Self { name, - stopped: Arc::new(AtomicBool::new(false)), buffer_size, + stopped: Arc::new(AtomicBool::new(false)), action, tx_event: None, } diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs b/ballista/rust/scheduler/src/scheduler_server/grpc.rs index 5dd2a8a0ba4c..fe4eb36a0073 100644 --- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs +++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs @@ -17,7 +17,6 @@ use anyhow::Context; use ballista_core::config::TaskSchedulingPolicy; -use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf::execute_query_params::Query; use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; @@ -26,10 +25,9 @@ use ballista_core::serde::protobuf::scheduler_grpc_server::SchedulerGrpc; use ballista_core::serde::protobuf::{ job_status, ExecuteQueryParams, ExecuteQueryResult, ExecutorHeartbeat, FailedJob, FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, - GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PartitionId, - PollWorkParams, PollWorkResult, QueuedJob, RegisterExecutorParams, - RegisterExecutorResult, RunningJob, TaskDefinition, TaskStatus, - UpdateTaskStatusParams, UpdateTaskStatusResult, + GetJobStatusResult, HeartBeatParams, HeartBeatResult, JobStatus, PollWorkParams, + PollWorkResult, QueuedJob, RegisterExecutorParams, RegisterExecutorResult, + TaskDefinition, UpdateTaskStatusParams, UpdateTaskStatusResult, }; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use ballista_core::serde::scheduler::{ExecutorData, ExecutorMetadata}; @@ -37,19 +35,17 @@ use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::{local::LocalFileSystem, ObjectStore}; -use datafusion::logical_plan::LogicalPlan; -use datafusion::physical_plan::ExecutionPlan; use futures::StreamExt; use log::{debug, error, info, trace, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use std::collections::HashSet; use std::convert::TryInto; use std::sync::Arc; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use tonic::{Request, Response, Status}; -use crate::planner::DistributedPlanner; use crate::scheduler_server::event_loop::SchedulerServerEvent; +use crate::scheduler_server::query_stage_scheduler::QueryStageSchedulerEvent; use crate::scheduler_server::SchedulerServer; #[tonic::async_trait] @@ -422,9 +418,13 @@ impl SchedulerGrpc tonic::Status::internal(format!("Could not save job metadata: {}", e)) })?; - // Create job details for the plan, like stages, tasks, etc - // TODO To achieve more throughput, maybe change it to be event-based processing in the future - match create_job(self, job_id.clone(), plan).await { + match self + .post_event(QueryStageSchedulerEvent::JobSubmitted( + job_id.clone(), + Box::new(plan), + )) + .await + { Err(error) => { let msg = format!("Job {} failed due to {}", job_id, error); warn!("{}", msg); @@ -470,112 +470,6 @@ fn generate_job_id() -> String { .collect() } -async fn create_job( - scheduler_server: &SchedulerServer, - job_id: String, - plan: LogicalPlan, -) -> Result<(), BallistaError> { - // create physical plan using DataFusion - let plan = async move { - let start = Instant::now(); - - let ctx = scheduler_server.ctx.read().await.clone(); - let optimized_plan = ctx.optimize(&plan).map_err(|e| { - let msg = format!("Could not create optimized logical plan: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; - - debug!("Calculated optimized plan: {:?}", optimized_plan); - - let plan = ctx - .create_physical_plan(&optimized_plan) - .await - .map_err(|e| { - let msg = format!("Could not create physical plan: {}", e); - error!("{}", msg); - BallistaError::General(msg) - }); - - info!( - "DataFusion created physical plan in {} milliseconds", - start.elapsed().as_millis() - ); - - plan - } - .await?; - - scheduler_server - .state - .save_job_metadata( - &job_id, - &JobStatus { - status: Some(job_status::Status::Running(RunningJob {})), - }, - ) - .await - .map_err(|e| { - warn!("Could not update job {} status to running: {}", job_id, e); - e - })?; - - // create distributed physical plan using Ballista - let mut planner = DistributedPlanner::new(); - let stages = planner - .plan_query_stages(&job_id, plan) - .await - .map_err(|e| { - let msg = format!("Could not plan query stages: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; - - // save stages into state - for shuffle_writer in stages { - scheduler_server - .state - .save_stage_plan(&job_id, shuffle_writer.stage_id(), shuffle_writer.clone()) - .await - .map_err(|e| { - let msg = format!("Could not save stage plan: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; - let num_partitions = shuffle_writer.output_partitioning().partition_count(); - for partition_id in 0..num_partitions { - let pending_status = TaskStatus { - task_id: Some(PartitionId { - job_id: job_id.clone(), - stage_id: shuffle_writer.stage_id() as u32, - partition_id: partition_id as u32, - }), - status: None, - }; - scheduler_server - .state - .save_task_status(&pending_status) - .await - .map_err(|e| { - let msg = format!("Could not save task status: {}", e); - error!("{}", msg); - BallistaError::General(msg) - })?; - } - } - - if let Some(event_loop) = scheduler_server.event_loop.as_ref() { - // Send job_id to the scheduler channel - event_loop - .get_sender()? - .post_event(SchedulerServerEvent::JobSubmitted(job_id)) - .await - .unwrap(); - }; - - Ok(()) -} - #[cfg(all(test, feature = "sled"))] mod test { use std::sync::Arc; diff --git a/ballista/rust/scheduler/src/scheduler_server/mod.rs b/ballista/rust/scheduler/src/scheduler_server/mod.rs index fdcd375fbb33..9106df7f7aee 100644 --- a/ballista/rust/scheduler/src/scheduler_server/mod.rs +++ b/ballista/rust/scheduler/src/scheduler_server/mod.rs @@ -33,6 +33,9 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext}; use crate::scheduler_server::event_loop::{ SchedulerServerEvent, SchedulerServerEventAction, }; +use crate::scheduler_server::query_stage_scheduler::{ + QueryStageScheduler, QueryStageSchedulerEvent, +}; use crate::state::backend::StateBackendClient; use crate::state::SchedulerState; @@ -45,6 +48,7 @@ pub mod externalscaler { mod event_loop; mod external_scaler; mod grpc; +mod query_stage_scheduler; type ExecutorsClient = Arc>>>; @@ -55,6 +59,7 @@ pub struct SchedulerServer, event_loop: Option>, + query_stage_event_loop: EventLoop, ctx: Arc>, codec: BallistaCodec, } @@ -98,6 +103,10 @@ impl SchedulerServer SchedulerServer SchedulerServer Result<()> { + self.query_stage_event_loop + .get_sender()? + .post_event(event) + .await + } } /// Create a DataFusion context that is compatible with Ballista diff --git a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs new file mode 100644 index 000000000000..5a2800c904d6 --- /dev/null +++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use log::{debug, error, info}; +use tokio::sync::RwLock; + +use ballista_core::error::{BallistaError, Result}; +use ballista_core::event_loop::{EventAction, EventSender}; +use ballista_core::serde::protobuf::{PartitionId, TaskStatus}; +use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan}; +use datafusion::logical_plan::LogicalPlan; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::ExecutionContext; + +use crate::planner::DistributedPlanner; +use crate::scheduler_server::event_loop::SchedulerServerEvent; +use crate::state::SchedulerState; + +#[derive(Clone)] +pub enum QueryStageSchedulerEvent { + JobSubmitted(String, Box), +} + +pub(crate) struct QueryStageScheduler< + T: 'static + AsLogicalPlan, + U: 'static + AsExecutionPlan, +> { + ctx: Arc>, + state: Arc>, + event_sender: Option>, +} + +impl QueryStageScheduler { + pub(crate) fn new( + ctx: Arc>, + state: Arc>, + event_sender: Option>, + ) -> Self { + Self { + ctx, + state, + event_sender, + } + } + + async fn create_physical_plan( + &self, + plan: Box, + ) -> Result> { + let start = Instant::now(); + + let ctx = self.ctx.read().await.clone(); + let optimized_plan = ctx.optimize(plan.as_ref()).map_err(|e| { + let msg = format!("Could not create optimized logical plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + + debug!("Calculated optimized plan: {:?}", optimized_plan); + + let plan = ctx + .create_physical_plan(&optimized_plan) + .await + .map_err(|e| { + let msg = format!("Could not create physical plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + }); + + info!( + "DataFusion created physical plan in {} milliseconds", + start.elapsed().as_millis() + ); + + plan + } + + async fn generate_stages( + &self, + job_id: &str, + plan: Arc, + ) -> Result<()> { + let mut planner = DistributedPlanner::new(); + let stages = planner.plan_query_stages(job_id, plan).await.map_err(|e| { + let msg = format!("Could not plan query stages: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + + // save stages into state + for shuffle_writer in stages { + self.state + .save_stage_plan( + job_id, + shuffle_writer.stage_id(), + shuffle_writer.clone(), + ) + .await + .map_err(|e| { + let msg = format!("Could not save stage plan: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + let num_partitions = shuffle_writer.output_partitioning().partition_count(); + for partition_id in 0..num_partitions { + let pending_status = TaskStatus { + task_id: Some(PartitionId { + job_id: job_id.to_owned(), + stage_id: shuffle_writer.stage_id() as u32, + partition_id: partition_id as u32, + }), + status: None, + }; + self.state + .save_task_status(&pending_status) + .await + .map_err(|e| { + let msg = format!("Could not save task status: {}", e); + error!("{}", msg); + BallistaError::General(msg) + })?; + } + } + + Ok(()) + } +} + +#[async_trait] +impl + EventAction for QueryStageScheduler +{ + // TODO + fn on_start(&self) {} + + // TODO + fn on_stop(&self) {} + + async fn on_receive( + &self, + event: QueryStageSchedulerEvent, + ) -> Result> { + match event { + QueryStageSchedulerEvent::JobSubmitted(job_id, plan) => { + let plan = self.create_physical_plan(plan).await?; + self.generate_stages(&job_id, plan).await?; + + if let Some(event_sender) = self.event_sender.as_ref() { + // Send job_id to the scheduler channel + event_sender + .post_event(SchedulerServerEvent::JobSubmitted(job_id)) + .await?; + }; + } + } + Ok(None) + } + + // TODO + fn on_error(&self, _error: BallistaError) {} +}