diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 4082a22daac1..43bdd959c27e 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -40,7 +40,7 @@ //! # Examples //! //! The main entry point for interacting with DataFusion is the -//! [`SessionContext`]. +//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`. //! //! [`SessionContext`]: execution::context::SessionContext //! diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 83a9c90ebd70..58c257c97f99 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -150,7 +150,7 @@ impl ExternalSorter { // // The factor of 2 aims to avoid a degenerate case where the // memory required for `fetch` is just under the memory available, - // causing repeated resorting of data + // causing repeated re-sorting of data if self.reservation.size() > before / 2 || self.reservation.try_grow(size).is_err() { diff --git a/datafusion/expr/src/window_function.rs b/datafusion/expr/src/window_function.rs index ac8a731a1611..a5b58d173c1a 100644 --- a/datafusion/expr/src/window_function.rs +++ b/datafusion/expr/src/window_function.rs @@ -73,7 +73,9 @@ impl fmt::Display for WindowFunction { } } -/// An aggregate function that is part of a built-in window function +/// A [window function] built in to DataFusion +/// +/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL) #[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)] pub enum BuiltInWindowFunction { /// number of the current row within its partition, counting from 1 diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 698754ab3bff..c8a4797a5288 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -37,12 +37,9 @@ use crate::{ expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, }; -/// A window expr that takes the form of an aggregate function -/// Aggregate Window Expressions that have the form -/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)` -/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions -/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)` -/// e.g sliding window frames uses `SlidingAggregateWindowExpr`. +/// A window expr that takes the form of an aggregate function. +/// +/// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct PlainAggregateWindowExpr { aggregate: Arc, diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index c09e6db7442b..030c20c5743c 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -39,7 +39,7 @@ use datafusion_common::utils::evaluate_partition_ranges; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::WindowFrame; -/// A window expr that takes the form of a built in window function +/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`]. #[derive(Debug)] pub struct BuiltInWindowExpr { expr: Arc, diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 6f41ec599a83..59438a72f275 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -24,20 +24,24 @@ use datafusion_common::Result; use std::any::Any; use std::sync::Arc; -/// A window expression that is a built-in window function. +/// Evaluates a window function by instantiating a +/// `[PartitionEvaluator]` for calculating the function's output in +/// that partition. /// -/// Note that unlike aggregation based window functions, built-in window functions normally ignore -/// window frame spec, with the exception of first_value, last_value, and nth_value. +/// Note that unlike aggregation based window functions, some window +/// functions such as `rank` ignore the values in the window frame, +/// but others such as `first_value`, `last_value`, and +/// `nth_value` need the value. +#[allow(rustdoc::private_intra_doc_links)] pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { /// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. fn as_any(&self) -> &dyn Any; - /// the field of the final result of this aggregation. + /// The field of the final result of evaluating this window function. fn field(&self) -> Result; - /// expressions that are passed to the Accumulator. - /// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many. + /// Expressions that are passed to the [`PartitionEvaluator`]. fn expressions(&self) -> Vec>; /// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default @@ -46,8 +50,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { "BuiltInWindowFunctionExpr: default name" } - /// Evaluate window function arguments against the batch and return - /// an array ref. Typically, the resulting vector is a single element vector. + /// Evaluate window function's arguments against the input window + /// batch and return an [`ArrayRef`]. + /// + /// Typically, the resulting vector is a single element vector. fn evaluate_args(&self, batch: &RecordBatch) -> Result> { self.expressions() .iter() @@ -56,19 +62,36 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { .collect() } - /// Create built-in window evaluator with a batch + /// Create a [`PartitionEvaluator`] for evaluating the function on + /// a particular partition. fn create_evaluator(&self) -> Result>; - /// Construct Reverse Expression that produces the same result - /// on a reversed window. For example `lead(10)` --> `lag(10)` + /// Construct a new [`BuiltInWindowFunctionExpr`] that produces + /// the same result as this function on a window with reverse + /// order. The return value of this function is used by the + /// DataFusion optimizer to avoid re-sorting the data when + /// possible. + /// + /// Returns `None` (the default) if no reverse is known (or possible). + /// + /// For example, the reverse of `lead(10)` is `lag(10)`. fn reverse_expr(&self) -> Option> { None } + /// Can the window function be incrementally computed using + /// bounded memory? + /// + /// If this function returns true, [`Self::create_evaluator`] must + /// implement [`PartitionEvaluator::evaluate_stateful`] fn supports_bounded_execution(&self) -> bool { false } + /// Does the window function use the values from its window frame? + /// + /// If this function returns true, [`Self::create_evaluator`] must + /// implement [`PartitionEvaluator::evaluate_inside_range`] fn uses_window_frame(&self) -> bool { false } diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs index 758f7c3b1b23..db60fdd5f1fa 100644 --- a/datafusion/physical-expr/src/window/partition_evaluator.rs +++ b/datafusion/physical-expr/src/window/partition_evaluator.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! partition evaluation module +//! Partition evaluation module use crate::window::window_expr::BuiltinWindowState; use crate::window::WindowAggState; @@ -25,24 +25,97 @@ use datafusion_common::{DataFusionError, ScalarValue}; use std::fmt::Debug; use std::ops::Range; -/// Partition evaluator +/// Partition evaluator for Window Functions +/// +/// # Background +/// +/// An implementation of this trait is created and used for each +/// partition defined by an `OVER` clause and is instantiated by +/// [`BuiltInWindowFunctionExpr::create_evaluator`] +/// +/// For example, evaluating `window_func(val) OVER (PARTITION BY col)` +/// on the following data: +/// +/// ```text +/// col | val +/// --- + ---- +/// A | 10 +/// A | 10 +/// C | 20 +/// D | 30 +/// D | 30 +/// ``` +/// +/// Will instantiate three `PartitionEvaluator`s, one each for the +/// partitions defined by `col=A`, `col=B`, and `col=C`. +/// +/// ```text +/// col | val +/// --- + ---- +/// A | 10 <--- partition 1 +/// A | 10 +/// +/// col | val +/// --- + ---- +/// C | 20 <--- partition 2 +/// +/// col | val +/// --- + ---- +/// D | 30 <--- partition 3 +/// D | 30 +/// ``` +/// +/// Different methods on this trait will be called depending on the +/// capabilities described by [`BuiltInWindowFunctionExpr`]: +/// +/// # Stateless `PartitionEvaluator` +/// +/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or +/// [`Self::evaluate_inside_range`] is called with values for the +/// entire partition. +/// +/// # Stateful `PartitionEvaluator` +/// +/// In this case, [`Self::evaluate_stateful`] is called to calculate +/// the results of the window function incrementally for each new +/// batch, saving and restoring any state needed to do so as +/// [`BuiltinWindowState`]. +/// +/// For example, when computing `ROW_NUMBER` incrementally, +/// [`Self::evaluate_stateful`] will be called multiple times with +/// different batches. For all batches after the first, the output +/// `row_number` must start from last `row_number` produced for the +/// previous batch. The previous row number is saved and restored as +/// the state. +/// +/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr +/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator pub trait PartitionEvaluator: Debug + Send { - /// Whether the evaluator should be evaluated with rank + /// Can this evaluator be evaluated with (only) rank + /// + /// If `include_rank` is true, then [`Self::evaluate_with_rank`] + /// will be called for each partition, which includes the + /// `rank`. fn include_rank(&self) -> bool { false } - /// Returns state of the Built-in Window Function + /// Returns the internal state of the window function + /// + /// Only used for stateful evaluation fn state(&self) -> Result { // If we do not use state we just return Default Ok(BuiltinWindowState::Default) } - /// Updates the internal state for Built-in window function - // state is useful to update internal state for Built-in window function. - // idx is the index of last row for which result is calculated. - // range_columns is the result of order by column values. It is used to calculate rank boundaries - // sort_partition_points is the boundaries of each rank in the range_column. It is used to update rank. + /// Updates the internal state for window function + /// + /// Only used for stateful evaluation + /// + /// `state`: is useful to update internal state for window function. + /// `idx`: is the index of last row for which result is calculated. + /// `range_columns`: is the result of order by column values. It is used to calculate rank boundaries + /// `sort_partition_points`: is the boundaries of each rank in the range_column. It is used to update rank. fn update_state( &mut self, _state: &WindowAggState, @@ -54,36 +127,72 @@ pub trait PartitionEvaluator: Debug + Send { Ok(()) } + /// Sets the internal state for window function + /// + /// Only used for stateful evaluation fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> { Err(DataFusionError::NotImplemented( "set_state is not implemented for this window function".to_string(), )) } - /// Gets the range where Built-in window function result is calculated. - // idx is the index of last row for which result is calculated. - // n_rows is the number of rows of the input record batch (Used during bound check) + /// Gets the range where the window function result is calculated. + /// + /// `idx`: is the index of last row for which result is calculated. + /// `n_rows`: is the number of rows of the input record batch (Used during bounds check) fn get_range(&self, _idx: usize, _n_rows: usize) -> Result> { Err(DataFusionError::NotImplemented( "get_range is not implemented for this window function".to_string(), )) } - /// Evaluate the partition evaluator against the partition + /// Called for window functions that *do not use* values from the + /// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`, + /// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`). fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result { Err(DataFusionError::NotImplemented( "evaluate is not implemented by default".into(), )) } - /// Evaluate window function result inside given range + /// Evaluate window function result inside given range. + /// + /// Only used for stateful evaluation fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result { Err(DataFusionError::NotImplemented( "evaluate_stateful is not implemented by default".into(), )) } - /// evaluate the partition evaluator against the partition but with rank + /// [`PartitionEvaluator::evaluate_with_rank`] is called for window + /// functions that only need the rank of a row within its window + /// frame. + /// + /// Evaluate the partition evaluator against the partition using + /// the row ranks. For example, `RANK(col)` produces + /// + /// ```text + /// col | rank + /// --- + ---- + /// A | 1 + /// A | 1 + /// C | 3 + /// D | 4 + /// D | 5 + /// ``` + /// + /// For this case, `num_rows` would be `5` and the + /// `ranks_in_partition` would be called with + /// + /// ```text + /// [ + /// (0,1), + /// (2,2), + /// (3,4), + /// ] + /// ``` + /// + /// See [`Self::include_rank`] for more details fn evaluate_with_rank( &self, _num_rows: usize, @@ -94,7 +203,11 @@ pub trait PartitionEvaluator: Debug + Send { )) } - /// evaluate window function result inside given range + /// Called for window functions that use values from window frame, + /// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a + /// single value for every row in the partition. + /// + /// Returns a [`ScalarValue`] that is the value of the window function for the entire partition fn evaluate_inside_range( &self, _values: &[ArrayRef], diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs b/datafusion/physical-expr/src/window/sliding_aggregate.rs index 8ce3f42bea60..709f8d23be36 100644 --- a/datafusion/physical-expr/src/window/sliding_aggregate.rs +++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs @@ -36,12 +36,10 @@ use crate::{ expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr, }; -/// A window expr that takes the form of an aggregate function -/// Aggregate Window Expressions that have the form -/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)` -/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions -/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)` -/// e.g sliding window frames uses `SlidingAggregateWindowExpr`. +/// A window expr that takes the form of an aggregate function that +/// can be incrementally computed over sliding windows. +/// +/// See comments on [`WindowExpr`] for more details. #[derive(Debug)] pub struct SlidingAggregateWindowExpr { aggregate: Arc, @@ -72,10 +70,11 @@ impl SlidingAggregateWindowExpr { } } -/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns -/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same -/// results for peers) and concatenate the results. - +/// Incrementally update window function using the fact that batch is +/// pre-sorted given the sort columns and then per partition point. +/// +/// Evaluates the peer group (e.g. `SUM` or `MAX` gives the same results +/// for peers) and concatenate the results. impl WindowExpr for SlidingAggregateWindowExpr { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index ec0d929b7dd4..7fe616feda61 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -32,8 +32,31 @@ use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; -/// A window expression that: -/// * knows its resulting field +/// Common trait for [window function] implementations +/// +/// # Aggregate Window Expressions +/// +/// These expressions take the form +/// +/// ```text +/// OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...) +/// ``` +/// +/// For example, cumulative window frames uses `PlainAggregateWindowExpr`. +/// +/// # Non Aggregate Window Expressions +/// +/// The expressions have the form +/// +/// ```text +/// OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...) +/// ``` +/// +/// For example, sliding window frames use [`SlidingAggregateWindowExpr`]. +/// +/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL) +/// [`PlainAggregateWindowExpr`]: crate::window::PlainAggregateWindowExpr +/// [`SlidingAggregateWindowExpr`]: crate::window::SlidingAggregateWindowExpr pub trait WindowExpr: Send + Sync + Debug { /// Returns the window expression as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. @@ -123,7 +146,7 @@ pub trait WindowExpr: Send + Sync + Debug { fn get_reverse_expr(&self) -> Option>; } -/// Trait for different `AggregateWindowExpr`s (`PlainAggregateWindowExpr`, `SlidingAggregateWindowExpr`) +/// Extension trait that adds common functionality to [`AggregateWindowExpr`]s pub trait AggregateWindowExpr: WindowExpr { /// Get the accumulator for the window expression. Note that distinct /// window expressions may return distinct accumulators; e.g. sliding