diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index ea06ee2a11f25..0ddf6aa2d0363 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -63,6 +63,7 @@ async fn main() -> Result<()> { read_parquet(&ctx).await?; read_csv(&ctx).await?; read_memory(&ctx).await?; + read_memory_macro().await?; write_out(&ctx).await?; register_aggregate_test_data("t1", &ctx).await?; register_aggregate_test_data("t2", &ctx).await?; @@ -173,6 +174,24 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> { Ok(()) } +/// Use the DataFrame API to: +/// 1. Read in-memory data. +async fn read_memory_macro() -> Result<()> { + // create a DataFrame using macro + let df = dataframe!( + "a" => ["a", "b", "c", "d"], + "b" => [1, 10, 10, 100] + )?; + // print the results + df.show().await?; + + // create empty DataFrame using macro + let df_empty = dataframe!()?; + df_empty.show().await?; + + Ok(()) +} + /// Use the DataFrame API to: /// 1. Write out a DataFrame to a table /// 2. Write out a DataFrame to a parquet file diff --git a/datafusion/common/src/test_util.rs b/datafusion/common/src/test_util.rs index ff6320d17a05e..820a230bf6e17 100644 --- a/datafusion/common/src/test_util.rs +++ b/datafusion/common/src/test_util.rs @@ -18,11 +18,16 @@ //! Utility functions to make testing DataFusion based crates easier use crate::arrow::util::pretty::pretty_format_batches_with_options; -use arrow::array::RecordBatch; +use arrow::array::{ArrayRef, RecordBatch}; use arrow::error::ArrowError; use std::fmt::Display; use std::{error::Error, path::PathBuf}; +/// Converts a vector or array into an ArrayRef. +pub trait IntoArrayRef { + fn into_array_ref(self) -> ArrayRef; +} + pub fn format_batches(results: &[RecordBatch]) -> Result { let datafusion_format_options = crate::config::FormatOptions::default(); @@ -383,6 +388,326 @@ macro_rules! record_batch { } } +pub mod array_conversion { + use arrow::array::ArrayRef; + + use super::IntoArrayRef; + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self) + } + } + + impl IntoArrayRef for &[bool] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Boolean, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self) + } + } + + impl IntoArrayRef for &[i8] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self) + } + } + + impl IntoArrayRef for &[i16] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int16, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self) + } + } + + impl IntoArrayRef for &[i32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self) + } + } + + impl IntoArrayRef for &[i64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Int64, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self) + } + } + + impl IntoArrayRef for &[u8] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self) + } + } + + impl IntoArrayRef for &[u16] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt16, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self) + } + } + + impl IntoArrayRef for &[u32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self) + } + } + + impl IntoArrayRef for &[u64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(UInt64, self.to_vec()) + } + } + + //#TODO add impl for f16 + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self) + } + } + + impl IntoArrayRef for &[f32] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float32, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self) + } + } + + impl IntoArrayRef for &[f64] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Float64, self.to_vec()) + } + } + + impl IntoArrayRef for Vec<&str> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for &[&str] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option<&str>] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for Vec { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for Vec> { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self) + } + } + + impl IntoArrayRef for &[String] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } + + impl IntoArrayRef for &[Option] { + fn into_array_ref(self) -> ArrayRef { + create_array!(Utf8, self.to_vec()) + } + } +} + #[cfg(test)] mod tests { use crate::cast::{as_float64_array, as_int32_array, as_string_array}; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e385125692bd1..a7b2065248a87 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -166,9 +166,12 @@ impl Default for DataFrameWriteOptions { /// /// # Example /// ``` +/// # use std::sync::Arc; /// # use datafusion::prelude::*; /// # use datafusion::error::Result; /// # use datafusion::functions_aggregate::expr_fn::min; +/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray}; +/// # use datafusion::arrow::datatypes::{DataType, Field, Schema}; /// # #[tokio::main] /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); @@ -181,6 +184,28 @@ impl Default for DataFrameWriteOptions { /// .limit(0, Some(100))?; /// // Perform the actual computation /// let results = df.collect(); +/// +/// // Create a new dataframe with in-memory data +/// let schema = Schema::new(vec![ +/// Field::new("id", DataType::Int32, true), +/// Field::new("name", DataType::Utf8, true), +/// ]); +/// let batch = RecordBatch::try_new( +/// Arc::new(schema), +/// vec![ +/// Arc::new(Int32Array::from(vec![1, 2, 3])), +/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])), +/// ], +/// )?; +/// let df = ctx.read_batch(batch)?; +/// df.show().await?; +/// +/// // Create a new dataframe with in-memory data using macro +/// let df = dataframe!( +/// "id" => [1, 2, 3], +/// "name" => ["foo", "bar", "baz"] +/// )?; +/// df.show().await?; /// # Ok(()) /// # } /// ``` @@ -2199,6 +2224,94 @@ impl DataFrame { }) .collect() } + + /// Helper for creating DataFrame. + /// # Example + /// ``` + /// use std::sync::Arc; + /// use arrow::array::{ArrayRef, Int32Array, StringArray}; + /// use datafusion::prelude::DataFrame; + /// let id: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + /// let name: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar", "baz"])); + /// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap(); + /// // +----+------+, + /// // | id | name |, + /// // +----+------+, + /// // | 1 | foo |, + /// // | 2 | bar |, + /// // | 3 | baz |, + /// // +----+------+, + /// ``` + pub fn from_columns(columns: Vec<(&str, ArrayRef)>) -> Result { + let fields = columns + .iter() + .map(|(name, array)| Field::new(*name, array.data_type().clone(), true)) + .collect::>(); + + let arrays = columns + .into_iter() + .map(|(_, array)| array) + .collect::>(); + + let schema = Arc::new(Schema::new(fields)); + let batch = RecordBatch::try_new(schema, arrays)?; + let ctx = SessionContext::new(); + let df = ctx.read_batch(batch)?; + Ok(df) + } +} + +/// Macro for creating DataFrame. +/// # Example +/// ``` +/// use datafusion::prelude::dataframe; +/// # use datafusion::error::Result; +/// # #[tokio::main] +/// # async fn main() -> Result<()> { +/// let df = dataframe!( +/// "id" => [1, 2, 3], +/// "name" => ["foo", "bar", "baz"] +/// )?; +/// df.show().await?; +/// // +----+------+, +/// // | id | name |, +/// // +----+------+, +/// // | 1 | foo |, +/// // | 2 | bar |, +/// // | 3 | baz |, +/// // +----+------+, +/// let df_empty = dataframe!()?; // empty DataFrame +/// assert_eq!(df_empty.schema().fields().len(), 0); +/// assert_eq!(df_empty.count().await?, 0); +/// # Ok(()) +/// # } +/// ``` +#[macro_export] +macro_rules! dataframe { + () => {{ + use std::sync::Arc; + + use datafusion::prelude::SessionContext; + use datafusion::arrow::array::RecordBatch; + use datafusion::arrow::datatypes::Schema; + + let ctx = SessionContext::new(); + let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); + ctx.read_batch(batch) + }}; + + ($($name:expr => $data:expr),+ $(,)?) => {{ + use datafusion::prelude::DataFrame; + use datafusion::common::test_util::IntoArrayRef; + + let columns = vec![ + $( + ($name, $data.into_array_ref()), + )+ + ]; + + DataFrame::from_columns(columns) + }}; } #[derive(Debug)] diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 9c9fcd04bf09a..d723620d32323 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -25,6 +25,7 @@ //! use datafusion::prelude::*; //! ``` +pub use crate::dataframe; pub use crate::dataframe::DataFrame; pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext}; pub use crate::execution::options::{ diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 827808d923f59..246787b6d78eb 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -32,6 +32,7 @@ use arrow::datatypes::{ }; use arrow::error::ArrowError; use arrow::util::pretty::pretty_format_batches; +use datafusion::{assert_batches_eq, dataframe}; use datafusion_functions_aggregate::count::{count_all, count_all_window}; use datafusion_functions_aggregate::expr_fn::{ array_agg, avg, count, count_distinct, max, median, min, sum, @@ -6017,3 +6018,62 @@ async fn test_insert_into_casting_support() -> Result<()> { ); Ok(()) } + +#[tokio::test] +async fn test_dataframe_from_columns() -> Result<()> { + let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + let b: ArrayRef = Arc::new(BooleanArray::from(vec![true, true, false])); + let c: ArrayRef = Arc::new(StringArray::from(vec![Some("foo"), Some("bar"), None])); + let df = DataFrame::from_columns(vec![("a", a), ("b", b), ("c", c)])?; + + assert_eq!(df.schema().fields().len(), 3); + assert_eq!(df.clone().count().await?, 3); + + let rows = df.sort(vec![col("a").sort(true, true)])?; + assert_batches_eq!( + &[ + "+---+-------+-----+", + "| a | b | c |", + "+---+-------+-----+", + "| 1 | true | foo |", + "| 2 | true | bar |", + "| 3 | false | |", + "+---+-------+-----+", + ], + &rows.collect().await? + ); + + Ok(()) +} + +#[tokio::test] +async fn test_dataframe_macro() -> Result<()> { + let df = dataframe!( + "a" => [1, 2, 3], + "b" => [true, true, false], + "c" => [Some("foo"), Some("bar"), None] + )?; + + assert_eq!(df.schema().fields().len(), 3); + assert_eq!(df.clone().count().await?, 3); + + let rows = df.sort(vec![col("a").sort(true, true)])?; + assert_batches_eq!( + &[ + "+---+-------+-----+", + "| a | b | c |", + "+---+-------+-----+", + "| 1 | true | foo |", + "| 2 | true | bar |", + "| 3 | false | |", + "+---+-------+-----+", + ], + &rows.collect().await? + ); + + let df_empty = dataframe!()?; + assert_eq!(df_empty.schema().fields().len(), 0); + assert_eq!(df_empty.count().await?, 0); + + Ok(()) +} diff --git a/docs/source/user-guide/dataframe.md b/docs/source/user-guide/dataframe.md index 96be1bb9e2568..ef67bdeb0d1de 100644 --- a/docs/source/user-guide/dataframe.md +++ b/docs/source/user-guide/dataframe.md @@ -51,12 +51,16 @@ use datafusion::prelude::*; Here is a minimal example showing the execution of a query using the DataFrame API. ```rust -use datafusion::prelude::*; +use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::functions_aggregate::expr_fn::min; +use datafusion::prelude::*; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<()> { + // Read the data from a csv file let ctx = SessionContext::new(); let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; let df = df.filter(col("a").lt_eq(col("b")))? @@ -64,6 +68,28 @@ async fn main() -> Result<()> { .limit(0, Some(100))?; // Print results df.show().await?; + + // Create a new dataframe with in-memory data + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + ]); + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["foo", "bar", "baz"])), + ], + )?; + let df = ctx.read_batch(batch)?; + df.show().await?; + + // Create a new dataframe with in-memory data using macro + let df = dataframe!( + "id" => [1, 2, 3], + "name" => ["foo", "bar", "baz"] + )?; + df.show().await?; Ok(()) } ```