Skip to content

[SPARK-52551][SQL] Add a new v2 Predicate BOOLEAN_EXPRESSION #51247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@
* <li>Since version: 3.3.0</li>
* </ul>
* </li>
* <li>Name: <code>BOOLEAN_EXPRESSION</code>
* <ul>
* <li>A simple wrapper for any expression that returns boolean type.</li>
* <li>Since version: 4.1.0</li>
* </ul>
* </li>
* </ol>
*
* @since 3.3.0
Expand All @@ -145,5 +151,8 @@ public class Predicate extends GeneralScalarExpression {

public Predicate(String name, Expression[] children) {
super(name, children);
if ("BOOLEAN_EXPRESSION".equals(name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use final to modify "BOOLEAN_EXPRESSION".

assert children.length == 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public String build(Expression expr) {
case "CONTAINS" -> visitContains(build(e.children()[0]), build(e.children()[1]));
case "=", "<>", "<=>", "<", "<=", ">", ">=" ->
visitBinaryComparison(name, e.children()[0], e.children()[1]);
case "BOOLEAN_EXPRESSION" ->
build(expr.children()[0]);
case "+", "*", "/", "%", "&", "|", "^" ->
visitBinaryArithmetic(name, inputToSQL(e.children()[0]), inputToSQL(e.children()[1]));
case "-" -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ object V2ExpressionUtils extends SQLConfHelper with Logging {
case "ENDS_WITH" => convertBinaryExpr(expr, EndsWith)
case "CONTAINS" => convertBinaryExpr(expr, Contains)
case "IN" => convertExpr(expr, children => In(children.head, children.tail))
case "BOOLEAN_EXPRESSION" => toCatalyst(expr.children().head)
case _ => None
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,21 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) extends L
def build(): Option[V2Expression] = generateExpression(e, isPredicate)

def buildPredicate(): Option[V2Predicate] = {

if (isPredicate) {
val translated = build()
val translated0 = build()
val conf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold the SQLConf will lead to unable to obtain real-time changes to SQLConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is within a single method, I don't think we want to be that dynamic. And in practise this should be run within the same session so it won't change.

val alwaysCreateV2Predicate = conf.getConf(SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE)
val translated = if (alwaysCreateV2Predicate && e.dataType == BooleanType) {
translated0.map {
case p: V2Predicate => p
case other => new V2Predicate("BOOLEAN_EXPRESSION", Array(other))
}
} else {
translated0
}

val modifiedExprOpt = if (
SQLConf.get.getConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE)
conf.getConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE)
&& translated.isDefined
&& !translated.get.isInstanceOf[V2Predicate]) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1695,9 +1695,19 @@ object SQLConf {
buildConf("spark.sql.dataSource.skipAssertOnPredicatePushdown")
.internal()
.doc("Enable skipping assert when expression in not translated to predicate.")
.version("4.0.0")
.booleanConf
.createWithDefault(!Utils.isTesting)

val DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE =
buildConf("spark.sql.dataSource.alwaysCreateV2Predicate")
.internal()
.doc("When true, the v2 push-down framework always wraps the expression that returns " +
"boolean type with a v2 Predicate so that it can be pushed down.")
.version("4.1.0")
.booleanConf
.createWithDefault(true)

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,71 @@
package org.apache.spark.sql.connector

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.connector.expressions.filter.{AlwaysTrue, Predicate => V2Predicate}
import org.apache.spark.sql.execution.datasources.v2.PushablePredicate
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.BooleanType

class PushablePredicateSuite extends QueryTest with SharedSparkSession {

test("PushablePredicate None returned - flag on") {
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "true") {
val pushable = PushablePredicate.unapply(Literal.create("string"))
assert(!pushable.isDefined)
}
}

test("PushablePredicate success - flag on") {
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "true") {
val pushable = PushablePredicate.unapply(Literal.create(true))
assert(pushable.isDefined)
test("simple boolean expression should always return v2 Predicate") {
Seq(true, false).foreach { createV2Predicate =>
Seq(true, false).foreach { noAssert =>
withSQLConf(
SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE.key -> createV2Predicate.toString,
SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> noAssert.toString) {
val pushable = PushablePredicate.unapply(Literal.create(true))
assert(pushable.isDefined)
assert(pushable.get.isInstanceOf[AlwaysTrue])
}
}
}
}

test("PushablePredicate success") {
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "false") {
val pushable = PushablePredicate.unapply(Literal.create(true))
assert(pushable.isDefined)
test("non-boolean expression") {
Seq(true, false).foreach { createV2Predicate =>
Seq(true, false).foreach { noAssert =>
withSQLConf(
SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE.key -> createV2Predicate.toString,
SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> noAssert.toString) {
val catalystExpr = Literal.create("string")
if (noAssert) {
val pushable = PushablePredicate.unapply(catalystExpr)
assert(pushable.isEmpty)
} else {
intercept[java.lang.AssertionError] {
PushablePredicate.unapply(catalystExpr)
}
}
}
}
}
}

test("PushablePredicate throws") {
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "false") {
intercept[java.lang.AssertionError] {
PushablePredicate.unapply(Literal.create("string"))
test("non-trivial boolean expression") {
Seq(true, false).foreach { createV2Predicate =>
Seq(true, false).foreach { noAssert =>
withSQLConf(
SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE.key -> createV2Predicate.toString,
SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> noAssert.toString) {
val catalystExpr = Cast(Literal.create("true"), BooleanType)
if (createV2Predicate) {
val pushable = PushablePredicate.unapply(catalystExpr)
assert(pushable.isDefined)
assert(pushable.get.isInstanceOf[V2Predicate])
} else {
if (noAssert) {
val pushable = PushablePredicate.unapply(catalystExpr)
assert(pushable.isEmpty)
} else {
intercept[java.lang.AssertionError] {
PushablePredicate.unapply(catalystExpr)
}
}
}
}
}
}
}
Expand Down