Skip to content

Chore: update wasm-supported crates, add tests #14005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion datafusion/wasmtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ chrono = { version = "0.4", features = ["wasmbind"] }
# code size when deploying.
console_error_panic_hook = { version = "0.1.1", optional = true }
datafusion = { workspace = true }
datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-common-runtime = { workspace = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-functions = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
datafusion-functions-table = { workspace = true }
datafusion-optimizer = { workspace = true, default-features = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-sql = { workspace = true }

# getrandom must be compiled with js feature
getrandom = { version = "0.2.8", features = ["js"] }

Expand Down
12 changes: 10 additions & 2 deletions datafusion/wasmtest/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,13 @@ The following DataFusion crates are verified to work in a wasm-pack environment
- `datafusion-physical-expr`
- `datafusion-physical-plan`
- `datafusion-sql`

The difficulty with getting the remaining DataFusion crates compiled to WASM is that they have non-optional dependencies on the [`parquet`](https://docs.rs/crate/parquet/) crate with its default features enabled. Several of the default parquet crate features require native dependencies that are not compatible with WASM, in particular the `lz4` and `zstd` features. If we can arrange our feature flags to make it possible to depend on parquet with these features disabled, then it should be possible to compile the core `datafusion` crate to WASM as well.
- `datafusion-expr-common`
- `datafusion-physical-expr-common`
- `datafusion-functions`
- `datafusion-functions-aggregate`
- `datafusion-functions-aggregate-common`
- `datafusion-functions-table`
- `datafusion-catalog`
- `datafusion-common-runtime`

The `datafusion-ffi` crate cannot compile for the wasm32-unknown-unknown target because it relies on lzma-sys, which depends on native C libraries (liblzma). The wasm32-unknown-unknown target lacks a standard C library (stdlib.h) and POSIX-like environment, preventing the native code from being compiled.
70 changes: 61 additions & 9 deletions datafusion/wasmtest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use std::sync::Arc;
use wasm_bindgen::prelude::*;

pub fn set_panic_hook() {
// When the `console_error_panic_hook` feature is enabled, we can call the
// `set_panic_hook` function at least once during initialization, and then
Expand Down Expand Up @@ -77,7 +76,14 @@ pub fn basic_parse() {
#[cfg(test)]
mod test {
use super::*;
use datafusion::execution::context::SessionContext;
use datafusion::{
arrow::{
array::{ArrayRef, Int32Array, RecordBatch, StringArray},
datatypes::{DataType, Field, Schema},
},
datasource::MemTable,
execution::context::SessionContext,
};
use datafusion_execution::{
config::SessionConfig, disk_manager::DiskManagerConfig,
runtime_env::RuntimeEnvBuilder,
Expand All @@ -95,19 +101,21 @@ mod test {
basic_parse();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_execute() {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)
fn get_ctx() -> Arc<SessionContext> {
let rt = RuntimeEnvBuilder::new()
.with_disk_manager(DiskManagerConfig::Disabled)
.build_arc()
.unwrap();
let session_config = SessionConfig::new().with_target_partitions(1);
let session_context =
Arc::new(SessionContext::new_with_config_rt(session_config, rt));
Arc::new(SessionContext::new_with_config_rt(session_config, rt))
}
#[wasm_bindgen_test(unsupported = tokio::test)]
async fn basic_execute() {
let sql = "SELECT 2 + 2;";

// Execute SQL (using datafusion)

let session_context = get_ctx();
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();

let logical_plan = session_context
Expand All @@ -124,4 +132,48 @@ mod test {
let task_ctx = session_context.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

async fn basic_df_function_execute() {
let sql = "SELECT abs(-1.0);";
let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();
let ctx = get_ctx();
let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}

#[wasm_bindgen_test(unsupported = tokio::test)]
async fn test_basic_aggregate() {
let sql =
"SELECT FIRST_VALUE(value) OVER (ORDER BY id) as first_val FROM test_table;";

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("value", DataType::Utf8, false),
]));

let data: Vec<ArrayRef> = vec![
Arc::new(Int32Array::from(vec![1])),
Arc::new(StringArray::from(vec!["a"])),
];

let batch = RecordBatch::try_new(schema.clone(), data).unwrap();
let table = MemTable::try_new(schema.clone(), vec![vec![batch]]).unwrap();

let ctx = get_ctx();
ctx.register_table("test_table", Arc::new(table)).unwrap();

let statement = DFParser::parse_sql(sql).unwrap().pop_back().unwrap();

let logical_plan = ctx.state().statement_to_plan(statement).await.unwrap();
let data_frame = ctx.execute_logical_plan(logical_plan).await.unwrap();
let physical_plan = data_frame.create_physical_plan().await.unwrap();

let task_ctx = ctx.task_ctx();
let _ = collect(physical_plan, task_ctx).await.unwrap();
}
}
Loading