Skip to content

Commit 071f14a

Browse files
alambxudong963
andauthored
Update ExecutionPlan to know about sortedness and repartitioning optimizer pass respect the invariants (#1776)
* Do not repartition sorted inputs `SortPreservingMerge` * Add notion of sortedness to `ExecutionPlan`, use to avoid repartitioning when that would result in incorrect behavior * fix: fix ballitsa * Update datafusion/src/physical_optimizer/repartition.rs Co-authored-by: xudong.w <[email protected]> * Update datafusion/src/physical_optimizer/repartition.rs Co-authored-by: xudong.w <[email protected]> * Add more comments * Remove special `EmptyExec` case * restore default for benefits_from_input_partitioning * avoid unecessary check * default relies_on_input_order to true * fix test Co-authored-by: xudong.w <[email protected]>
1 parent f6df759 commit 071f14a

35 files changed

+742
-71
lines changed

ballista/rust/core/src/execution_plans/distributed_query.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use crate::utils::WrappedStream;
3333
use datafusion::arrow::datatypes::{Schema, SchemaRef};
3434
use datafusion::error::{DataFusionError, Result};
3535
use datafusion::logical_plan::LogicalPlan;
36+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3637
use datafusion::physical_plan::{
3738
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
3839
SendableRecordBatchStream, Statistics,
@@ -82,6 +83,14 @@ impl ExecutionPlan for DistributedQueryExec {
8283
Partitioning::UnknownPartitioning(1)
8384
}
8485

86+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
87+
None
88+
}
89+
90+
fn relies_on_input_order(&self) -> bool {
91+
false
92+
}
93+
8594
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
8695
vec![]
8796
}

ballista/rust/core/src/execution_plans/shuffle_reader.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion::arrow::datatypes::SchemaRef;
2828
use datafusion::arrow::error::Result as ArrowResult;
2929
use datafusion::arrow::record_batch::RecordBatch;
3030
use datafusion::execution::runtime_env::RuntimeEnv;
31+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3132
use datafusion::physical_plan::metrics::{
3233
ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
3334
};
@@ -85,6 +86,14 @@ impl ExecutionPlan for ShuffleReaderExec {
8586
Partitioning::UnknownPartitioning(self.partition.len())
8687
}
8788

89+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
90+
None
91+
}
92+
93+
fn relies_on_input_order(&self) -> bool {
94+
false
95+
}
96+
8897
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
8998
vec![]
9099
}

ballista/rust/core/src/execution_plans/shuffle_writer.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
2121
//! will use the ShuffleReaderExec to read these results.
2222
23+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
2324
use parking_lot::Mutex;
2425
use std::fs::File;
2526
use std::iter::Iterator;
@@ -334,6 +335,14 @@ impl ExecutionPlan for ShuffleWriterExec {
334335
self.plan.output_partitioning()
335336
}
336337

338+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
339+
None
340+
}
341+
342+
fn relies_on_input_order(&self) -> bool {
343+
false
344+
}
345+
337346
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
338347
vec![self.plan.clone()]
339348
}

ballista/rust/core/src/execution_plans/unresolved_shuffle.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::serde::scheduler::PartitionLocation;
2323
use async_trait::async_trait;
2424
use datafusion::arrow::datatypes::SchemaRef;
2525
use datafusion::execution::runtime_env::RuntimeEnv;
26+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
2627
use datafusion::physical_plan::{
2728
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
2829
};
@@ -85,6 +86,14 @@ impl ExecutionPlan for UnresolvedShuffleExec {
8586
Partitioning::UnknownPartitioning(self.output_partition_count)
8687
}
8788

89+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
90+
None
91+
}
92+
93+
fn relies_on_input_order(&self) -> bool {
94+
false
95+
}
96+
8897
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
8998
vec![]
9099
}

ballista/rust/executor/src/collect.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion::arrow::{
2828
};
2929
use datafusion::error::DataFusionError;
3030
use datafusion::execution::runtime_env::RuntimeEnv;
31+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3132
use datafusion::physical_plan::{
3233
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
3334
};
@@ -62,6 +63,10 @@ impl ExecutionPlan for CollectExec {
6263
Partitioning::UnknownPartitioning(1)
6364
}
6465

66+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
67+
None
68+
}
69+
6570
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
6671
vec![self.plan.clone()]
6772
}

0 commit comments

Comments
 (0)