Skip to content

Commit b43b1c2

Browse files
committed
feat: support concat for strings
1 parent b45789d commit b43b1c2

File tree

6 files changed

+54
-7
lines changed

6 files changed

+54
-7
lines changed

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ These settings can be used to determine which parts of the plan are accelerated
200200
| `spark.comet.expression.CheckOverflow.enabled` | Enable Comet acceleration for `CheckOverflow` | true |
201201
| `spark.comet.expression.Chr.enabled` | Enable Comet acceleration for `Chr` | true |
202202
| `spark.comet.expression.Coalesce.enabled` | Enable Comet acceleration for `Coalesce` | true |
203+
| `spark.comet.expression.Concat.enabled` | Enable Comet acceleration for `Concat` | true |
203204
| `spark.comet.expression.ConcatWs.enabled` | Enable Comet acceleration for `ConcatWs` | true |
204205
| `spark.comet.expression.Contains.enabled` | Enable Comet acceleration for `Contains` | true |
205206
| `spark.comet.expression.Cos.enabled` | Enable Comet acceleration for `Cos` | true |

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/jni_api.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use datafusion_spark::function::hash::sha1::SparkSha1;
4747
use datafusion_spark::function::hash::sha2::SparkSha2;
4848
use datafusion_spark::function::math::expm1::SparkExpm1;
4949
use datafusion_spark::function::string::char::CharFunc;
50+
use datafusion_spark::function::string::concat::SparkConcat;
5051
use futures::poll;
5152
use futures::stream::StreamExt;
5253
use jni::objects::JByteBuffer;
@@ -317,20 +318,23 @@ fn prepare_datafusion_session_context(
317318
let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
318319

319320
datafusion::functions_nested::register_all(&mut session_ctx)?;
321+
register_datafusion_spark_function(&session_ctx);
322+
// Must be the last one to override existing functions with the same name
323+
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;
324+
325+
Ok(session_ctx)
326+
}
320327

321-
// register UDFs from datafusion-spark crate
328+
// register UDFs from datafusion-spark crate
329+
fn register_datafusion_spark_function(session_ctx: &SessionContext) {
322330
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkExpm1::default()));
323331
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha2::default()));
324332
session_ctx.register_udf(ScalarUDF::new_from_impl(CharFunc::default()));
325333
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBitGet::default()));
326334
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateAdd::default()));
327335
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkDateSub::default()));
328336
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkSha1::default()));
329-
330-
// Must be the last one to override existing functions with the same name
331-
datafusion_comet_spark_expr::register_all_comet_functions(&mut session_ctx)?;
332-
333-
Ok(session_ctx)
337+
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkConcat::default()));
334338
}
335339

336340
/// Prepares arrow arrays for output.

native/spark-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ edition = { workspace = true }
3030
arrow = { workspace = true }
3131
chrono = { workspace = true }
3232
datafusion = { workspace = true }
33+
datafusion-spark = { workspace = true }
3334
chrono-tz = { workspace = true }
3435
num = { workspace = true }
3536
regex = { workspace = true }

native/spark-expr/src/comet_scalar_funcs.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ pub fn create_comet_physical_fun_with_eval_mode(
180180
let func = Arc::new(spark_modulo);
181181
make_comet_scalar_udf!("spark_modulo", func, without data_type, fail_on_error)
182182
}
183+
"concat" => {
184+
// Use concat from datafusion-spark crate which has Spark semantics
185+
// (returns null if any argument is null)
186+
// The registry parameter already contains functions from datafusion-spark
187+
// if they were registered in prepare_datafusion_session_context
188+
registry.udf("concat").map_err(|e| {
189+
DataFusionError::Execution(format!(
190+
"Function concat not found in the registry: {e}",
191+
))
192+
})
193+
}
183194
_ => registry.udf(fun_name).map_err(|e| {
184195
DataFusionError::Execution(format!(
185196
"Function {fun_name} not found in the registry: {e}",

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3149,10 +3149,39 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
31493149
sql(
31503150
"create table t1 using parquet as select uuid() c1, uuid() c2, uuid() c3, uuid() c4, cast(null as string) c5 from range(10)")
31513151
checkSparkAnswerAndOperator("select concat(c1, c2) AS x FROM t1")
3152+
checkSparkAnswerAndOperator("select concat(c1, c1) AS x FROM t1")
31523153
checkSparkAnswerAndOperator("select concat(c1, c2, c3) AS x FROM t1")
31533154
checkSparkAnswerAndOperator("select concat(c1, c2, c3, c5) AS x FROM t1")
3154-
//checkSparkAnswerAndOperator("select concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM t1")
3155+
checkSparkAnswerAndOperator(
3156+
"select concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM t1")
3157+
}
3158+
}
3159+
3160+
// https://github.com/apache/datafusion-comet/issues/2647
3161+
ignore("test concat function - arrays") {
3162+
withTable("t1") {
3163+
sql(
3164+
"create table t1 using parquet as select array(id, id+1) c1, array(id+2, id+3) c2, array() c3, array(null) c4, cast(null as array<int>) c5 from range(10)")
3165+
checkSparkAnswerAndOperator("select concat(c1, c2) AS x FROM t1")
3166+
checkSparkAnswerAndOperator("select concat(c1, c1) AS x FROM t1")
3167+
checkSparkAnswerAndOperator("select concat(c1, c2, c3) AS x FROM t1")
3168+
checkSparkAnswerAndOperator("select concat(c1, c2, c3, c5) AS x FROM t1")
3169+
checkSparkAnswerAndOperator(
3170+
"select concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM t1")
31553171
}
31563172
}
31573173

3174+
// https://github.com/apache/datafusion-comet/issues/2647
3175+
ignore("test concat function - binary") {
3176+
withTable("t1") {
3177+
sql(
3178+
"create table t1 using parquet as select cast(uuid() as binary) c1, cast(uuid() as binary) c2, cast(uuid() as binary) c3, cast(uuid() as binary) c4, cast(null as binary) c5 from range(10)")
3179+
checkSparkAnswerAndOperator("select concat(c1, c2) AS x FROM t1")
3180+
checkSparkAnswerAndOperator("select concat(c1, c1) AS x FROM t1")
3181+
checkSparkAnswerAndOperator("select concat(c1, c2, c3) AS x FROM t1")
3182+
checkSparkAnswerAndOperator("select concat(c1, c2, c3, c5) AS x FROM t1")
3183+
checkSparkAnswerAndOperator(
3184+
"select concat(concat(c1, c2, c3), concat(c1, c3)) AS x FROM t1")
3185+
}
3186+
}
31583187
}

0 commit comments

Comments
 (0)