From f1c14f8ca0f0b1ee72532359ef59140b1edc1f6e Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Mon, 19 May 2025 19:11:40 +0300 Subject: [PATCH 1/4] Add macro for creating DataFrame (#16090) --- datafusion-examples/examples/dataframe.rs | 19 ++++ datafusion/common/src/array_conversion.rs | 128 ++++++++++++++++++++++ datafusion/common/src/lib.rs | 1 + datafusion/core/src/dataframe/mod.rs | 54 +++++++++ datafusion/core/src/lib.rs | 4 + datafusion/core/src/macros.rs | 51 +++++++++ datafusion/core/src/prelude.rs | 1 + datafusion/core/tests/dataframe/mod.rs | 60 ++++++++++ 8 files changed, 318 insertions(+) create mode 100644 datafusion/common/src/array_conversion.rs create mode 100644 datafusion/core/src/macros.rs diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index ea06ee2a11f2..2a699b2555d6 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -63,6 +63,7 @@ async fn main() -> Result<()> { read_parquet(&ctx).await?; read_csv(&ctx).await?; read_memory(&ctx).await?; + read_memory_macro().await?; write_out(&ctx).await?; register_aggregate_test_data("t1", &ctx).await?; register_aggregate_test_data("t2", &ctx).await?; @@ -173,6 +174,24 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> { Ok(()) } +/// Use the DataFrame API to: +/// 1. Read in-memory data. +async fn read_memory_macro() -> Result<()> { + // create a DataFrame using macro + let df = df!( + "a" => ["a", "b", "c", "d"], + "b" => [1, 10, 10, 100] + )?; + // print the results + df.show().await?; + + // create empty DataFrame using macro + let df_empty = df!()?; + df_empty.show().await?; + + Ok(()) +} + /// Use the DataFrame API to: /// 1. Write out a DataFrame to a table /// 2. Write out a DataFrame to a parquet file diff --git a/datafusion/common/src/array_conversion.rs b/datafusion/common/src/array_conversion.rs new file mode 100644 index 000000000000..cc5ac1049efd --- /dev/null +++ b/datafusion/common/src/array_conversion.rs @@ -0,0 +1,128 @@ +use std::sync::Arc; + +use arrow::array::{ArrayRef, BooleanArray, Float32Array, Int32Array, StringArray}; + +/// Converts a vector or array into an ArrayRef. +pub trait IntoArrayRef { + fn into_array_ref(self: Box) -> ArrayRef; +} + +impl IntoArrayRef for Vec { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Int32Array::from(*self)) + } +} + +impl IntoArrayRef for Vec> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Int32Array::from(*self)) + } +} + +impl IntoArrayRef for [i32; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Int32Array::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for [Option; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Int32Array::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for Vec { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Float32Array::from(*self)) + } +} + +impl IntoArrayRef for Vec> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Float32Array::from(*self)) + } +} + +impl IntoArrayRef for [f32; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Float32Array::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for [Option; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(Float32Array::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for Vec<&str> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(*self)) + } +} + +impl IntoArrayRef for Vec> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(*self)) + } +} + +impl IntoArrayRef for [&'static str; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for [Option<&'static str>; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for Vec { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(*self)) + } +} + +impl IntoArrayRef for Vec> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(*self)) + } +} + +impl IntoArrayRef for [String; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for [Option; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(StringArray::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for Vec { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(BooleanArray::from(*self)) + } +} + +impl IntoArrayRef for Vec> { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(BooleanArray::from(*self)) + } +} + +impl IntoArrayRef for [bool; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(BooleanArray::from(Vec::from(*self))) + } +} + +impl IntoArrayRef for [Option; N] { + fn into_array_ref(self: Box) -> ArrayRef { + Arc::new(BooleanArray::from(Vec::from(*self))) + } +} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 2576ab431202..72493e7fb8ac 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -36,6 +36,7 @@ mod table_reference; mod unnest; pub mod alias; +pub mod array_conversion; pub mod cast; pub mod config; pub mod cse; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e385125692bd..c1e88c8d8607 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -49,6 +49,7 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::array_conversion::IntoArrayRef; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, @@ -181,6 +182,13 @@ impl Default for DataFrameWriteOptions { /// .limit(0, Some(100))?; /// // Perform the actual computation /// let results = df.collect(); +/// +/// // Create a new dataframe with in-memory data using macro +/// let df = df!( +/// "a" => ["a", "b", "c", "d"], +/// "b" => [1, 10, 10, 100] +/// )?; +/// df.show().await?; /// # Ok(()) /// # } /// ``` @@ -2199,6 +2207,52 @@ impl DataFrame { }) .collect() } + + /// Helper for creating DataFrame. + /// # Example + /// ``` + /// use datafusion::prelude::DataFrame; + /// let id = Box::new([1, 2, 3]); + /// let name = Box::new(["foo", "bar", "baz"]); + /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap(); + /// // +----+------+, + /// // | id | name |, + /// // +----+------+, + /// // | 1 | foo |, + /// // | 2 | bar |, + /// // | 3 | baz |, + /// // +----+------+, + /// ``` + pub fn from_columns(columns: Vec<(&str, Box)>) -> Result { + let mut fields: Vec = vec![]; + let mut arrays: Vec = vec![]; + + for (name, array_like) in columns { + let array = array_like.into_array_ref(); + let dtype = array.data_type().clone(); + fields.push(Field::new(name, dtype, true)); + arrays.push(array); + } + + let len = + arrays + .first() + .map(|arr| arr.len()) + .ok_or(DataFusionError::Execution( + "Column must not be empty".to_string(), + ))?; + let all_same_len = arrays.iter().all(|arr| arr.len() == len); + if !all_same_len { + return Err(DataFusionError::Execution( + "All columns must have the same length".to_string(), + )); + } + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema.clone(), arrays)?; + let ctx = SessionContext::new(); + ctx.read_batch(batch) + } } #[derive(Debug)] diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index db8cc919c59b..f62d6e0c2bad 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -723,6 +723,10 @@ pub mod dataframe; pub mod datasource; pub mod error; pub mod execution; + +/// helpers for creating [`DataFrame`] +pub mod macros; + pub mod physical_planner; pub mod prelude; pub mod scalar; diff --git a/datafusion/core/src/macros.rs b/datafusion/core/src/macros.rs new file mode 100644 index 000000000000..9adc281a0644 --- /dev/null +++ b/datafusion/core/src/macros.rs @@ -0,0 +1,51 @@ +/// Macro for creating DataFrame. +/// # Example +/// ``` +/// use datafusion::prelude::df; +/// # use datafusion::error::Result; +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// let df = df!( +/// "id" => [1, 2, 3], +/// "name" => ["foo", "bar", "baz"] +/// )?; +/// df.show().await?; +/// // +----+------+, +/// // | id | name |, +/// // +----+------+, +/// // | 1 | foo |, +/// // | 2 | bar |, +/// // | 3 | baz |, +/// // +----+------+, +/// let df = df!()?; // empty DataFrame +/// df.show().await?; +/// // ++ +/// // ++ +/// // ++ +/// # Ok(()) +/// # } +/// ``` +#[macro_export] +macro_rules! df { + () => {{ + use std::sync::Arc; + + use datafusion::prelude::SessionContext; + use datafusion::arrow::array::RecordBatch; + use datafusion::arrow::datatypes::Schema; + + let ctx = SessionContext::new(); + let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + ctx.read_batch(batch) + }}; + + ($($col_name:expr => $data:expr),+ $(,)?) => {{ + use datafusion::prelude::DataFrame; + use datafusion::common::array_conversion::IntoArrayRef; + + let columns = vec![ + $( ($col_name, Box::new($data) as Box) ),+ + ]; + DataFrame::from_columns(columns) + }}; +} diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 9c9fcd04bf09..fd3c315adde4 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -26,6 +26,7 @@ //! ``` pub use crate::dataframe::DataFrame; +pub use crate::df; pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 827808d923f5..0c6944134328 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -32,6 +32,7 @@ use arrow::datatypes::{ }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; +use datafusion::{assert_batches_eq, df}; use datafusion_functions_aggregate::count::{count_all, count_all_window}; use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, count, count_distinct, max, median, min, sum, @@ -6017,3 +6018,62 @@ async fn test_insert_into_casting_support() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn test_df_from_columns() -> Result<()> { + let a = Box::new([1, 2, 3]); + let b = Box::new([true, true, false]); + let c = Box::new([Some("foo"), Some("bar"), None]); + let df = DataFrame::from_columns(vec![("a", a), ("b", b), ("c", c)])?; + + assert_eq!(df.schema().fields().len(), 3); + assert_eq!(df.clone().count().await?, 3); + + let rows = df.sort(vec![col("a").sort(true, true)])?; + assert_batches_eq!( + &[ + "+---+-------+-----+", + "| a | b | c |", + "+---+-------+-----+", + "| 1 | true | foo |", + "| 2 | true | bar |", + "| 3 | false | |", + "+---+-------+-----+", + ], + &rows.collect().await? + ); + + Ok(()) +} + +#[tokio::test] +async fn test_df_macro() -> Result<()> { + let df = df!( + "a" => [1, 2, 3], + "b" => [true, true, false], + "c" => [Some("foo"), Some("bar"), None] + )?; + + assert_eq!(df.schema().fields().len(), 3); + assert_eq!(df.clone().count().await?, 3); + + let rows = df.sort(vec![col("a").sort(true, true)])?; + assert_batches_eq!( + &[ + "+---+-------+-----+", + "| a | b | c |", + "+---+-------+-----+", + "| 1 | true | foo |", + "| 2 | true | bar |", + "| 3 | false | |", + "+---+-------+-----+", + ], + &rows.collect().await? + ); + + let df_empty = df!()?; + assert_eq!(df_empty.schema().fields().len(), 0); + assert_eq!(df_empty.count().await?, 0); + + Ok(()) +} From 6972cf01b74b357635f92ab75629c3a7603a6ceb Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Tue, 20 May 2025 09:56:26 +0300 Subject: [PATCH 2/4] Fix issues causing GitHub checks to fail --- datafusion/common/src/array_conversion.rs | 17 ++++++++++++++++ datafusion/core/src/dataframe/mod.rs | 24 ++++++++++++++++++++--- datafusion/core/src/lib.rs | 2 +- datafusion/core/src/macros.rs | 8 +++----- 4 files changed, 42 insertions(+), 9 deletions(-) diff --git a/datafusion/common/src/array_conversion.rs b/datafusion/common/src/array_conversion.rs index cc5ac1049efd..86d3efea2fc6 100644 --- a/datafusion/common/src/array_conversion.rs +++ b/datafusion/common/src/array_conversion.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::sync::Arc; use arrow::array::{ArrayRef, BooleanArray, Float32Array, Int32Array, StringArray}; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c1e88c8d8607..1cd3d265e3cb 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -167,9 +167,12 @@ impl Default for DataFrameWriteOptions { /// /// # Example /// ``` +/// # use std::sync::Arc; /// # use datafusion::prelude::*; /// # use datafusion::error::Result; /// # use datafusion::functions_aggregate::expr_fn::min; +/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray}; +/// # use datafusion::arrow::datatypes::{DataType, Field, Schema}; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); @@ -183,10 +186,25 @@ impl Default for DataFrameWriteOptions { /// // Perform the actual computation /// let results = df.collect(); /// +/// // Create a new dataframe with in-memory data +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Int32, true), +/// Field::new("name", DataType::Utf8, true), +/// ]); +/// let batch = RecordBatch::try_new( +/// Arc::new(schema), +/// vec![ +/// Arc::new(Int32Array::from(vec![1, 2, 3])), +/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])), +/// ], +/// )?; +/// let df = ctx.read_batch(batch)?; +/// df.show().await?; +/// /// // Create a new dataframe with in-memory data using macro /// let df = df!( -/// "a" => ["a", "b", "c", "d"], -/// "b" => [1, 10, 10, 100] +/// "id" => [1, 2, 3], +/// "name" => ["foo", "bar", "baz"] /// )?; /// df.show().await?; /// # Ok(()) @@ -2249,7 +2267,7 @@ impl DataFrame { } let schema = Arc::new(Schema::new(fields)); - let batch = RecordBatch::try_new(schema.clone(), arrays)?; + let batch = RecordBatch::try_new(schema, arrays)?; let ctx = SessionContext::new(); ctx.read_batch(batch) } diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index f62d6e0c2bad..b9abf46c85dc 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -724,7 +724,7 @@ pub mod datasource; pub mod error; pub mod execution; -/// helpers for creating [`DataFrame`] +/// helpers for creating DataFrame pub mod macros; pub mod physical_planner; diff --git a/datafusion/core/src/macros.rs b/datafusion/core/src/macros.rs index 9adc281a0644..172cda981d3c 100644 --- a/datafusion/core/src/macros.rs +++ b/datafusion/core/src/macros.rs @@ -17,11 +17,9 @@ /// // | 2 | bar |, /// // | 3 | baz |, /// // +----+------+, -/// let df = df!()?; // empty DataFrame -/// df.show().await?; -/// // ++ -/// // ++ -/// // ++ +/// let df_empty = df!()?; // empty DataFrame +/// assert_eq!(df_empty.schema().fields().len(), 0); +/// assert_eq!(df_empty.count().await?, 0); /// # Ok(()) /// # } /// ``` From 741265654cd5cdef9fed7e1df15e225a776d11d0 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Tue, 20 May 2025 10:25:36 +0300 Subject: [PATCH 3/4] Fix issues causing GitHub checks to fail --- datafusion/core/src/macros.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/datafusion/core/src/macros.rs b/datafusion/core/src/macros.rs index 172cda981d3c..498535d6c80c 100644 --- a/datafusion/core/src/macros.rs +++ b/datafusion/core/src/macros.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + /// Macro for creating DataFrame. /// # Example /// ``` From e5e2d29f588546dc311da35d6d3ebd5d07ebf748 Mon Sep 17 00:00:00 2001 From: Sergey Zhukov Date: Sun, 25 May 2025 15:11:32 +0300 Subject: [PATCH 4/4] Add macro for creating DataFrame (#16090) --- datafusion-examples/examples/dataframe.rs | 4 +- datafusion/common/src/array_conversion.rs | 145 ---------- datafusion/common/src/lib.rs | 1 - datafusion/common/src/test_util.rs | 327 +++++++++++++++++++++- datafusion/core/src/dataframe/mod.rs | 97 +++++-- datafusion/core/src/lib.rs | 4 - datafusion/core/src/macros.rs | 66 ----- datafusion/core/src/prelude.rs | 2 +- datafusion/core/tests/dataframe/mod.rs | 16 +- docs/source/user-guide/dataframe.md | 28 +- 10 files changed, 433 insertions(+), 257 deletions(-) delete mode 100644 datafusion/common/src/array_conversion.rs delete mode 100644 datafusion/core/src/macros.rs diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index 2a699b2555d6..0ddf6aa2d036 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -178,7 +178,7 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> { /// 1. Read in-memory data. async fn read_memory_macro() -> Result<()> { // create a DataFrame using macro - let df = df!( + let df = dataframe!( "a" => ["a", "b", "c", "d"], "b" => [1, 10, 10, 100] )?; @@ -186,7 +186,7 @@ async fn read_memory_macro() -> Result<()> { df.show().await?; // create empty DataFrame using macro - let df_empty = df!()?; + let df_empty = dataframe!()?; df_empty.show().await?; Ok(()) diff --git a/datafusion/common/src/array_conversion.rs b/datafusion/common/src/array_conversion.rs deleted file mode 100644 index 86d3efea2fc6..000000000000 --- a/datafusion/common/src/array_conversion.rs +++ /dev/null @@ -1,145 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow::array::{ArrayRef, BooleanArray, Float32Array, Int32Array, StringArray}; - -/// Converts a vector or array into an ArrayRef. -pub trait IntoArrayRef { - fn into_array_ref(self: Box) -> ArrayRef; -} - -impl IntoArrayRef for Vec { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Int32Array::from(*self)) - } -} - -impl IntoArrayRef for Vec> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Int32Array::from(*self)) - } -} - -impl IntoArrayRef for [i32; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Int32Array::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for [Option; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Int32Array::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for Vec { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Float32Array::from(*self)) - } -} - -impl IntoArrayRef for Vec> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Float32Array::from(*self)) - } -} - -impl IntoArrayRef for [f32; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Float32Array::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for [Option; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(Float32Array::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for Vec<&str> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(*self)) - } -} - -impl IntoArrayRef for Vec> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(*self)) - } -} - -impl IntoArrayRef for [&'static str; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for [Option<&'static str>; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for Vec { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(*self)) - } -} - -impl IntoArrayRef for Vec> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(*self)) - } -} - -impl IntoArrayRef for [String; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for [Option; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(StringArray::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for Vec { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(BooleanArray::from(*self)) - } -} - -impl IntoArrayRef for Vec> { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(BooleanArray::from(*self)) - } -} - -impl IntoArrayRef for [bool; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(BooleanArray::from(Vec::from(*self))) - } -} - -impl IntoArrayRef for [Option; N] { - fn into_array_ref(self: Box) -> ArrayRef { - Arc::new(BooleanArray::from(Vec::from(*self))) - } -} diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 72493e7fb8ac..2576ab431202 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -36,7 +36,6 @@ mod table_reference; mod unnest; pub mod alias; -pub mod array_conversion; pub mod cast; pub mod config; pub mod cse; diff --git a/datafusion/common/src/test_util.rs b/datafusion/common/src/test_util.rs index ff6320d17a05..820a230bf6e1 100644 --- a/datafusion/common/src/test_util.rs +++ b/datafusion/common/src/test_util.rs @@ -18,11 +18,16 @@ //! Utility functions to make testing DataFusion based crates easier use crate::arrow::util::pretty::pretty_format_batches_with_options; -use arrow::array::RecordBatch; +use arrow::array::{ArrayRef, RecordBatch}; use arrow::error::ArrowError; use std::fmt::Display; use std::{error::Error, path::PathBuf}; +/// Converts a vector or array into an ArrayRef. +pub trait IntoArrayRef { + fn into_array_ref(self) -> ArrayRef; +} + pub fn format_batches(results: &[RecordBatch]) -> Result { let datafusion_format_options = crate::config::FormatOptions::default(); @@ -383,6 +388,326 @@ macro_rules! record_batch { } } +pub mod array_conversion { + use arrow::array::ArrayRef; + + use super::IntoArrayRef; + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self) + } + } + + impl IntoArrayRef for &[bool] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self) + } + } + + impl IntoArrayRef for &[i8] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self) + } + } + + impl IntoArrayRef for &[i16] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self) + } + } + + impl IntoArrayRef for &[i32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self) + } + } + + impl IntoArrayRef for &[i64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self) + } + } + + impl IntoArrayRef for &[u8] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self) + } + } + + impl IntoArrayRef for &[u16] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self) + } + } + + impl IntoArrayRef for &[u32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self) + } + } + + impl IntoArrayRef for &[u64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self.to_vec()) + } + } + + //#TODO add impl for f16 + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self) + } + } + + impl IntoArrayRef for &[f32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self) + } + } + + impl IntoArrayRef for &[f64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self.to_vec()) + } + } + + impl IntoArrayRef for Vec<&str> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for &[&str] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option<&str>] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for &[String] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } +} + #[cfg(test)] mod tests { use crate::cast::{as_float64_array, as_int32_array, as_string_array}; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 1cd3d265e3cb..a7b2065248a8 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -49,7 +49,6 @@ use std::sync::Arc; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::array_conversion::IntoArrayRef; use datafusion_common::config::{CsvOptions, JsonOptions}; use datafusion_common::{ exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, @@ -202,7 +201,7 @@ impl Default for DataFrameWriteOptions { /// df.show().await?; /// /// // Create a new dataframe with in-memory data using macro -/// let df = df!( +/// let df = dataframe!( /// "id" => [1, 2, 3], /// "name" => ["foo", "bar", "baz"] /// )?; @@ -2229,9 +2228,11 @@ impl DataFrame { /// Helper for creating DataFrame. /// # Example /// ``` + /// use std::sync::Arc; + /// use arrow::array::{ArrayRef, Int32Array, StringArray}; /// use datafusion::prelude::DataFrame; - /// let id = Box::new([1, 2, 3]); - /// let name = Box::new(["foo", "bar", "baz"]); + /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"])); /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap(); /// // +----+------+, /// // | id | name |, @@ -2241,38 +2242,78 @@ impl DataFrame { /// // | 3 | baz |, /// // +----+------+, /// ``` - pub fn from_columns(columns: Vec<(&str, Box)>) -> Result { - let mut fields: Vec = vec![]; - let mut arrays: Vec = vec![]; - - for (name, array_like) in columns { - let array = array_like.into_array_ref(); - let dtype = array.data_type().clone(); - fields.push(Field::new(name, dtype, true)); - arrays.push(array); - } + pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result { + let fields = columns + .iter() + .map(|(name, array)| Field::new(*name, array.data_type().clone(), true)) + .collect::>(); - let len = - arrays - .first() - .map(|arr| arr.len()) - .ok_or(DataFusionError::Execution( - "Column must not be empty".to_string(), - ))?; - let all_same_len = arrays.iter().all(|arr| arr.len() == len); - if !all_same_len { - return Err(DataFusionError::Execution( - "All columns must have the same length".to_string(), - )); - } + let arrays = columns + .into_iter() + .map(|(_, array)| array) + .collect::>(); let schema = Arc::new(Schema::new(fields)); let batch = RecordBatch::try_new(schema, arrays)?; let ctx = SessionContext::new(); - ctx.read_batch(batch) + let df = ctx.read_batch(batch)?; + Ok(df) } } +/// Macro for creating DataFrame. +/// # Example +/// ``` +/// use datafusion::prelude::dataframe; +/// # use datafusion::error::Result; +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// let df = dataframe!( +/// "id" => [1, 2, 3], +/// "name" => ["foo", "bar", "baz"] +/// )?; +/// df.show().await?; +/// // +----+------+, +/// // | id | name |, +/// // +----+------+, +/// // | 1 | foo |, +/// // | 2 | bar |, +/// // | 3 | baz |, +/// // +----+------+, +/// let df_empty = dataframe!()?; // empty DataFrame +/// assert_eq!(df_empty.schema().fields().len(), 0); +/// assert_eq!(df_empty.count().await?, 0); +/// # Ok(()) +/// # } +/// ``` +#[macro_export] +macro_rules! dataframe { + () => {{ + use std::sync::Arc; + + use datafusion::prelude::SessionContext; + use datafusion::arrow::array::RecordBatch; + use datafusion::arrow::datatypes::Schema; + + let ctx = SessionContext::new(); + let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + ctx.read_batch(batch) + }}; + + ($($name:expr => $data:expr),+ $(,)?) => {{ + use datafusion::prelude::DataFrame; + use datafusion::common::test_util::IntoArrayRef; + + let columns = vec![ + $( + ($name, $data.into_array_ref()), + )+ + ]; + + DataFrame::from_columns(columns) + }}; +} + #[derive(Debug)] struct DataFrameTableProvider { plan: LogicalPlan, diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index b9abf46c85dc..db8cc919c59b 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -723,10 +723,6 @@ pub mod dataframe; pub mod datasource; pub mod error; pub mod execution; - -/// helpers for creating DataFrame -pub mod macros; - pub mod physical_planner; pub mod prelude; pub mod scalar; diff --git a/datafusion/core/src/macros.rs b/datafusion/core/src/macros.rs deleted file mode 100644 index 498535d6c80c..000000000000 --- a/datafusion/core/src/macros.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -/// Macro for creating DataFrame. -/// # Example -/// ``` -/// use datafusion::prelude::df; -/// # use datafusion::error::Result; -/// # #[tokio::main] -/// # async fn main() -> Result<()> { -/// let df = df!( -/// "id" => [1, 2, 3], -/// "name" => ["foo", "bar", "baz"] -/// )?; -/// df.show().await?; -/// // +----+------+, -/// // | id | name |, -/// // +----+------+, -/// // | 1 | foo |, -/// // | 2 | bar |, -/// // | 3 | baz |, -/// // +----+------+, -/// let df_empty = df!()?; // empty DataFrame -/// assert_eq!(df_empty.schema().fields().len(), 0); -/// assert_eq!(df_empty.count().await?, 0); -/// # Ok(()) -/// # } -/// ``` -#[macro_export] -macro_rules! df { - () => {{ - use std::sync::Arc; - - use datafusion::prelude::SessionContext; - use datafusion::arrow::array::RecordBatch; - use datafusion::arrow::datatypes::Schema; - - let ctx = SessionContext::new(); - let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); - ctx.read_batch(batch) - }}; - - ($($col_name:expr => $data:expr),+ $(,)?) => {{ - use datafusion::prelude::DataFrame; - use datafusion::common::array_conversion::IntoArrayRef; - - let columns = vec![ - $( ($col_name, Box::new($data) as Box) ),+ - ]; - DataFrame::from_columns(columns) - }}; -} diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index fd3c315adde4..d723620d3232 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -25,8 +25,8 @@ //! use datafusion::prelude::*; //! ``` +pub use crate::dataframe; pub use crate::dataframe::DataFrame; -pub use crate::df; pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions, diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 0c6944134328..246787b6d78e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -32,7 +32,7 @@ use arrow::datatypes::{ }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; -use datafusion::{assert_batches_eq, df}; +use datafusion::{assert_batches_eq, dataframe}; use datafusion_functions_aggregate::count::{count_all, count_all_window}; use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, count, count_distinct, max, median, min, sum, @@ -6020,10 +6020,10 @@ async fn test_insert_into_casting_support() -> Result<()> { } #[tokio::test] -async fn test_df_from_columns() -> Result<()> { - let a = Box::new([1, 2, 3]); - let b = Box::new([true, true, false]); - let c = Box::new([Some("foo"), Some("bar"), None]); +async fn test_dataframe_from_columns() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let b: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false])); + let c: ArrayRef = Arc::new(StringArray::from(vec![Some("foo"), Some("bar"), None])); let df = DataFrame::from_columns(vec![("a", a), ("b", b), ("c", c)])?; assert_eq!(df.schema().fields().len(), 3); @@ -6047,8 +6047,8 @@ async fn test_df_from_columns() -> Result<()> { } #[tokio::test] -async fn test_df_macro() -> Result<()> { - let df = df!( +async fn test_dataframe_macro() -> Result<()> { + let df = dataframe!( "a" => [1, 2, 3], "b" => [true, true, false], "c" => [Some("foo"), Some("bar"), None] @@ -6071,7 +6071,7 @@ async fn test_df_macro() -> Result<()> { &rows.collect().await? ); - let df_empty = df!()?; + let df_empty = dataframe!()?; assert_eq!(df_empty.schema().fields().len(), 0); assert_eq!(df_empty.count().await?, 0); diff --git a/docs/source/user-guide/dataframe.md b/docs/source/user-guide/dataframe.md index 96be1bb9e256..ef67bdeb0d1d 100644 --- a/docs/source/user-guide/dataframe.md +++ b/docs/source/user-guide/dataframe.md @@ -51,12 +51,16 @@ use datafusion::prelude::*; Here is a minimal example showing the execution of a query using the DataFrame API. ```rust -use datafusion::prelude::*; +use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::functions_aggregate::expr_fn::min; +use datafusion::prelude::*; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { + // Read the data from a csv file let ctx = SessionContext::new(); let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; let df = df.filter(col("a").lt_eq(col("b")))? @@ -64,6 +68,28 @@ async fn main() -> Result<()> { .limit(0, Some(100))?; // Print results df.show().await?; + + // Create a new dataframe with in-memory data + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["foo", "bar", "baz"])), + ], + )?; + let df = ctx.read_batch(batch)?; + df.show().await?; + + // Create a new dataframe with in-memory data using macro + let df = dataframe!( + "id" => [1, 2, 3], + "name" => ["foo", "bar", "baz"] + )?; + df.show().await?; Ok(()) } ```