Skip to content

Commit 1bedaf3

Browse files
Abstract over logical and physical plan representations in Ballista (#1677)
* Abstract over logical plan representation in Ballista * Linter fix * Create trait for ExecutionPlan serializable representation and make the ExecutionContext global to the server * Make FileScanConfig decoding context aware * Clippy fix * More clippy issues and use global context everywehere * Fix various cases in which we were still using the old codec * Remove unused code and add test case for logical plan round trip with custom object store * clippy is a cruel tyrant * Decode execution plan from raw bytes in protobuf message * clippy fixes * Remove commented code * Add LogicalExtensionCodec design * Implement extension codecs for serializing user-defined logical and physical plans * appease clippy * remove commented code
1 parent 1431ef3 commit 1bedaf3

File tree

25 files changed

+2971
-1580
lines changed

25 files changed

+2971
-1580
lines changed

ballista/rust/client/src/context.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::path::PathBuf;
2525
use std::sync::Arc;
2626

2727
use ballista_core::config::BallistaConfig;
28+
use ballista_core::serde::protobuf::LogicalPlanNode;
2829
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;
2930

3031
use datafusion::catalog::TableReference;
@@ -144,7 +145,7 @@ impl BallistaContext {
144145
// use local DataFusion context for now but later this might call the scheduler
145146
let mut ctx = {
146147
let guard = self.state.lock();
147-
create_df_ctx_with_ballista_query_planner(
148+
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
148149
&guard.scheduler_host,
149150
guard.scheduler_port,
150151
guard.config(),
@@ -164,7 +165,7 @@ impl BallistaContext {
164165
// use local DataFusion context for now but later this might call the scheduler
165166
let mut ctx = {
166167
let guard = self.state.lock();
167-
create_df_ctx_with_ballista_query_planner(
168+
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
168169
&guard.scheduler_host,
169170
guard.scheduler_port,
170171
guard.config(),
@@ -188,7 +189,7 @@ impl BallistaContext {
188189
// use local DataFusion context for now but later this might call the scheduler
189190
let mut ctx = {
190191
let guard = self.state.lock();
191-
create_df_ctx_with_ballista_query_planner(
192+
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
192193
&guard.scheduler_host,
193194
guard.scheduler_port,
194195
guard.config(),
@@ -282,7 +283,7 @@ impl BallistaContext {
282283
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
283284
let mut ctx = {
284285
let state = self.state.lock();
285-
create_df_ctx_with_ballista_query_planner(
286+
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
286287
&state.scheduler_host,
287288
state.scheduler_port,
288289
state.config(),

ballista/rust/core/proto/ballista.proto

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,15 @@ message LogicalPlanNode {
266266
AnalyzeNode analyze = 14;
267267
CrossJoinNode cross_join = 15;
268268
ValuesNode values = 16;
269+
LogicalExtensionNode extension = 17;
269270
}
270271
}
271272

273+
message LogicalExtensionNode {
274+
bytes node = 1;
275+
repeated LogicalPlanNode inputs = 2;
276+
}
277+
272278
message ProjectionColumns {
273279
repeated string columns = 1;
274280
}
@@ -488,9 +494,15 @@ message PhysicalPlanNode {
488494
ShuffleWriterExecNode shuffle_writer = 18;
489495
CrossJoinExecNode cross_join = 19;
490496
AvroScanExecNode avro_scan = 20;
497+
PhysicalExtensionNode extension = 21;
491498
}
492499
}
493500

501+
message PhysicalExtensionNode {
502+
bytes node = 1;
503+
repeated PhysicalPlanNode inputs = 2;
504+
}
505+
494506
// physical expressions
495507
message PhysicalExprNode {
496508
oneof ExprType {
@@ -934,7 +946,7 @@ message PollWorkParams {
934946

935947
message TaskDefinition {
936948
PartitionId task_id = 1;
937-
PhysicalPlanNode plan = 2;
949+
bytes plan = 2;
938950
// Output partition for shuffle writer
939951
PhysicalHashRepartition output_partitioning = 3;
940952
}
@@ -980,7 +992,7 @@ message UpdateTaskStatusResult {
980992

981993
message ExecuteQueryParams {
982994
oneof query {
983-
LogicalPlanNode logical_plan = 1;
995+
bytes logical_plan = 1;
984996
string sql = 2;
985997
}
986998
repeated KeyValuePair settings = 3;

ballista/rust/core/src/execution_plans/distributed_query.rs

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
use std::any::Any;
1919
use std::convert::TryInto;
20+
use std::fmt::Debug;
21+
use std::marker::PhantomData;
22+
use std::marker::Send;
2023
use std::pin::Pin;
2124
use std::sync::Arc;
2225
use std::time::Duration;
@@ -39,6 +42,7 @@ use datafusion::physical_plan::{
3942
SendableRecordBatchStream, Statistics,
4043
};
4144

45+
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
4246
use async_trait::async_trait;
4347
use datafusion::execution::runtime_env::RuntimeEnv;
4448
use futures::future;
@@ -50,27 +54,64 @@ use log::{error, info};
5054
/// batches directly from the executors that hold the results from the final
5155
/// query stage.
5256
#[derive(Debug, Clone)]
53-
pub struct DistributedQueryExec {
57+
pub struct DistributedQueryExec<T: 'static + AsLogicalPlan> {
5458
/// Ballista scheduler URL
5559
scheduler_url: String,
5660
/// Ballista configuration
5761
config: BallistaConfig,
5862
/// Logical plan to execute
5963
plan: LogicalPlan,
64+
/// Codec for LogicalPlan extensions
65+
extension_codec: Arc<dyn LogicalExtensionCodec>,
66+
/// Phantom data for serializable plan message
67+
plan_repr: PhantomData<T>,
6068
}
6169

62-
impl DistributedQueryExec {
70+
impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
6371
pub fn new(scheduler_url: String, config: BallistaConfig, plan: LogicalPlan) -> Self {
6472
Self {
6573
scheduler_url,
6674
config,
6775
plan,
76+
extension_codec: Arc::new(DefaultLogicalExtensionCodec {}),
77+
plan_repr: PhantomData,
78+
}
79+
}
80+
81+
pub fn with_extension(
82+
scheduler_url: String,
83+
config: BallistaConfig,
84+
plan: LogicalPlan,
85+
extension_codec: Arc<dyn LogicalExtensionCodec>,
86+
) -> Self {
87+
Self {
88+
scheduler_url,
89+
config,
90+
plan,
91+
extension_codec,
92+
plan_repr: PhantomData,
93+
}
94+
}
95+
96+
pub fn with_repr(
97+
scheduler_url: String,
98+
config: BallistaConfig,
99+
plan: LogicalPlan,
100+
extension_codec: Arc<dyn LogicalExtensionCodec>,
101+
plan_repr: PhantomData<T>,
102+
) -> Self {
103+
Self {
104+
scheduler_url,
105+
config,
106+
plan,
107+
extension_codec,
108+
plan_repr,
68109
}
69110
}
70111
}
71112

72113
#[async_trait]
73-
impl ExecutionPlan for DistributedQueryExec {
114+
impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
74115
fn as_any(&self) -> &dyn Any {
75116
self
76117
}
@@ -99,11 +140,13 @@ impl ExecutionPlan for DistributedQueryExec {
99140
&self,
100141
_children: Vec<Arc<dyn ExecutionPlan>>,
101142
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
102-
Ok(Arc::new(DistributedQueryExec::new(
103-
self.scheduler_url.clone(),
104-
self.config.clone(),
105-
self.plan.clone(),
106-
)))
143+
Ok(Arc::new(DistributedQueryExec {
144+
scheduler_url: self.scheduler_url.clone(),
145+
config: self.config.clone(),
146+
plan: self.plan.clone(),
147+
extension_codec: self.extension_codec.clone(),
148+
plan_repr: self.plan_repr,
149+
}))
107150
}
108151

109152
async fn execute(
@@ -121,13 +164,23 @@ impl ExecutionPlan for DistributedQueryExec {
121164

122165
let schema: Schema = self.plan.schema().as_ref().clone().into();
123166

167+
let mut buf: Vec<u8> = vec![];
168+
let plan_message =
169+
T::try_from_logical_plan(&self.plan, self.extension_codec.as_ref()).map_err(
170+
|e| {
171+
DataFusionError::Internal(format!(
172+
"failed to serialize logical plan: {:?}",
173+
e
174+
))
175+
},
176+
)?;
177+
plan_message.try_encode(&mut buf).map_err(|e| {
178+
DataFusionError::Execution(format!("failed to encode logical plan: {:?}", e))
179+
})?;
180+
124181
let job_id = scheduler
125182
.execute_query(ExecuteQueryParams {
126-
query: Some(Query::LogicalPlan(
127-
(&self.plan)
128-
.try_into()
129-
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?,
130-
)),
183+
query: Some(Query::LogicalPlan(buf)),
131184
settings: self
132185
.config
133186
.settings()

0 commit comments

Comments
 (0)