Skip to content

Commit fa9e016

Browse files
authored
Implement fast path of with_new_children() in ExecutionPlan (#2168)
* Implement fast path of with_new_children() in ExecutionPlan * resolve review comments * refine comments
1 parent 6504d2a commit fa9e016

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+258
-330
lines changed

ballista/rust/core/src/execution_plans/distributed_query.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
148148
}
149149

150150
fn with_new_children(
151-
&self,
151+
self: Arc<Self>,
152152
_children: Vec<Arc<dyn ExecutionPlan>>,
153153
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
154154
Ok(Arc::new(DistributedQueryExec {

ballista/rust/core/src/execution_plans/shuffle_reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ impl ExecutionPlan for ShuffleReaderExec {
9595
}
9696

9797
fn with_new_children(
98-
&self,
98+
self: Arc<Self>,
9999
_children: Vec<Arc<dyn ExecutionPlan>>,
100100
) -> Result<Arc<dyn ExecutionPlan>> {
101101
Err(DataFusionError::Plan(

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -342,10 +342,9 @@ impl ExecutionPlan for ShuffleWriterExec {
342342
}
343343

344344
fn with_new_children(
345-
&self,
345+
self: Arc<Self>,
346346
children: Vec<Arc<dyn ExecutionPlan>>,
347347
) -> Result<Arc<dyn ExecutionPlan>> {
348-
assert!(children.len() == 1);
349348
Ok(Arc::new(ShuffleWriterExec::try_new(
350349
self.job_id.clone(),
351350
self.stage_id,

ballista/rust/core/src/execution_plans/unresolved_shuffle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
9292
}
9393

9494
fn with_new_children(
95-
&self,
95+
self: Arc<Self>,
9696
_children: Vec<Arc<dyn ExecutionPlan>>,
9797
) -> Result<Arc<dyn ExecutionPlan>> {
9898
Err(DataFusionError::Plan(

ballista/rust/core/src/serde/mod.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -505,18 +505,13 @@ mod tests {
505505
}
506506

507507
fn with_new_children(
508-
&self,
508+
self: Arc<Self>,
509509
children: Vec<Arc<dyn ExecutionPlan>>,
510510
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
511-
match children.len() {
512-
1 => Ok(Arc::new(TopKExec {
513-
input: children[0].clone(),
514-
k: self.k,
515-
})),
516-
_ => Err(DataFusionError::Internal(
517-
"TopKExec wrong number of children".to_string(),
518-
)),
519-
}
511+
Ok(Arc::new(TopKExec {
512+
input: children[0].clone(),
513+
k: self.k,
514+
}))
520515
}
521516

522517
/// Execute one partition and return an iterator over RecordBatch

ballista/rust/executor/src/collect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl ExecutionPlan for CollectExec {
7272
}
7373

7474
fn with_new_children(
75-
&self,
75+
self: Arc<Self>,
7676
_children: Vec<Arc<dyn ExecutionPlan>>,
7777
) -> Result<Arc<dyn ExecutionPlan>> {
7878
unimplemented!()

ballista/rust/scheduler/src/planner.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ use ballista_core::{
3030
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3131
use datafusion::physical_plan::repartition::RepartitionExec;
3232
use datafusion::physical_plan::windows::WindowAggExec;
33-
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
33+
use datafusion::physical_plan::{
34+
with_new_children_if_necessary, ExecutionPlan, Partitioning,
35+
};
3436
use futures::future::BoxFuture;
3537
use futures::FutureExt;
3638
use log::info;
@@ -99,7 +101,7 @@ impl DistributedPlanner {
99101
stages.append(&mut child_stages);
100102
}
101103

102-
if let Some(coalesce) = execution_plan
104+
if let Some(_coalesce) = execution_plan
103105
.as_any()
104106
.downcast_ref::<CoalescePartitionsExec>()
105107
{
@@ -122,7 +124,10 @@ impl DistributedPlanner {
122124
));
123125
stages.push(shuffle_writer);
124126
Ok((
125-
coalesce.with_new_children(vec![unresolved_shuffle])?,
127+
with_new_children_if_necessary(
128+
execution_plan,
129+
vec![unresolved_shuffle],
130+
)?,
126131
stages,
127132
))
128133
} else if let Some(repart) =
@@ -163,7 +168,10 @@ impl DistributedPlanner {
163168
window
164169
)))
165170
} else {
166-
Ok((execution_plan.with_new_children(children)?, stages))
171+
Ok((
172+
with_new_children_if_necessary(execution_plan, children)?,
173+
stages,
174+
))
167175
}
168176
}
169177
.boxed()
@@ -197,7 +205,7 @@ pub fn find_unresolved_shuffles(
197205
}
198206

199207
pub fn remove_unresolved_shuffles(
200-
stage: &dyn ExecutionPlan,
208+
stage: Arc<dyn ExecutionPlan>,
201209
partition_locations: &HashMap<usize, HashMap<usize, Vec<PartitionLocation>>>,
202210
) -> Result<Arc<dyn ExecutionPlan>> {
203211
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
@@ -240,13 +248,10 @@ pub fn remove_unresolved_shuffles(
240248
unresolved_shuffle.schema().clone(),
241249
)?))
242250
} else {
243-
new_children.push(remove_unresolved_shuffles(
244-
child.as_ref(),
245-
partition_locations,
246-
)?);
251+
new_children.push(remove_unresolved_shuffles(child, partition_locations)?);
247252
}
248253
}
249-
Ok(stage.with_new_children(new_children)?)
254+
Ok(with_new_children_if_necessary(stage, new_children)?)
250255
}
251256

252257
fn create_shuffle_writer(

ballista/rust/scheduler/src/scheduler_server/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ impl Default for SessionContextRegistry {
241241
}
242242

243243
impl SessionContextRegistry {
244-
/// Create the registry that object stores can registered into.
244+
/// Create the registry that session contexts can registered into.
245245
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
246246
pub fn new() -> Self {
247247
Self {

ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,8 +304,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> QueryStageSchedul
304304
}
305305
}
306306

307-
let plan =
308-
remove_unresolved_shuffles(stage_plan.as_ref(), &partition_locations)?;
307+
let plan = remove_unresolved_shuffles(stage_plan, &partition_locations)?;
309308
self.state.save_stage_plan(job_id, stage_id, plan).await?;
310309
}
311310

datafusion-examples/examples/custom_datasource.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2121
use datafusion::arrow::record_batch::RecordBatch;
2222
use datafusion::dataframe::DataFrame;
2323
use datafusion::datasource::TableProvider;
24-
use datafusion::error::{DataFusionError, Result};
24+
use datafusion::error::Result;
2525
use datafusion::execution::context::TaskContext;
2626
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
2727
use datafusion::physical_plan::expressions::PhysicalSortExpr;
@@ -219,17 +219,10 @@ impl ExecutionPlan for CustomExec {
219219
}
220220

221221
fn with_new_children(
222-
&self,
223-
children: Vec<Arc<dyn ExecutionPlan>>,
222+
self: Arc<Self>,
223+
_: Vec<Arc<dyn ExecutionPlan>>,
224224
) -> Result<Arc<dyn ExecutionPlan>> {
225-
if children.is_empty() {
226-
Ok(Arc::new(self.clone()))
227-
} else {
228-
Err(DataFusionError::Internal(format!(
229-
"Children cannot be replaced in {:?}",
230-
self
231-
)))
232-
}
225+
Ok(self)
233226
}
234227

235228
async fn execute(

datafusion/core/src/physical_optimizer/coalesce_batches.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! in bigger batches to avoid overhead with small batches
2020
2121
use super::optimizer::PhysicalOptimizerRule;
22+
use crate::physical_plan::with_new_children_if_necessary;
2223
use crate::{
2324
error::Result,
2425
physical_plan::{
@@ -69,7 +70,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
6970
// leaf node, children cannot be replaced
7071
Ok(plan.clone())
7172
} else {
72-
let plan = plan.with_new_children(children)?;
73+
let plan = with_new_children_if_necessary(plan, children)?;
7374
Ok(if wrap_in_coalesce {
7475
// TODO we should add specific configuration settings for coalescing batches and
7576
// we should do that once https://issues.apache.org/jira/browse/ARROW-11059 is

datafusion/core/src/physical_optimizer/merge_exec.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
//! with more than one partition, to coalesce them into one partition
2020
//! when the node needs a single partition
2121
use super::optimizer::PhysicalOptimizerRule;
22+
use crate::physical_plan::with_new_children_if_necessary;
2223
use crate::{
2324
error::Result,
2425
physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
@@ -52,9 +53,14 @@ impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
5253
.map(|child| self.optimize(child.clone(), config))
5354
.collect::<Result<Vec<_>>>()?;
5455
match plan.required_child_distribution() {
55-
Distribution::UnspecifiedDistribution => plan.with_new_children(children),
56-
Distribution::HashPartitioned(_) => plan.with_new_children(children),
57-
Distribution::SinglePartition => plan.with_new_children(
56+
Distribution::UnspecifiedDistribution => {
57+
with_new_children_if_necessary(plan, children)
58+
}
59+
Distribution::HashPartitioned(_) => {
60+
with_new_children_if_necessary(plan, children)
61+
}
62+
Distribution::SinglePartition => with_new_children_if_necessary(
63+
plan,
5864
children
5965
.iter()
6066
.map(|child| {

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ use std::sync::Arc;
2020

2121
use super::optimizer::PhysicalOptimizerRule;
2222
use crate::physical_plan::Partitioning::*;
23-
use crate::physical_plan::{repartition::RepartitionExec, ExecutionPlan};
23+
use crate::physical_plan::{
24+
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
25+
};
2426
use crate::{error::Result, execution::context::SessionConfig};
2527

2628
/// Optimizer that introduces repartition to introduce more
@@ -191,7 +193,7 @@ fn optimize_partitions(
191193
)
192194
})
193195
.collect::<Result<_>>()?;
194-
plan.with_new_children(children)?
196+
with_new_children_if_necessary(plan, children)?
195197
};
196198

197199
// decide if we should bother trying to repartition the output of this plan

datafusion/core/src/physical_optimizer/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use super::optimizer::PhysicalOptimizerRule;
2121
use crate::execution::context::SessionConfig;
2222

2323
use crate::error::Result;
24-
use crate::physical_plan::ExecutionPlan;
24+
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
2525
use std::sync::Arc;
2626

2727
/// Convenience rule for writing optimizers: recursively invoke
@@ -42,6 +42,6 @@ pub fn optimize_children(
4242
if children.is_empty() {
4343
Ok(Arc::clone(&plan))
4444
} else {
45-
plan.with_new_children(children)
45+
with_new_children_if_necessary(plan, children)
4646
}
4747
}

datafusion/core/src/physical_plan/analyze.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -92,21 +92,14 @@ impl ExecutionPlan for AnalyzeExec {
9292
}
9393

9494
fn with_new_children(
95-
&self,
95+
self: Arc<Self>,
9696
mut children: Vec<Arc<dyn ExecutionPlan>>,
9797
) -> Result<Arc<dyn ExecutionPlan>> {
98-
if children.len() == 1 {
99-
Ok(Arc::new(Self::new(
100-
self.verbose,
101-
children.pop().unwrap(),
102-
self.schema.clone(),
103-
)))
104-
} else {
105-
Err(DataFusionError::Internal(format!(
106-
"Invalid child count for AnalyzeExec. Expected 1 got {}",
107-
children.len()
108-
)))
109-
}
98+
Ok(Arc::new(Self::new(
99+
self.verbose,
100+
children.pop().unwrap(),
101+
self.schema.clone(),
102+
)))
110103
}
111104

112105
async fn execute(

datafusion/core/src/physical_plan/coalesce_batches.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::pin::Pin;
2323
use std::sync::Arc;
2424
use std::task::{Context, Poll};
2525

26-
use crate::error::{DataFusionError, Result};
26+
use crate::error::Result;
2727
use crate::physical_plan::{
2828
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
2929
SendableRecordBatchStream,
@@ -107,18 +107,13 @@ impl ExecutionPlan for CoalesceBatchesExec {
107107
}
108108

109109
fn with_new_children(
110-
&self,
110+
self: Arc<Self>,
111111
children: Vec<Arc<dyn ExecutionPlan>>,
112112
) -> Result<Arc<dyn ExecutionPlan>> {
113-
match children.len() {
114-
1 => Ok(Arc::new(CoalesceBatchesExec::new(
115-
children[0].clone(),
116-
self.target_batch_size,
117-
))),
118-
_ => Err(DataFusionError::Internal(
119-
"CoalesceBatchesExec wrong number of children".to_string(),
120-
)),
121-
}
113+
Ok(Arc::new(CoalesceBatchesExec::new(
114+
children[0].clone(),
115+
self.target_batch_size,
116+
)))
122117
}
123118

124119
async fn execute(

datafusion/core/src/physical_plan/coalesce_partitions.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,10 @@ impl ExecutionPlan for CoalescePartitionsExec {
9696
}
9797

9898
fn with_new_children(
99-
&self,
99+
self: Arc<Self>,
100100
children: Vec<Arc<dyn ExecutionPlan>>,
101101
) -> Result<Arc<dyn ExecutionPlan>> {
102-
match children.len() {
103-
1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
104-
_ => Err(DataFusionError::Internal(
105-
"CoalescePartitionsExec wrong number of children".to_string(),
106-
)),
107-
}
102+
Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone())))
108103
}
109104

110105
async fn execute(

datafusion/core/src/physical_plan/cross_join.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ use super::{
3232
coalesce_partitions::CoalescePartitionsExec, join_utils::check_join_is_valid,
3333
ColumnStatistics, Statistics,
3434
};
35-
use crate::{
36-
error::{DataFusionError, Result},
37-
scalar::ScalarValue,
38-
};
35+
use crate::{error::Result, scalar::ScalarValue};
3936
use async_trait::async_trait;
4037
use std::time::Instant;
4138

@@ -120,18 +117,13 @@ impl ExecutionPlan for CrossJoinExec {
120117
}
121118

122119
fn with_new_children(
123-
&self,
120+
self: Arc<Self>,
124121
children: Vec<Arc<dyn ExecutionPlan>>,
125122
) -> Result<Arc<dyn ExecutionPlan>> {
126-
match children.len() {
127-
2 => Ok(Arc::new(CrossJoinExec::try_new(
128-
children[0].clone(),
129-
children[1].clone(),
130-
)?)),
131-
_ => Err(DataFusionError::Internal(
132-
"CrossJoinExec wrong number of children".to_string(),
133-
)),
134-
}
123+
Ok(Arc::new(CrossJoinExec::try_new(
124+
children[0].clone(),
125+
children[1].clone(),
126+
)?))
135127
}
136128

137129
fn output_partitioning(&self) -> Partitioning {

0 commit comments

Comments
 (0)