|
19 | 19 |
|
20 | 20 | # Using the DataFrame API |
21 | 21 |
|
22 | | -## What is a DataFrame |
| 22 | +The [Users Guide] introduces the [`DataFrame`] API and this section describes |
| 23 | +that API in more depth. |
23 | 24 |
|
24 | | -`DataFrame` in `DataFrame` is modeled after the Pandas DataFrame interface, and is a thin wrapper over LogicalPlan that adds functionality for building and executing those plans. |
| 25 | +## What is a DataFrame? |
25 | 26 |
|
26 | | -```rust |
27 | | -pub struct DataFrame { |
28 | | - session_state: SessionState, |
29 | | - plan: LogicalPlan, |
30 | | -} |
31 | | -``` |
32 | | - |
33 | | -You can build up `DataFrame`s using its methods, similarly to building `LogicalPlan`s using `LogicalPlanBuilder`: |
34 | | - |
35 | | -```rust |
36 | | -let df = ctx.table("users").await?; |
| 27 | +As described in the [Users Guide], DataFusion [`DataFrame`]s are modeled after |
| 28 | +the [Pandas DataFrame] interface, and are implemented as thin wrapper over a |
| 29 | +[`LogicalPlan`] that adds functionality for building and executing those plans. |
37 | 30 |
|
38 | | -// Create a new DataFrame sorted by `id`, `bank_account` |
39 | | -let new_df = df.select(vec![col("id"), col("bank_account")])? |
40 | | - .sort(vec![col("id")])?; |
41 | | - |
42 | | -// Build the same plan using the LogicalPlanBuilder |
43 | | -let plan = LogicalPlanBuilder::from(&df.to_logical_plan()) |
44 | | - .project(vec![col("id"), col("bank_account")])? |
45 | | - .sort(vec![col("id")])? |
46 | | - .build()?; |
47 | | -``` |
48 | | - |
49 | | -You can use `collect` or `execute_stream` to execute the query. |
| 31 | +The simplest possible dataframe is one that scans a table and that table can be |
| 32 | +in a file or in memory. |
50 | 33 |
|
51 | 34 | ## How to generate a DataFrame |
52 | 35 |
|
53 | | -You can directly use the `DataFrame` API or generate a `DataFrame` from a SQL query. |
54 | | - |
55 | | -For example, to use `sql` to construct `DataFrame`: |
| 36 | +You can construct [`DataFrame`]s programmatically using the API, similarly to |
| 37 | +other DataFrame APIs. For example, you can read an in memory `RecordBatch` into |
| 38 | +a `DataFrame`: |
56 | 39 |
|
57 | 40 | ```rust |
58 | | -let ctx = SessionContext::new(); |
59 | | -// Register the in-memory table containing the data |
60 | | -ctx.register_table("users", Arc::new(create_memtable()?))?; |
61 | | -let dataframe = ctx.sql("SELECT * FROM users;").await?; |
| 41 | +use std::sync::Arc; |
| 42 | +use datafusion::prelude::*; |
| 43 | +use datafusion::arrow::array::{ArrayRef, Int32Array}; |
| 44 | +use datafusion::arrow::record_batch::RecordBatch; |
| 45 | +use datafusion::error::Result; |
| 46 | + |
| 47 | +#[tokio::main] |
| 48 | +async fn main() -> Result<()> { |
| 49 | + let ctx = SessionContext::new(); |
| 50 | + // Register an in-memory table containing the following data |
| 51 | + // id | bank_account |
| 52 | + // ---|------------- |
| 53 | + // 1 | 9000 |
| 54 | + // 2 | 8000 |
| 55 | + // 3 | 7000 |
| 56 | + let data = RecordBatch::try_from_iter(vec![ |
| 57 | + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), |
| 58 | + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), |
| 59 | + ])?; |
| 60 | + // Create a DataFrame that scans the user table, and finds |
| 61 | + // all users with a bank account at least 8000 |
| 62 | + // and sorts the results by bank account in descending order |
| 63 | + let dataframe = ctx |
| 64 | + .read_batch(data)? |
| 65 | + .filter(col("bank_account").gt_eq(lit(8000)))? // bank_account >= 8000 |
| 66 | + .sort(vec![col("bank_account").sort(false, true)])?; // ORDER BY bank_account DESC |
| 67 | + |
| 68 | + Ok(()) |
| 69 | +} |
62 | 70 | ``` |
63 | 71 |
|
64 | | -To construct `DataFrame` using the API: |
| 72 | +You can _also_ generate a `DataFrame` from a SQL query and use the DataFrame's APIs |
| 73 | +to manipulate the output of the query. |
65 | 74 |
|
66 | 75 | ```rust |
67 | | -let ctx = SessionContext::new(); |
68 | | -// Register the in-memory table containing the data |
69 | | -ctx.register_table("users", Arc::new(create_memtable()?))?; |
70 | | -let dataframe = ctx |
71 | | - .table("users") |
72 | | - .filter(col("a").lt_eq(col("b")))? |
73 | | - .sort(vec![col("a").sort(true, true), col("b").sort(false, false)])?; |
| 76 | +use std::sync::Arc; |
| 77 | +use datafusion::prelude::*; |
| 78 | +use datafusion::assert_batches_eq; |
| 79 | +use datafusion::arrow::array::{ArrayRef, Int32Array}; |
| 80 | +use datafusion::arrow::record_batch::RecordBatch; |
| 81 | +use datafusion::error::Result; |
| 82 | + |
| 83 | +#[tokio::main] |
| 84 | +async fn main() -> Result<()> { |
| 85 | + let ctx = SessionContext::new(); |
| 86 | + // Register the same in-memory table as the previous example |
| 87 | + let data = RecordBatch::try_from_iter(vec![ |
| 88 | + ("id", Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef), |
| 89 | + ("bank_account", Arc::new(Int32Array::from(vec![9000, 8000, 7000]))), |
| 90 | + ])?; |
| 91 | + ctx.register_batch("users", data)?; |
| 92 | + // Create a DataFrame using SQL |
| 93 | + let dataframe = ctx.sql("SELECT * FROM users;") |
| 94 | + .await? |
| 95 | + // Note we can filter the output of the query using the DataFrame API |
| 96 | + .filter(col("bank_account").gt_eq(lit(8000)))?; // bank_account >= 8000 |
| 97 | + |
| 98 | + let results = &dataframe.collect().await?; |
| 99 | + |
| 100 | + // use the `assert_batches_eq` macro to show the output |
| 101 | + assert_batches_eq!( |
| 102 | + vec![ |
| 103 | + "+----+--------------+", |
| 104 | + "| id | bank_account |", |
| 105 | + "+----+--------------+", |
| 106 | + "| 1 | 9000 |", |
| 107 | + "| 2 | 8000 |", |
| 108 | + "+----+--------------+", |
| 109 | + ], |
| 110 | + &results |
| 111 | + ); |
| 112 | + Ok(()) |
| 113 | +} |
74 | 114 | ``` |
75 | 115 |
|
76 | 116 | ## Collect / Streaming Exec |
77 | 117 |
|
78 | | -DataFusion `DataFrame`s are "lazy", meaning they do not do any processing until they are executed, which allows for additional optimizations. |
| 118 | +DataFusion [`DataFrame`]s are "lazy", meaning they do no processing until |
| 119 | +they are executed, which allows for additional optimizations. |
79 | 120 |
|
80 | | -When you have a `DataFrame`, you can run it in one of three ways: |
| 121 | +You can run a `DataFrame` in one of three ways: |
81 | 122 |
|
82 | | -1. `collect` which executes the query and buffers all the output into a `Vec<RecordBatch>` |
83 | | -2. `streaming_exec`, which begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` |
84 | | -3. `cache` which executes the query and buffers the output into a new in memory DataFrame. |
| 123 | +1. `collect`: executes the query and buffers all the output into a `Vec<RecordBatch>` |
| 124 | +2. `execute_stream`: begins executions and returns a `SendableRecordBatchStream` which incrementally computes output on each call to `next()` |
| 125 | +3. `cache`: executes the query and buffers the output into a new in memory `DataFrame.` |
85 | 126 |
|
86 | | -You can just collect all outputs once like: |
| 127 | +To collect all outputs into a memory buffer, use the `collect` method: |
87 | 128 |
|
88 | 129 | ```rust |
89 | | -let ctx = SessionContext::new(); |
90 | | -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
91 | | -let batches = df.collect().await?; |
| 130 | +use datafusion::prelude::*; |
| 131 | +use datafusion::error::Result; |
| 132 | + |
| 133 | +#[tokio::main] |
| 134 | +async fn main() -> Result<()> { |
| 135 | + let ctx = SessionContext::new(); |
| 136 | + // read the contents of a CSV file into a DataFrame |
| 137 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 138 | + // execute the query and collect the results as a Vec<RecordBatch> |
| 139 | + let batches = df.collect().await?; |
| 140 | + for record_batch in batches { |
| 141 | + println!("{record_batch:?}"); |
| 142 | + } |
| 143 | + Ok(()) |
| 144 | +} |
92 | 145 | ``` |
93 | 146 |
|
94 | | -You can also use stream output to incrementally generate output one `RecordBatch` at a time |
| 147 | +Use `execute_stream` to incrementally generate output one `RecordBatch` at a time: |
95 | 148 |
|
96 | 149 | ```rust |
97 | | -let ctx = SessionContext::new(); |
98 | | -let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
99 | | -let mut stream = df.execute_stream().await?; |
100 | | -while let Some(rb) = stream.next().await { |
101 | | - println!("{rb:?}"); |
| 150 | +use datafusion::prelude::*; |
| 151 | +use datafusion::error::Result; |
| 152 | +use futures::stream::StreamExt; |
| 153 | + |
| 154 | +#[tokio::main] |
| 155 | +async fn main() -> Result<()> { |
| 156 | + let ctx = SessionContext::new(); |
| 157 | + // read example.csv file into a DataFrame |
| 158 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 159 | + // begin execution (returns quickly, does not compute results) |
| 160 | + let mut stream = df.execute_stream().await?; |
| 161 | + // results are returned incrementally as they are computed |
| 162 | + while let Some(record_batch) = stream.next().await { |
| 163 | + println!("{record_batch:?}"); |
| 164 | + } |
| 165 | + Ok(()) |
102 | 166 | } |
103 | 167 | ``` |
104 | 168 |
|
105 | 169 | # Write DataFrame to Files |
106 | 170 |
|
107 | | -You can also serialize `DataFrame` to a file. For now, `Datafusion` supports write `DataFrame` to `csv`, `json` and `parquet`. |
108 | | - |
109 | | -When writing a file, DataFusion will execute the DataFrame and stream the results to a file. |
| 171 | +You can also write the contents of a `DataFrame` to a file. When writing a file, |
| 172 | +DataFusion executes the `DataFrame` and streams the results to the output. |
| 173 | +DataFusion comes with support for writing `csv`, `json` `arrow` `avro`, and |
| 174 | +`parquet` files, and supports writing custom file formats via API (see |
| 175 | +[`custom_file_format.rs`] for an example) |
110 | 176 |
|
111 | | -For example, to write a csv_file |
| 177 | +For example, to read a CSV file and write it to a parquet file, use the |
| 178 | +[`DataFrame::write_parquet`] method |
112 | 179 |
|
113 | 180 | ```rust |
114 | | -let ctx = SessionContext::new(); |
115 | | -// Register the in-memory table containing the data |
116 | | -ctx.register_table("users", Arc::new(mem_table))?; |
117 | | -let dataframe = ctx.sql("SELECT * FROM users;").await?; |
118 | | - |
119 | | -dataframe |
120 | | - .write_csv("user_dataframe.csv", DataFrameWriteOptions::default(), None) |
121 | | - .await; |
| 181 | +use datafusion::prelude::*; |
| 182 | +use datafusion::error::Result; |
| 183 | +use datafusion::dataframe::DataFrameWriteOptions; |
| 184 | + |
| 185 | +#[tokio::main] |
| 186 | +async fn main() -> Result<()> { |
| 187 | + let ctx = SessionContext::new(); |
| 188 | + // read example.csv file into a DataFrame |
| 189 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 190 | + // stream the contents of the DataFrame to the `example.parquet` file |
| 191 | + df.write_parquet( |
| 192 | + "example.parquet", |
| 193 | + DataFrameWriteOptions::new(), |
| 194 | + None, // writer_options |
| 195 | + ).await; |
| 196 | + Ok(()) |
| 197 | +} |
122 | 198 | ``` |
123 | 199 |
|
124 | | -and the file will look like (Example Output): |
| 200 | +[`custom_file_format.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/custom_file_format.rs |
125 | 201 |
|
126 | | -``` |
127 | | -id,bank_account |
128 | | -1,9000 |
| 202 | +The output file will look like (Example Output): |
| 203 | + |
| 204 | +```sql |
| 205 | +> select * from '../datafusion/core/example.parquet'; |
| 206 | ++---+---+---+ |
| 207 | +| a | b | c | |
| 208 | ++---+---+---+ |
| 209 | +| 1 | 2 | 3 | |
| 210 | ++---+---+---+ |
129 | 211 | ``` |
130 | 212 |
|
131 | | -## Transform between LogicalPlan and DataFrame |
| 213 | +## Relationship between `LogicalPlan`s and `DataFrame`s |
132 | 214 |
|
133 | | -As shown above, `DataFrame` is just a very thin wrapper of `LogicalPlan`, so you can easily go back and forth between them. |
| 215 | +The `DataFrame` struct is defined like this: |
134 | 216 |
|
135 | 217 | ```rust |
136 | | -// Just combine LogicalPlan with SessionContext and you get a DataFrame |
137 | | -let ctx = SessionContext::new(); |
138 | | -// Register the in-memory table containing the data |
139 | | -ctx.register_table("users", Arc::new(mem_table))?; |
140 | | -let dataframe = ctx.sql("SELECT * FROM users;").await?; |
| 218 | +use datafusion::execution::session_state::SessionState; |
| 219 | +use datafusion::logical_expr::LogicalPlan; |
| 220 | +pub struct DataFrame { |
| 221 | + // state required to execute a LogicalPlan |
| 222 | + session_state: Box<SessionState>, |
| 223 | + // LogicalPlan that describes the computation to perform |
| 224 | + plan: LogicalPlan, |
| 225 | +} |
| 226 | +``` |
141 | 227 |
|
142 | | -// get LogicalPlan in dataframe |
143 | | -let plan = dataframe.logical_plan().clone(); |
| 228 | +As shown above, `DataFrame` is a thin wrapper of `LogicalPlan`, so you can |
| 229 | +easily go back and forth between them. |
144 | 230 |
|
145 | | -// construct a DataFrame with LogicalPlan |
146 | | -let new_df = DataFrame::new(ctx.state(), plan); |
| 231 | +```rust |
| 232 | +use datafusion::prelude::*; |
| 233 | +use datafusion::error::Result; |
| 234 | +use datafusion::logical_expr::LogicalPlanBuilder; |
| 235 | + |
| 236 | +#[tokio::main] |
| 237 | +async fn main() -> Result<()>{ |
| 238 | + let ctx = SessionContext::new(); |
| 239 | + // read example.csv file into a DataFrame |
| 240 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 241 | + // You can easily get the LogicalPlan from the DataFrame |
| 242 | + let (_state, plan) = df.into_parts(); |
| 243 | + // Just combine LogicalPlan with SessionContext and you get a DataFrame |
| 244 | + // get LogicalPlan in dataframe |
| 245 | + let new_df = DataFrame::new(ctx.state(), plan); |
| 246 | + Ok(()) |
| 247 | +} |
147 | 248 | ``` |
| 249 | + |
| 250 | +In fact, using the [`DataFrame`]s methods you can create the same |
| 251 | +[`LogicalPlan`]s as when using [`LogicalPlanBuilder`]: |
| 252 | + |
| 253 | +```rust |
| 254 | +use datafusion::prelude::*; |
| 255 | +use datafusion::error::Result; |
| 256 | +use datafusion::logical_expr::LogicalPlanBuilder; |
| 257 | + |
| 258 | +#[tokio::main] |
| 259 | +async fn main() -> Result<()>{ |
| 260 | + let ctx = SessionContext::new(); |
| 261 | + // read example.csv file into a DataFrame |
| 262 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 263 | + // Create a new DataFrame sorted by `id`, `bank_account` |
| 264 | + let new_df = df.select(vec![col("a"), col("b")])? |
| 265 | + .sort(vec![col("a")])?; |
| 266 | + // Build the same plan using the LogicalPlanBuilder |
| 267 | + // Similar to `SELECT a, b FROM example.csv ORDER BY a` |
| 268 | + let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?; |
| 269 | + let (_state, plan) = df.into_parts(); // get the DataFrame's LogicalPlan |
| 270 | + let plan = LogicalPlanBuilder::from(plan) |
| 271 | + .project(vec![col("a"), col("b")])? |
| 272 | + .sort(vec![col("a")])? |
| 273 | + .build()?; |
| 274 | + // prove they are the same |
| 275 | + assert_eq!(new_df.logical_plan(), &plan); |
| 276 | + Ok(()) |
| 277 | +} |
| 278 | +``` |
| 279 | + |
| 280 | +[users guide]: ../user-guide/dataframe.md |
| 281 | +[pandas dataframe]: https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html |
| 282 | +[`dataframe`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html |
| 283 | +[`logicalplan`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.LogicalPlan.html |
| 284 | +[`logicalplanbuilder`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.LogicalPlanBuilder.html |
| 285 | +[`dataframe::write_parquet`]: https://docs.rs/datafusion/latest/datafusion/dataframe/struct.DataFrame.html#method.write_parquet |
0 commit comments