Skip to content

Commit 5257fe3

Browse files
committed
impl execution support
1 parent 128b2c6 commit 5257fe3

File tree

23 files changed

+1119
-47
lines changed

23 files changed

+1119
-47
lines changed

datafusion-cli/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ config_namespace! {
282282
/// Should DataFusion support recursive CTEs
283283
/// Defaults to false since this feature is a work in progress and may not
284284
/// behave as expected
285-
pub enable_recursive_ctes: bool, default = false
285+
pub enable_recursive_ctes: bool, default = true
286286
}
287287
}
288288

datafusion/common/src/dfschema.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,11 @@ impl DFField {
915915
self.field = f.into();
916916
self
917917
}
918+
919+
pub fn with_qualifier(mut self, qualifier: impl Into<OwnedTableReference>) -> Self {
920+
self.qualifier = Some(qualifier.into());
921+
self
922+
}
918923
}
919924

920925
impl From<FieldRef> for DFField {

datafusion/core/src/datasource/cte.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! CteWorkTable implementation used for recursive queries
19+
20+
use std::any::Any;
21+
use std::sync::Arc;
22+
23+
use arrow::datatypes::SchemaRef;
24+
use async_trait::async_trait;
25+
26+
use crate::{
27+
error::Result,
28+
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
29+
physical_plan::{work_table::WorkTableExec, ExecutionPlan},
30+
};
31+
32+
use crate::datasource::{TableProvider, TableType};
33+
use crate::execution::context::SessionState;
34+
35+
/// TODO: add docs
36+
pub struct CteWorkTable {
37+
name: String,
38+
table_schema: SchemaRef,
39+
}
40+
41+
impl CteWorkTable {
42+
/// TODO: add doc
43+
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
44+
Self {
45+
name: name.to_owned(),
46+
table_schema,
47+
}
48+
}
49+
}
50+
51+
#[async_trait]
52+
impl TableProvider for CteWorkTable {
53+
fn as_any(&self) -> &dyn Any {
54+
self
55+
}
56+
57+
fn get_logical_plan(&self) -> Option<&LogicalPlan> {
58+
None
59+
}
60+
61+
fn schema(&self) -> SchemaRef {
62+
self.table_schema.clone()
63+
}
64+
65+
fn table_type(&self) -> TableType {
66+
TableType::Temporary
67+
}
68+
69+
async fn scan(
70+
&self,
71+
_state: &SessionState,
72+
_projection: Option<&Vec<usize>>,
73+
_filters: &[Expr],
74+
_limit: Option<usize>,
75+
) -> Result<Arc<dyn ExecutionPlan>> {
76+
// TODO: pushdown filters and limits
77+
Ok(Arc::new(WorkTableExec::new(
78+
self.name.clone(),
79+
self.table_schema.clone(),
80+
)))
81+
}
82+
83+
fn supports_filter_pushdown(
84+
&self,
85+
_filter: &Expr,
86+
) -> Result<TableProviderFilterPushDown> {
87+
// TODO: should we support filter pushdown?
88+
Ok(TableProviderFilterPushDown::Unsupported)
89+
}
90+
}

datafusion/core/src/datasource/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
//! [`ListingTable`]: crate::datasource::listing::ListingTable
2121
2222
pub mod avro_to_arrow;
23+
pub mod cte;
2324
pub mod default_table_source;
2425
pub mod empty;
2526
pub mod file_format;

datafusion/core/src/execution/context/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod parquet;
2626
use crate::{
2727
catalog::{CatalogList, MemoryCatalogList},
2828
datasource::{
29+
cte::CteWorkTable,
2930
function::{TableFunction, TableFunctionImpl},
3031
listing::{ListingOptions, ListingTable},
3132
provider::TableProviderFactory,
@@ -1899,6 +1900,15 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
18991900
Ok(provider_as_source(provider))
19001901
}
19011902

1903+
fn create_cte_work_table(
1904+
&self,
1905+
name: &str,
1906+
schema: SchemaRef,
1907+
) -> Result<Arc<dyn TableSource>> {
1908+
let table = Arc::new(CteWorkTable::new(name, schema));
1909+
Ok(provider_as_source(table))
1910+
}
1911+
19021912
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
19031913
self.state.scalar_functions().get(name).cloned()
19041914
}

datafusion/core/src/physical_planner.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use crate::physical_plan::joins::{
5858
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
5959
use crate::physical_plan::memory::MemoryExec;
6060
use crate::physical_plan::projection::ProjectionExec;
61+
use crate::physical_plan::recursive_query::RecursiveQueryExec;
6162
use crate::physical_plan::repartition::RepartitionExec;
6263
use crate::physical_plan::sorts::sort::SortExec;
6364
use crate::physical_plan::union::UnionExec;
@@ -87,8 +88,8 @@ use datafusion_expr::expr::{
8788
use datafusion_expr::expr_rewriter::unnormalize_cols;
8889
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
8990
use datafusion_expr::{
90-
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
91-
WindowFrameBound, WriteOp,
91+
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
92+
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
9293
};
9394
use datafusion_physical_expr::expressions::Literal;
9495
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
@@ -1311,6 +1312,11 @@ impl DefaultPhysicalPlanner {
13111312
Ok(plan)
13121313
}
13131314
}
1315+
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct,.. }) => {
1316+
let static_term = self.create_initial_plan(static_term, session_state).await?;
1317+
let recursive_term = self.create_initial_plan(&recursive_term, session_state).await?;
1318+
Ok(Arc::new(RecursiveQueryExec::try_new(name.clone(), static_term, recursive_term, *is_distinct)?))
1319+
}
13141320
};
13151321
exec_plan
13161322
}.boxed()

datafusion/execution/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,12 @@ object_store = { workspace = true }
4545
parking_lot = { workspace = true }
4646
rand = { workspace = true }
4747
tempfile = { workspace = true }
48+
tokio = { version = "1.28", features = [
49+
"macros",
50+
"rt",
51+
"rt-multi-thread",
52+
"sync",
53+
"fs",
54+
"parking_lot",
55+
] }
4856
url = { workspace = true }

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ use datafusion_common::{
5555
ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5656
};
5757

58+
use super::plan::RecursiveQuery;
59+
5860
/// Default table name for unnamed table
5961
pub const UNNAMED_TABLE: &str = "?table?";
6062

@@ -121,6 +123,27 @@ impl LogicalPlanBuilder {
121123
}))
122124
}
123125

126+
/// Convert a regular plan into a recursive query.
127+
pub fn to_recursive_query(
128+
&self,
129+
name: String,
130+
recursive_term: LogicalPlan,
131+
is_distinct: bool,
132+
) -> Result<Self> {
133+
// TODO: we need to do a bunch of validation here. Maybe more.
134+
if is_distinct {
135+
return Err(DataFusionError::NotImplemented(
136+
"Recursive queries with distinct is not supported".to_string(),
137+
));
138+
}
139+
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
140+
name,
141+
static_term: Arc::new(self.plan.clone()),
142+
recursive_term: Arc::new(recursive_term),
143+
is_distinct,
144+
})))
145+
}
146+
124147
/// Create a values list based relation, and the schema is inferred from data, consuming
125148
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
126149
/// documentation for more details.

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ pub use plan::{
3636
projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct,
3737
DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
3838
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
39-
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
40-
ToStringifiedPlan, Union, Unnest, Values, Window,
39+
RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
40+
TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
4141
};
4242
pub use statement::{
4343
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ pub enum LogicalPlan {
154154
/// Unnest a column that contains a nested list type such as an
155155
/// ARRAY. This is used to implement SQL `UNNEST`
156156
Unnest(Unnest),
157+
/// A variadic query (e.g. "Recursive CTEs")
158+
RecursiveQuery(RecursiveQuery),
157159
}
158160

159161
impl LogicalPlan {
@@ -191,6 +193,10 @@ impl LogicalPlan {
191193
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
192194
LogicalPlan::Ddl(ddl) => ddl.schema(),
193195
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
196+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
197+
// we take the schema of the static term as the schema of the entire recursive query
198+
static_term.schema()
199+
}
194200
}
195201
}
196202

@@ -243,6 +249,10 @@ impl LogicalPlan {
243249
| LogicalPlan::TableScan(_) => {
244250
vec![self.schema()]
245251
}
252+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
253+
// return only the schema of the static term
254+
static_term.all_schemas()
255+
}
246256
// return children schemas
247257
LogicalPlan::Limit(_)
248258
| LogicalPlan::Subquery(_)
@@ -384,6 +394,7 @@ impl LogicalPlan {
384394
.try_for_each(f),
385395
// plans without expressions
386396
LogicalPlan::EmptyRelation(_)
397+
| LogicalPlan::RecursiveQuery(_)
387398
| LogicalPlan::Subquery(_)
388399
| LogicalPlan::SubqueryAlias(_)
389400
| LogicalPlan::Limit(_)
@@ -430,6 +441,11 @@ impl LogicalPlan {
430441
LogicalPlan::Ddl(ddl) => ddl.inputs(),
431442
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
432443
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
444+
LogicalPlan::RecursiveQuery(RecursiveQuery {
445+
static_term,
446+
recursive_term,
447+
..
448+
}) => vec![static_term, recursive_term],
433449
// plans without inputs
434450
LogicalPlan::TableScan { .. }
435451
| LogicalPlan::Statement { .. }
@@ -510,6 +526,9 @@ impl LogicalPlan {
510526
cross.left.head_output_expr()
511527
}
512528
}
529+
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
530+
static_term.head_output_expr()
531+
}
513532
LogicalPlan::Union(union) => Ok(Some(Expr::Column(
514533
union.schema.fields()[0].qualified_column(),
515534
))),
@@ -835,6 +854,14 @@ impl LogicalPlan {
835854
};
836855
Ok(LogicalPlan::Distinct(distinct))
837856
}
857+
LogicalPlan::RecursiveQuery(RecursiveQuery {
858+
name, is_distinct, ..
859+
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
860+
name: name.clone(),
861+
static_term: Arc::new(inputs[0].clone()),
862+
recursive_term: Arc::new(inputs[1].clone()),
863+
is_distinct: *is_distinct,
864+
})),
838865
LogicalPlan::Analyze(a) => {
839866
assert!(expr.is_empty());
840867
assert_eq!(inputs.len(), 1);
@@ -1073,6 +1100,7 @@ impl LogicalPlan {
10731100
}),
10741101
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
10751102
LogicalPlan::EmptyRelation(_) => Some(0),
1103+
LogicalPlan::RecursiveQuery(_) => None,
10761104
LogicalPlan::Subquery(_) => None,
10771105
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
10781106
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
@@ -1408,6 +1436,11 @@ impl LogicalPlan {
14081436
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
14091437
match self.0 {
14101438
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
1439+
LogicalPlan::RecursiveQuery(RecursiveQuery {
1440+
is_distinct, ..
1441+
}) => {
1442+
write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
1443+
}
14111444
LogicalPlan::Values(Values { ref values, .. }) => {
14121445
let str_values: Vec<_> = values
14131446
.iter()
@@ -1718,6 +1751,19 @@ pub struct EmptyRelation {
17181751
pub schema: DFSchemaRef,
17191752
}
17201753

1754+
/// A variadic query operation
1755+
#[derive(Clone, PartialEq, Eq, Hash)]
1756+
pub struct RecursiveQuery {
1757+
/// Name of the query
1758+
pub name: String,
1759+
/// The static term
1760+
pub static_term: Arc<LogicalPlan>,
1761+
/// The recursive term
1762+
pub recursive_term: Arc<LogicalPlan>,
1763+
/// Distinction
1764+
pub is_distinct: bool,
1765+
}
1766+
17211767
/// Values expression. See
17221768
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
17231769
/// documentation for more details.

datafusion/optimizer/src/common_subexpr_eliminate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ impl OptimizerRule for CommonSubexprEliminate {
364364
| LogicalPlan::Dml(_)
365365
| LogicalPlan::Copy(_)
366366
| LogicalPlan::Unnest(_)
367+
| LogicalPlan::RecursiveQuery(_)
367368
| LogicalPlan::Prepare(_) => {
368369
// apply the optimization to all inputs of the plan
369370
utils::optimize_children(self, plan, config)?

datafusion/optimizer/src/optimize_projections.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ fn optimize_projections(
162162
.collect::<Vec<_>>()
163163
}
164164
LogicalPlan::EmptyRelation(_)
165+
| LogicalPlan::RecursiveQuery(_)
165166
| LogicalPlan::Statement(_)
166167
| LogicalPlan::Values(_)
167168
| LogicalPlan::Extension(_)

0 commit comments

Comments
 (0)