Skip to content

Commit fd17765

Browse files
authored
Add examples to use MemTable and TableProvider (#1864) (#1946)
* Add examples to use MemTable and TableProvider (#1864) * Avoid using Builder in the Memtable example
1 parent b161216 commit fd17765

File tree

3 files changed

+346
-0
lines changed

3 files changed

+346
-0
lines changed

datafusion-examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ tonic = "0.6"
4141
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
4242
futures = "0.3"
4343
num_cpus = "1.13.0"
44+
async-trait = "0.1.41"
Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use async_trait::async_trait;
19+
use datafusion::arrow::array::{Array, UInt64Builder, UInt8Builder};
20+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
21+
use datafusion::arrow::record_batch::RecordBatch;
22+
use datafusion::datasource::TableProvider;
23+
use datafusion::error::{DataFusionError, Result};
24+
use datafusion::execution::dataframe_impl::DataFrameImpl;
25+
use datafusion::execution::runtime_env::RuntimeEnv;
26+
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
27+
use datafusion::physical_plan::expressions::PhysicalSortExpr;
28+
use datafusion::physical_plan::memory::MemoryStream;
29+
use datafusion::physical_plan::{
30+
project_schema, ExecutionPlan, SendableRecordBatchStream, Statistics,
31+
};
32+
use datafusion::prelude::*;
33+
use std::any::Any;
34+
use std::collections::{BTreeMap, HashMap};
35+
use std::fmt::{Debug, Formatter};
36+
use std::sync::{Arc, Mutex};
37+
use std::time::Duration;
38+
use tokio::time::timeout;
39+
40+
/// This example demonstrates executing a simple query against a custom datasource
41+
#[tokio::main]
42+
async fn main() -> Result<()> {
43+
// create our custom datasource and adding some users
44+
let db = CustomDataSource::default();
45+
db.populate_users();
46+
47+
search_accounts(db.clone(), None, 3).await?;
48+
search_accounts(db.clone(), Some(col("bank_account").gt(lit(8000u64))), 1).await?;
49+
search_accounts(db.clone(), Some(col("bank_account").gt(lit(200u64))), 2).await?;
50+
51+
Ok(())
52+
}
53+
54+
async fn search_accounts(
55+
db: CustomDataSource,
56+
filter: Option<Expr>,
57+
expected_result_length: usize,
58+
) -> Result<()> {
59+
// create local execution context
60+
let ctx = ExecutionContext::new();
61+
62+
// create logical plan composed of a single TableScan
63+
let logical_plan =
64+
LogicalPlanBuilder::scan_with_filters("accounts", Arc::new(db), None, vec![])
65+
.unwrap()
66+
.build()
67+
.unwrap();
68+
69+
let mut dataframe = DataFrameImpl::new(ctx.state, &logical_plan)
70+
.select_columns(&["id", "bank_account"])?;
71+
72+
if let Some(f) = filter {
73+
dataframe = dataframe.filter(f)?;
74+
}
75+
76+
timeout(Duration::from_secs(10), async move {
77+
let result = dataframe.collect().await.unwrap();
78+
let record_batch = result.get(0).unwrap();
79+
80+
assert_eq!(expected_result_length, record_batch.column(1).len());
81+
dbg!(record_batch.columns());
82+
})
83+
.await
84+
.unwrap();
85+
86+
Ok(())
87+
}
88+
89+
/// A User, with an id and a bank account
90+
#[derive(Clone, Debug)]
91+
struct User {
92+
id: u8,
93+
bank_account: u64,
94+
}
95+
96+
/// A custom datasource, used to represent a datastore with a single index
97+
#[derive(Clone)]
98+
pub struct CustomDataSource {
99+
inner: Arc<Mutex<CustomDataSourceInner>>,
100+
}
101+
102+
struct CustomDataSourceInner {
103+
data: HashMap<u8, User>,
104+
bank_account_index: BTreeMap<u64, u8>,
105+
}
106+
107+
impl Debug for CustomDataSource {
108+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109+
f.write_str("custom_db")
110+
}
111+
}
112+
113+
impl CustomDataSource {
114+
pub(crate) async fn create_physical_plan(
115+
&self,
116+
projections: &Option<Vec<usize>>,
117+
schema: SchemaRef,
118+
) -> Result<Arc<dyn ExecutionPlan>> {
119+
Ok(Arc::new(CustomExec::new(projections, schema, self.clone())))
120+
}
121+
122+
pub(crate) fn populate_users(&self) {
123+
self.add_user(User {
124+
id: 1,
125+
bank_account: 9_000,
126+
});
127+
self.add_user(User {
128+
id: 2,
129+
bank_account: 100,
130+
});
131+
self.add_user(User {
132+
id: 3,
133+
bank_account: 1_000,
134+
});
135+
}
136+
137+
fn add_user(&self, user: User) {
138+
let mut inner = self.inner.lock().unwrap();
139+
inner.bank_account_index.insert(user.bank_account, user.id);
140+
inner.data.insert(user.id, user);
141+
}
142+
}
143+
144+
impl Default for CustomDataSource {
145+
fn default() -> Self {
146+
CustomDataSource {
147+
inner: Arc::new(Mutex::new(CustomDataSourceInner {
148+
data: Default::default(),
149+
bank_account_index: Default::default(),
150+
})),
151+
}
152+
}
153+
}
154+
155+
#[async_trait]
156+
impl TableProvider for CustomDataSource {
157+
fn as_any(&self) -> &dyn Any {
158+
self
159+
}
160+
161+
fn schema(&self) -> SchemaRef {
162+
SchemaRef::new(Schema::new(vec![
163+
Field::new("id", DataType::UInt8, false),
164+
Field::new("bank_account", DataType::UInt64, true),
165+
]))
166+
}
167+
168+
async fn scan(
169+
&self,
170+
projection: &Option<Vec<usize>>,
171+
// filters and limit can be used here to inject some push-down operations if needed
172+
_filters: &[Expr],
173+
_limit: Option<usize>,
174+
) -> Result<Arc<dyn ExecutionPlan>> {
175+
return self.create_physical_plan(projection, self.schema()).await;
176+
}
177+
}
178+
179+
#[derive(Debug, Clone)]
180+
struct CustomExec {
181+
db: CustomDataSource,
182+
projected_schema: SchemaRef,
183+
}
184+
185+
impl CustomExec {
186+
fn new(
187+
projections: &Option<Vec<usize>>,
188+
schema: SchemaRef,
189+
db: CustomDataSource,
190+
) -> Self {
191+
let projected_schema = project_schema(&schema, projections.as_ref()).unwrap();
192+
Self {
193+
db,
194+
projected_schema,
195+
}
196+
}
197+
}
198+
199+
#[async_trait]
200+
impl ExecutionPlan for CustomExec {
201+
fn as_any(&self) -> &dyn Any {
202+
self
203+
}
204+
205+
fn schema(&self) -> SchemaRef {
206+
self.projected_schema.clone()
207+
}
208+
209+
fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
210+
datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
211+
}
212+
213+
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
214+
None
215+
}
216+
217+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
218+
vec![]
219+
}
220+
221+
fn with_new_children(
222+
&self,
223+
children: Vec<Arc<dyn ExecutionPlan>>,
224+
) -> Result<Arc<dyn ExecutionPlan>> {
225+
if children.is_empty() {
226+
Ok(Arc::new(self.clone()))
227+
} else {
228+
Err(DataFusionError::Internal(format!(
229+
"Children cannot be replaced in {:?}",
230+
self
231+
)))
232+
}
233+
}
234+
235+
async fn execute(
236+
&self,
237+
_partition: usize,
238+
_runtime: Arc<RuntimeEnv>,
239+
) -> Result<SendableRecordBatchStream> {
240+
let users: Vec<User> = {
241+
let db = self.db.inner.lock().unwrap();
242+
db.data.values().cloned().collect()
243+
};
244+
245+
let mut id_array = UInt8Builder::new(users.len());
246+
let mut account_array = UInt64Builder::new(users.len());
247+
248+
for user in users {
249+
id_array.append_value(user.id)?;
250+
account_array.append_value(user.bank_account)?;
251+
}
252+
253+
return Ok(Box::pin(MemoryStream::try_new(
254+
vec![RecordBatch::try_new(
255+
self.projected_schema.clone(),
256+
vec![
257+
Arc::new(id_array.finish()),
258+
Arc::new(account_array.finish()),
259+
],
260+
)?],
261+
self.schema(),
262+
None,
263+
)?));
264+
}
265+
266+
fn statistics(&self) -> Statistics {
267+
todo!()
268+
}
269+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion::arrow::array::{UInt64Array, UInt8Array};
19+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20+
use datafusion::arrow::record_batch::RecordBatch;
21+
use datafusion::datasource::MemTable;
22+
use datafusion::error::Result;
23+
use datafusion::prelude::ExecutionContext;
24+
use std::sync::Arc;
25+
use std::time::Duration;
26+
use tokio::time::timeout;
27+
28+
/// This example demonstrates executing a simple query against a Memtable
29+
#[tokio::main]
30+
async fn main() -> Result<()> {
31+
let mem_table = create_memtable()?;
32+
33+
// create local execution context
34+
let mut ctx = ExecutionContext::new();
35+
36+
// Register the in-memory table containing the data
37+
ctx.register_table("users", Arc::new(mem_table))?;
38+
39+
let dataframe = ctx.sql("SELECT * FROM users;").await?;
40+
41+
timeout(Duration::from_secs(10), async move {
42+
let result = dataframe.collect().await.unwrap();
43+
let record_batch = result.get(0).unwrap();
44+
45+
assert_eq!(1, record_batch.column(0).len());
46+
dbg!(record_batch.columns());
47+
})
48+
.await
49+
.unwrap();
50+
51+
Ok(())
52+
}
53+
54+
fn create_memtable() -> Result<MemTable> {
55+
MemTable::try_new(get_schema(), vec![vec![create_record_batch()?]])
56+
}
57+
58+
fn create_record_batch() -> Result<RecordBatch> {
59+
let id_array = UInt8Array::from(vec![1]);
60+
let account_array = UInt64Array::from(vec![9000]);
61+
62+
Result::Ok(
63+
RecordBatch::try_new(
64+
get_schema(),
65+
vec![Arc::new(id_array), Arc::new(account_array)],
66+
)
67+
.unwrap(),
68+
)
69+
}
70+
71+
fn get_schema() -> SchemaRef {
72+
SchemaRef::new(Schema::new(vec![
73+
Field::new("id", DataType::UInt8, false),
74+
Field::new("bank_account", DataType::UInt64, true),
75+
]))
76+
}

0 commit comments

Comments
 (0)