Skip to content

Commit 569f6fe

Browse files
authored
Remove input schema from PhysicalExpr, move the validation logic to physical expression planner (#6122)
* Remove input schema from PhysicalExpr, move the validation logic to phyiscal expression planner * fix fmt
1 parent cc8dec0 commit 569f6fe

File tree

8 files changed

+99
-145
lines changed

8 files changed

+99
-145
lines changed

datafusion/physical-expr/src/expressions/datetime.rs

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -37,45 +37,16 @@ pub struct DateTimeIntervalExpr {
3737
lhs: Arc<dyn PhysicalExpr>,
3838
op: Operator,
3939
rhs: Arc<dyn PhysicalExpr>,
40-
// TODO: move type checking to the planning phase and not in the physical expr
41-
// so we can remove this
42-
input_schema: Schema,
4340
}
4441

4542
impl DateTimeIntervalExpr {
4643
/// Create a new instance of DateIntervalExpr
47-
pub fn try_new(
44+
pub fn new(
4845
lhs: Arc<dyn PhysicalExpr>,
4946
op: Operator,
5047
rhs: Arc<dyn PhysicalExpr>,
51-
input_schema: &Schema,
52-
) -> Result<Self> {
53-
match (
54-
lhs.data_type(input_schema)?,
55-
op,
56-
rhs.data_type(input_schema)?,
57-
) {
58-
(
59-
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
60-
Operator::Plus | Operator::Minus,
61-
DataType::Interval(_),
62-
)
63-
| (DataType::Timestamp(_, _), Operator::Minus, DataType::Timestamp(_, _))
64-
| (DataType::Interval(_), Operator::Plus, DataType::Timestamp(_, _))
65-
| (
66-
DataType::Interval(_),
67-
Operator::Plus | Operator::Minus,
68-
DataType::Interval(_),
69-
) => Ok(Self {
70-
lhs,
71-
op,
72-
rhs,
73-
input_schema: input_schema.clone(),
74-
}),
75-
(lhs, _, rhs) => Err(DataFusionError::Execution(format!(
76-
"Invalid operation {op} between '{lhs}' and '{rhs}' for DateIntervalExpr"
77-
))),
78-
}
48+
) -> Self {
49+
Self { lhs, op, rhs }
7950
}
8051

8152
/// Get the left-hand side expression
@@ -202,12 +173,11 @@ impl PhysicalExpr for DateTimeIntervalExpr {
202173
self: Arc<Self>,
203174
children: Vec<Arc<dyn PhysicalExpr>>,
204175
) -> Result<Arc<dyn PhysicalExpr>> {
205-
Ok(Arc::new(DateTimeIntervalExpr::try_new(
176+
Ok(Arc::new(DateTimeIntervalExpr::new(
206177
children[0].clone(),
207178
self.op,
208179
children[1].clone(),
209-
&self.input_schema,
210-
)?))
180+
)))
211181
}
212182
}
213183

@@ -220,6 +190,36 @@ impl PartialEq<dyn Any> for DateTimeIntervalExpr {
220190
}
221191
}
222192

193+
/// create a DateIntervalExpr
194+
pub fn date_time_interval_expr(
195+
lhs: Arc<dyn PhysicalExpr>,
196+
op: Operator,
197+
rhs: Arc<dyn PhysicalExpr>,
198+
input_schema: &Schema,
199+
) -> Result<Arc<dyn PhysicalExpr>> {
200+
match (
201+
lhs.data_type(input_schema)?,
202+
op,
203+
rhs.data_type(input_schema)?,
204+
) {
205+
(
206+
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
207+
Operator::Plus | Operator::Minus,
208+
DataType::Interval(_),
209+
)
210+
| (DataType::Timestamp(_, _), Operator::Minus, DataType::Timestamp(_, _))
211+
| (DataType::Interval(_), Operator::Plus, DataType::Timestamp(_, _))
212+
| (
213+
DataType::Interval(_),
214+
Operator::Plus | Operator::Minus,
215+
DataType::Interval(_),
216+
) => Ok(Arc::new(DateTimeIntervalExpr::new(lhs, op, rhs))),
217+
(lhs, _, rhs) => Err(DataFusionError::Execution(format!(
218+
"Invalid operation {op} between '{lhs}' and '{rhs}' for DateIntervalExpr"
219+
))),
220+
}
221+
}
222+
223223
#[cfg(test)]
224224
mod tests {
225225
use super::*;
@@ -535,7 +535,7 @@ mod tests {
535535
let lhs = create_physical_expr(&dt, &dfs, &schema, &props)?;
536536
let rhs = create_physical_expr(&interval, &dfs, &schema, &props)?;
537537

538-
let cut = DateTimeIntervalExpr::try_new(lhs, op, rhs, &schema)?;
538+
let cut = date_time_interval_expr(lhs, op, rhs, &schema)?;
539539
let res = cut.evaluate(&batch)?;
540540

541541
let mut builder = Date32Builder::with_capacity(8);
@@ -613,7 +613,7 @@ mod tests {
613613
let lhs_str = format!("{lhs}");
614614
let rhs_str = format!("{rhs}");
615615

616-
let cut = DateTimeIntervalExpr::try_new(lhs, op, rhs, &schema)?;
616+
let cut = DateTimeIntervalExpr::new(lhs, op, rhs);
617617

618618
assert_eq!(lhs_str, format!("{}", cut.lhs()));
619619
assert_eq!(op, cut.op().clone());

datafusion/physical-expr/src/expressions/in_list.rs

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ pub struct InListExpr {
4747
expr: Arc<dyn PhysicalExpr>,
4848
list: Vec<Arc<dyn PhysicalExpr>>,
4949
negated: bool,
50-
static_filter: Option<Box<dyn Set>>,
51-
input_schema: Schema,
50+
static_filter: Option<Arc<dyn Set>>,
5251
}
5352

5453
impl Debug for InListExpr {
@@ -62,7 +61,7 @@ impl Debug for InListExpr {
6261
}
6362

6463
/// A type-erased container of array elements
65-
trait Set: Send + Sync {
64+
pub trait Set: Send + Sync {
6665
fn contains(&self, v: &dyn Array, negated: bool) -> Result<BooleanArray>;
6766
}
6867

@@ -172,36 +171,36 @@ where
172171
}
173172

174173
/// Creates a `Box<dyn Set>` for the given list of `IN` expressions and `batch`
175-
fn make_set(array: &dyn Array) -> Result<Box<dyn Set>> {
174+
fn make_set(array: &dyn Array) -> Result<Arc<dyn Set>> {
176175
Ok(downcast_primitive_array! {
177-
array => Box::new(ArraySet::new(array, make_hash_set(array))),
176+
array => Arc::new(ArraySet::new(array, make_hash_set(array))),
178177
DataType::Boolean => {
179178
let array = as_boolean_array(array)?;
180-
Box::new(ArraySet::new(array, make_hash_set(array)))
179+
Arc::new(ArraySet::new(array, make_hash_set(array)))
181180
},
182181
DataType::Decimal128(_, _) => {
183182
let array = as_primitive_array::<Decimal128Type>(array)?;
184-
Box::new(ArraySet::new(array, make_hash_set(array)))
183+
Arc::new(ArraySet::new(array, make_hash_set(array)))
185184
}
186185
DataType::Decimal256(_, _) => {
187186
let array = as_primitive_array::<Decimal256Type>(array)?;
188-
Box::new(ArraySet::new(array, make_hash_set(array)))
187+
Arc::new(ArraySet::new(array, make_hash_set(array)))
189188
}
190189
DataType::Utf8 => {
191190
let array = as_string_array(array)?;
192-
Box::new(ArraySet::new(array, make_hash_set(array)))
191+
Arc::new(ArraySet::new(array, make_hash_set(array)))
193192
}
194193
DataType::LargeUtf8 => {
195194
let array = as_largestring_array(array);
196-
Box::new(ArraySet::new(array, make_hash_set(array)))
195+
Arc::new(ArraySet::new(array, make_hash_set(array)))
197196
}
198197
DataType::Binary => {
199198
let array = as_generic_binary_array::<i32>(array)?;
200-
Box::new(ArraySet::new(array, make_hash_set(array)))
199+
Arc::new(ArraySet::new(array, make_hash_set(array)))
201200
}
202201
DataType::LargeBinary => {
203202
let array = as_generic_binary_array::<i64>(array)?;
204-
Box::new(ArraySet::new(array, make_hash_set(array)))
203+
Arc::new(ArraySet::new(array, make_hash_set(array)))
205204
}
206205
DataType::Dictionary(_, _) => unreachable!("dictionary should have been flattened"),
207206
d => return Err(DataFusionError::NotImplemented(format!("DataType::{d} not supported in InList")))
@@ -233,7 +232,7 @@ fn evaluate_list(
233232
fn try_cast_static_filter_to_set(
234233
list: &[Arc<dyn PhysicalExpr>],
235234
schema: &Schema,
236-
) -> Result<Box<dyn Set>> {
235+
) -> Result<Arc<dyn Set>> {
237236
let batch = RecordBatch::new_empty(Arc::new(schema.clone()));
238237
make_set(evaluate_list(list, &batch)?.as_ref())
239238
}
@@ -244,15 +243,13 @@ impl InListExpr {
244243
expr: Arc<dyn PhysicalExpr>,
245244
list: Vec<Arc<dyn PhysicalExpr>>,
246245
negated: bool,
247-
schema: &Schema,
246+
static_filter: Option<Arc<dyn Set>>,
248247
) -> Self {
249-
let static_filter = try_cast_static_filter_to_set(&list, schema).ok();
250248
Self {
251249
expr,
252250
list,
253251
negated,
254252
static_filter,
255-
input_schema: schema.clone(),
256253
}
257254
}
258255

@@ -325,12 +322,13 @@ impl PhysicalExpr for InListExpr {
325322
self: Arc<Self>,
326323
children: Vec<Arc<dyn PhysicalExpr>>,
327324
) -> Result<Arc<dyn PhysicalExpr>> {
328-
in_list(
325+
// assume the static_filter will not change during the rewrite process
326+
Ok(Arc::new(InListExpr::new(
329327
children[0].clone(),
330328
children[1..].to_vec(),
331-
&self.negated,
332-
&self.input_schema,
333-
)
329+
self.negated,
330+
self.static_filter.clone(),
331+
)))
334332
}
335333
}
336334

@@ -364,7 +362,13 @@ pub fn in_list(
364362
)));
365363
}
366364
}
367-
Ok(Arc::new(InListExpr::new(expr, list, *negated, schema)))
365+
let static_filter = try_cast_static_filter_to_set(&list, schema).ok();
366+
Ok(Arc::new(InListExpr::new(
367+
expr,
368+
list,
369+
*negated,
370+
static_filter,
371+
)))
368372
}
369373

370374
#[cfg(test)]

datafusion/physical-expr/src/expressions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ pub use cast::{
7878
cast, cast_column, cast_with_options, CastExpr, DEFAULT_DATAFUSION_CAST_OPTIONS,
7979
};
8080
pub use column::{col, Column, UnKnownColumn};
81-
pub use datetime::DateTimeIntervalExpr;
81+
pub use datetime::{date_time_interval_expr, DateTimeIntervalExpr};
8282
pub use get_indexed_field::GetIndexedFieldExpr;
8383
pub use in_list::{in_list, InListExpr};
8484
pub use is_not_null::{is_not_null, IsNotNullExpr};

datafusion/physical-expr/src/intervals/test_utils.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
2020
use std::sync::Arc;
2121

22-
use crate::expressions::{BinaryExpr, DateTimeIntervalExpr, Literal};
22+
use crate::expressions::{date_time_interval_expr, BinaryExpr, Literal};
2323
use crate::PhysicalExpr;
2424
use arrow_schema::Schema;
2525
use datafusion_common::{DataFusionError, ScalarValue};
@@ -78,30 +78,22 @@ pub fn gen_conjunctive_temporal_expr(
7878
d: ScalarValue,
7979
schema: &Schema,
8080
) -> Result<Arc<dyn PhysicalExpr>, DataFusionError> {
81-
let left_and_1 = Arc::new(DateTimeIntervalExpr::try_new(
81+
let left_and_1 = date_time_interval_expr(
8282
left_col.clone(),
8383
op_1,
8484
Arc::new(Literal::new(a)),
8585
schema,
86-
)?);
87-
let left_and_2 = Arc::new(DateTimeIntervalExpr::try_new(
86+
)?;
87+
let left_and_2 = date_time_interval_expr(
8888
right_col.clone(),
8989
op_2,
9090
Arc::new(Literal::new(b)),
9191
schema,
92-
)?);
93-
let right_and_1 = Arc::new(DateTimeIntervalExpr::try_new(
94-
left_col,
95-
op_3,
96-
Arc::new(Literal::new(c)),
97-
schema,
98-
)?);
99-
let right_and_2 = Arc::new(DateTimeIntervalExpr::try_new(
100-
right_col,
101-
op_4,
102-
Arc::new(Literal::new(d)),
103-
schema,
104-
)?);
92+
)?;
93+
let right_and_1 =
94+
date_time_interval_expr(left_col, op_3, Arc::new(Literal::new(c)), schema)?;
95+
let right_and_2 =
96+
date_time_interval_expr(right_col, op_4, Arc::new(Literal::new(d)), schema)?;
10597
let left_expr = Arc::new(BinaryExpr::new(left_and_1, Operator::Gt, left_and_2));
10698
let right_expr = Arc::new(BinaryExpr::new(right_and_1, Operator::Lt, right_and_2));
10799
Ok(Arc::new(BinaryExpr::new(

datafusion/physical-expr/src/planner.rs

Lines changed: 5 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::var_provider::is_system_variables;
1919
use crate::{
2020
execution_props::ExecutionProps,
2121
expressions::{
22-
self, binary, like, Column, DateTimeIntervalExpr, GetIndexedFieldExpr, Literal,
22+
self, binary, date_time_interval_expr, like, Column, GetIndexedFieldExpr, Literal,
2323
},
2424
functions, udf,
2525
var_provider::VarType,
@@ -195,42 +195,22 @@ pub fn create_physical_expr(
195195
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
196196
Operator::Plus | Operator::Minus,
197197
DataType::Interval(_),
198-
) => Ok(Arc::new(DateTimeIntervalExpr::try_new(
199-
lhs,
200-
*op,
201-
rhs,
202-
input_schema,
203-
)?)),
198+
) => Ok(date_time_interval_expr(lhs, *op, rhs, input_schema)?),
204199
(
205200
DataType::Interval(_),
206201
Operator::Plus | Operator::Minus,
207202
DataType::Date32 | DataType::Date64 | DataType::Timestamp(_, _),
208-
) => Ok(Arc::new(DateTimeIntervalExpr::try_new(
209-
rhs,
210-
*op,
211-
lhs,
212-
input_schema,
213-
)?)),
203+
) => Ok(date_time_interval_expr(rhs, *op, lhs, input_schema)?),
214204
(
215205
DataType::Timestamp(_, _),
216206
Operator::Minus,
217207
DataType::Timestamp(_, _),
218-
) => Ok(Arc::new(DateTimeIntervalExpr::try_new(
219-
lhs,
220-
*op,
221-
rhs,
222-
input_schema,
223-
)?)),
208+
) => Ok(date_time_interval_expr(lhs, *op, rhs, input_schema)?),
224209
(
225210
DataType::Interval(_),
226211
Operator::Plus | Operator::Minus,
227212
DataType::Interval(_),
228-
) => Ok(Arc::new(DateTimeIntervalExpr::try_new(
229-
lhs,
230-
*op,
231-
rhs,
232-
input_schema,
233-
)?)),
213+
) => Ok(date_time_interval_expr(lhs, *op, rhs, input_schema)?),
234214
_ => {
235215
// Note that the logical planner is responsible
236216
// for type coercion on the arguments (e.g. if one

datafusion/physical-expr/src/utils.rs

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::equivalence::EquivalentClass;
19-
use crate::expressions::{BinaryExpr, Column, InListExpr, UnKnownColumn};
19+
use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
2020
use crate::{
2121
EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement,
2222
};
@@ -586,28 +586,7 @@ pub fn reassign_predicate_columns(
586586
column.name(),
587587
index,
588588
))));
589-
} else if let Some(in_list) = expr_any.downcast_ref::<InListExpr>() {
590-
// transform child first
591-
let expr = reassign_predicate_columns(
592-
in_list.expr().clone(),
593-
schema,
594-
ignore_not_found,
595-
)?;
596-
let list = in_list
597-
.list()
598-
.iter()
599-
.map(|expr| {
600-
reassign_predicate_columns(expr.clone(), schema, ignore_not_found)
601-
})
602-
.collect::<Result<Vec<_>>>()?;
603-
return Ok(Transformed::Yes(Arc::new(InListExpr::new(
604-
expr,
605-
list,
606-
in_list.negated(),
607-
schema.as_ref(),
608-
))));
609589
}
610-
611590
Ok(Transformed::No(expr))
612591
})
613592
}

0 commit comments

Comments
 (0)