diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 177fcb8..43ec6b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,8 +35,6 @@ jobs: steps: - uses: actions/checkout@v3 - uses: crate-ci/typos@v1.13.10 - with: - config: .typos.toml check: name: Check diff --git a/.typos.toml b/.typos.toml deleted file mode 100644 index 0c8feb8..0000000 --- a/.typos.toml +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# typos configuration file -# Place this file in the project root (same level as Cargo.toml) -[files] -extend-exclude = ["Cargo.toml", "**/Cargo.toml"] diff --git a/CHANGELOG.md b/CHANGELOG.md index b4c0114..ef55a1d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,47 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [0.2.0](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.1...v0.2.0) - 2025-10-24 + +### Added +- `Decorator` trait ([#26](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/26)) (by @suremarc) - #26 + +### Other +- remove useless lines in changelog ([#97](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/97)) (by @xudong963) - #97 +- Improve the doc ([#95](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/95)) (by @xudong963) - #95 +- Support limit pushdown for OneOfExec ([#94](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/94)) (by @xudong963) - #94 +- Improved documentation on IVM algorithm ([#90](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/90)) (by @suremarc) - #90 +- Support static partition columns for MV ([#89](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/89)) (by @suremarc) - #89 +- upgrade to DF50 ([#87](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/87)) (by @xudong963) - #87 +- Fix empty unnest columns handling when pushdown_projection_inexact ([#88](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/88)) (by @zhuqi-lucas) - #88 +- make cost fn accept candidates ([#83](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/83)) (by @xudong963) - #83 +- Upgrade DF to 49.0.2 ([#86](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/86)) (by @zhuqi-lucas) - #86 +- Upgrade to DF49 ([#75](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/75)) (by @xudong963) - #75 +- Upgrade DataFusion 48.0.0 ([#61](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/61)) (by @xudong963) - #61 +- Allow customization of `list_all_files` function. ([#69](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/69)) (by @jared-m-combs) - #69 +- Allow for 'special' partitions that are omitted in the staleness check. ([#68](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/68)) (by @jared-m-combs) - #68 +- don't panic if eq class is not found ([#60](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/60)) (by @suremarc) - #60 +- Handle table scan filters that reference dropped columns ([#59](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/59)) (by @suremarc) - #59 +- exclude some materialized views from query rewriting ([#57](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/57)) (by @suremarc) - #57 +- Optimize performance bottleneck if projection is large ([#56](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/56)) (by @xudong963) - #56 +- Upgrade df47 ([#55](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/55)) (by @xudong963) - #55 +- Update itertools requirement from 0.13 to 0.14 ([#32](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/32)) (by @dependabot[bot]) - #32 +- Update ordered-float requirement from 4.6.0 to 5.0.0 ([#49](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/49)) (by @dependabot[bot]) - #49 +- Upgrade DF46 ([#48](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/48)) (by @xudong963) - #48 +- Update extension ([#45](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/45)) (by @matthewmturner) - #45 +- make explain output stable ([#44](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/44)) (by @suremarc) - #44 +- Add alternate analysis for MVs with no partition columns ([#39](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/39)) (by @suremarc) - #39 +- upgrade to datafusion 45 ([#38](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/38)) (by @suremarc) - #38 +- use nanosecond timestamps in file metadata ([#28](https://github.com/datafusion-contrib/datafusion-materialized-views/pull/28)) (by @suremarc) - #28 + +### Contributors + +* @xudong963 +* @suremarc +* @zhuqi-lucas +* @jared-m-combs +* @dependabot[bot] +* @matthewmturner ## [0.1.1](https://github.com/datafusion-contrib/datafusion-materialized-views/compare/v0.1.0...v0.1.1) - 2025-01-07 diff --git a/Cargo.toml b/Cargo.toml index 1903327..2323c3c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "datafusion-materialized-views" -version = "0.1.1" +version = "0.2.0" edition = "2021" homepage = "https://github.com/datafusion-contrib/datafusion-materialized-views" repository = "https://github.com/datafusion-contrib/datafusion-materialized-views" @@ -29,23 +29,23 @@ rust-version = "1.88.0" [dependencies] aquamarine = "0.6.0" -arrow = "57.0.0" -arrow-schema = "57.0.0" +arrow = "58.0.0" +arrow-schema = "58.0.0" async-trait = "0.1.89" dashmap = "6" -datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } -datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "05a6c45" } +datafusion = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-common = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-functions = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-functions-aggregate = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-optimizer = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-physical-expr = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-physical-plan = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } +datafusion-sql = { git = "https://github.com/massive-com/arrow-datafusion", rev = "6195a0cb0beaf638ae48de1eef6a9e65a9443cdf" } futures = "0.3" itertools = "0.14" log = "0.4" -object_store = "0.12" +object_store = "0.13.1" ordered-float = "5.0.0" [dev-dependencies] diff --git a/rust-toolchain.toml b/rust-toolchain.toml deleted file mode 100644 index 8cd75dd..0000000 --- a/rust-toolchain.toml +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[toolchain] -channel = "1.91.0" -components = ["rust-analyzer", "rustfmt", "clippy"] \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 8b26d85..fd6dd8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,37 @@ #![deny(missing_docs)] -//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion. +//! # datafusion-materialized-views +//! +//! `datafusion-materialized-views` provides robust algorithms and core functionality for working with materialized views in [DataFusion](https://arrow.apache.org/datafusion/). +//! +//! ## Key Features +//! +//! - **Incremental View Maintenance**: Efficiently tracks dependencies between Hive-partitioned tables and their materialized views, allowing users to determine which partitions need to be refreshed when source data changes. This is achieved via UDTFs such as `mv_dependencies` and `stale_files`. +//! - **Query Rewriting**: Implements a view matching optimizer that rewrites queries to automatically leverage materialized views when beneficial, based on the techniques described in the [paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf). +//! - **Pluggable Metadata Sources**: Supports custom metadata sources for incremental view maintenance, with default support for object store metadata via the `FileMetadata` and `RowMetadataRegistry` components. +//! - **Extensible Table Abstractions**: Defines traits such as `ListingTableLike` and `Materialized` to abstract over Hive-partitioned tables and materialized views, enabling custom implementations and easy registration for use in the maintenance and rewriting logic. +//! +//! ## Typical Workflow +//! +//! 1. **Define and Register Views**: Implement a custom table type that implements the `Materialized` trait, and register it using `register_materialized`. +//! 2. **Metadata Initialization**: Set up `FileMetadata` and `RowMetadataRegistry` to track file-level and row-level metadata. +//! 3. **Dependency Tracking**: Use the `mv_dependencies` UDTF to generate build graphs for materialized views, and `stale_files` to identify partitions that require recomputation. +//! 4. **Query Optimization**: Enable the query rewriting optimizer to transparently rewrite queries to use materialized views where possible. +//! +//! ## Example +//! +//! See the README and integration tests for a full walkthrough of setting up and maintaining a materialized view, including dependency tracking and query rewriting. +//! +//! ## Limitations +//! +//! - Currently supports only Hive-partitioned tables in object storage, with the smallest update unit being a file. +//! - Future work may generalize to other storage backends and partitioning schemes. +//! +//! ## References +//! +//! - [Optimizing Queries Using Materialized Views: A Practical, Scalable Solution](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) +//! - [DataFusion documentation](https://datafusion.apache.org/) /// Code for incremental view maintenance against Hive-partitioned tables. /// @@ -42,6 +72,9 @@ pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. +/// +/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), +/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*. pub mod rewrite; /// Configuration options for materialized view related features. diff --git a/src/materialized.rs b/src/materialized.rs index 8e093c0..c3cf30e 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -/// Track dependencies of materialized data in object storage pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 07f7f7e..f188c0d 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -15,17 +15,40 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait. +Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed +into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is +implied by the type bound `Materialized: ListingTableLike`. + +The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to +partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions +of the materialized view and its source tables. This gives rise to two natural phases of the algorithm: +1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources. + This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order, + or even set equality of the original query. + * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. + * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies. +2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources. + After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource) + This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row. + The output rows are then transformed into object storage paths to generate the final graph. + +The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`]. + */ + use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, datasource::{provider_as_source, TableProvider, ViewTable}, - prelude::{flatten, get_field, make_array}, + prelude::{flatten, make_array}, }; use datafusion_common::{ alias::AliasGenerator, internal_err, tree_node::{Transformed, TreeNode}, - DFSchema, DataFusionError, Result, ScalarValue, + Column as DFColumn, DFSchema, DataFusionError, Result, ScalarValue, }; use datafusion_expr::{ col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, @@ -39,7 +62,8 @@ use crate::materialized::META_COLUMN; use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized}; -/// A table function that shows build targets and dependencies for a materialized view: +/// A table function that, for a given materialized view, lists all the output data objects (build targets) +/// generated during its construction or refresh, as well as all the source data objects (dependencies) it relies on. /// /// ```ignore /// fn mv_dependencies(table_ref: Utf8) -> Table @@ -252,8 +276,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> { } } +#[cfg_attr(doc, aquamarine::aquamarine)] /// Returns a logical plan that, when executed, lists expected build targets /// for this materialized view, together with the dependencies for each target. +/// +/// See the [module documentation](super) for an overview of the algorithm. +/// +/// # Example +/// +/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily +/// close prices of a stock from its trades, together with the settlement price from a daily statistics table: +/// +/// ```sql +/// SELECT +/// ticker, +/// LAST_VALUE(trades.price) AS close, +/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, +/// trades.date AS date +/// FROM trades +/// JOIN daily_statistics ON +/// trades.ticker = daily_statistics.ticker AND +/// trades.date = daily_statistics.reference_date AND +/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS +/// GROUP BY ticker, date +/// ``` +/// +/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`. +/// This query gives us the following logical plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection:
ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, trades.date AS date"] +/// A --> B["Aggregate:
expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)]
groupby=[ticker, trades.date]"] +/// B --> C["Inner Join:
trades.ticker = daily_statistics.ticker AND
trades.date = daily_statistics.reference_date AND
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades
projection=[ticker, price, date]"] +/// C --> E["TableScan: daily_statistics
projection=[ticker, settlement_price, reference_date, date]"] +/// ``` +/// +/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: trades.date AS date"] +/// A --> B["Projection: trades.date"] +/// B --> C["Inner Join:
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades (projection=[date])"] +/// C --> E["TableScan: daily_statistics (projection=[date])"] +/// ``` +/// +/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that +/// we cannot partition the materialized view on aggregate expressions. +/// +/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan, +/// together with the target path constructed from the (static) partition columns. This gives us the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"] +/// A --> B["Projection: __meta, trades.date AS date"] +/// B --> C["Projection:
concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"] +/// C --> D["Inner Join:
daily_statistics_meta.date BETWEEN trades_meta.date AND trades_meta.date + INTERVAL 2 WEEKS"] +/// D --> E["TableScan: trades_meta (projection=[__meta, date])"] +/// D --> F["TableScan: daily_statistics_meta (projection=[__meta, date])"] +/// ``` +/// +/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column +/// unnested into its components. The final output looks roughly like this: +/// +/// ```text +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// ``` pub fn mv_dependencies_plan( materialized_view: &dyn Materialized, row_metadata_registry: &RowMetadataRegistry, @@ -298,17 +400,25 @@ pub fn mv_dependencies_plan( .into_iter() .find(|c| c.name.starts_with(META_COLUMN)) .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?; - let files_col = Expr::Column(files.clone()); + let meta_table_catalog = + Expr::Column(DFColumn::from_name(format!("{}.table_catalog", files.name))); + let meta_table_schema = + Expr::Column(DFColumn::from_name(format!("{}.table_schema", files.name))); + let meta_table_name = Expr::Column(DFColumn::from_name(format!("{}.table_name", files.name))); + let meta_source_uri = Expr::Column(DFColumn::from_name(format!("{}.source_uri", files.name))); + let meta_last_modified = + Expr::Column(DFColumn::from_name(format!("{}.last_modified", files.name))); LogicalPlanBuilder::from(pruned_plan_with_source_files) + .unnest_column(files.clone())? .unnest_column(files)? .project(vec![ construct_target_path_from_static_partition_columns(materialized_view).alias("target"), - get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"), - get_field(files_col.clone(), "table_schema").alias("source_table_schema"), - get_field(files_col.clone(), "table_name").alias("source_table_name"), - get_field(files_col.clone(), "source_uri").alias("source_uri"), - get_field(files_col.clone(), "last_modified").alias("source_last_modified"), + meta_table_catalog.alias("source_table_catalog"), + meta_table_schema.alias("source_table_schema"), + meta_table_name.alias("source_table_name"), + meta_source_uri.alias("source_uri"), + meta_last_modified.alias("source_last_modified"), ])? .distinct()? .build() @@ -537,6 +647,13 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet) -> R _ => unreachable!(), }; + if new_projection.is_empty() { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: true, + schema: Arc::new(DFSchema::empty()), + })); + } + TableScan::try_new( scan.table_name, scan.source, @@ -751,7 +868,7 @@ fn project_dfschema(schema: &DFSchema, indices: &HashSet) -> Result Result<()> { + let context = setup().await?; + + // Test case: Cross join where only columns from left table (t1) are selected + // The cross join with t3 affects cardinality but we don't select any t3 columns + // Expected: Only files from t1 should be in dependencies, not from t3 + // BUG: Currently t3 files are incorrectly included in dependencies + let query = "SELECT t1.column1, t1.column2 FROM t1 CROSS JOIN t3"; + + let plan = context.sql(query).await?.into_optimized_plan()?; + + println!("Original plan:\n{}", plan.display_indent()); + + // We're partitioning on column1 which only comes from t1 + let partition_col_indices: HashSet = [0].into_iter().collect(); // column1 is at index 0 + + let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; + println!("After pushdown:\n{}", analyzed.display_indent()); + + // Register materialized view + context.register_table( + "mv_cross_join", + Arc::new(MockMaterializedView { + table_path: ListingTableUrl::parse("s3://mv_cross_join/").unwrap(), + partition_columns: vec!["column1".to_string()], + static_partition_columns: None, + query: plan, + file_ext: ".parquet", + }), + )?; + + // Add file metadata for the MV + context.sql( + "INSERT INTO file_metadata VALUES + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2022/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'mv_cross_join', 's3://mv_cross_join/column1=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)" + ).await?.collect().await?; + + // Get dependencies + let df = context + .sql("SELECT * FROM mv_dependencies('mv_cross_join', 'v2')") + .await?; + let batches = df.collect().await?; + + // Print the actual dependencies for debugging + println!("Actual dependencies:"); + println!("{}", pretty_format_batches(&batches)?); + + // Expected: Only t1 files should be in dependencies, NOT t3 files + // This test currently FAILS because t3 files are incorrectly included + let expected = [ + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://mv_cross_join/column1=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://mv_cross_join/column1=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://mv_cross_join/column1=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+----------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ]; + + assert_batches_sorted_eq!(expected, &batches); + + Ok(()) + } + #[test] fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> { // This test simulates a simplified MV scenario: diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 85e5838..baf4cab 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -43,7 +43,7 @@ use futures::stream::{self, BoxStream}; use futures::{future, Future, FutureExt, StreamExt, TryStreamExt}; use itertools::Itertools; use log::debug; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{ObjectMeta, ObjectStore, ObjectStoreExt}; use std::any::Any; use std::sync::Arc; @@ -137,7 +137,7 @@ impl TableProvider for FileMetadata { /// An [`ExecutionPlan`] that scans object store metadata. pub struct FileMetadataExec { table_schema: SchemaRef, - plan_properties: PlanProperties, + plan_properties: Arc, projection: Option>, filters: Vec>, limit: Option, @@ -170,7 +170,7 @@ impl FileMetadataExec { let exec = Self { table_schema, - plan_properties, + plan_properties: Arc::new(plan_properties), projection, filters, limit, @@ -192,7 +192,7 @@ impl ExecutionPlan for FileMetadataExec { "FileMetadataExec" } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { &self.plan_properties } @@ -722,7 +722,7 @@ impl FileMetadataBuilder { } } -/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider. #[async_trait] pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { /// List all files in the store for the given `url` prefix. diff --git a/src/materialized/util.rs b/src/materialized/util.rs index 7977f8d..cb4afad 100644 --- a/src/materialized/util.rs +++ b/src/materialized/util.rs @@ -21,6 +21,7 @@ use datafusion::catalog::{CatalogProviderList, TableProvider}; use datafusion_common::{DataFusionError, Result}; use datafusion_sql::ResolvedTableReference; +/// Retrieves a table from the catalog list given a resolved table reference. pub fn get_table( catalog_list: &dyn CatalogProviderList, table_ref: &ResolvedTableReference, @@ -35,6 +36,7 @@ pub fn get_table( // NOTE: this is bad, we are calling async code in a sync context. // We should file an issue about async in UDTFs. + // See: https://github.com/apache/datafusion/issues/17663 futures::executor::block_on(schema.table(table_ref.table.as_ref())) .map_err(|e| e.context(format!("couldn't get table '{}'", table_ref.table)))? .ok_or_else(|| DataFusionError::Plan(format!("no such table {}", table_ref.schema))) diff --git a/src/rewrite.rs b/src/rewrite.rs index 170c88f..da24cc5 100644 --- a/src/rewrite.rs +++ b/src/rewrite.rs @@ -17,8 +17,6 @@ use datafusion::{common::extensions_options, config::ConfigExtension}; -/// Implements a query rewriting optimizer, also known as "view exploitation" -/// in some academic sources. pub mod exploitation; pub mod normal_form; diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 7310d93..efa2b4b 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -15,6 +15,34 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a query rewriting optimizer, also known as "view exploitation" +in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code, +which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views. + +The query rewriting process spans both the logical and physical planning phases and can be described as follows: + +1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views + and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms. + If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression + and one or more candidate rewrites using materialized views. +2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`] + physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node. +3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions + to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost + estimations in the next phase. +4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node. + The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node + with its selected best candidate plan. + +In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention +that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization. +While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner +to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes. + +*/ + use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; @@ -136,6 +164,43 @@ impl ViewMatcher { pub fn mv_plans(&self) -> &HashMap, SpjNormalForm)> { &self.mv_plans } + + /// Returns materialized views that potentially reference the given table. + /// + /// This is a preliminary filter - it only checks if the MV references the table + /// but does NOT guarantee that the MV can actually be used to rewrite queries + /// involving that table. + /// + /// # Arguments + /// + /// * `table_reference` - The table reference to find candidates for + /// + /// # Returns + /// + /// A vector of tuples containing: + /// - The materialized view's table reference + /// - The materialized view's table provider + /// - The materialized view's SPJ normal form + pub fn get_potential_mv_candidates_for_table( + &self, + table_reference: &TableReference, + ) -> Vec<(TableReference, Arc, &SpjNormalForm)> { + self.mv_plans + .iter() + .filter_map(|(mv_table_ref, (mv_provider, mv_normal_form))| { + // Check if this MV references the target table + if mv_normal_form.referenced_tables().contains(table_reference) { + Some(( + mv_table_ref.clone(), + Arc::clone(mv_provider), + mv_normal_form, + )) + } else { + None + } + }) + .collect() + } } impl OptimizerRule for ViewMatcher { @@ -291,12 +356,8 @@ impl ExtensionPlanner for ViewExploitationPlanner { return Ok(None); }; - // Compare schemas ignoring nullability differences. - // Different table types (FileScanTable, LiveTable, MV) may expose - // different nullability for the same column. For example, a partition - // column in one table is non-nullable, but the same column is a file - // column in a rollup MV (forced nullable for DF 52 RecordBatch - // validation compatibility). Field names and data types must match. + // Different table providers may expose equivalent fields with different + // nullability. Names and data types still have to match. if logical_inputs .iter() .map(|plan| plan.schema()) @@ -491,7 +552,7 @@ impl ExecutionPlan for OneOfExec { self } - fn properties(&self) -> &PlanProperties { + fn properties(&self) -> &Arc { self.candidates[self.best].properties() } @@ -527,6 +588,10 @@ impl ExecutionPlan for OneOfExec { )?)) } + fn supports_limit_pushdown(&self) -> bool { + true + } + fn execute( &self, partition: usize, @@ -535,20 +600,12 @@ impl ExecutionPlan for OneOfExec { self.candidates[self.best].execute(partition, context) } - fn statistics(&self) -> Result { - self.candidates[self.best].partition_statistics(None) - } - fn partition_statistics( &self, partition: Option, ) -> Result { self.candidates[self.best].partition_statistics(partition) } - - fn supports_limit_pushdown(&self) -> bool { - true - } } impl DisplayAs for OneOfExec { @@ -620,12 +677,6 @@ impl PhysicalOptimizerRule for PruneCandidates { } /// Compare two Arrow schemas ignoring field nullability. -/// -/// Returns true if field count, field names, and data types all match. -/// Nullability differences are ignored because different table types -/// (e.g. FileScanTable with forced-nullable file columns vs a table -/// where the same column is a non-nullable partition column) can -/// legitimately differ in nullability while being semantically equivalent. fn schemas_equal_ignoring_nullability(a: &arrow_schema::Schema, b: &arrow_schema::Schema) -> bool { a.fields().len() == b.fields().len() && a.fields() diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index f0c946f..b5f2343 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -17,7 +17,7 @@ /*! -This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf), +This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query. The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows: @@ -856,7 +856,11 @@ impl Predicate { /// Rewrite all expressions in terms of their normal representatives /// with respect to this predicate's equivalence classes. fn normalize_expr(&self, e: Expr) -> Expr { - // Fast path: if it's a simple Column, avoid full transform traversal + // Fast path: if it's a simple Column, avoid full transform traversal. + // Even though transform() handles Column efficiently, the machinery setup + // (closures, iterators, Transformed wrappers) has overhead that adds up + // when called thousands of times (e.g., 41 columns × 5-7 MVs × every query). + // Direct HashMap lookup + clone is significantly faster. if let Expr::Column(ref c) = e { return Expr::Column(self.normalize_column(c)); } @@ -1390,6 +1394,77 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_normalize_column_fast_path() -> Result<()> { + let ctx = SessionContext::new(); + + ctx.sql("CREATE TABLE t (a INT, b INT, c INT)") + .await? + .collect() + .await?; + + // Query with column equivalence: a = b + let plan = ctx + .sql("SELECT a, b, c FROM t WHERE a = b") + .await? + .into_optimized_plan()?; + + let normal_form = SpjNormalForm::new(&plan)?; + + // Verify that columns are normalized correctly + // a and b should be in the same equivalence class + assert_eq!(normal_form.output_exprs().len(), 3); + + Ok(()) + } + + #[tokio::test] + async fn test_rewrite_from_with_many_columns() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a wide table to test the columns() caching optimization + ctx.sql( + "CREATE TABLE wide_table ( + c0 INT, c1 INT, c2 INT, c3 INT, c4 INT, + c5 INT, c6 INT, c7 INT, c8 INT, c9 INT + )", + ) + .await? + .collect() + .await?; + + let base_plan = ctx + .sql("SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .into_optimized_plan()?; + + let query_plan = ctx + .sql("SELECT c0, c1, c2 FROM wide_table WHERE c0 >= 10") + .await? + .into_optimized_plan()?; + + let base_nf = SpjNormalForm::new(&base_plan)?; + let query_nf = SpjNormalForm::new(&query_plan)?; + + // Create MV table + ctx.sql("CREATE TABLE mv AS SELECT * FROM wide_table WHERE c0 >= 0") + .await? + .collect() + .await?; + + let table_ref = TableReference::bare("mv"); + let provider = ctx.table_provider(table_ref.clone()).await?; + + // Test that rewrite_from works correctly with cached columns + let result = query_nf.rewrite_from(&base_nf, table_ref, provider_as_source(provider))?; + + assert!(result.is_some()); + let rewritten = result.unwrap(); + assert_eq!(rewritten.schema().fields().len(), 3); + + Ok(()) + } + #[tokio::test] async fn test_boolean_expression_normalization() -> Result<()> { let _ = env_logger::builder().is_test(true).try_init();