Skip to content

fix: add an "expr_planners" method to SessionState #15119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1711,7 +1711,7 @@ impl FunctionRegistry for SessionContext {
}

fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
self.state.read().expr_planners()
self.state.read().expr_planners().to_vec()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, it's the FunctionRegistry::expr_planners method gets called and a vector of expression planners is returned.
Now, the SessionState::expr_planners takes precedence and a slice of expression planners is returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Fixed CI. PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how the code in this PR is any different than what is in on main

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, both the FunctionRegistry and SessionState provide an expr_planners method, but they return different data type. FunctionRegistry::expr_planners returns Vec<dyn ExprPlanner, and SessionState::expr_planners returns &[dyn ExprPlanner].

Now, the SessionState::expr_planners takes precedence and a slice of expression planners is returned. Therefore, we have add a to_vec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ContextProvider that uses a slice vs Vec. Bit annoying the types are not aligned. SessionState uses expr_planners: Vec<Arc<dyn ExprPlanner>>,

Copy link
Contributor Author

@niebayes niebayes Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Omega359 I agree, it's better to align the types.

I tried to align the types but ran into some difficulties. If both functions return a slice, SessionContext::expr_planners would encounter the error Cannot return a reference of temporary values.

If both return a Vec, then SessionState's implementation of ContextProvider::get_expr_planners would also face the same issue. The only way to resolve this would be to change the return type of ContextProvider::get_expr_planners to Vec, but that would be a breaking change.

}

fn register_expr_planner(
Expand Down
161 changes: 160 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,11 @@ impl SessionState {
&self.optimizer
}

/// Returns the [`ExprPlanner`]s for this session
pub fn expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
&self.expr_planners
}

/// Returns the [`QueryPlanner`] for this session
pub fn query_planner(&self) -> &Arc<dyn QueryPlanner + Send + Sync> {
&self.query_planner
Expand Down Expand Up @@ -1637,7 +1642,7 @@ struct SessionContextProvider<'a> {

impl ContextProvider for SessionContextProvider<'_> {
fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
&self.state.expr_planners
self.state.expr_planners()
}

fn get_type_planner(&self) -> Option<Arc<dyn TypePlanner>> {
Expand Down Expand Up @@ -1959,8 +1964,17 @@ pub(crate) struct PreparedPlan {
#[cfg(test)]
mod tests {
use super::{SessionContextProvider, SessionStateBuilder};
use crate::common::assert_contains;
use crate::config::ConfigOptions;
use crate::datasource::empty::EmptyTable;
use crate::datasource::provider_as_source;
use crate::datasource::MemTable;
use crate::execution::context::SessionState;
use crate::logical_expr::planner::ExprPlanner;
use crate::logical_expr::{AggregateUDF, ScalarUDF, TableSource, WindowUDF};
use crate::physical_plan::ExecutionPlan;
use crate::sql::planner::ContextProvider;
use crate::sql::{ResolvedTableReference, TableReference};
use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_catalog::MemoryCatalogProviderList;
Expand All @@ -1970,6 +1984,7 @@ mod tests {
use datafusion_expr::Expr;
use datafusion_optimizer::optimizer::OptimizerRule;
use datafusion_optimizer::Optimizer;
use datafusion_physical_plan::display::DisplayableExecutionPlan;
use datafusion_sql::planner::{PlannerContext, SqlToRel};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -2127,4 +2142,148 @@ mod tests {

Ok(())
}

/// This test demonstrates why it's more convenient and somewhat necessary to provide
/// an `expr_planners` method for `SessionState`.
#[tokio::test]
async fn test_with_expr_planners() -> Result<()> {
// A helper method for planning count wildcard with or without expr planners.
async fn plan_count_wildcard(
with_expr_planners: bool,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut context_provider = MyContextProvider::new().with_table(
"t",
provider_as_source(Arc::new(EmptyTable::new(Schema::empty().into()))),
);
if with_expr_planners {
context_provider = context_provider.with_expr_planners();
}

let state = &context_provider.state;
let statement = state.sql_to_statement("select count(*) from t", "mysql")?;
let plan = SqlToRel::new(&context_provider).statement_to_plan(statement)?;
state.create_physical_plan(&plan).await
}

// Planning count wildcard without expr planners should fail.
let got = plan_count_wildcard(false).await;
assert_contains!(
got.unwrap_err().to_string(),
"Physical plan does not support logical expression Wildcard"
);

// Planning count wildcard with expr planners should succeed.
let got = plan_count_wildcard(true).await?;
let displayable = DisplayableExecutionPlan::new(got.as_ref());
assert_eq!(
displayable.indent(false).to_string(),
"ProjectionExec: expr=[0 as count(*)]\n PlaceholderRowExec\n"
);

Ok(())
}

/// A `ContextProvider` based on `SessionState`.
///
/// Almost all planning context are retrieved from the `SessionState`.
struct MyContextProvider {
/// The session state.
state: SessionState,
/// Registered tables.
tables: HashMap<ResolvedTableReference, Arc<dyn TableSource>>,
/// Controls whether to return expression planners when called `ContextProvider::expr_planners`.
return_expr_planners: bool,
}

impl MyContextProvider {
/// Creates a new `SessionContextProvider`.
pub fn new() -> Self {
Self {
state: SessionStateBuilder::default()
.with_default_features()
.build(),
tables: HashMap::new(),
return_expr_planners: false,
}
}

/// Registers a table.
///
/// The catalog and schema are provided by default.
pub fn with_table(mut self, table: &str, source: Arc<dyn TableSource>) -> Self {
self.tables.insert(
ResolvedTableReference {
catalog: "default".to_string().into(),
schema: "public".to_string().into(),
table: table.to_string().into(),
},
source,
);
self
}

/// Sets the `return_expr_planners` flag to true.
pub fn with_expr_planners(self) -> Self {
Self {
return_expr_planners: true,
..self
}
}
}

impl ContextProvider for MyContextProvider {
fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
let resolved_table_ref = ResolvedTableReference {
catalog: "default".to_string().into(),
schema: "public".to_string().into(),
table: name.table().to_string().into(),
};
let source = self.tables.get(&resolved_table_ref).cloned().unwrap();
Ok(source)
}

/// We use a `return_expr_planners` flag to demonstrate why it's necessary to
/// return the expression planners in the `SessionState`.
///
/// Note, the default implementation returns an empty slice.
fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
if self.return_expr_planners {
self.state.expr_planners()
} else {
&[]
}
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}

fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
self.state.aggregate_functions().get(name).cloned()
}

fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
self.state.window_functions().get(name).cloned()
}

fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
None
}

fn options(&self) -> &ConfigOptions {
self.state.config_options()
}

fn udf_names(&self) -> Vec<String> {
self.state.scalar_functions().keys().cloned().collect()
}

fn udaf_names(&self) -> Vec<String> {
self.state.aggregate_functions().keys().cloned().collect()
}

fn udwf_names(&self) -> Vec<String> {
self.state.window_functions().keys().cloned().collect()
}
}
}