Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
681 changes: 644 additions & 37 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ resolver = "3"
anyhow = { version = "1.0.100", default-features = false }
arrow = { version = "55.2.0", default-features = false, features = ["ipc"] }
chrono = { version = "0.4.42", default-features = false }
datafusion = { version = "49.0.1", default-features = false }
datafusion-common = { version = "49.0.1", default-features = false }
datafusion-expr = { version = "49.0.1", default-features = false }
datafusion-sql = { version = "49.0.1", default-features = false }
datafusion-udf-wasm-arrow2bytes = { path = "arrow2bytes", version = "0.1.0" }
datafusion-udf-wasm-bundle = { path = "guests/bundle", version = "0.1.0" }
datafusion-udf-wasm-guest = { path = "guests/rust", version = "0.1.0" }
datafusion-udf-wasm-python = { path = "guests/python", version = "0.1.0" }
sqlparser = { version = "0.55.0", default-features = false, features = ["std", "visitor"] }
http = { version = "1.3.1", default-features = false }
hyper = { version = "1.7", default-features = false }
tokio = { version = "1.48.0", default-features = false }
Expand Down Expand Up @@ -65,8 +68,10 @@ private_intra_doc_links = "deny"
[patch.crates-io]
# use same DataFusion fork as InfluxDB
# See https://github.com/influxdata/arrow-datafusion/pull/72
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
datafusion-common = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
datafusion-expr = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }
datafusion-sql = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "8347a71f62d4fef8d37548f22b93877170039357" }

# faster tests
[profile.dev.package]
Expand Down
3 changes: 3 additions & 0 deletions host/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ workspace = true
[dependencies]
anyhow.workspace = true
arrow.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-udf-wasm-arrow2bytes.workspace = true
datafusion-sql.workspace = true
sqlparser.workspace = true
http.workspace = true
hyper.workspace = true
rand = { version = "0.9" }
Expand Down
1 change: 1 addition & 0 deletions host/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ mod error;
pub mod http;
mod linker;
mod tokio_helpers;
pub mod udf_query;
mod vfs;

/// State of the WASM payload.
Expand Down
185 changes: 185 additions & 0 deletions host/src/udf_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
//! Embedded SQL approach for executing UDFs within SQL queries.

use std::collections::HashMap;

use datafusion::execution::TaskContext;
use datafusion_common::{DataFusionError, Result as DataFusionResult};
use datafusion_sql::parser::{DFParserBuilder, Statement};
use sqlparser::ast::{CreateFunctionBody, Expr, Statement as SqlStatement, Value};
use sqlparser::dialect::dialect_from_str;

use crate::{WasmComponentPrecompiled, WasmPermissions, WasmScalarUdf};

/// A [ParsedQuery] contains the extracted UDFs and SQL query string
#[derive(Debug)]
pub struct ParsedQuery {
/// Extracted UDFs from the query
pub udfs: Vec<WasmScalarUdf>,
/// SQL query string with UDF definitions removed
pub sql: String,
}

/// Handles the registration and invocation of UDF queries in DataFusion with a
/// pre-compiled WASM component.
pub struct UdfQueryParser<'a> {
/// Pre-compiled WASM component.
/// Necessary to create UDFs.
components: HashMap<String, &'a WasmComponentPrecompiled>,
}

impl std::fmt::Debug for UdfQueryParser<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UdfQueryParser")
.field("session_ctx", &"SessionContext { ... }")
.field("components", &self.components)
.finish()
}
}

impl<'a> UdfQueryParser<'a> {
/// Registers the UDF query in DataFusion.
pub fn new(components: HashMap<String, &'a WasmComponentPrecompiled>) -> Self {
Self { components }
}

/// Parses a SQL query that defines & uses UDFs into a [ParsedQuery].
pub async fn parse(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub async fn parse(
pub fn parse(

I think this method is fully sync

Copy link
Contributor Author

@Sl1mb0 Sl1mb0 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This calls WasmScalarUdf::new, which is async. The more I look at the parser though the more fine I am with it just returning the code and allowing the caller to handle UDF creation/registration. Thoughts?

Copy link
Collaborator

@crepererum crepererum Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my oversight, sorry. keep it async 👍

&self,
udf_query: &str,
permissions: &WasmPermissions,
task_ctx: &TaskContext,
) -> DataFusionResult<ParsedQuery> {
let (code, sql) = self.parse_inner(udf_query, task_ctx)?;

let mut udfs = vec![];
for (lang, blocks) in code {
let component = self.components.get(&lang).ok_or_else(|| {
DataFusionError::Plan(format!(
"no WASM component registered for language: {:?}",
lang
))
})?;

for code in blocks {
udfs.extend(WasmScalarUdf::new(component, permissions, code).await?);
}
}

Ok(ParsedQuery { udfs, sql })
}

/// Parse the combined query to extract the chosen UDF language, UDF
/// definitions, and SQL statements.
fn parse_inner(
&self,
query: &str,
task_ctx: &TaskContext,
) -> DataFusionResult<(HashMap<String, Vec<String>>, String)> {
let options = task_ctx.session_config().options();

let dialect = dialect_from_str(options.sql_parser.dialect.clone()).expect("valid dialect");
let recursion_limit = options.sql_parser.recursion_limit;

let statements = DFParserBuilder::new(query)
.with_dialect(dialect.as_ref())
.with_recursion_limit(recursion_limit)
.build()?
.parse_statements()?;

let mut sql = String::new();
let mut udf_blocks: HashMap<String, Vec<String>> = HashMap::new();
for s in statements {
let Statement::Statement(stmt) = s else {
continue;
};

match parse_udf(*stmt)? {
Parsed::Udf { code, language } => {
if let Some(existing) = udf_blocks.get_mut(&language) {
existing.push(code);
} else {
udf_blocks.insert(language.clone(), vec![code]);
}
}
Parsed::Other(statement) => {
sql.push_str(&statement);
sql.push_str(";\n");
}
}
}

if sql.is_empty() {
return Err(DataFusionError::Plan("no SQL query found".to_string()));
}

Ok((udf_blocks, sql))
}
}

/// Represents a parsed SQL statement
enum Parsed {
/// A UDF definition
Udf {
/// UDF code
code: String,
/// UDF language
language: String,
},
/// Any other SQL statement
Other(String),
}

/// Parse a single SQL statement to extract a UDF
fn parse_udf(stmt: SqlStatement) -> DataFusionResult<Parsed> {
match stmt {
SqlStatement::CreateFunction(cf) => {
let function_body = cf.function_body.as_ref();

let language = if let Some(lang) = cf.language.as_ref() {
lang.to_string()
} else {
return Err(DataFusionError::Plan(
"function language is required for UDFs".to_string(),
));
};

let code = match function_body {
Some(body) => extract_function_body(body),
None => Err(DataFusionError::Plan(
"function body is required for UDFs".to_string(),
)),
}?;

Ok(Parsed::Udf {
code: code.to_string(),
language,
})
}
_ => Ok(Parsed::Other(stmt.to_string())),
}
}

/// Extracts the code from the function body, adding it to `code`.
fn extract_function_body(body: &CreateFunctionBody) -> DataFusionResult<&str> {
match body {
CreateFunctionBody::AsAfterOptions(e) | CreateFunctionBody::AsBeforeOptions(e) => {
expression_into_str(e)
}
CreateFunctionBody::Return(_) => Err(DataFusionError::Plan(
"`RETURN` function body not supported for UDFs".to_string(),
)),
}
}

/// Attempt to convert an `Expr` into a `str`
fn expression_into_str(expr: &Expr) -> DataFusionResult<&str> {
match expr {
Expr::Value(v) => match &v.value {
Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => Ok(s),
_ => Err(DataFusionError::Plan("expected string value".to_string())),
},
_ => Err(DataFusionError::Plan(
"expected value expression".to_string(),
)),
}
}
1 change: 1 addition & 0 deletions host/tests/integration_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod python;
mod rust;
mod test_utils;
mod udf_query;
2 changes: 1 addition & 1 deletion host/tests/integration_tests/python/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ mod examples;
mod inspection;
mod runtime;
mod state;
mod test_utils;
pub(crate) mod test_utils;
mod types;
Loading