diff --git a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs index e2b8a966cb92..4964ebd55517 100644 --- a/datafusion/optimizer/src/unwrap_cast_in_comparison.rs +++ b/datafusion/optimizer/src/unwrap_cast_in_comparison.rs @@ -25,6 +25,7 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; use crate::utils::NamePreserver; +#[allow(deprecated)] use arrow::datatypes::{ DataType, TimeUnit, MAX_DECIMAL128_FOR_EACH_PRECISION, MIN_DECIMAL128_FOR_EACH_PRECISION, diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a6417044a061..e0edf5686035 100755 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -2148,17 +2148,38 @@ fn calculate_union_binary( }) .collect::>(); + // TEMP HACK WORKAROUND + // Revert code from https://github.com/apache/datafusion/pull/12562 + // Context: https://github.com/apache/datafusion/issues/13748 + // Context: https://github.com/influxdata/influxdb_iox/issues/13038 + // Next, calculate valid orderings for the union by searching for prefixes // in both sides. - let mut orderings = UnionEquivalentOrderingBuilder::new(); - orderings.add_satisfied_orderings(lhs.normalized_oeq_class(), lhs.constants(), &rhs); - orderings.add_satisfied_orderings(rhs.normalized_oeq_class(), rhs.constants(), &lhs); - let orderings = orderings.build(); - - let mut eq_properties = - EquivalenceProperties::new(lhs.schema).with_constants(constants); - + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().into_iter() { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + let mut eq_properties = EquivalenceProperties::new(lhs.schema); + eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); + Ok(eq_properties) } @@ -2204,6 +2225,7 @@ struct UnionEquivalentOrderingBuilder { orderings: Vec, } +#[expect(unused)] impl UnionEquivalentOrderingBuilder { fn new() -> Self { Self { orderings: vec![] } @@ -3552,134 +3574,6 @@ mod tests { .run() } - #[test] - fn test_union_equivalence_properties_constants_fill_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [b] - vec![vec!["a", "c"]], - vec!["b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [ - // [a ASC, b ASC, c ASC], - // [b ASC, a ASC, c ASC] - // ], const [] - vec![vec!["a", "b", "c"], vec!["b", "a", "c"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_no_fill_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [d] // some other constant - vec![vec!["a", "c"]], - vec!["d"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a]] (only a is constant) - vec![vec!["a"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_some_gaps() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [c ASC], const [a, b] // some other constant - vec![vec!["c"]], - vec!["a", "b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [a DESC, b], const [] - vec![vec!["a DESC", "b"]], - vec![], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [[a, b]] (can fill in the a/b with constants) - vec![vec!["a DESC", "b"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_fill_gaps_non_symmetric() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child orderings: [a ASC, c ASC], const [b] - vec![vec!["a", "c"]], - vec!["b"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child orderings: [b ASC, c ASC], const [a] - vec![vec!["b DESC", "c"]], - vec!["a"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: [ - // [a ASC, b ASC, c ASC], - // [b ASC, a ASC, c ASC] - // ], const [] - vec![vec!["a", "b DESC", "c"], vec!["b DESC", "a", "c"]], - vec![], - ) - .run() - } - - #[test] - fn test_union_equivalence_properties_constants_gap_fill_symmetric() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // First child: [a ASC, b ASC, d ASC], const [c] - vec![vec!["a", "b", "d"]], - vec!["c"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child: [a ASC, c ASC, d ASC], const [b] - vec![vec!["a", "c", "d"]], - vec!["b"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: - // [a, b, c, d] - // [a, c, b, d] - vec![vec!["a", "c", "b", "d"], vec!["a", "b", "c", "d"]], - vec![], - ) - .run() - } - #[test] fn test_union_equivalence_properties_constants_gap_fill_and_common() { let schema = create_test_schema().unwrap(); @@ -3705,34 +3599,6 @@ mod tests { .run() } - #[test] - fn test_union_equivalence_properties_constants_middle_desc() { - let schema = create_test_schema().unwrap(); - UnionEquivalenceTest::new(&schema) - .with_child_sort_and_const_exprs( - // NB `b DESC` in the first child - // - // First child: [a ASC, b DESC, d ASC], const [c] - vec![vec!["a", "b DESC", "d"]], - vec!["c"], - &schema, - ) - .with_child_sort_and_const_exprs( - // Second child: [a ASC, c ASC, d ASC], const [b] - vec![vec!["a", "c", "d"]], - vec!["b"], - &schema, - ) - .with_expected_sort_and_const_exprs( - // Union orderings: - // [a, b, d] (c constant) - // [a, c, d] (b constant) - vec![vec!["a", "c", "b DESC", "d"], vec!["a", "b DESC", "c", "d"]], - vec![], - ) - .run() - } - // TODO tests with multiple constants #[derive(Debug)] diff --git a/datafusion/physical-optimizer/src/sanity_checker.rs b/datafusion/physical-optimizer/src/sanity_checker.rs index 8edbb0f09114..ad01a0047eaa 100644 --- a/datafusion/physical-optimizer/src/sanity_checker.rs +++ b/datafusion/physical-optimizer/src/sanity_checker.rs @@ -32,6 +32,8 @@ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::union::UnionExec; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; use crate::PhysicalOptimizerRule; @@ -135,6 +137,14 @@ pub fn check_plan_sanity( plan.required_input_ordering(), plan.required_input_distribution(), ) { + // TEMP HACK WORKAROUND https://github.com/apache/datafusion/issues/11492 + if child.as_any().downcast_ref::().is_some() { + continue; + } + if child.as_any().downcast_ref::().is_some() { + continue; + } + let child_eq_props = child.equivalence_properties(); if let Some(sort_req) = sort_req { if !child_eq_props.ordering_satisfy_requirement(&sort_req) { diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index cbd19bf3806f..fd85ff30c8ed 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -538,6 +538,9 @@ physical_plan # Clean up after the test ######## +statement ok +drop table t + statement ok drop table t1; @@ -778,76 +781,36 @@ select make_array(make_array(1)) x UNION ALL SELECT make_array(arrow_cast(make_a [[-1]] [[1]] +### +# Test for https://github.com/apache/datafusion/issues/11492 +### + +# Input data is +# a,b,c +# 1,2,3 + statement ok -CREATE EXTERNAL TABLE aggregate_test_100 ( - c1 VARCHAR NOT NULL, - c2 TINYINT NOT NULL, - c3 SMALLINT NOT NULL, - c4 SMALLINT, - c5 INT, - c6 BIGINT NOT NULL, - c7 SMALLINT NOT NULL, - c8 INT NOT NULL, - c9 BIGINT UNSIGNED NOT NULL, - c10 VARCHAR NOT NULL, - c11 FLOAT NOT NULL, - c12 DOUBLE NOT NULL, - c13 VARCHAR NOT NULL +CREATE EXTERNAL TABLE t ( + a INT, + b INT, + c INT ) STORED AS CSV -LOCATION '../../testing/data/csv/aggregate_test_100.csv' +LOCATION '../core/tests/data/example.csv' +WITH ORDER (a ASC) OPTIONS ('format.has_header' 'true'); -statement ok -set datafusion.execution.batch_size = 2; +query T +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT 'bar' as a from t) ORDER BY a; +---- +1 +bar -# Constant value tracking across union -query TT -explain -SELECT * FROM( -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -) -UNION ALL -( - SELECT * FROM aggregate_test_100 WHERE c1='a' -)) -ORDER BY c1 +query I +SELECT (SELECT a from t ORDER BY a) UNION ALL (SELECT NULL as a from t) ORDER BY a; ---- -logical_plan -01)Sort: aggregate_test_100.c1 ASC NULLS LAST -02)--Union -03)----Filter: aggregate_test_100.c1 = Utf8("a") -04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] -05)----Filter: aggregate_test_100.c1 = Utf8("a") -06)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] -physical_plan -01)CoalescePartitionsExec -02)--UnionExec -03)----CoalesceBatchesExec: target_batch_size=2 -04)------FilterExec: c1@0 = a -05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true -07)----CoalesceBatchesExec: target_batch_size=2 -08)------FilterExec: c1@0 = a -09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true +1 +NULL -# Clean up after the test statement ok -drop table aggregate_test_100; - -# test for https://github.com/apache/datafusion/issues/14352 -query TB rowsort -SELECT - a, - a IS NOT NULL -FROM ( - -- second column, even though it's not selected, was necessary to reproduce the bug linked above - SELECT 'foo' AS a, 3 AS b - UNION ALL - SELECT NULL AS a, 4 AS b -) ----- -NULL false -foo true +drop table t