diff --git a/.github/actions/setup-rust-runtime/action.yaml b/.github/actions/setup-rust-runtime/action.yaml index cd18be989031..b6fb2c898bf2 100644 --- a/.github/actions/setup-rust-runtime/action.yaml +++ b/.github/actions/setup-rust-runtime/action.yaml @@ -20,8 +20,10 @@ description: 'Setup Rust Runtime Environment' runs: using: "composite" steps: - - name: Run sccache-cache - uses: mozilla-actions/sccache-action@v0.0.4 + # https://github.com/apache/datafusion/issues/15535 + # disabled because neither version nor git hash works with apache github policy + #- name: Run sccache-cache + # uses: mozilla-actions/sccache-action@65101d47ea8028ed0c98a1cdea8dd9182e9b5133 # v0.0.8 - name: Configure runtime env shell: bash # do not produce debug symbols to keep memory usage down @@ -30,9 +32,11 @@ runs: # # Set debuginfo=line-tables-only as debuginfo=0 causes immensely slow build # See for more details: https://github.com/rust-lang/rust/issues/119560 + # + # readd the following to the run below once sccache-cache is re-enabled + # echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV + # echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV run: | - echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV - echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV echo "RUST_BACKTRACE=1" >> $GITHUB_ENV echo "RUSTFLAGS=-C debuginfo=line-tables-only -C incremental=false" >> $GITHUB_ENV diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0be709a4a785..d102cb8f8d51 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -290,7 +290,7 @@ jobs: run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh - name: Build with wasm-pack working-directory: ./datafusion/wasmtest - run: wasm-pack build --dev + run: RUSTFLAGS='--cfg getrandom_backend="wasm_js"' wasm-pack build --dev # verify that the benchmark queries return the correct results verify-benchmark-results: diff --git a/Cargo.toml b/Cargo.toml index 55855d09d50e..bedda78a30ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ homepage = "https://datafusion.apache.org" license = "Apache-2.0" readme = "README.md" repository = "https://github.com/apache/datafusion" -rust-version = "1.81.0" +rust-version = "1.82.0" version = "45.0.0" [workspace.dependencies] diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index a5cf71426607..01a470d94774 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -4276,6 +4276,7 @@ checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" dependencies = [ "getrandom 0.2.15", "serde", + "wasm-bindgen", ] [[package]] diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 32b7213d952f..3a3ed07ac9ed 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -305,7 +305,7 @@ config_namespace! { /// /// This is used to workaround bugs in the planner that are now caught by /// the new schema verification step. - pub skip_physical_aggregate_schema_check: bool, default = false + pub skip_physical_aggregate_schema_check: bool, default = true /// Specifies the reserved memory for each spillable sort operation to /// facilitate an in-memory merge. diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs index f7c247aaf288..66330f91cbd8 100644 --- a/datafusion/common/src/error.rs +++ b/datafusion/common/src/error.rs @@ -57,7 +57,7 @@ pub enum DataFusionError { ParquetError(ParquetError), /// Error when reading Avro data. #[cfg(feature = "avro")] - AvroError(AvroError), + AvroError(Box), /// Error when reading / writing to / from an object_store (e.g. S3 or LocalFile) #[cfg(feature = "object_store")] ObjectStore(object_store::Error), @@ -267,7 +267,7 @@ impl From for DataFusionError { #[cfg(feature = "avro")] impl From for DataFusionError { fn from(e: AvroError) -> Self { - DataFusionError::AvroError(e) + DataFusionError::AvroError(Box::new(e)) } } diff --git a/datafusion/common/src/table_reference.rs b/datafusion/common/src/table_reference.rs index bb53a30dcb23..9b6f9696c00b 100644 --- a/datafusion/common/src/table_reference.rs +++ b/datafusion/common/src/table_reference.rs @@ -193,8 +193,7 @@ impl TableReference { match self { TableReference::Bare { table } => **table == *other.table(), TableReference::Partial { schema, table } => { - **table == *other.table() - && other.schema().map_or(true, |s| *s == **schema) + **table == *other.table() && other.schema().is_none_or(|s| *s == **schema) } TableReference::Full { catalog, @@ -202,8 +201,8 @@ impl TableReference { table, } => { **table == *other.table() - && other.schema().map_or(true, |s| *s == **schema) - && other.catalog().map_or(true, |c| *c == **catalog) + && other.schema().is_none_or(|s| *s == **schema) + && other.catalog().is_none_or(|c| *c == **catalog) } } } diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index c9b059ad0f40..ab43b3a27a26 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -131,7 +131,7 @@ tempfile = { workspace = true } tokio = { workspace = true } tokio-util = { version = "0.7.4", features = ["io"], optional = true } url = { workspace = true } -uuid = { version = "1.7", features = ["v4"] } +uuid = { version = "1.7", features = ["v4", "js"] } xz2 = { version = "0.1", optional = true, features = ["static"] } zstd = { version = "0.13", optional = true, default-features = false } diff --git a/datafusion/core/src/datasource/avro_to_arrow/schema.rs b/datafusion/core/src/datasource/avro_to_arrow/schema.rs index 991f648e58bd..b84fbe063a07 100644 --- a/datafusion/core/src/datasource/avro_to_arrow/schema.rs +++ b/datafusion/core/src/datasource/avro_to_arrow/schema.rs @@ -16,7 +16,7 @@ // under the License. use crate::arrow::datatypes::{DataType, IntervalUnit, Schema, TimeUnit, UnionMode}; -use crate::error::{DataFusionError, Result}; +use crate::error::Result; use apache_avro::schema::{ Alias, DecimalSchema, EnumSchema, FixedSchema, Name, RecordSchema, }; @@ -107,9 +107,7 @@ fn schema_to_field_with_props( .data_type() .clone() } else { - return Err(DataFusionError::AvroError( - apache_avro::Error::GetUnionDuplicate, - )); + return Err(apache_avro::Error::GetUnionDuplicate.into()); } } else { let fields = sub_schemas diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs index f47e2107ade6..e2dfc209e9b6 100644 --- a/datafusion/core/src/datasource/file_format/mod.rs +++ b/datafusion/core/src/datasource/file_format/mod.rs @@ -424,6 +424,7 @@ pub fn transform_schema_to_view(schema: &Schema) -> Schema { Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) } +#[cfg(not(target_arch = "wasm32"))] /// Coerces the file schema if the table schema uses a view type. pub(crate) fn coerce_file_schema_to_view_type( table_schema: &Schema, @@ -486,6 +487,7 @@ pub fn transform_binary_to_string(schema: &Schema) -> Schema { Schema::new_with_metadata(transformed_fields, schema.metadata.clone()) } +#[cfg(not(target_arch = "wasm32"))] /// If the table schema uses a string type, coerce the file schema to use a string type. /// /// See [parquet::ParquetFormat::binary_as_string] for details diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9b9bcd22c464..5e56989331d5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -2179,7 +2179,7 @@ mod tests { // create table let tmp_dir = TempDir::new()?; - let tmp_path = tmp_dir.into_path(); + let tmp_path = tmp_dir.keep(); let str_path = tmp_path.to_str().expect("Temp path should convert to &str"); session_ctx .sql(&format!( diff --git a/datafusion/core/src/datasource/listing/url.rs b/datafusion/core/src/datasource/listing/url.rs index 6fb536ca2f05..c6c4c7cece76 100644 --- a/datafusion/core/src/datasource/listing/url.rs +++ b/datafusion/core/src/datasource/listing/url.rs @@ -325,8 +325,10 @@ impl std::fmt::Display for ListingTableUrl { } } +#[cfg(not(target_arch = "wasm32"))] const GLOB_START_CHARS: [char; 3] = ['?', '*', '[']; +#[cfg(not(target_arch = "wasm32"))] /// Splits `path` at the first path segment containing a glob expression, returning /// `None` if no glob expression found. /// diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 5f98ba4efcf0..9611c6940d62 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -693,6 +693,9 @@ impl DefaultPhysicalPlanner { differences.push(format!("field nullability at index {} [{}]: (physical) {} vs (logical) {}", i, physical_field.name(), physical_field.is_nullable(), logical_field.is_nullable())); } } + + log::warn!("Physical input schema should be the same as the one converted from logical input schema, but did not match for logical plan:\n{}", input.display_indent()); + return internal_err!("Physical input schema should be the same as the one converted from logical input schema. Differences: {}", differences .iter() .map(|s| format!("\n\t- {}", s)) diff --git a/datafusion/core/tests/execution/logical_plan.rs b/datafusion/core/tests/execution/logical_plan.rs index 168bf484e541..e16f7084ce8e 100644 --- a/datafusion/core/tests/execution/logical_plan.rs +++ b/datafusion/core/tests/execution/logical_plan.rs @@ -30,8 +30,8 @@ use std::fmt::Debug; use std::ops::Deref; use std::sync::Arc; -///! Logical plans need to provide stable semantics, as downstream projects -///! create them and depend on them. Test executable semantics of logical plans. +// Logical plans need to provide stable semantics, as downstream projects +// create them and depend on them. Test executable semantics of logical plans. #[tokio::test] async fn count_only_nulls() -> Result<()> { diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 212ffdaaa2a5..c64f052691ec 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -347,7 +347,7 @@ async fn oom_recursive_cte() { #[tokio::test] async fn oom_parquet_sink() { let dir = tempfile::tempdir().unwrap(); - let path = dir.into_path().join("test.parquet"); + let path = dir.keep().join("test.parquet"); let _ = File::create(path.clone()).await.unwrap(); TestCase::new() @@ -371,7 +371,7 @@ async fn oom_parquet_sink() { #[tokio::test] async fn oom_with_tracked_consumer_pool() { let dir = tempfile::tempdir().unwrap(); - let path = dir.into_path().join("test.parquet"); + let path = dir.keep().join("test.parquet"); let _ = File::create(path.clone()).await.unwrap(); TestCase::new() diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 4b5d22bfa71f..ff89a28665f3 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -117,7 +117,7 @@ async fn list_files_with_session_level_cache() { let temp_path1 = tempdir() .unwrap() - .into_path() + .keep() .into_os_string() .into_string() .unwrap(); @@ -125,7 +125,7 @@ async fn list_files_with_session_level_cache() { let temp_path2 = tempdir() .unwrap() - .into_path() + .keep() .into_os_string() .into_string() .unwrap(); diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 856f7dc8e8a9..5c3eacf97f64 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -32,9 +32,12 @@ use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::physical_plan::{CsvExec, FileScanConfig, ParquetExec}; use datafusion_common::error::Result; +use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::ScalarValue; -use datafusion_expr::{JoinType, Operator}; +use datafusion_common::{ColumnStatistics, ScalarValue}; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::{AggregateUDF, JoinType, Operator}; +use datafusion_physical_expr::aggregate::AggregateExprBuilder; use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{ @@ -44,6 +47,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use datafusion_physical_optimizer::enforce_distribution::*; use datafusion_physical_optimizer::enforce_sorting::EnforceSorting; use datafusion_physical_optimizer::output_requirements::OutputRequirements; +use datafusion_physical_optimizer::sanity_checker::check_plan_sanity; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, PhysicalGroupBy, @@ -150,8 +154,8 @@ impl ExecutionPlan for SortRequiredExec { fn execute( &self, _partition: usize, - _context: Arc, - ) -> Result { + _context: Arc, + ) -> Result { unreachable!(); } @@ -164,6 +168,40 @@ fn parquet_exec() -> Arc { parquet_exec_with_sort(vec![]) } +fn int64_stats() -> ColumnStatistics { + ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(1_000_000.into()), + min_value: Precision::Exact(0.into()), + distinct_count: Precision::Absent, + sum_value: Precision::Absent, + } +} + +fn column_stats() -> Vec { + vec![ + int64_stats(), // a + int64_stats(), // b + int64_stats(), // c + ColumnStatistics::default(), + ColumnStatistics::default(), + ] +} + +pub(crate) fn parquet_exec_with_stats() -> Arc { + let mut statistics = Statistics::new_unknown(&schema()); + statistics.num_rows = Precision::Inexact(10); + statistics.column_statistics = column_stats(); + + let config = + FileScanConfig::new(ObjectStoreUrl::parse("test:///").unwrap(), schema()) + .with_file(PartitionedFile::new("x".to_string(), 10000)) + .with_statistics(statistics); + assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); + + ParquetExec::builder(config).build_arc() +} + fn parquet_exec_multiple() -> Arc { parquet_exec_multiple_sorted(vec![]) } @@ -229,7 +267,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc { ) } -fn projection_exec_with_alias( +pub(crate) fn projection_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { @@ -243,6 +281,15 @@ fn projection_exec_with_alias( fn aggregate_exec_with_alias( input: Arc, alias_pairs: Vec<(String, String)>, +) -> Arc { + aggregate_exec_with_aggr_expr_and_alias(input, vec![], alias_pairs) +} + +#[expect(clippy::type_complexity)] +fn aggregate_exec_with_aggr_expr_and_alias( + input: Arc, + aggr_expr: Vec<(Arc, Vec>)>, + alias_pairs: Vec<(String, String)>, ) -> Arc { let schema = schema(); let mut group_by_expr: Vec<(Arc, String)> = vec![]; @@ -263,18 +310,33 @@ fn aggregate_exec_with_alias( .collect::>(); let final_grouping = PhysicalGroupBy::new_single(final_group_by_expr); + let aggr_expr = aggr_expr + .into_iter() + .map(|(udaf, exprs)| { + AggregateExprBuilder::new(udaf.clone(), exprs) + .alias(udaf.name()) + .schema(Arc::clone(&schema)) + .build() + .map(Arc::new) + .unwrap() + }) + .collect::>(); + let filter_exprs = std::iter::repeat(None) + .take(aggr_expr.len()) + .collect::>(); + Arc::new( AggregateExec::try_new( AggregateMode::FinalPartitioned, final_grouping, - vec![], - vec![], + aggr_expr.clone(), + filter_exprs.clone(), Arc::new( AggregateExec::try_new( AggregateMode::Partial, group_by, - vec![], - vec![], + aggr_expr, + filter_exprs, input, schema.clone(), ) @@ -402,6 +464,11 @@ macro_rules! assert_optimized { }; ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => { + // Use a small batch size, to trigger RoundRobin in tests + assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, $PREFER_EXISTING_UNION, 1); + }; + + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr, $BATCH_SIZE: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); let mut config = ConfigOptions::new(); @@ -410,8 +477,12 @@ macro_rules! assert_optimized { config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION; - // Use a small batch size, to trigger RoundRobin in tests - config.execution.batch_size = 1; + config.execution.batch_size = $BATCH_SIZE; + + // This triggers the use of column statisticals estimates in the repartition calculation. + // Without this setting, the testing of `get_repartition_requirement_status` misses + // several branches. + config.execution.use_row_number_estimates_to_optimize_partitioning = true; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -497,6 +568,22 @@ macro_rules! assert_optimized { }; } +macro_rules! assert_optimized_without_forced_roundrobin { + ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { + assert_optimized!( + $EXPECTED_LINES, + $PLAN, + $FIRST_ENFORCE_DIST, + false, + 10, + false, + 1024, + false, + 100 + ); + }; +} + macro_rules! assert_plan_txt { ($EXPECTED_LINES: expr, $PLAN: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); @@ -1959,6 +2046,264 @@ fn repartition_ignores_union() -> Result<()> { Ok(()) } +fn aggregate_over_union(input: Vec>) -> Arc { + let union = union_exec(input); + let plan = + aggregate_exec_with_alias(union, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan +} + +// Aggregate over a union, +// with current testing setup. +// +// It will repartiton twice for an aggregate over a union. +// * repartitions before the partial aggregate. +// * repartitions before the final aggregation. +#[test] +fn repartitions_twice_for_aggregate_after_union() -> Result<()> { + let plan = aggregate_over_union(vec![parquet_exec(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)")); + + // Test: using the `assert_optimized` macro. + // + // Updated plan (post optimization) will have added RepartitionExecs (btwn union and aggregation). + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=10", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized!(expected, plan.clone(), true); + assert_optimized!(expected, plan.clone(), false); + + Ok(()) +} + +// Aggregate over a union, +// but make the test setup more realistic. +// +// It will repartiton once for an aggregate over a union. +// * repartitions btwn partial & final aggregations. +#[test] +fn repartitions_once_for_aggregate_after_union() -> Result<()> { + // use parquet exec with stats + let plan: Arc = + aggregate_over_union(vec![parquet_exec_with_stats(); 2]); + + // We get a distribution error without repartitioning. + let err = check_plan_sanity(plan.clone(), &Default::default()).unwrap_err(); + assert!(err.message().contains("ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]\"] does not satisfy distribution requirements: HashPartitioned[[a1@0]]). Child-0 output partitioning: UnknownPartitioning(2)")); + + // Test: using the `assert_optimized_without_forced_roundrobin` macro. + // This removes the forced round-robin repartitioning, + // by no longer hard-coding batch_size=1. + // + // Updated plan (post optimization) will have added only 1 RepartitionExec. + let expected = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!(expected, plan.clone(), true); + assert_optimized_without_forced_roundrobin!(expected, plan.clone(), false); + + Ok(()) +} + +/// Same as [`aggregate_over_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan +} + +/// Same as [`repartitions_once_for_aggregate_after_union`], but adds a sort btwn +/// the union and the aggregate. This changes the outcome: +/// +/// * we no longer get a distribution error. +/// * but we still get repartitioning? +#[test] +fn repartitions_for_aggregate_after_sorted_union() -> Result<()> { + let plan = aggregate_over_sorted_union(vec![parquet_exec_with_stats(); 2]); + + // With the sort, there is no distribution error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It does not repartition on the first run + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortPreservingMergeExec: [a@0 ASC]", + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_first_run, + plan.clone(), + true + ); + + // But does repartition on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "RepartitionExec: partitioning=Hash([a1@0], 10), input_partitions=2", + "SortExec: expr=[a1@0 ASC NULLS LAST], preserve_partitioning=[true]", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "UnionExec", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_second_run, + plan.clone(), + false + ); + + Ok(()) +} + +/// Same as [`aggregate_over_sorted_union`], but with a sort btwn the union and aggregation. +fn aggregate_over_sorted_union_projection( + input: Vec>, +) -> Arc { + let union = union_exec(input); + let union_projection = projection_exec_with_alias( + union, + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let sort = sort_exec(sort_key, union_projection, false); + let plan = aggregate_exec_with_alias(sort, vec![("a".to_string(), "a1".to_string())]); + + // Demonstrate starting plan. + // Notice the `ordering_mode=Sorted` on the aggregations. + let before = displayable(plan.as_ref()).indent(true).to_string(); + let before = trim_plan_display(&before); + assert_eq!( + before, + vec![ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + plan +} + +/// Same as [`repartitions_for_aggregate_after_sorted_union`], but adds a projection +/// as well between the union and aggregate. This change the outcome: +/// +/// * we no longer get repartitioning, and instead get coalescing. +#[test] +fn coalesces_for_aggregate_after_sorted_union_projection() -> Result<()> { + let plan = aggregate_over_sorted_union_projection(vec![parquet_exec_with_stats(); 2]); + + // Same as `repartitions_for_aggregate_after_sorted_union`. No error. + let checker = check_plan_sanity(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // It no longer does a repartition on the first run. + // Instead adds a SPM. + let expected_after_first_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortPreservingMergeExec: [a@0 ASC]", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!( + expected_after_first_run, + plan.clone(), + true + ); + + // Then it removes the SPM, and inserts a coalesace on the second run. + let expected_after_second_run = &[ + "AggregateExec: mode=FinalPartitioned, gby=[a1@0 as a1], aggr=[], ordering_mode=Sorted", + "AggregateExec: mode=Partial, gby=[a@0 as a1], aggr=[], ordering_mode=Sorted", + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + "CoalescePartitionsExec", + "ProjectionExec: expr=[a@0 as a, b@1 as value]", + "UnionExec", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ]; + assert_optimized_without_forced_roundrobin!(expected_after_second_run, plan, false); + + Ok(()) +} + #[test] fn repartition_through_sort_preserving_merge() -> Result<()> { // sort preserving merge with non-sorted input diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index ab90eab74d3f..cfc32e55b821 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,15 +17,18 @@ use std::sync::Arc; +use crate::physical_optimizer::enforce_distribution::{ + parquet_exec_with_stats, projection_exec_with_alias, +}; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, bounded_window_exec_non_set_monotonic, bounded_window_exec_with_partition, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, create_test_schema4, filter_exec, global_limit_exec, hash_join_exec, limit_exec, local_limit_exec, memory_exec, parquet_exec, - repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, - sort_preserving_merge_exec, spr_repartition_exec, stream_exec_ordered, union_exec, - RequirementsTestExec, + repartition_exec, schema, sort_exec, sort_expr, sort_expr_options, + sort_merge_join_exec, sort_preserving_merge_exec, spr_repartition_exec, + stream_exec_ordered, union_exec, RequirementsTestExec, }; use datafusion_physical_plan::displayable; @@ -35,11 +38,15 @@ use datafusion_common::Result; use datafusion_expr::JoinType; use datafusion_physical_expr::expressions::{col, Column, NotExpr}; use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_expr::Partitioning; +use datafusion_physical_expr::{Partitioning}; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; use datafusion_physical_optimizer::enforce_sorting::{EnforceSorting,PlanWithCorrespondingCoalescePartitions,PlanWithCorrespondingSort,parallelize_sorts,ensure_sorting}; use datafusion_physical_optimizer::enforce_sorting::replace_with_order_preserving_variants::{replace_with_order_preserving_variants,OrderPreservationContext}; use datafusion_physical_optimizer::enforce_sorting::sort_pushdown::{SortPushDown, assign_initial_requirements, pushdown_sorts}; +use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; +use datafusion_physical_plan::aggregates::{ + AggregateExec, AggregateMode, PhysicalGroupBy, +}; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; @@ -2099,6 +2106,92 @@ async fn test_commutativity() -> Result<()> { Ok(()) } +fn single_partition_aggregate( + input: Arc, + alias_pairs: Vec<(String, String)>, +) -> Arc { + let schema = schema(); + let group_by = alias_pairs + .iter() + .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) + .collect::>(); + let group_by = PhysicalGroupBy::new_single(group_by); + + Arc::new( + AggregateExec::try_new( + AggregateMode::SinglePartitioned, + group_by, + vec![], + vec![], + input, + schema, + ) + .unwrap(), + ) +} + +#[tokio::test] +async fn test_preserve_needed_coalesce() -> Result<()> { + // Input to EnforceSorting, from our test case. + let plan = projection_exec_with_alias( + union_exec(vec![parquet_exec_with_stats(); 2]), + vec![ + ("a".to_string(), "a".to_string()), + ("b".to_string(), "value".to_string()), + ], + ); + let plan = Arc::new(CoalescePartitionsExec::new(plan)); + let schema = schema(); + let sort_key = LexOrdering::new(vec![PhysicalSortExpr { + expr: col("a", &schema).unwrap(), + options: SortOptions::default(), + }]); + let plan: Arc = + single_partition_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); + let plan = sort_exec(sort_key, plan); + + // Starting plan: as in our test case. + assert_eq!( + get_plan_string(&plan), + vec![ + "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + let checker = SanityCheckPlan::new().optimize(plan.clone(), &Default::default()); + assert!(checker.is_ok()); + + // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). + let optimizer = EnforceSorting::new(); + let optimized = optimizer.optimize(plan, &Default::default())?; + assert_eq!( + get_plan_string(&optimized), + vec![ + "SortPreservingMergeExec: [a@0 ASC]", + " SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", + " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", + " CoalescePartitionsExec", + " ProjectionExec: expr=[a@0 as a, b@1 as value]", + " UnionExec", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + " ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", + ], + ); + + // Plan is valid. + let checker = SanityCheckPlan::new(); + let checker = checker.optimize(optimized, &Default::default()); + assert!(checker.is_ok()); + + Ok(()) +} + #[tokio::test] async fn test_coalesce_propagate() -> Result<()> { let schema = create_test_schema()?; diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 2b08b7ff9e88..95f14f485792 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -27,7 +27,7 @@ use crate::{ }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::Result; use object_store::ObjectStore; use std::path::PathBuf; use std::sync::Arc; @@ -150,9 +150,7 @@ impl RuntimeEnv { /// registry. See [`ObjectStoreRegistry::get_store`] for more /// details. pub fn object_store(&self, url: impl AsRef) -> Result> { - self.object_store_registry - .get_store(url.as_ref()) - .map_err(DataFusionError::from) + self.object_store_registry.get_store(url.as_ref()) } } diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 049926fb0bcd..c4aa894aa52b 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -832,7 +832,7 @@ pub fn exprlist_len( .enumerate() .filter_map(|(idx, field)| { let (maybe_table_ref, _) = schema.qualified_field(idx); - if maybe_table_ref.map_or(true, |q| q == qualifier) { + if maybe_table_ref.is_none_or(|q| q == qualifier) { Some((maybe_table_ref.cloned(), Arc::clone(field))) } else { None diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index e2b8a966cb92..4964ebd55517 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -25,6 +25,7 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use crate::utils::NamePreserver; +#[allow(deprecated)] use arrow::datatypes::{ DataType, TimeUnit, MAX_DECIMAL128_FOR_EACH_PRECISION, MIN_DECIMAL128_FOR_EACH_PRECISION, diff --git a/datafusion/physical-expr-common/src/binary_map.rs b/datafusion/physical-expr-common/src/binary_map.rs index bdff494518da..28b25fa58806 100644 --- a/datafusion/physical-expr-common/src/binary_map.rs +++ b/datafusion/physical-expr-common/src/binary_map.rs @@ -384,7 +384,7 @@ where // value is "small" let payload = if value.len() <= SHORT_VALUE_LEN { - let inline = value.iter().fold(0usize, |acc, &x| acc << 8 | x as usize); + let inline = value.iter().fold(0usize, |acc, &x| (acc << 8) | x as usize); // is value is already present in the set? let entry = self.map.find_mut(hash, |header| { diff --git a/datafusion/physical-expr-common/src/sort_expr.rs b/datafusion/physical-expr-common/src/sort_expr.rs index b150d3dc9bd3..601b2a23d09d 100644 --- a/datafusion/physical-expr-common/src/sort_expr.rs +++ b/datafusion/physical-expr-common/src/sort_expr.rs @@ -172,13 +172,11 @@ impl PhysicalSortExpr { let nullable = self.expr.nullable(schema).unwrap_or(true); self.expr.eq(&requirement.expr) && if nullable { - requirement - .options - .map_or(true, |opts| self.options == opts) + requirement.options.is_none_or(|opts| self.options == opts) } else { requirement .options - .map_or(true, |opts| self.options.descending == opts.descending) + .is_none_or(|opts| self.options.descending == opts.descending) } } } @@ -293,7 +291,7 @@ impl PhysicalSortRequirement { self.expr.eq(&other.expr) && other .options - .map_or(true, |other_opts| self.options == Some(other_opts)) + .is_none_or(|other_opts| self.options == Some(other_opts)) } #[deprecated(since = "43.0.0", note = "use LexRequirement::from_lex_ordering")] diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a6417044a061..98bdef7225e7 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -639,7 +639,7 @@ impl EquivalenceProperties { req.expr.eq(&existing.expr) && req .options - .map_or(true, |req_opts| req_opts == existing.options) + .is_none_or(|req_opts| req_opts == existing.options) }, ) }) @@ -2148,17 +2148,38 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs); - orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs); - let orderings = orderings.build(); - - let mut eq_properties = - EquivalenceProperties::new(lhs.schema).with_constants(constants); - + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + let mut eq_properties = EquivalenceProperties::new(lhs.schema); + eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); + Ok(eq_properties) } @@ -2204,6 +2225,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -3552,134 +3574,6 @@ mod tests { .run() } - #[test] - fn test_union_equivalence_properties_constants_fill_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [b] - vec![vec!["a", "c"]], - vec!["b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [ - // [a ASC, b ASC, c ASC], - // [b ASC, a ASC, c ASC] - // ], const [] - vec![vec!["a", "b", "c"], vec!["b", "a", "c"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_no_fill_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [d] // some other constant - vec![vec!["a", "c"]], - vec!["d"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a]] (only a is constant) - vec![vec!["a"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_some_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [c ASC], const [a, b] // some other constant - vec![vec!["c"]], - vec!["a", "b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [a DESC, b], const [] - vec![vec!["a DESC", "b"]], - vec![], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a, b]] (can fill in the a/b with constants) - vec![vec!["a DESC", "b"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [b] - vec![vec!["a", "c"]], - vec!["b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b DESC", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [ - // [a ASC, b ASC, c ASC], - // [b ASC, a ASC, c ASC] - // ], const [] - vec![vec!["a", "b DESC", "c"], vec!["b DESC", "a", "c"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_gap_fill_symmetric() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child: [a ASC, b ASC, d ASC], const [c] - vec![vec!["a", "b", "d"]], - vec!["c"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child: [a ASC, c ASC, d ASC], const [b] - vec![vec!["a", "c", "d"]], - vec!["b"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: - // [a, b, c, d] - // [a, c, b, d] - vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]], - vec![], - ) - .run() - } - #[test] fn test_union_equivalence_properties_constants_gap_fill_and_common() { let schema = create_test_schema().unwrap(); @@ -3705,34 +3599,6 @@ mod tests { .run() } - #[test] - fn test_union_equivalence_properties_constants_middle_desc() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // NB `b DESC` in the first child - // - // First child: [a ASC, b DESC, d ASC], const [c] - vec![vec!["a", "b DESC", "d"]], - vec!["c"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child: [a ASC, c ASC, d ASC], const [b] - vec![vec!["a", "c", "d"]], - vec!["b"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: - // [a, b, d] (c constant) - // [a, c, d] (b constant) - vec![vec!["a", "c", "b DESC", "d"], vec!["a", "b DESC", "c", "d"]], - vec![], - ) - .run() - } - // TODO tests with multiple constants #[derive(Debug)] diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs index 2d23894d6b5e..7c979d0a5bc9 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs @@ -47,8 +47,8 @@ use crate::enforce_sorting::sort_pushdown::{ assign_initial_requirements, pushdown_sorts, SortPushDown, }; use crate::utils::{ - add_sort_above, add_sort_above_with_check, is_coalesce_partitions, is_limit, - is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, + add_sort_above, add_sort_above_with_check, is_aggregation, is_coalesce_partitions, + is_limit, is_repartition, is_sort, is_sort_preserving_merge, is_union, is_window, }; use crate::PhysicalOptimizerRule; @@ -518,7 +518,7 @@ fn remove_bottleneck_in_subplan( ) -> Result { let plan = &requirements.plan; let children = &mut requirements.children; - if is_coalesce_partitions(&children[0].plan) { + if is_coalesce_partitions(&children[0].plan) && !is_aggregation(plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_child_node = children[0].children.swap_remove(0); while new_child_node.plan.output_partitioning() == plan.output_partitioning() diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 8edbb0f09114..ad01a0047eaa 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; @@ -135,6 +137,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { if !child_eq_props.ordering_satisfy_requirement(&sort_req) { diff --git a/datafusion/physical-optimizer/src/utils.rs b/datafusion/physical-optimizer/src/utils.rs index 636e78a06ce7..a3a1838c64be 100644 --- a/datafusion/physical-optimizer/src/utils.rs +++ b/datafusion/physical-optimizer/src/utils.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use datafusion_physical_expr::LexRequirement; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use datafusion_physical_plan::aggregates::AggregateExec; use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion_physical_plan::repartition::RepartitionExec; @@ -105,3 +106,8 @@ pub fn is_repartition(plan: &Arc) -> bool { pub fn is_limit(plan: &Arc) -> bool { plan.as_any().is::() || plan.as_any().is::() } + +// Checks whether the given operator is a [`AggregateExec`]. +pub fn is_aggregation(plan: &Arc) -> bool { + plan.as_any().is::() +} diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index a41336ea6eb7..63696ac0d094 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -499,31 +499,42 @@ pub fn can_interleave>>( } fn union_schema(inputs: &[Arc]) -> SchemaRef { - let first_schema = inputs[0].schema(); + // needs to handle n children, including child which have an empty projection or different number of fields + let num_fields = inputs.iter().fold(0, |acc, input| { + std::cmp::max(acc, input.schema().fields().len()) + }); - let fields = (0..first_schema.fields().len()) + let fields: Vec = (0..num_fields) .map(|i| { - inputs - .iter() - .enumerate() - .map(|(input_idx, input)| { - let field = input.schema().field(i).clone(); - let mut metadata = field.metadata().clone(); + // collect fields for i + let field_options_for_i = + inputs.iter().enumerate().filter_map(|(input_idx, input)| { + let field = if input.schema().fields().len() <= i { + return None; + } else { + input.schema().field(i).clone() + }; + // merge field metadata + let mut metadata = field.metadata().clone(); let other_metadatas = inputs .iter() .enumerate() - .filter(|(other_idx, _)| *other_idx != input_idx) + .filter(|(other_idx, other_input)| { + *other_idx != input_idx + && other_input.schema().fields().len() > i + }) .flat_map(|(_, other_input)| { other_input.schema().field(i).metadata().clone().into_iter() }); - metadata.extend(other_metadatas); - field.with_metadata(metadata) - }) + Some(field.with_metadata(metadata)) + }); + + // pick first nullable field (if exists) + field_options_for_i .find_or_first(Field::is_nullable) - // We can unwrap this because if inputs was empty, this would've already panic'ed when we - // indexed into inputs[0]. + // We can unwrap this because if inputs was empty, we would never had iterated with (0..num_fields) .unwrap() }) .collect::>(); diff --git a/datafusion/sql/src/expr/value.rs b/datafusion/sql/src/expr/value.rs index 847163c6d3b3..96408cfd3722 100644 --- a/datafusion/sql/src/expr/value.rs +++ b/datafusion/sql/src/expr/value.rs @@ -303,7 +303,7 @@ fn try_decode_hex_literal(s: &str) -> Option> { for i in (start_idx..hex_bytes.len()).step_by(2) { let high = try_decode_hex_char(hex_bytes[i])?; let low = try_decode_hex_char(hex_bytes[i + 1])?; - decoded_bytes.push(high << 4 | low); + decoded_bytes.push((high << 4) | low); } Some(decoded_bytes) diff --git a/datafusion/sqllogictest/test_files/dates.slt b/datafusion/sqllogictest/test_files/dates.slt index 4425eee33373..148f0dfe64bb 100644 --- a/datafusion/sqllogictest/test_files/dates.slt +++ b/datafusion/sqllogictest/test_files/dates.slt @@ -183,7 +183,7 @@ query error input contains invalid characters SELECT to_date('2020-09-08 12/00/00+00:00', '%c', '%+') # to_date with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_date('2020-09-08 12/00/00+00:00', '%q') statement ok diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 4653df400080..08168caa03eb 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -223,7 +223,7 @@ datafusion.execution.parquet.writer_version 1.0 datafusion.execution.planning_concurrency 13 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 -datafusion.execution.skip_physical_aggregate_schema_check false +datafusion.execution.skip_physical_aggregate_schema_check true datafusion.execution.soft_max_rows_per_output_file 50000000 datafusion.execution.sort_in_place_threshold_bytes 1048576 datafusion.execution.sort_spill_reservation_bytes 10485760 @@ -317,7 +317,7 @@ datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer ve datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode -datafusion.execution.skip_physical_aggregate_schema_check false When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. +datafusion.execution.skip_physical_aggregate_schema_check true When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. datafusion.execution.soft_max_rows_per_output_file 50000000 Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max datafusion.execution.sort_in_place_threshold_bytes 1048576 When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. datafusion.execution.sort_spill_reservation_bytes 10485760 Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). diff --git a/datafusion/sqllogictest/test_files/timestamps.slt b/datafusion/sqllogictest/test_files/timestamps.slt index b713008d2c3b..a0c339c5fb0d 100644 --- a/datafusion/sqllogictest/test_files/timestamps.slt +++ b/datafusion/sqllogictest/test_files/timestamps.slt @@ -2241,23 +2241,23 @@ query error input contains invalid characters SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%c', '%+') # to_timestamp with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_nanos with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_nanos('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_millis with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_millis('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_micros with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_micros('2020-09-08 12/00/00+00:00', '%q') # to_timestamp_seconds with broken formatting -query error bad or unsupported format string +query error DataFusion error: Execution error: Error parsing timestamp from '2020\-09\-08 12/00/00\+00:00' using format '%q': trailing input SELECT to_timestamp_seconds('2020-09-08 12/00/00+00:00', '%q') # Create string timestamp table with different formats diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index cbd19bf3806f..fd85ff30c8ed 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -538,6 +538,9 @@ physical_plan # Clean up after the test ######## +statement ok +drop table t + statement ok drop table t1; @@ -778,76 +781,36 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a [[-1]] [[1]] +### +# Test for https://github.com/apache/datafusion/issues/11492 +### + +# Input data is +# a,b,c +# 1,2,3 + statement ok -CREATE EXTERNAL TABLE aggregate_test_100 ( - c1 VARCHAR NOT NULL, - c2 TINYINT NOT NULL, - c3 SMALLINT NOT NULL, - c4 SMALLINT, - c5 INT, - c6 BIGINT NOT NULL, - c7 SMALLINT NOT NULL, - c8 INT NOT NULL, - c9 BIGINT UNSIGNED NOT NULL, - c10 VARCHAR NOT NULL, - c11 FLOAT NOT NULL, - c12 DOUBLE NOT NULL, - c13 VARCHAR NOT NULL +CREATE EXTERNAL TABLE t ( + a INT, + b INT, + c INT ) STORED AS CSV -LOCATION '../../testing/data/csv/aggregate_test_100.csv' +LOCATION '../core/tests/data/example.csv' +WITH ORDER (a ASC) OPTIONS ('format.has_header' 'true'); -statement ok -set datafusion.execution.batch_size = 2; +query T +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a; +---- +1 +bar -# Constant value tracking across union -query TT -explain -SELECT * FROM( -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -) -UNION ALL -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -)) -ORDER BY c1 +query I +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a; ---- -logical_plan -01)Sort: aggregate_test_100.c1 ASC NULLS LAST -02)--Union -03)----Filter: aggregate_test_100.c1 = Utf8("a") -04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] -05)----Filter: aggregate_test_100.c1 = Utf8("a") -06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] -physical_plan -01)CoalescePartitionsExec -02)--UnionExec -03)----CoalesceBatchesExec: target_batch_size=2 -04)------FilterExec: c1@0 = a -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true -07)----CoalesceBatchesExec: target_batch_size=2 -08)------FilterExec: c1@0 = a -09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true +1 +NULL -# Clean up after the test statement ok -drop table aggregate_test_100; - -# test for https://github.com/apache/datafusion/issues/14352 -query TB rowsort -SELECT - a, - a IS NOT NULL -FROM ( - -- second column, even though it's not selected, was necessary to reproduce the bug linked above - SELECT 'foo' AS a, 3 AS b - UNION ALL - SELECT NULL AS a, 4 AS b -) ----- -NULL false -foo true +drop table t diff --git a/datafusion/wasmtest/Cargo.toml b/datafusion/wasmtest/Cargo.toml index aae66e6b9a97..8ffaffc29d22 100644 --- a/datafusion/wasmtest/Cargo.toml +++ b/datafusion/wasmtest/Cargo.toml @@ -59,7 +59,8 @@ datafusion-physical-expr-common = { workspace = true } datafusion-physical-plan = { workspace = true } datafusion-sql = { workspace = true } # getrandom must be compiled with js feature -getrandom = { version = "0.2.8", features = ["js"] } +getrandom = { version = "0.3.3", features = ["wasm_js"] } +getrandom_0_2 = { package = "getrandom", version = "0.2.15", features = ["js"] } parquet = { workspace = true } wasm-bindgen = "0.2.99" diff --git a/docs/source/contributor-guide/howtos.md b/docs/source/contributor-guide/howtos.md index e406804caa44..556242751ff4 100644 --- a/docs/source/contributor-guide/howtos.md +++ b/docs/source/contributor-guide/howtos.md @@ -19,6 +19,12 @@ # HOWTOs +## How to update the version of Rust used in CI tests + +- Make a PR to update the [rust-toolchain] file in the root of the repository: + +[rust-toolchain]: https://github.com/apache/datafusion/blob/main/rust-toolchain.toml + ## How to add a new scalar function Below is a checklist of what you need to do to add a new scalar function to DataFusion: diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 02c8f127d973..cb53036dabb4 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -80,7 +80,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.execution.parquet.maximum_parallel_row_group_writers | 1 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.parquet.maximum_buffered_record_batches_per_stream | 2 | (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. | | datafusion.execution.planning_concurrency | 0 | Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system | -| datafusion.execution.skip_physical_aggregate_schema_check | false | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | +| datafusion.execution.skip_physical_aggregate_schema_check | true | When set to true, skips verifying that the schema produced by planning the input of `LogicalPlan::Aggregate` exactly matches the schema of the input plan. When set to false, if the schema does not match exactly (including nullability and metadata), a planning error will be raised. This is used to workaround bugs in the planner that are now caught by the new schema verification step. | | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 000000000000..11f4fb798c37 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file specifies the default version of Rust used +# to compile this workspace and run CI jobs. + +[toolchain] +channel = "1.85.0" +components = ["rustfmt", "clippy"] diff --git a/test-utils/src/array_gen/string.rs b/test-utils/src/array_gen/string.rs index a405cb76b1bd..e2a983612b8b 100644 --- a/test-utils/src/array_gen/string.rs +++ b/test-utils/src/array_gen/string.rs @@ -97,7 +97,6 @@ fn random_string(rng: &mut StdRng, max_len: usize) -> String { let len = rng.gen_range(1..=max_len); rng.sample_iter::(rand::distributions::Standard) .take(len) - .map(char::from) .collect::() } }