Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datafusion-examples/examples/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ async fn main() -> Result<()> {
read_parquet(&ctx).await?;
read_csv(&ctx).await?;
read_memory(&ctx).await?;
read_memory_macro().await?;
write_out(&ctx).await?;
register_aggregate_test_data("t1", &ctx).await?;
register_aggregate_test_data("t2", &ctx).await?;
Expand Down Expand Up @@ -173,6 +174,24 @@ async fn read_memory(ctx: &SessionContext) -> Result<()> {
Ok(())
}

/// Use the DataFrame API to:
/// 1. Read in-memory data.
async fn read_memory_macro() -> Result<()> {
// create a DataFrame using macro
let df = df!(
"a" => ["a", "b", "c", "d"],
"b" => [1, 10, 10, 100]
)?;
// print the results
df.show().await?;

// create empty DataFrame using macro
let df_empty = df!()?;
df_empty.show().await?;

Ok(())
}

/// Use the DataFrame API to:
/// 1. Write out a DataFrame to a table
/// 2. Write out a DataFrame to a parquet file
Expand Down
145 changes: 145 additions & 0 deletions datafusion/common/src/array_conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{ArrayRef, BooleanArray, Float32Array, Int32Array, StringArray};

/// Converts a vector or array into an ArrayRef.
pub trait IntoArrayRef {
fn into_array_ref(self: Box<Self>) -> ArrayRef;
}

impl IntoArrayRef for Vec<i32> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Int32Array::from(*self))
}
}

impl IntoArrayRef for Vec<Option<i32>> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Int32Array::from(*self))
}
}

impl<const N: usize> IntoArrayRef for [i32; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Int32Array::from(Vec::from(*self)))
}
}

impl<const N: usize> IntoArrayRef for [Option<i32>; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Int32Array::from(Vec::from(*self)))
}
}

impl IntoArrayRef for Vec<f32> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Float32Array::from(*self))
}
}

impl IntoArrayRef for Vec<Option<f32>> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Float32Array::from(*self))
}
}

impl<const N: usize> IntoArrayRef for [f32; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Float32Array::from(Vec::from(*self)))
}
}

impl<const N: usize> IntoArrayRef for [Option<f32>; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(Float32Array::from(Vec::from(*self)))
}
}

impl IntoArrayRef for Vec<&str> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(*self))
}
}

impl IntoArrayRef for Vec<Option<&str>> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(*self))
}
}

impl<const N: usize> IntoArrayRef for [&'static str; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(Vec::from(*self)))
}
}

impl<const N: usize> IntoArrayRef for [Option<&'static str>; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(Vec::from(*self)))
}
}

impl IntoArrayRef for Vec<String> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(*self))
}
}

impl IntoArrayRef for Vec<Option<String>> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(*self))
}
}

impl<const N: usize> IntoArrayRef for [String; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(Vec::from(*self)))
}
}

impl<const N: usize> IntoArrayRef for [Option<String>; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(StringArray::from(Vec::from(*self)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm feeling this file somewhat repeats what was done in #12846 ? can we reuse it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I'm going to try to reuse it.

}

impl IntoArrayRef for Vec<bool> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(BooleanArray::from(*self))
}
}

impl IntoArrayRef for Vec<Option<bool>> {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(BooleanArray::from(*self))
}
}

impl<const N: usize> IntoArrayRef for [bool; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(BooleanArray::from(Vec::from(*self)))
}
}

impl<const N: usize> IntoArrayRef for [Option<bool>; N] {
fn into_array_ref(self: Box<Self>) -> ArrayRef {
Arc::new(BooleanArray::from(Vec::from(*self)))
}
}
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod table_reference;
mod unnest;

pub mod alias;
pub mod array_conversion;
pub mod cast;
pub mod config;
pub mod cse;
Expand Down
72 changes: 72 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use std::sync::Arc;
use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
use arrow::compute::{cast, concat};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::array_conversion::IntoArrayRef;
use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
exec_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema,
Expand Down Expand Up @@ -166,9 +167,12 @@ impl Default for DataFrameWriteOptions {
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use datafusion::prelude::*;
/// # use datafusion::error::Result;
/// # use datafusion::functions_aggregate::expr_fn::min;
/// # use datafusion::arrow::array::{Int32Array, RecordBatch, StringArray};
/// # use datafusion::arrow::datatypes::{DataType, Field, Schema};
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
Expand All @@ -181,6 +185,28 @@ impl Default for DataFrameWriteOptions {
/// .limit(0, Some(100))?;
/// // Perform the actual computation
/// let results = df.collect();
///
/// // Create a new dataframe with in-memory data
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, true),
/// Field::new("name", DataType::Utf8, true),
/// ]);
/// let batch = RecordBatch::try_new(
/// Arc::new(schema),
/// vec![
/// Arc::new(Int32Array::from(vec![1, 2, 3])),
/// Arc::new(StringArray::from(vec!["foo", "bar", "baz"])),
/// ],
/// )?;
/// let df = ctx.read_batch(batch)?;
/// df.show().await?;
///
/// // Create a new dataframe with in-memory data using macro
/// let df = df!(
/// "id" => [1, 2, 3],
/// "name" => ["foo", "bar", "baz"]
/// )?;
/// df.show().await?;
/// # Ok(())
/// # }
/// ```
Expand Down Expand Up @@ -2199,6 +2225,52 @@ impl DataFrame {
})
.collect()
}

/// Helper for creating DataFrame.
/// # Example
/// ```
/// use datafusion::prelude::DataFrame;
/// let id = Box::new([1, 2, 3]);
/// let name = Box::new(["foo", "bar", "baz"]);
/// let df = DataFrame::from_columns(vec![("id", id), ("name", name)]).unwrap();
/// // +----+------+,
/// // | id | name |,
/// // +----+------+,
/// // | 1 | foo |,
/// // | 2 | bar |,
/// // | 3 | baz |,
/// // +----+------+,
/// ```
pub fn from_columns(columns: Vec<(&str, Box<dyn IntoArrayRef>)>) -> Result<Self> {
let mut fields: Vec<Field> = vec![];
let mut arrays: Vec<ArrayRef> = vec![];

for (name, array_like) in columns {
let array = array_like.into_array_ref();
let dtype = array.data_type().clone();
fields.push(Field::new(name, dtype, true));
arrays.push(array);
}

let len =
arrays
.first()
.map(|arr| arr.len())
.ok_or(DataFusionError::Execution(
"Column must not be empty".to_string(),
))?;
let all_same_len = arrays.iter().all(|arr| arr.len() == len);
if !all_same_len {
return Err(DataFusionError::Execution(
"All columns must have the same length".to_string(),
));
}

let schema = Arc::new(Schema::new(fields));
let batch = RecordBatch::try_new(schema, arrays)?;
let ctx = SessionContext::new();
ctx.read_batch(batch)
}
}

#[derive(Debug)]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ pub mod dataframe;
pub mod datasource;
pub mod error;
pub mod execution;

/// helpers for creating DataFrame
pub mod macros;

pub mod physical_planner;
pub mod prelude;
pub mod scalar;
Expand Down
66 changes: 66 additions & 0 deletions datafusion/core/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Macro for creating DataFrame.
/// # Example
/// ```
/// use datafusion::prelude::df;
/// # use datafusion::error::Result;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let df = df!(
/// "id" => [1, 2, 3],
/// "name" => ["foo", "bar", "baz"]
/// )?;
/// df.show().await?;
/// // +----+------+,
/// // | id | name |,
/// // +----+------+,
/// // | 1 | foo |,
/// // | 2 | bar |,
/// // | 3 | baz |,
/// // +----+------+,
/// let df_empty = df!()?; // empty DataFrame
/// assert_eq!(df_empty.schema().fields().len(), 0);
/// assert_eq!(df_empty.count().await?, 0);
/// # Ok(())
/// # }
/// ```
#[macro_export]
macro_rules! df {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets call it make_dataframe df can be easily confused with DataFusion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the concern about potential confusion with df and DataFusion. Personally, I like the name df because it mirrors the concise syntax seen in other data libraries. If we want to avoid potential ambiguity, I’d prefer dataframe over make_dataframe, since it's still clear but a bit shorter to type.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataframe! sounds good to me!

() => {{
use std::sync::Arc;

use datafusion::prelude::SessionContext;
use datafusion::arrow::array::RecordBatch;
use datafusion::arrow::datatypes::Schema;

let ctx = SessionContext::new();
let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
ctx.read_batch(batch)
}};

($($col_name:expr => $data:expr),+ $(,)?) => {{
use datafusion::prelude::DataFrame;
use datafusion::common::array_conversion::IntoArrayRef;

let columns = vec![
$( ($col_name, Box::new($data) as Box<dyn IntoArrayRef>) ),+
];
DataFrame::from_columns(columns)
}};
}
1 change: 1 addition & 0 deletions datafusion/core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
//! ```

pub use crate::dataframe::DataFrame;
pub use crate::df;
pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
pub use crate::execution::options::{
AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
Expand Down
Loading