Skip to content

Commit 2f28327

Browse files
authored
Interface for physical plan invariant checking. (#13986)
* feat(13652): provide interfaces for checking physical plan invariants, and perform check as part of the default physical planner * feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the of the output plan * feat(13652): perform invariant checking on the execution plan, conditionally based upon the expected/stated behavior of the optimizer rule * test: update tests to reflect updated invariant checking paradigm * Revert "feat(13652): define PhysicalOptimizerRule::executable_check interface which allows each optimizer rule to state the of the output plan" This reverts commit 5760792. * Revert "test: update tests to reflect updated invariant checking paradigm" This reverts commit ad15c85. * refactor: remove vestiges of sanity_check from the InvariantChecker * refactor: introduce Invariant levels, and make explicit how the post-optimization checker should be run * feat: provide invariant for UnionExec * chore: update docs and error messages
1 parent 163314d commit 2f28327

File tree

3 files changed

+367
-15
lines changed

3 files changed

+367
-15
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 330 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use arrow::datatypes::{Schema, SchemaRef};
6565
use arrow_array::builder::StringBuilder;
6666
use arrow_array::RecordBatch;
6767
use datafusion_common::display::ToStringifiedPlan;
68+
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6869
use datafusion_common::{
6970
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
7071
ScalarValue,
@@ -82,6 +83,7 @@ use datafusion_expr::{
8283
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8384
use datafusion_physical_expr::expressions::Literal;
8485
use datafusion_physical_expr::LexOrdering;
86+
use datafusion_physical_plan::execution_plan::InvariantLevel;
8587
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
8688
use datafusion_physical_plan::unnest::ListUnnest;
8789
use datafusion_sql::utils::window_expr_common_partition_keys;
@@ -1874,33 +1876,35 @@ impl DefaultPhysicalPlanner {
18741876
displayable(plan.as_ref()).indent(true)
18751877
);
18761878

1877-
let mut new_plan = plan;
1879+
// This runs once before any optimization,
1880+
// to verify that the plan fulfills the base requirements.
1881+
InvariantChecker(InvariantLevel::Always).check(&plan)?;
1882+
1883+
let mut new_plan = Arc::clone(&plan);
18781884
for optimizer in optimizers {
18791885
let before_schema = new_plan.schema();
18801886
new_plan = optimizer
18811887
.optimize(new_plan, session_state.config_options())
18821888
.map_err(|e| {
18831889
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
18841890
})?;
1885-
if optimizer.schema_check() && new_plan.schema() != before_schema {
1886-
let e = DataFusionError::Internal(format!(
1887-
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
1888-
optimizer.name(),
1889-
before_schema,
1890-
new_plan.schema()
1891-
));
1892-
return Err(DataFusionError::Context(
1893-
optimizer.name().to_string(),
1894-
Box::new(e),
1895-
));
1896-
}
1891+
1892+
// This only checks the schema in release build, and performs additional checks in debug mode.
1893+
OptimizationInvariantChecker::new(optimizer)
1894+
.check(&new_plan, before_schema)?;
1895+
18971896
trace!(
18981897
"Optimized physical plan by {}:\n{}\n",
18991898
optimizer.name(),
19001899
displayable(new_plan.as_ref()).indent(false)
19011900
);
19021901
observer(new_plan.as_ref(), optimizer.as_ref())
19031902
}
1903+
1904+
// This runs once after all optimizer runs are complete,
1905+
// to verify that the plan is executable.
1906+
InvariantChecker(InvariantLevel::Executable).check(&new_plan)?;
1907+
19041908
debug!(
19051909
"Optimized physical plan:\n{}\n",
19061910
displayable(new_plan.as_ref()).indent(false)
@@ -2008,6 +2012,83 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
20082012
}
20092013
}
20102014

2015+
struct OptimizationInvariantChecker<'a> {
2016+
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
2017+
}
2018+
2019+
impl<'a> OptimizationInvariantChecker<'a> {
2020+
/// Create an [`OptimizationInvariantChecker`] that performs checking per tule.
2021+
pub fn new(rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>) -> Self {
2022+
Self { rule }
2023+
}
2024+
2025+
/// Checks that the plan change is permitted, returning an Error if not.
2026+
///
2027+
/// Conditionally performs schema checks per [PhysicalOptimizerRule::schema_check].
2028+
/// In debug mode, this recursively walks the entire physical plan
2029+
/// and performs [`ExecutionPlan::check_invariants`].
2030+
pub fn check(
2031+
&mut self,
2032+
plan: &Arc<dyn ExecutionPlan>,
2033+
previous_schema: Arc<Schema>,
2034+
) -> Result<()> {
2035+
// if the rule is not permitted to change the schema, confirm that it did not change.
2036+
if self.rule.schema_check() && plan.schema() != previous_schema {
2037+
internal_err!("PhysicalOptimizer rule '{}' failed. Schema mismatch. Expected original schema: {:?}, got new schema: {:?}",
2038+
self.rule.name(),
2039+
previous_schema,
2040+
plan.schema()
2041+
)?
2042+
}
2043+
2044+
// check invariants per each ExecutionPlan node
2045+
#[cfg(debug_assertions)]
2046+
plan.visit(self)?;
2047+
2048+
Ok(())
2049+
}
2050+
}
2051+
2052+
impl<'n> TreeNodeVisitor<'n> for OptimizationInvariantChecker<'_> {
2053+
type Node = Arc<dyn ExecutionPlan>;
2054+
2055+
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2056+
// Checks for the more permissive `InvariantLevel::Always`.
2057+
// Plans are not guarenteed to be executable after each physical optimizer run.
2058+
node.check_invariants(InvariantLevel::Always).map_err(|e|
2059+
e.context(format!("Invariant for ExecutionPlan node '{}' failed for PhysicalOptimizer rule '{}'", node.name(), self.rule.name()))
2060+
)?;
2061+
Ok(TreeNodeRecursion::Continue)
2062+
}
2063+
}
2064+
2065+
/// Check [`ExecutionPlan`] invariants per [`InvariantLevel`].
2066+
struct InvariantChecker(InvariantLevel);
2067+
2068+
impl InvariantChecker {
2069+
/// Checks that the plan is executable, returning an Error if not.
2070+
pub fn check(&mut self, plan: &Arc<dyn ExecutionPlan>) -> Result<()> {
2071+
// check invariants per each ExecutionPlan node
2072+
plan.visit(self)?;
2073+
2074+
Ok(())
2075+
}
2076+
}
2077+
2078+
impl<'n> TreeNodeVisitor<'n> for InvariantChecker {
2079+
type Node = Arc<dyn ExecutionPlan>;
2080+
2081+
fn f_down(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
2082+
node.check_invariants(self.0).map_err(|e| {
2083+
e.context(format!(
2084+
"Invariant for ExecutionPlan node '{}' failed",
2085+
node.name()
2086+
))
2087+
})?;
2088+
Ok(TreeNodeRecursion::Continue)
2089+
}
2090+
}
2091+
20112092
#[cfg(test)]
20122093
mod tests {
20132094
use std::any::Any;
@@ -2028,6 +2109,7 @@ mod tests {
20282109
use crate::execution::session_state::SessionStateBuilder;
20292110
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
20302111
use arrow::datatypes::{DataType, Field, Int32Type};
2112+
use datafusion_common::config::ConfigOptions;
20312113
use datafusion_common::{assert_contains, DFSchemaRef, TableReference};
20322114
use datafusion_execution::runtime_env::RuntimeEnv;
20332115
use datafusion_execution::TaskContext;
@@ -2782,4 +2864,239 @@ digraph {
27822864

27832865
assert_contains!(generated_graph, expected_tooltip);
27842866
}
2867+
2868+
/// Extension Node which passes invariant checks
2869+
#[derive(Debug)]
2870+
struct OkExtensionNode(Vec<Arc<dyn ExecutionPlan>>);
2871+
impl ExecutionPlan for OkExtensionNode {
2872+
fn name(&self) -> &str {
2873+
"always ok"
2874+
}
2875+
fn with_new_children(
2876+
self: Arc<Self>,
2877+
children: Vec<Arc<dyn ExecutionPlan>>,
2878+
) -> Result<Arc<dyn ExecutionPlan>> {
2879+
Ok(Arc::new(Self(children)))
2880+
}
2881+
fn schema(&self) -> SchemaRef {
2882+
Arc::new(Schema::empty())
2883+
}
2884+
fn as_any(&self) -> &dyn Any {
2885+
unimplemented!()
2886+
}
2887+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2888+
self.0.iter().collect::<Vec<_>>()
2889+
}
2890+
fn properties(&self) -> &PlanProperties {
2891+
unimplemented!()
2892+
}
2893+
fn execute(
2894+
&self,
2895+
_partition: usize,
2896+
_context: Arc<TaskContext>,
2897+
) -> Result<SendableRecordBatchStream> {
2898+
unimplemented!()
2899+
}
2900+
}
2901+
impl DisplayAs for OkExtensionNode {
2902+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
2903+
write!(f, "{}", self.name())
2904+
}
2905+
}
2906+
2907+
/// Extension Node which fails the [`OptimizationInvariantChecker`].
2908+
#[derive(Debug)]
2909+
struct InvariantFailsExtensionNode;
2910+
impl ExecutionPlan for InvariantFailsExtensionNode {
2911+
fn name(&self) -> &str {
2912+
"InvariantFailsExtensionNode"
2913+
}
2914+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
2915+
match check {
2916+
InvariantLevel::Always => plan_err!("extension node failed it's user-defined always-invariant check"),
2917+
InvariantLevel::Executable => panic!("the OptimizationInvariantChecker should not be checking for executableness"),
2918+
}
2919+
}
2920+
fn schema(&self) -> SchemaRef {
2921+
Arc::new(Schema::empty())
2922+
}
2923+
fn with_new_children(
2924+
self: Arc<Self>,
2925+
_children: Vec<Arc<dyn ExecutionPlan>>,
2926+
) -> Result<Arc<dyn ExecutionPlan>> {
2927+
unimplemented!()
2928+
}
2929+
fn as_any(&self) -> &dyn Any {
2930+
unimplemented!()
2931+
}
2932+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2933+
unimplemented!()
2934+
}
2935+
fn properties(&self) -> &PlanProperties {
2936+
unimplemented!()
2937+
}
2938+
fn execute(
2939+
&self,
2940+
_partition: usize,
2941+
_context: Arc<TaskContext>,
2942+
) -> Result<SendableRecordBatchStream> {
2943+
unimplemented!()
2944+
}
2945+
}
2946+
impl DisplayAs for InvariantFailsExtensionNode {
2947+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
2948+
write!(f, "{}", self.name())
2949+
}
2950+
}
2951+
2952+
/// Extension Optimizer rule that requires the schema check
2953+
#[derive(Debug)]
2954+
struct OptimizerRuleWithSchemaCheck;
2955+
impl PhysicalOptimizerRule for OptimizerRuleWithSchemaCheck {
2956+
fn optimize(
2957+
&self,
2958+
plan: Arc<dyn ExecutionPlan>,
2959+
_config: &ConfigOptions,
2960+
) -> Result<Arc<dyn ExecutionPlan>> {
2961+
Ok(plan)
2962+
}
2963+
fn name(&self) -> &str {
2964+
"OptimizerRuleWithSchemaCheck"
2965+
}
2966+
fn schema_check(&self) -> bool {
2967+
true
2968+
}
2969+
}
2970+
2971+
#[test]
2972+
fn test_optimization_invariant_checker() -> Result<()> {
2973+
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
2974+
Arc::new(OptimizerRuleWithSchemaCheck);
2975+
2976+
// ok plan
2977+
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
2978+
let child = Arc::clone(&ok_node);
2979+
let ok_plan = Arc::clone(&ok_node).with_new_children(vec![
2980+
Arc::clone(&child).with_new_children(vec![Arc::clone(&child)])?,
2981+
Arc::clone(&child),
2982+
])?;
2983+
2984+
// Test: check should pass with same schema
2985+
let equal_schema = ok_plan.schema();
2986+
OptimizationInvariantChecker::new(&rule).check(&ok_plan, equal_schema)?;
2987+
2988+
// Test: should fail with schema changed
2989+
let different_schema =
2990+
Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
2991+
let expected_err = OptimizationInvariantChecker::new(&rule)
2992+
.check(&ok_plan, different_schema)
2993+
.unwrap_err();
2994+
assert!(expected_err.to_string().contains("PhysicalOptimizer rule 'OptimizerRuleWithSchemaCheck' failed. Schema mismatch. Expected original schema"));
2995+
2996+
// Test: should fail when extension node fails it's own invariant check
2997+
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
2998+
let expected_err = OptimizationInvariantChecker::new(&rule)
2999+
.check(&failing_node, ok_plan.schema())
3000+
.unwrap_err();
3001+
assert!(expected_err
3002+
.to_string()
3003+
.contains("extension node failed it's user-defined always-invariant check"));
3004+
3005+
// Test: should fail when descendent extension node fails
3006+
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(InvariantFailsExtensionNode);
3007+
let invalid_plan = ok_node.with_new_children(vec![
3008+
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3009+
Arc::clone(&child),
3010+
])?;
3011+
let expected_err = OptimizationInvariantChecker::new(&rule)
3012+
.check(&invalid_plan, ok_plan.schema())
3013+
.unwrap_err();
3014+
assert!(expected_err
3015+
.to_string()
3016+
.contains("extension node failed it's user-defined always-invariant check"));
3017+
3018+
Ok(())
3019+
}
3020+
3021+
/// Extension Node which fails the [`InvariantChecker`]
3022+
/// if, and only if, [`InvariantLevel::Executable`]
3023+
#[derive(Debug)]
3024+
struct ExecutableInvariantFails;
3025+
impl ExecutionPlan for ExecutableInvariantFails {
3026+
fn name(&self) -> &str {
3027+
"ExecutableInvariantFails"
3028+
}
3029+
fn check_invariants(&self, check: InvariantLevel) -> Result<()> {
3030+
match check {
3031+
InvariantLevel::Always => Ok(()),
3032+
InvariantLevel::Executable => plan_err!(
3033+
"extension node failed it's user-defined executable-invariant check"
3034+
),
3035+
}
3036+
}
3037+
fn schema(&self) -> SchemaRef {
3038+
Arc::new(Schema::empty())
3039+
}
3040+
fn with_new_children(
3041+
self: Arc<Self>,
3042+
_children: Vec<Arc<dyn ExecutionPlan>>,
3043+
) -> Result<Arc<dyn ExecutionPlan>> {
3044+
unimplemented!()
3045+
}
3046+
fn as_any(&self) -> &dyn Any {
3047+
unimplemented!()
3048+
}
3049+
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3050+
vec![]
3051+
}
3052+
fn properties(&self) -> &PlanProperties {
3053+
unimplemented!()
3054+
}
3055+
fn execute(
3056+
&self,
3057+
_partition: usize,
3058+
_context: Arc<TaskContext>,
3059+
) -> Result<SendableRecordBatchStream> {
3060+
unimplemented!()
3061+
}
3062+
}
3063+
impl DisplayAs for ExecutableInvariantFails {
3064+
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
3065+
write!(f, "{}", self.name())
3066+
}
3067+
}
3068+
3069+
#[test]
3070+
fn test_invariant_checker_levels() -> Result<()> {
3071+
// plan that passes the always-invariant, but fails the executable check
3072+
let plan: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3073+
3074+
// Test: check should pass with less stringent Always check
3075+
InvariantChecker(InvariantLevel::Always).check(&plan)?;
3076+
3077+
// Test: should fail the executable check
3078+
let expected_err = InvariantChecker(InvariantLevel::Executable)
3079+
.check(&plan)
3080+
.unwrap_err();
3081+
assert!(expected_err.to_string().contains(
3082+
"extension node failed it's user-defined executable-invariant check"
3083+
));
3084+
3085+
// Test: should fail when descendent extension node fails
3086+
let failing_node: Arc<dyn ExecutionPlan> = Arc::new(ExecutableInvariantFails);
3087+
let ok_node: Arc<dyn ExecutionPlan> = Arc::new(OkExtensionNode(vec![]));
3088+
let child = Arc::clone(&ok_node);
3089+
let plan = ok_node.with_new_children(vec![
3090+
Arc::clone(&child).with_new_children(vec![Arc::clone(&failing_node)])?,
3091+
Arc::clone(&child),
3092+
])?;
3093+
let expected_err = InvariantChecker(InvariantLevel::Executable)
3094+
.check(&plan)
3095+
.unwrap_err();
3096+
assert!(expected_err.to_string().contains(
3097+
"extension node failed it's user-defined executable-invariant check"
3098+
));
3099+
3100+
Ok(())
3101+
}
27853102
}

0 commit comments

Comments
 (0)