diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index 02cd2d163a5a..a03267c03532 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -95,9 +95,9 @@ impl WindowExpr for BuiltInWindowExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let evaluator = self.expr.create_evaluator()?; + let mut evaluator = self.expr.create_evaluator()?; let num_rows = batch.num_rows(); - if self.expr.uses_window_frame() { + if evaluator.uses_window_frame() { let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); let mut row_wise_results = vec![]; @@ -114,18 +114,18 @@ impl WindowExpr for BuiltInWindowExpr { num_rows, idx, )?; - let value = evaluator.evaluate_inside_range(&values, &range)?; + let value = evaluator.evaluate(&values, &range)?; row_wise_results.push(value); last_range = range; } ScalarValue::iter_to_array(row_wise_results.into_iter()) - } else if self.expr.include_rank() { + } else if evaluator.include_rank() { let columns = self.sort_columns(batch)?; let sort_partition_points = evaluate_partition_ranges(num_rows, &columns)?; - evaluator.evaluate_with_rank(num_rows, &sort_partition_points) + evaluator.evaluate_with_rank_all(num_rows, &sort_partition_points) } else { let (values, _) = self.get_values_orderbys(batch)?; - evaluator.evaluate(&values, num_rows) + evaluator.evaluate_all(&values, num_rows) } } @@ -164,7 +164,7 @@ impl WindowExpr for BuiltInWindowExpr { // We iterate on each row to perform a running calculation. let record_batch = &partition_batch_state.record_batch; let num_rows = record_batch.num_rows(); - let sort_partition_points = if self.expr.include_rank() { + let sort_partition_points = if evaluator.include_rank() { let columns = self.sort_columns(record_batch)?; evaluate_partition_ranges(num_rows, &columns)? } else { @@ -172,7 +172,7 @@ impl WindowExpr for BuiltInWindowExpr { }; let mut row_wise_results: Vec = vec![]; for idx in state.last_calculated_index..num_rows { - let frame_range = if self.expr.uses_window_frame() { + let frame_range = if evaluator.uses_window_frame() { state .window_frame_ctx .get_or_insert_with(|| { @@ -199,7 +199,8 @@ impl WindowExpr for BuiltInWindowExpr { // Update last range state.window_frame_range = frame_range; evaluator.update_state(state, idx, &order_bys, &sort_partition_points)?; - row_wise_results.push(evaluator.evaluate_stateful(&values)?); + row_wise_results + .push(evaluator.evaluate(&values, &state.window_frame_range)?); } let out_col = if row_wise_results.is_empty() { new_empty_array(out_type) @@ -231,8 +232,12 @@ impl WindowExpr for BuiltInWindowExpr { } fn uses_bounded_memory(&self) -> bool { - self.expr.supports_bounded_execution() - && (!self.expr.uses_window_frame() - || !self.window_frame.end_bound.is_unbounded()) + if let Ok(evaluator) = self.expr.create_evaluator() { + evaluator.supports_bounded_execution() + && (!evaluator.uses_window_frame() + || !self.window_frame.end_bound.is_unbounded()) + } else { + false + } } } diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 763bcfc2b17f..432bf78368ee 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -85,29 +85,4 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// /// The default implementation does nothing fn add_equal_orderings(&self, _builder: &mut OrderingEquivalenceBuilder) {} - - /// Can the window function be incrementally computed using - /// bounded memory? - /// - /// If this function returns true, [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_stateful`] - fn supports_bounded_execution(&self) -> bool { - false - } - - /// Does the window function use the values from its window frame? - /// - /// If this function returns true, [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_inside_range`] - fn uses_window_frame(&self) -> bool { - false - } - - /// Can this function be evaluated with (only) rank - /// - /// If `include_rank` is true, then [`Self::create_evaluator`] must - /// implement [`PartitionEvaluator::evaluate_with_rank`] - fn include_rank(&self) -> bool { - false - } } diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index 2214555dbe48..47f2e4208d71 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -64,17 +64,13 @@ impl BuiltInWindowFunctionExpr for CumeDist { fn create_evaluator(&self) -> Result> { Ok(Box::new(CumeDistEvaluator {})) } - - fn include_rank(&self) -> bool { - true - } } #[derive(Debug)] pub(crate) struct CumeDistEvaluator; impl PartitionEvaluator for CumeDistEvaluator { - fn evaluate_with_rank( + fn evaluate_with_rank_all( &self, num_rows: usize, ranks_in_partition: &[Range], @@ -94,6 +90,10 @@ impl PartitionEvaluator for CumeDistEvaluator { ); Ok(Arc::new(result)) } + + fn include_rank(&self) -> bool { + true + } } #[cfg(test)] @@ -109,7 +109,7 @@ mod tests { ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank(num_rows, &ranks)?; + .evaluate_with_rank_all(num_rows, &ranks)?; let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index bae5098edf77..24248f989eb9 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -110,10 +110,6 @@ impl BuiltInWindowFunctionExpr for WindowShift { })) } - fn supports_bounded_execution(&self) -> bool { - true - } - fn reverse_expr(&self) -> Option> { Some(Arc::new(Self { name: self.name.clone(), @@ -206,7 +202,11 @@ impl PartitionEvaluator for WindowShiftEvaluator { } } - fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + values: &[ArrayRef], + _range: &Range, + ) -> Result { let array = &values[0]; let dtype = array.data_type(); let idx = self.state.idx as i64 - self.shift_offset; @@ -217,11 +217,19 @@ impl PartitionEvaluator for WindowShiftEvaluator { } } - fn evaluate(&self, values: &[ArrayRef], _num_rows: usize) -> Result { + fn evaluate_all( + &mut self, + values: &[ArrayRef], + _num_rows: usize, + ) -> Result { // LEAD, LAG window functions take single column, values will have size 1 let value = &values[0]; shift_with_default_value(value, self.shift_offset, self.default_value.as_ref()) } + + fn supports_bounded_execution(&self) -> bool { + true + } } fn get_default_value( @@ -258,7 +266,7 @@ mod tests { let values = expr.evaluate_args(&batch)?; let result = expr .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_int32_array(&result)?; assert_eq!(expected, *result); Ok(()) diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index aa5fe77df039..e6dbeba8342e 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -122,14 +122,6 @@ impl BuiltInWindowFunctionExpr for NthValue { Ok(Box::new(NthValueEvaluator { state })) } - fn supports_bounded_execution(&self) -> bool { - true - } - - fn uses_window_frame(&self) -> bool { - true - } - fn reverse_expr(&self) -> Option> { let reversed_kind = match self.kind { NthValueKind::First => NthValueKind::Last, @@ -197,40 +189,44 @@ impl PartitionEvaluator for NthValueEvaluator { Ok(()) } - fn evaluate_stateful(&mut self, values: &[ArrayRef]) -> Result { - if let Some(ref result) = self.state.finalized_result { - Ok(result.clone()) - } else { - self.evaluate_inside_range(values, &self.state.range) - } - } - - fn evaluate_inside_range( - &self, + fn evaluate( + &mut self, values: &[ArrayRef], range: &Range, ) -> Result { - // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. - let arr = &values[0]; - let n_range = range.end - range.start; - if n_range == 0 { - // We produce None if the window is empty. - return ScalarValue::try_from(arr.data_type()); - } - match self.state.kind { - NthValueKind::First => ScalarValue::try_from_array(arr, range.start), - NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), - NthValueKind::Nth(n) => { - // We are certain that n > 0. - let index = (n as usize) - 1; - if index >= n_range { - ScalarValue::try_from(arr.data_type()) - } else { - ScalarValue::try_from_array(arr, range.start + index) + if let Some(ref result) = self.state.finalized_result { + Ok(result.clone()) + } else { + // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take a single column, values will have size 1. + let arr = &values[0]; + let n_range = range.end - range.start; + if n_range == 0 { + // We produce None if the window is empty. + return ScalarValue::try_from(arr.data_type()); + } + match self.state.kind { + NthValueKind::First => ScalarValue::try_from_array(arr, range.start), + NthValueKind::Last => ScalarValue::try_from_array(arr, range.end - 1), + NthValueKind::Nth(n) => { + // We are certain that n > 0. + let index = (n as usize) - 1; + if index >= n_range { + ScalarValue::try_from(arr.data_type()) + } else { + ScalarValue::try_from_array(arr, range.start + index) + } } } } } + + fn supports_bounded_execution(&self) -> bool { + true + } + + fn uses_window_frame(&self) -> bool { + true + } } #[cfg(test)] @@ -254,11 +250,11 @@ mod tests { end: i + 1, }) } - let evaluator = expr.create_evaluator()?; + let mut evaluator = expr.create_evaluator()?; let values = expr.evaluate_args(&batch)?; let result = ranges .iter() - .map(|range| evaluator.evaluate_inside_range(&values, range)) + .map(|range| evaluator.evaluate(&values, range)) .collect::>>()?; let result = ScalarValue::iter_to_array(result.into_iter())?; let result = as_int32_array(&result)?; diff --git a/datafusion/physical-expr/src/window/ntile.rs b/datafusion/physical-expr/src/window/ntile.rs index 479fa263337a..2feab9956a89 100644 --- a/datafusion/physical-expr/src/window/ntile.rs +++ b/datafusion/physical-expr/src/window/ntile.rs @@ -70,7 +70,11 @@ pub(crate) struct NtileEvaluator { } impl PartitionEvaluator for NtileEvaluator { - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { + fn evaluate_all( + &mut self, + _values: &[ArrayRef], + num_rows: usize, + ) -> Result { let num_rows = num_rows as u64; let mut vec: Vec = Vec::new(); for i in 0..num_rows { diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs index 9e665c56778e..553f631790b0 100644 --- a/datafusion/physical-expr/src/window/partition_evaluator.rs +++ b/datafusion/physical-expr/src/window/partition_evaluator.rs @@ -69,18 +69,17 @@ use std::ops::Range; /// /// # Stateless `PartitionEvaluator` /// -/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or -/// [`Self::evaluate_inside_range`] is called with values for the +/// In this case, either [`Self::evaluate_all`] or [`Self::evaluate_with_rank_all`] is called with values for the /// entire partition. /// /// # Stateful `PartitionEvaluator` /// -/// In this case, [`Self::evaluate_stateful`] is called to calculate +/// In this case, [`Self::evaluate`] is called to calculate /// the results of the window function incrementally for each new /// batch. /// /// For example, when computing `ROW_NUMBER` incrementally, -/// [`Self::evaluate_stateful`] will be called multiple times with +/// [`Self::evaluate`] will be called multiple times with /// different batches. For all batches after the first, the output /// `row_number` must start from last `row_number` produced for the /// previous batch. The previous row number is saved and restored as @@ -88,6 +87,15 @@ use std::ops::Range; /// /// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr /// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator +/// When implementing a new `PartitionEvaluator`, +/// `uses_window_frame` and `supports_bounded_execution` flags determine which evaluation method will be called +/// during runtime. Implement corresponding evaluator according to table below. +/// |uses_window_frame|supports_bounded_execution|function_to_implement| +/// |---|---|----| +/// |false|false|`evaluate_all` (if we were to implement `PERCENT_RANK` it would end up in this quadrant, we cannot produce any result without seeing whole data)| +/// |false|true|`evaluate` (optionally can also implement `evaluate_all` for more optimized implementation. However, there will be default implementation that is suboptimal) . If we were to implement `ROW_NUMBER` it will end up in this quadrant. Example `OddRowNumber` showcases this use case| +/// |true|false|`evaluate` (I think as long as `uses_window_frame` is `true`. There is no way for `supports_bounded_execution` to be false). I couldn't come up with any example for this quadrant | +/// |true|true|`evaluate`. If we were to implement `FIRST_VALUE`, it would end up in this quadrant|. pub trait PartitionEvaluator: Debug + Send { /// Updates the internal state for window function /// @@ -120,35 +128,58 @@ pub trait PartitionEvaluator: Debug + Send { Ok(()) } - /// Gets the range where the window function result is calculated. - /// - /// `idx`: is the index of last row for which result is calculated. - /// `n_rows`: is the number of rows of the input record batch (Used during bounds check) - fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { - Err(DataFusionError::NotImplemented( - "get_range is not implemented for this window function".to_string(), - )) + /// If `uses_window_frame` flag is `false`. This method is used to calculate required range for the window function + /// Generally there is no required range, hence by default this returns smallest range(current row). e.g seeing current row + /// is enough to calculate window result (such as row_number, rank, etc) + fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { + if self.uses_window_frame() { + Err(DataFusionError::Execution( + "Range should be calculated from window frame".to_string(), + )) + } else { + Ok(Range { + start: idx, + end: idx + 1, + }) + } } /// Called for window functions that *do not use* values from the /// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, - /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). - fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate is not implemented by default".into(), - )) + /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). It produces result + /// of all rows in a single pass. It expects to receive whole table data + /// as a single batch. + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { + // When window frame boundaries are not used and evaluator supports bounded execution + // We can calculate evaluate result by repeatedly calling `self.evaluate` `num_rows` times + // If user wants to implement more efficient version, this method should be overwritten + // Default implementation may behave suboptimally (For instance `NumRowEvaluator` overwrites it) + if !self.uses_window_frame() && self.supports_bounded_execution() { + let res = (0..num_rows) + .map(|idx| self.evaluate(values, &self.get_range(idx, num_rows)?)) + .collect::>>()?; + ScalarValue::iter_to_array(res.into_iter()) + } else { + Err(DataFusionError::NotImplemented( + "evaluate_all is not implemented by default".into(), + )) + } } /// Evaluate window function result inside given range. /// /// Only used for stateful evaluation - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { Err(DataFusionError::NotImplemented( - "evaluate_stateful is not implemented by default".into(), + "evaluate is not implemented by default".into(), )) } - /// [`PartitionEvaluator::evaluate_with_rank`] is called for window + /// [`PartitionEvaluator::evaluate_with_rank_all`] is called for window /// functions that only need the rank of a row within its window /// frame. /// @@ -175,7 +206,7 @@ pub trait PartitionEvaluator: Debug + Send { /// (3,4), /// ] /// ``` - fn evaluate_with_rank( + fn evaluate_with_rank_all( &self, _num_rows: usize, _ranks_in_partition: &[Range], @@ -185,18 +216,25 @@ pub trait PartitionEvaluator: Debug + Send { )) } - /// Called for window functions that use values from window frame, - /// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a - /// single value for every row in the partition. + /// Can the window function be incrementally computed using + /// bounded memory? /// - /// Returns a [`ScalarValue`] that is the value of the window function for the entire partition - fn evaluate_inside_range( - &self, - _values: &[ArrayRef], - _range: &Range, - ) -> Result { - Err(DataFusionError::NotImplemented( - "evaluate_inside_range is not implemented by default".into(), - )) + /// If this function returns true, implement [`PartitionEvaluator::evaluate`] + fn supports_bounded_execution(&self) -> bool { + false + } + + /// Does the window function use the values from its window frame? + /// + /// If this function returns true, implement [`PartitionEvaluator::evaluate`] + fn uses_window_frame(&self) -> bool { + false + } + + /// Can this function be evaluated with (only) rank + /// + /// If `include_rank` is true, implement [`PartitionEvaluator::evaluate_with_rank_all`] + fn include_rank(&self) -> bool { + false } } diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index 918fa89f0e0c..be184ca891de 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -100,14 +100,6 @@ impl BuiltInWindowFunctionExpr for Rank { &self.name } - fn supports_bounded_execution(&self) -> bool { - matches!(self.rank_type, RankType::Basic | RankType::Dense) - } - - fn include_rank(&self) -> bool { - true - } - fn create_evaluator(&self) -> Result> { Ok(Box::new(RankEvaluator { state: RankState::default(), @@ -123,12 +115,6 @@ pub(crate) struct RankEvaluator { } impl PartitionEvaluator for RankEvaluator { - fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - let start = idx; - let end = idx + 1; - Ok(Range { start, end }) - } - fn update_state( &mut self, state: &WindowAggState, @@ -157,7 +143,11 @@ impl PartitionEvaluator for RankEvaluator { } /// evaluate window function result inside given range - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { match self.rank_type { RankType::Basic => Ok(ScalarValue::UInt64(Some( self.state.last_rank_boundary as u64 + 1, @@ -169,7 +159,7 @@ impl PartitionEvaluator for RankEvaluator { } } - fn evaluate_with_rank( + fn evaluate_with_rank_all( &self, num_rows: usize, ranks_in_partition: &[Range], @@ -215,6 +205,14 @@ impl PartitionEvaluator for RankEvaluator { }; Ok(result) } + + fn supports_bounded_execution(&self) -> bool { + matches!(self.rank_type, RankType::Basic | RankType::Dense) + } + + fn include_rank(&self) -> bool { + true + } } #[cfg(test)] @@ -238,7 +236,7 @@ mod tests { ) -> Result<()> { let result = expr .create_evaluator()? - .evaluate_with_rank(num_rows, &ranks)?; + .evaluate_with_rank_all(num_rows, &ranks)?; let result = as_float64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); @@ -250,7 +248,7 @@ mod tests { ranks: Vec>, expected: Vec, ) -> Result<()> { - let result = expr.create_evaluator()?.evaluate_with_rank(8, &ranks)?; + let result = expr.create_evaluator()?.evaluate_with_rank_all(8, &ranks)?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(expected, *result); diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 0abc98289793..3c1f0583b47c 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -85,10 +85,6 @@ impl BuiltInWindowFunctionExpr for RowNumber { fn create_evaluator(&self) -> Result> { Ok(Box::::default()) } - - fn supports_bounded_execution(&self) -> bool { - true - } } #[derive(Default, Debug)] @@ -97,23 +93,29 @@ pub(crate) struct NumRowsEvaluator { } impl PartitionEvaluator for NumRowsEvaluator { - fn get_range(&self, idx: usize, _n_rows: usize) -> Result> { - let start = idx; - let end = idx + 1; - Ok(Range { start, end }) - } - /// evaluate window function result inside given range - fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { + fn evaluate( + &mut self, + _values: &[ArrayRef], + _range: &Range, + ) -> Result { self.state.n_rows += 1; Ok(ScalarValue::UInt64(Some(self.state.n_rows as u64))) } - fn evaluate(&self, _values: &[ArrayRef], num_rows: usize) -> Result { + fn evaluate_all( + &mut self, + _values: &[ArrayRef], + num_rows: usize, + ) -> Result { Ok(Arc::new(UInt64Array::from_iter_values( 1..(num_rows as u64) + 1, ))) } + + fn supports_bounded_execution(&self) -> bool { + true + } } #[cfg(test)] @@ -134,7 +136,7 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result); @@ -152,7 +154,7 @@ mod tests { let values = row_number.evaluate_args(&batch)?; let result = row_number .create_evaluator()? - .evaluate(&values, batch.num_rows())?; + .evaluate_all(&values, batch.num_rows())?; let result = as_uint64_array(&result)?; let result = result.values(); assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *result);