From ed6114383e54c971f299054f8e65ec6e7c18d9bb Mon Sep 17 00:00:00 2001 From: Matthew Cramerus <8771538+suremarc@users.noreply.github.com> Date: Fri, 3 Jan 2025 14:55:20 -0600 Subject: [PATCH] feat: view exploitation (#19) * feat: view exploitation * add PruneCandidates physical optimizer --- Cargo.toml | 1 + src/materialized/file_metadata.rs | 2 +- src/materialized/row_metadata.rs | 4 + src/rewrite.rs | 20 ++ src/rewrite/exploitation.rs | 495 ++++++++++++++++++++++++++++++ src/rewrite/normal_form.rs | 8 +- src/rewrite/util.rs | 83 +++++ 7 files changed, 608 insertions(+), 5 deletions(-) create mode 100644 src/rewrite/exploitation.rs create mode 100644 src/rewrite/util.rs diff --git a/Cargo.toml b/Cargo.toml index 00ad412..7b3c52f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ futures = "0.3" itertools = "0.13" log = "0.4" object_store = "0.11" +ordered-float = "4.6.0" [dev-dependencies] anyhow = "1.0.95" diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index fd57cc0..68499d5 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -412,7 +412,7 @@ impl std::fmt::Debug for FileMetadataExec { .field("plan_properties", &self.plan_properties) .field("filters", &self.filters) .field("limit", &self.limit) - .finish() + .finish_non_exhaustive() } } diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index 0bd0d7a..d51927c 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -43,6 +43,10 @@ impl std::fmt::Debug for RowMetadataRegistry { .map(|r| (r.key().clone(), r.value().name().to_string())) .collect::>(), ) + .field( + "default_source", + &self.default_source.as_ref().map(|s| s.name()), + ) .finish() } } diff --git a/src/rewrite.rs b/src/rewrite.rs index abcca8f..a81a9f1 100644 --- a/src/rewrite.rs +++ b/src/rewrite.rs @@ -15,4 +15,24 @@ // specific language governing permissions and limitations // under the License. +use datafusion::{common::extensions_options, config::ConfigExtension}; + +/// Implements a query rewriting optimizer, also known as "view exploitation" +/// in some academic sources. +pub mod exploitation; + pub mod normal_form; + +mod util; + +extensions_options! { + /// Options for the query rewriting optimizer + pub struct QueryRewriteOptions { + /// Toggle query rewriting on or off + pub enabled: bool, default = true + } +} + +impl ConfigExtension for QueryRewriteOptions { + const PREFIX: &'static str = "QueryRewrite"; +} diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs new file mode 100644 index 0000000..c029471 --- /dev/null +++ b/src/rewrite/exploitation.rs @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::{collections::HashSet, sync::Arc}; + +use async_trait::async_trait; +use datafusion::catalog::TableProvider; +use datafusion::datasource::provider_as_source; +use datafusion::execution::context::SessionState; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{LexRequirement, PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_expr_common::sort_expr::format_physical_sort_requirement_list; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; +use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; +use datafusion_common::{DataFusionError, Result, TableReference}; +use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; +use datafusion_optimizer::OptimizerRule; +use itertools::Itertools; +use ordered_float::OrderedFloat; + +use crate::materialized::cast_to_materialized; + +use super::normal_form::SpjNormalForm; +use super::QueryRewriteOptions; + +/// A cost function. Used to evaluate the best physical plan among multiple equivalent choices. +pub type CostFn = Arc f64 + Send + Sync>; + +/// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes. +#[derive(Debug)] +pub struct ViewMatcher { + mv_plans: HashMap, SpjNormalForm)>, +} + +impl ViewMatcher { + /// Loads table information and processes all views, returning a new optimizer rule. + pub async fn try_new_from_state(session_state: &SessionState) -> Result { + let mut mv_plans: HashMap = HashMap::default(); + for (resolved_table_ref, table) in + super::util::list_tables(session_state.catalog_list().as_ref()).await? + { + let Some(mv) = cast_to_materialized(table.as_ref()) else { + continue; + }; + + // Analyze the plan to normalize things such as wildcard expressions + let analyzed_plan = session_state.analyzer().execute_and_check( + mv.query(), + session_state.config_options(), + |_, _| {}, + )?; + + match SpjNormalForm::new(&analyzed_plan) { + Err(e) => { + log::trace!("can't support view matching for {resolved_table_ref}: {e}") + } + Ok(normal_form) => { + mv_plans.insert(resolved_table_ref.clone().into(), (table, normal_form)); + } + } + } + + Ok(ViewMatcher { mv_plans }) + } +} + +impl OptimizerRule for ViewMatcher { + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn datafusion_optimizer::OptimizerConfig, + ) -> Result> { + if !config + .options() + .extensions + .get::() + .cloned() + .unwrap_or_default() + .enabled + { + return Ok(Transformed::no(plan)); + } + + plan.rewrite(&mut ViewMatchingRewriter { parent: self }) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn name(&self) -> &str { + "view_matcher" + } +} + +/// A logical plan rewriter that looks for opportunities for substitution. +struct ViewMatchingRewriter<'a> { + parent: &'a ViewMatcher, +} + +impl TreeNodeRewriter for ViewMatchingRewriter<'_> { + type Node = LogicalPlan; + + fn f_down(&mut self, node: Self::Node) -> Result> { + if matches!(&node, LogicalPlan::Extension(ext) if ext.node.as_any().is::()) { + return Ok(Transformed::new(node, false, TreeNodeRecursion::Jump)); + } + + let table_reference = match locate_single_table_scan(&node)? { + None => return Ok(Transformed::no(node)), + Some(table_reference) => table_reference, + }; + + log::trace!("rewriting node: {}", node.display()); + + let form = match SpjNormalForm::new(&node) { + Err(e) => { + log::trace!( + "couldn't generate rewrites: for {table_reference}, recursing deeper: {e}" + ); + return Ok(Transformed::no(node)); + } + Ok(form) => form, + }; + + // Generate candidate substitutions + let candidates = self + .parent + .mv_plans + .iter() + .filter_map(|(table_ref, (table, plan))| { + // Only attempt rewrite if the view references our table in the first place + plan.referenced_tables() + .contains(&table_reference) + .then(|| { + form.rewrite_from( + plan, + table_ref.clone(), + provider_as_source(Arc::clone(table)), + ) + .transpose() + }) + .flatten() + }) + .flat_map(|res| match res { + Err(e) => { + log::trace!("error rewriting: {e}"); + None + } + Ok(plan) => Some(plan), + }) + .collect::>(); + + if candidates.is_empty() { + log::trace!("no candidates"); + Ok(Transformed::no(node)) + } else { + Ok(Transformed::new( + LogicalPlan::Extension(Extension { + node: Arc::new(OneOf { + branches: Some(node).into_iter().chain(candidates).collect_vec(), + }), + }), + true, + TreeNodeRecursion::Jump, + )) + } + } +} + +fn locate_single_table_scan(node: &LogicalPlan) -> Result> { + let mut table_reference = None; + node.apply(|plan| { + if let LogicalPlan::TableScan(scan) = plan { + match table_reference { + Some(_) => { + // A table reference was already found, but there is another. + // Erase the original and stop, return None. + table_reference = None; + return Ok(TreeNodeRecursion::Stop); + } + None => table_reference = Some(scan.table_name.clone()), + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + // Either there are multiple or there are no table scans. + Ok(table_reference) +} + +/// [`ExtensionPlanner`]` that chooses the best plan from a `OneOf` node. +pub struct ViewExploitationPlanner { + cost: CostFn, +} + +impl ViewExploitationPlanner { + /// Initialize this ViewExploitationPlanner with a given cost function. + pub fn new(cost: CostFn) -> Self { + Self { cost } + } +} + +#[async_trait] +impl ExtensionPlanner for ViewExploitationPlanner { + /// Choose the best candidate and use it for the physical plan. + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + if node.as_any().downcast_ref::().is_none() { + return Ok(None); + } + + if logical_inputs + .iter() + .map(|plan| plan.schema()) + .any(|schema| schema != logical_inputs[0].schema()) + { + return Err(DataFusionError::Plan( + "candidate logical plans should have the same schema".to_string(), + )); + } + + if physical_inputs + .iter() + .map(|plan| plan.schema()) + .any(|schema| schema != physical_inputs[0].schema()) + { + return Err(DataFusionError::Plan( + "candidate physical plans should have the same schema".to_string(), + )); + } + + Ok(Some(Arc::new(OneOfExec::try_new( + physical_inputs.to_vec(), + None, + Arc::clone(&self.cost), + )?))) + } +} + +/// A custom logical plan node that denotes multiple equivalent sub-expressions in a logical plan. +/// Used for rewriting queries to use available materialized views, where beneficial. +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] +pub struct OneOf { + branches: Vec, +} + +impl UserDefinedLogicalNodeCore for OneOf { + fn name(&self) -> &str { + "OneOf" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + self.branches.iter().collect_vec() + } + + fn schema(&self) -> &datafusion_common::DFSchemaRef { + self.branches[0].schema() + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> std::collections::HashSet { + HashSet::new() // all predicates can be pushed down + } + + fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "OneOf") + } + + fn from_template(&self, _exprs: &[datafusion::prelude::Expr], inputs: &[LogicalPlan]) -> Self { + Self { + branches: inputs.to_vec(), + } + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + Ok(Self { branches: inputs }) + } +} + +/// A physical plan that represents a unique selection between multiple logically equivalent candidate physical plans. +#[derive(Clone)] +pub struct OneOfExec { + candidates: Vec>, + // Optionally declare a required input ordering + // This will inform DataFusion to add sorts to children, + // which will improve cost estimation of candidates + required_input_ordering: Option, + // Index of the candidate with the best cost + best: usize, + // Cost function to use in optimization + cost: CostFn, +} + +impl std::fmt::Debug for OneOfExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OneOfExec") + .field("candidates", &self.candidates) + .field("required_input_ordering", &self.required_input_ordering) + .field("best", &self.best) + .finish_non_exhaustive() + } +} + +impl OneOfExec { + /// Create a new `OneOfExec` + pub fn try_new( + candidates: Vec>, + required_input_ordering: Option, + cost: CostFn, + ) -> Result { + if candidates.is_empty() { + return Err(DataFusionError::Plan( + "can't create OneOfExec with empty children".to_string(), + )); + } + let best = candidates + .iter() + .position_min_by_key(|candidate| OrderedFloat(cost(candidate.as_ref()))) + .unwrap(); + + Ok(Self { + candidates, + required_input_ordering, + best, + cost, + }) + } + + /// Return the best of this `OneOfExec`'s children, using the cost function provided to + /// this plan at initialization timee + pub fn best(&self) -> Arc { + Arc::clone(&self.candidates[self.best]) + } + + /// Modify this plan's required input ordering. + /// Used for sort pushdown + pub fn with_required_input_ordering(self, requirement: Option) -> Self { + Self { + required_input_ordering: requirement, + ..self + } + } +} + +impl ExecutionPlan for OneOfExec { + fn name(&self) -> &str { + "OneOfExec" + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.candidates[self.best].properties() + } + + fn required_input_ordering(&self) -> Vec> { + vec![self.required_input_ordering.clone(); self.children().len()] + } + + fn maintains_input_order(&self) -> Vec { + vec![true; self.candidates.len()] + } + + fn benefits_from_input_partitioning(&self) -> Vec { + vec![false; self.candidates.len()] + } + + fn children(&self) -> Vec<&Arc> { + self.candidates.iter().collect_vec() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() == 1 { + return Ok(Arc::clone(&children[0])); + } + + Ok(Arc::new(Self::try_new( + children, + self.required_input_ordering.clone(), + Arc::clone(&self.cost), + )?)) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + self.candidates[self.best].execute(partition, context) + } + + fn statistics(&self) -> Result { + self.candidates[self.best].statistics() + } +} + +impl DisplayAs for OneOfExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let costs = self + .children() + .iter() + .map(|c| (self.cost)(c.as_ref())) + .collect_vec(); + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "OneOfExec(best={}), costs=[{}], required_input_ordering=[{}]", + self.best, + costs.into_iter().join(","), + format_physical_sort_requirement_list( + &self + .required_input_ordering + .clone() + .unwrap_or_default() + .into_iter() + .map(PhysicalSortExpr::from) + .map(PhysicalSortRequirement::from) + .collect_vec() + ) + ) + } + } + } +} + +/// Finalize selection of best candidate plan from a OneOfExec. +#[derive(Debug, Clone, Default)] +pub struct PruneCandidates; + +impl PhysicalOptimizerRule for PruneCandidates { + fn optimize( + &self, + plan: Arc, + _config: &datafusion::config::ConfigOptions, + ) -> Result> { + // Search for any OneOfExec nodes. + plan.transform(&|plan: Arc| { + if let Some(one_of_exec) = plan.as_any().downcast_ref::() { + Ok(Transformed::new( + one_of_exec.best(), + true, + TreeNodeRecursion::Jump, + )) + } else { + Ok(Transformed::no(plan)) + } + }) + .map(|t| t.data) + } + + fn name(&self) -> &str { + "PruneCandidates" + } + + fn schema_check(&self) -> bool { + true + } +} diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index b1f7fe1..812367e 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -744,7 +744,7 @@ impl Predicate { ) } - /// Rewrite all expressions in terms of their Normal representatives + /// Rewrite all expressions in terms of their normal representatives /// with respect to this predicate's equivalence classes. fn normalize_expr(&self, e: Expr) -> Expr { e.transform(&|e| { @@ -769,12 +769,12 @@ impl Predicate { } /// A collection of columns that are all considered to be equivalent. -/// In some cases we normalize expressions so that they use the "Normal" representative +/// In some cases we normalize expressions so that they use the "normal" representative /// in place of any other columns in the class. -/// This Normal representative is chosen arbitrarily. +/// This normal representative is chosen arbitrarily. #[derive(Debug, Clone, Default)] struct ColumnEquivalenceClass { - // first element is the Normal representative of the equivalence class + // first element is the normal representative of the equivalence class columns: BTreeSet, } diff --git a/src/rewrite/util.rs b/src/rewrite/util.rs new file mode 100644 index 0000000..a15439f --- /dev/null +++ b/src/rewrite/util.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion::catalog::{CatalogProviderList, TableProvider}; +use datafusion_common::DataFusionError; +use datafusion_sql::ResolvedTableReference; + +/// List every table in the catalog list. +/// Always returns a fully qualified table reference. +pub async fn list_tables( + catalog_list: &dyn CatalogProviderList, +) -> Result)>, DataFusionError> { + use datafusion_common::Result; + use futures::stream::{self, StreamExt, TryStreamExt}; + + let catalogs_by_name = catalog_list + .catalog_names() + .into_iter() + .map(|catalog_name| { + catalog_list + .catalog(&catalog_name) + .ok_or(DataFusionError::Internal(format!( + "could not find named catalog: {catalog_name}" + ))) + .map(|catalog| (catalog, catalog_name)) + }) + .collect::>>()?; + + let schemas_by_catalog_and_schema_name = catalogs_by_name + .into_iter() + .flat_map(|(catalog, catalog_name)| { + catalog.schema_names().into_iter().map(move |schema_name| { + catalog + .schema(&schema_name) + .ok_or(DataFusionError::Internal(format!( + "could not find named schema: {catalog_name}.{schema_name}" + ))) + .map(|schema| (schema, catalog_name.clone(), schema_name)) + }) + }) + .collect::>>()?; + + stream::iter(schemas_by_catalog_and_schema_name) + .flat_map(|(schema, catalog_name, schema_name)| { + stream::iter(schema.table_names().into_iter().map(move |table_name| { + ( + schema.clone(), + ResolvedTableReference { + catalog: catalog_name.clone().into(), + schema: schema_name.clone().into(), + table: table_name.into(), + }, + ) + })) + }) + .then(|(schema, table_ref)| async move { + schema + .table(&table_ref.table) + .await? + .ok_or(DataFusionError::Internal(format!( + "could not find named table: {table_ref}" + ))) + .map(|table| (table_ref, table)) + }) + .try_collect() + .await +}