From 23eb6b1b3d011165dc50b9f4c0e8291b6dd414ba Mon Sep 17 00:00:00 2001 From: "xudong.w" Date: Mon, 18 May 2026 11:43:07 +0800 Subject: [PATCH] Upgrade materialized views to DataFusion 53 --- Cargo.toml | 26 +-- src/materialized/dependencies.rs | 24 ++- src/materialized/file_metadata.rs | 8 +- src/rewrite/exploitation.rs | 275 +++++++++++++++++++++++++++--- 4 files changed, 287 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 041add6..a1dcb9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,27 +25,27 @@ authors = ["Matthew Cramerus "] license = "Apache-2.0" description = "Materialized Views & Query Rewriting in DataFusion" keywords = ["arrow", "arrow-rs", "datafusion"] -rust-version = "1.85.1" +rust-version = "1.88.0" [dependencies] aquamarine = "0.6.0" -arrow = "57.1.0" -arrow-schema = "57.1.0" +arrow = "58.0.0" +arrow-schema = "58.0.0" async-trait = "0.1.89" dashmap = "6" -datafusion = "52" -datafusion-common = "52" -datafusion-expr = "52" -datafusion-functions = "52" -datafusion-functions-aggregate = "52" -datafusion-optimizer = "52" -datafusion-physical-expr = "52" -datafusion-physical-plan = "52" -datafusion-sql = "52" +datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } +datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "f069fa60c2ff13411922fca07f1e4980571e80fb" } futures = "0.3" itertools = "0.14" log = "0.4" -object_store = "0.12.4" +object_store = "0.13.1" ordered-float = "5.0.0" [dev-dependencies] diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 7295b36..f188c0d 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -42,13 +42,13 @@ use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, datasource::{provider_as_source, TableProvider, ViewTable}, - prelude::{flatten, get_field, make_array}, + prelude::{flatten, make_array}, }; use datafusion_common::{ alias::AliasGenerator, internal_err, tree_node::{Transformed, TreeNode}, - DFSchema, DataFusionError, Result, ScalarValue, + Column as DFColumn, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, @@ -400,17 +400,25 @@ pub fn mv_dependencies_plan( .into_iter() .find(|c| c.name.starts_with(META_COLUMN)) .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?; - let files_col = Expr::Column(files.clone()); + let meta_table_catalog = + Expr::Column(DFColumn::from_name(format!("{}.table_catalog", files.name))); + let meta_table_schema = + Expr::Column(DFColumn::from_name(format!("{}.table_schema", files.name))); + let meta_table_name = Expr::Column(DFColumn::from_name(format!("{}.table_name", files.name))); + let meta_source_uri = Expr::Column(DFColumn::from_name(format!("{}.source_uri", files.name))); + let meta_last_modified = + Expr::Column(DFColumn::from_name(format!("{}.last_modified", files.name))); LogicalPlanBuilder::from(pruned_plan_with_source_files) + .unnest_column(files.clone())? .unnest_column(files)? .project(vec![ construct_target_path_from_static_partition_columns(materialized_view).alias("target"), - get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), - get_field(files_col.clone(), "table_schema").alias("source_table_schema"), - get_field(files_col.clone(), "table_name").alias("source_table_name"), - get_field(files_col.clone(), "source_uri").alias("source_uri"), - get_field(files_col.clone(), "last_modified").alias("source_last_modified"), + meta_table_catalog.alias("source_table_catalog"), + meta_table_schema.alias("source_table_schema"), + meta_table_name.alias("source_table_name"), + meta_source_uri.alias("source_uri"), + meta_last_modified.alias("source_last_modified"), ])? .distinct()? .build() diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index bca69ea..baf4cab 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -43,7 +43,7 @@ use futures::stream::{self, BoxStream}; use futures::{future, Future, FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use std::any::Any; use std::sync::Arc; @@ -137,7 +137,7 @@ impl TableProvider for FileMetadata { /// An [`ExecutionPlan`] that scans object store metadata. pub struct FileMetadataExec { table_schema: SchemaRef, - plan_properties: PlanProperties, + plan_properties: Arc, projection: Option>, filters: Vec>, limit: Option, @@ -170,7 +170,7 @@ impl FileMetadataExec { let exec = Self { table_schema, - plan_properties, + plan_properties: Arc::new(plan_properties), projection, filters, limit, @@ -192,7 +192,7 @@ impl ExecutionPlan for FileMetadataExec { "FileMetadataExec" } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 997d28a..efa2b4b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -69,10 +69,55 @@ use crate::materialized::cast_to_materialized; use super::normal_form::SpjNormalForm; use super::QueryRewriteOptions; +/// Logical rewrite metadata propagated alongside equivalent candidate plans. +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Hash)] +pub struct RewriteContext { + root_table_refs: Vec, +} + +impl RewriteContext { + /// Create a new rewrite context from the root table refs visible during rewrite. + pub fn new(root_table_refs: Vec) -> Self { + Self { root_table_refs } + } + + /// Returns the root table refs that produced this rewrite opportunity. + pub fn root_table_refs(&self) -> &[String] { + &self.root_table_refs + } +} + +/// Inputs provided to a cost function when selecting the best candidate plan. +pub struct CostContext<'a> { + candidate_plans: Box + 'a>, + rewrite_context: &'a RewriteContext, +} + +impl<'a> CostContext<'a> { + /// Create a new cost context. + pub fn new( + candidate_plans: Box + 'a>, + rewrite_context: &'a RewriteContext, + ) -> Self { + Self { + candidate_plans, + rewrite_context, + } + } + + /// Consume the context and return the candidate plans iterator. + pub fn into_candidate_plans(self) -> Box + 'a> { + self.candidate_plans + } + + /// Returns rewrite metadata for the current candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + self.rewrite_context + } +} + /// A cost function. Used to evaluate the best physical plan among multiple equivalent choices. -pub type CostFn = Arc< - dyn for<'a> Fn(Box + 'a>) -> Vec + Send + Sync, ->; +pub type CostFn = Arc Fn(CostContext<'a>) -> Vec + Send + Sync>; /// A logical optimizer that generates candidate logical plans in the form of [`OneOf`] nodes. #[derive(Debug)] @@ -251,9 +296,10 @@ impl TreeNodeRewriter for ViewMatchingRewriter<'_> { } else { Ok(Transformed::new( LogicalPlan::Extension(Extension { - node: Arc::new(OneOf { - branches: Some(node).into_iter().chain(candidates).collect_vec(), - }), + node: Arc::new(OneOf::with_rewrite_context( + Some(node).into_iter().chain(candidates).collect_vec(), + RewriteContext::new(vec![table_reference.to_string()]), + )), }), true, TreeNodeRecursion::Jump, @@ -306,14 +352,21 @@ impl ExtensionPlanner for ViewExploitationPlanner { physical_inputs: &[Arc], _session_state: &SessionState, ) -> Result>> { - if node.as_any().downcast_ref::().is_none() { + let Some(one_of) = node.as_any().downcast_ref::() else { return Ok(None); - } + }; + // Different table providers may expose equivalent fields with different + // nullability. Names and data types still have to match. if logical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != logical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability( + schema.as_arrow(), + logical_inputs[0].schema().as_arrow(), + ) + }) { return Err(DataFusionError::Plan( "candidate logical plans should have the same schema".to_string(), @@ -323,7 +376,9 @@ impl ExtensionPlanner for ViewExploitationPlanner { if physical_inputs .iter() .map(|plan| plan.schema()) - .any(|schema| schema != physical_inputs[0].schema()) + .any(|schema| { + !schemas_equal_ignoring_nullability(&schema, &physical_inputs[0].schema()) + }) { return Err(DataFusionError::Plan( "candidate physical plans should have the same schema".to_string(), @@ -334,6 +389,7 @@ impl ExtensionPlanner for ViewExploitationPlanner { physical_inputs.to_vec(), None, Arc::clone(&self.cost), + one_of.rewrite_context().clone(), )?))) } } @@ -343,6 +399,30 @@ impl ExtensionPlanner for ViewExploitationPlanner { #[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Hash)] pub struct OneOf { branches: Vec, + rewrite_context: RewriteContext, +} + +impl OneOf { + /// Create a new OneOf node with the given branches. + pub fn new(branches: Vec) -> Self { + Self::with_rewrite_context(branches, RewriteContext::default()) + } + + /// Create a new OneOf node with the given branches and rewrite context. + pub fn with_rewrite_context( + branches: Vec, + rewrite_context: RewriteContext, + ) -> Self { + Self { + branches, + rewrite_context, + } + } + + /// Returns logical rewrite metadata for this candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + &self.rewrite_context + } } impl UserDefinedLogicalNodeCore for OneOf { @@ -378,7 +458,10 @@ impl UserDefinedLogicalNodeCore for OneOf { _exprs: Vec, inputs: Vec, ) -> Result { - Ok(Self { branches: inputs }) + Ok(Self { + branches: inputs, + rewrite_context: self.rewrite_context.clone(), + }) } } @@ -394,6 +477,7 @@ pub struct OneOfExec { best: usize, // Cost function to use in optimization cost: CostFn, + rewrite_context: RewriteContext, } impl std::fmt::Debug for OneOfExec { @@ -402,6 +486,7 @@ impl std::fmt::Debug for OneOfExec { .field("candidates", &self.candidates) .field("required_input_ordering", &self.required_input_ordering) .field("best", &self.best) + .field("rewrite_context", &self.rewrite_context) .finish_non_exhaustive() } } @@ -412,6 +497,7 @@ impl OneOfExec { candidates: Vec>, required_input_ordering: Option, cost: CostFn, + rewrite_context: RewriteContext, ) -> Result { if candidates.is_empty() { return Err(DataFusionError::Plan( @@ -419,16 +505,20 @@ impl OneOfExec { )); } - let best = cost(Box::new(candidates.iter().map(|c| c.as_ref()))) - .iter() - .position_min_by_key(|&cost| OrderedFloat(*cost)) - .unwrap(); + let best = cost(CostContext::new( + Box::new(candidates.iter().map(|c| c.as_ref())), + &rewrite_context, + )) + .iter() + .position_min_by_key(|&cost| OrderedFloat(*cost)) + .unwrap(); Ok(Self { candidates, required_input_ordering, best, cost, + rewrite_context, }) } @@ -438,6 +528,11 @@ impl OneOfExec { Arc::clone(&self.candidates[self.best]) } + /// Returns rewrite metadata for this candidate set. + pub fn rewrite_context(&self) -> &RewriteContext { + &self.rewrite_context + } + /// Modify this plan's required input ordering. /// Used for sort pushdown pub fn with_required_input_ordering(self, requirement: Option) -> Self { @@ -457,7 +552,7 @@ impl ExecutionPlan for OneOfExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.candidates[self.best].properties() } @@ -489,6 +584,7 @@ impl ExecutionPlan for OneOfExec { children, self.required_input_ordering.clone(), Arc::clone(&self.cost), + self.rewrite_context.clone(), )?)) } @@ -504,10 +600,6 @@ impl ExecutionPlan for OneOfExec { self.candidates[self.best].execute(partition, context) } - fn statistics(&self) -> Result { - self.candidates[self.best].partition_statistics(None) - } - fn partition_statistics( &self, partition: Option, @@ -518,7 +610,10 @@ impl ExecutionPlan for OneOfExec { impl DisplayAs for OneOfExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let costs = (self.cost)(Box::new(self.children().iter().map(|arc| arc.as_ref()))); + let costs = (self.cost)(CostContext::new( + Box::new(self.children().iter().map(|arc| arc.as_ref())), + &self.rewrite_context, + )); match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( @@ -580,3 +675,141 @@ impl PhysicalOptimizerRule for PruneCandidates { true } } + +/// Compare two Arrow schemas ignoring field nullability. +fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { + a.fields().len() == b.fields().len() + && a.fields() + .iter() + .zip(b.fields().iter()) + .all(|(f1, f2)| f1.name() == f2.name() && f1.data_type() == f2.data_type()) +} + +#[cfg(test)] +mod tests_nullability { + use super::*; + use arrow_schema::{DataType, Field, Schema}; + + #[test] + fn schemas_equal_when_only_nullability_differs() { + let a = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, false), + Field::new("date", DataType::Utf8, false), + Field::new("price", DataType::Float64, false), + ]); + let b = Schema::new(vec![ + Field::new("ticker", DataType::Utf8, true), + Field::new("date", DataType::Utf8, true), + Field::new("price", DataType::Float64, true), + ]); + assert!(schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_types_differ() { + let a = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + let b = Schema::new(vec![Field::new("x", DataType::Int64, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_names_differ() { + let a = Schema::new(vec![Field::new("ticker", DataType::Utf8, true)]); + let b = Schema::new(vec![Field::new("symbol", DataType::Utf8, true)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } + + #[test] + fn schemas_not_equal_when_field_count_differs() { + let a = Schema::new(vec![ + Field::new("x", DataType::Int32, false), + Field::new("y", DataType::Int32, false), + ]); + let b = Schema::new(vec![Field::new("x", DataType::Int32, false)]); + assert!(!schemas_equal_ignoring_nullability(&a, &b)); + } +} + +#[cfg(test)] +mod tests_rewrite_context { + use super::*; + use arrow_schema::Schema; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion_expr::LogicalPlanBuilder; + use std::sync::Mutex; + + #[test] + fn one_of_preserves_rewrite_context_when_rebuilt() { + let plan = LogicalPlanBuilder::empty(false) + .build() + .expect("empty plan"); + let one_of = OneOf::with_rewrite_context( + vec![plan.clone()], + RewriteContext::new(vec!["catalog.schema.root_table".to_string()]), + ); + + let rebuilt = + UserDefinedLogicalNodeCore::with_exprs_and_inputs(&one_of, vec![], vec![plan]) + .expect("rebuild one_of"); + + assert_eq!( + rebuilt.rewrite_context().root_table_refs(), + ["catalog.schema.root_table".to_string()] + ); + } + + #[test] + fn one_of_exec_passes_rewrite_context_to_cost_function() { + let seen = Arc::new(Mutex::new(Vec::::new())); + let seen_clone = Arc::clone(&seen); + let cost: CostFn = Arc::new(move |ctx| { + *seen_clone.lock().expect("lock seen") = + ctx.rewrite_context().root_table_refs().to_vec(); + ctx.into_candidate_plans().map(|_| 1.0).collect() + }); + let context = RewriteContext::new(vec!["catalog.schema.root_table".to_string()]); + let schema = Arc::new(Schema::empty()); + let candidates = vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(schema)) as Arc, + ]; + + let exec = + OneOfExec::try_new(candidates, None, cost, context.clone()).expect("one_of exec"); + + assert_eq!(exec.rewrite_context(), &context); + assert_eq!(*seen.lock().expect("lock seen"), context.root_table_refs()); + } + + #[test] + fn one_of_exec_with_new_children_preserves_rewrite_context() { + let cost: CostFn = Arc::new(|ctx| ctx.into_candidate_plans().map(|_| 1.0).collect()); + let context = RewriteContext::new(vec!["catalog.schema.root_table".to_string()]); + let schema = Arc::new(Schema::empty()); + let exec = Arc::new( + OneOfExec::try_new( + vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + ], + None, + cost, + context.clone(), + ) + .expect("one_of exec"), + ); + + let rebuilt = exec + .with_new_children(vec![ + Arc::new(EmptyExec::new(Arc::clone(&schema))) as Arc, + Arc::new(EmptyExec::new(schema)) as Arc, + ]) + .expect("rebuild exec"); + let rebuilt = rebuilt + .as_any() + .downcast_ref::() + .expect("expected OneOfExec"); + + assert_eq!(rebuilt.rewrite_context(), &context); + } +}