diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index b1e36e02925b..049926fb0bcd 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -35,8 +35,8 @@ use datafusion_common::tree_node::{ }; use datafusion_common::utils::get_at_indices; use datafusion_common::{ - internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, - DataFusionError, HashMap, Result, TableReference, + internal_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, HashMap, + Result, TableReference, }; use indexmap::IndexSet; @@ -379,14 +379,12 @@ fn get_exprs_except_skipped( } } -/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. -pub fn expand_wildcard( - schema: &DFSchema, - plan: &LogicalPlan, - wildcard_options: Option<&WildcardOptions>, -) -> Result> { +/// For each column specified in the USING JOIN condition, the JOIN plan outputs it twice +/// (once for each join side), but an unqualified wildcard should include it only once. +/// This function returns the columns that should be excluded. +fn exclude_using_columns(plan: &LogicalPlan) -> Result> { let using_columns = plan.using_columns()?; - let mut columns_to_skip = using_columns + let excluded = using_columns .into_iter() // For each USING JOIN condition, only expand to one of each join column in projection .flat_map(|cols| { @@ -395,18 +393,26 @@ pub fn expand_wildcard( // qualified column cols.sort(); let mut out_column_names: HashSet = HashSet::new(); - cols.into_iter() - .filter_map(|c| { - if out_column_names.contains(&c.name) { - Some(c) - } else { - out_column_names.insert(c.name); - None - } - }) - .collect::>() + cols.into_iter().filter_map(move |c| { + if out_column_names.contains(&c.name) { + Some(c) + } else { + out_column_names.insert(c.name); + None + } + }) }) .collect::>(); + Ok(excluded) +} + +/// Resolves an `Expr::Wildcard` to a collection of `Expr::Column`'s. +pub fn expand_wildcard( + schema: &DFSchema, + plan: &LogicalPlan, + wildcard_options: Option<&WildcardOptions>, +) -> Result> { + let mut columns_to_skip = exclude_using_columns(plan)?; let excluded_columns = if let Some(WildcardOptions { exclude: opt_exclude, except: opt_except, @@ -705,27 +711,20 @@ pub fn exprlist_to_fields<'a>( .map(|e| match e { Expr::Wildcard { qualifier, options } => match qualifier { None => { - let excluded: Vec = get_excluded_columns( + let mut excluded = exclude_using_columns(plan)?; + excluded.extend(get_excluded_columns( options.exclude.as_ref(), options.except.as_ref(), wildcard_schema, None, - )? - .into_iter() - .map(|c| c.flat_name()) - .collect(); - Ok::<_, DataFusionError>( - wildcard_schema - .field_names() - .iter() - .enumerate() - .filter(|(_, s)| !excluded.contains(s)) - .map(|(i, _)| wildcard_schema.qualified_field(i)) - .map(|(qualifier, f)| { - (qualifier.cloned(), Arc::new(f.to_owned())) - }) - .collect::>(), - ) + )?); + Ok(wildcard_schema + .iter() + .filter(|(q, f)| { + !excluded.contains(&Column::new(q.cloned(), f.name())) + }) + .map(|(q, f)| (q.cloned(), Arc::clone(f))) + .collect::>()) } Some(qualifier) => { let excluded: Vec = get_excluded_columns( diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index b93060988d20..24b585d2a5e7 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -4552,3 +4552,77 @@ fn test_error_message_invalid_window_aggregate_function_signature() { "Error during planning: sum does not support zero arguments", ); } + +// Test issue: https://github.com/apache/datafusion/issues/14058 +// Select with wildcard over a USING/NATURAL JOIN should deduplicate condition columns. +#[test] +fn test_using_join_wildcard_schema() { + let sql = "SELECT * FROM orders o1 JOIN orders o2 USING (order_id)"; + let plan = logical_plan(sql).unwrap(); + let count = plan + .schema() + .iter() + .filter(|(_, f)| f.name() == "order_id") + .count(); + // Only one order_id column + assert_eq!(count, 1); + + let sql = "SELECT * FROM orders o1 NATURAL JOIN orders o2"; + let plan = logical_plan(sql).unwrap(); + // Only columns from one join side should be present + let expected_fields = vec![ + "o1.order_id".to_string(), + "o1.customer_id".to_string(), + "o1.o_item_id".to_string(), + "o1.qty".to_string(), + "o1.price".to_string(), + "o1.delivered".to_string(), + ]; + assert_eq!(plan.schema().field_names(), expected_fields); + + // Reproducible example of issue #14058 + let sql = "WITH t1 AS (SELECT 1 AS id, 'a' AS value1), + t2 AS (SELECT 1 AS id, 'x' AS value2) + SELECT * FROM t1 NATURAL JOIN t2"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.id".to_string(), + "t1.value1".to_string(), + "t2.value2".to_string() + ] + ); + + // Multiple joins + let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b), + t2 AS (SELECT 1 AS a, 2 AS c), + t3 AS (SELECT 1 AS c, 2 AS d) + SELECT * FROM t1 NATURAL JOIN t2 RIGHT JOIN t3 USING (c)"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.a".to_string(), + "t1.b".to_string(), + "t2.c".to_string(), + "t3.d".to_string() + ] + ); + + // Subquery + let sql = "WITH t1 AS (SELECT 1 AS a, 1 AS b), + t2 AS (SELECT 1 AS a, 2 AS c), + t3 AS (SELECT 1 AS c, 2 AS d) + SELECT * FROM (SELECT * FROM t1 LEFT JOIN t2 USING(a)) NATURAL JOIN t3"; + let plan = logical_plan(sql).unwrap(); + assert_eq!( + plan.schema().field_names(), + [ + "t1.a".to_string(), + "t1.b".to_string(), + "t2.c".to_string(), + "t3.d".to_string() + ] + ); +}