Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::{
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
};
use datafusion::scalar::ScalarValue;

use substrait::proto::{join_rel, JoinRel};

Expand Down Expand Up @@ -75,15 +76,22 @@ pub async fn from_join_rel(
.build()
}
None => {
let on: Vec<String> = vec![];
left.join_detailed(
right.build()?,
join_type,
(on.clone(), on),
None,
NullEquality::NullEqualsNothing,
)?
.build()
// For joins without conditions, use cross_join if inner, otherwise use a filter with Boolean(true)
if join_type == JoinType::Inner {
left.cross_join(right.build()?)?.build()
} else {
// For outer joins without equi-join conditions, use a Boolean(true) filter
// This is semantically equivalent to having no condition (all rows match)
let on: Vec<String> = vec![];
left.join_detailed(
right.build()?,
join_type,
(on.clone(), on),
Some(Expr::Literal(ScalarValue::Boolean(Some(true)), None)),
NullEquality::NullEqualsNothing,
)?
.build()
}
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,30 @@ async fn roundtrip_read_filter() -> Result<()> {
roundtrip_verify_read_filter_count("SELECT a FROM data where a < 5", 1).await
}

#[tokio::test]
async fn scalar_subquery_in_select() -> Result<()> {
// Scalar subqueries get converted to LEFT joins during optimization
// This tests that the LEFT join without equi-join conditions can roundtrip
let plan = generate_plan_from_sql(
"SELECT a, (SELECT MAX(b) FROM data2) as max_b FROM data",
false,
true,
)
.await?;

assert_snapshot!(
plan,
@r#"
Projection: data.a, max(data2.b) AS max_b
Left Join:
TableScan: data projection=[a]
Aggregate: groupBy=[[]], aggr=[[max(data2.b)]]
TableScan: data2 projection=[b], partial_filters=[Boolean(true)]
"#
);
Ok(())
}

fn check_post_join_filters(rel: &Rel) -> Result<()> {
// search for target_rel and field value in proto
match &rel.rel_type {
Expand Down
Loading