Skip to content

Commit 4c56457

Browse files
alambjayzhan211
authored andcommitted
Minor: Add additional docstrings to Window function implementations (apache#6592)
* Add additional docstrings to Window function implementations * Update docs * updates * fix doc link * Change resorting --> re-sorting
1 parent 065a493 commit 4c56457

File tree

9 files changed

+207
-50
lines changed

9 files changed

+207
-50
lines changed

datafusion/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
//! # Examples
4141
//!
4242
//! The main entry point for interacting with DataFusion is the
43-
//! [`SessionContext`].
43+
//! [`SessionContext`]. [`Expr`]s represent expressions such as `a + b`.
4444
//!
4545
//! [`SessionContext`]: execution::context::SessionContext
4646
//!

datafusion/core/src/physical_plan/sorts/sort.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ impl ExternalSorter {
150150
//
151151
// The factor of 2 aims to avoid a degenerate case where the
152152
// memory required for `fetch` is just under the memory available,
153-
// causing repeated resorting of data
153+
// causing repeated re-sorting of data
154154
if self.reservation.size() > before / 2
155155
|| self.reservation.try_grow(size).is_err()
156156
{

datafusion/expr/src/window_function.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ impl fmt::Display for WindowFunction {
7373
}
7474
}
7575

76-
/// An aggregate function that is part of a built-in window function
76+
/// A [window function] built in to DataFusion
77+
///
78+
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
7779
#[derive(Debug, Clone, PartialEq, Eq, Hash, EnumIter)]
7880
pub enum BuiltInWindowFunction {
7981
/// number of the current row within its partition, counting from 1

datafusion/physical-expr/src/window/aggregate.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,9 @@ use crate::{
3737
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
3838
};
3939

40-
/// A window expr that takes the form of an aggregate function
41-
/// Aggregate Window Expressions that have the form
42-
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
43-
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
44-
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
45-
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
40+
/// A window expr that takes the form of an aggregate function.
41+
///
42+
/// See comments on [`WindowExpr`] for more details.
4643
#[derive(Debug)]
4744
pub struct PlainAggregateWindowExpr {
4845
aggregate: Arc<dyn AggregateExpr>,

datafusion/physical-expr/src/window/built_in.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
3939
use datafusion_common::{Result, ScalarValue};
4040
use datafusion_expr::WindowFrame;
4141

42-
/// A window expr that takes the form of a built in window function
42+
/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
4343
#[derive(Debug)]
4444
pub struct BuiltInWindowExpr {
4545
expr: Arc<dyn BuiltInWindowFunctionExpr>,

datafusion/physical-expr/src/window/built_in_window_function_expr.rs

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,24 @@ use datafusion_common::Result;
2424
use std::any::Any;
2525
use std::sync::Arc;
2626

27-
/// A window expression that is a built-in window function.
27+
/// Evaluates a window function by instantiating a
28+
/// `[PartitionEvaluator]` for calculating the function's output in
29+
/// that partition.
2830
///
29-
/// Note that unlike aggregation based window functions, built-in window functions normally ignore
30-
/// window frame spec, with the exception of first_value, last_value, and nth_value.
31+
/// Note that unlike aggregation based window functions, some window
32+
/// functions such as `rank` ignore the values in the window frame,
33+
/// but others such as `first_value`, `last_value`, and
34+
/// `nth_value` need the value.
35+
#[allow(rustdoc::private_intra_doc_links)]
3136
pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
3237
/// Returns the aggregate expression as [`Any`](std::any::Any) so that it can be
3338
/// downcast to a specific implementation.
3439
fn as_any(&self) -> &dyn Any;
3540

36-
/// the field of the final result of this aggregation.
41+
/// The field of the final result of evaluating this window function.
3742
fn field(&self) -> Result<Field>;
3843

39-
/// expressions that are passed to the Accumulator.
40-
/// Single-column aggregations such as `sum` return a single value, others (e.g. `cov`) return many.
44+
/// Expressions that are passed to the [`PartitionEvaluator`].
4145
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
4246

4347
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
@@ -46,8 +50,10 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
4650
"BuiltInWindowFunctionExpr: default name"
4751
}
4852

49-
/// Evaluate window function arguments against the batch and return
50-
/// an array ref. Typically, the resulting vector is a single element vector.
53+
/// Evaluate window function's arguments against the input window
54+
/// batch and return an [`ArrayRef`].
55+
///
56+
/// Typically, the resulting vector is a single element vector.
5157
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
5258
self.expressions()
5359
.iter()
@@ -56,19 +62,36 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
5662
.collect()
5763
}
5864

59-
/// Create built-in window evaluator with a batch
65+
/// Create a [`PartitionEvaluator`] for evaluating the function on
66+
/// a particular partition.
6067
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
6168

62-
/// Construct Reverse Expression that produces the same result
63-
/// on a reversed window. For example `lead(10)` --> `lag(10)`
69+
/// Construct a new [`BuiltInWindowFunctionExpr`] that produces
70+
/// the same result as this function on a window with reverse
71+
/// order. The return value of this function is used by the
72+
/// DataFusion optimizer to avoid re-sorting the data when
73+
/// possible.
74+
///
75+
/// Returns `None` (the default) if no reverse is known (or possible).
76+
///
77+
/// For example, the reverse of `lead(10)` is `lag(10)`.
6478
fn reverse_expr(&self) -> Option<Arc<dyn BuiltInWindowFunctionExpr>> {
6579
None
6680
}
6781

82+
/// Can the window function be incrementally computed using
83+
/// bounded memory?
84+
///
85+
/// If this function returns true, [`Self::create_evaluator`] must
86+
/// implement [`PartitionEvaluator::evaluate_stateful`]
6887
fn supports_bounded_execution(&self) -> bool {
6988
false
7089
}
7190

91+
/// Does the window function use the values from its window frame?
92+
///
93+
/// If this function returns true, [`Self::create_evaluator`] must
94+
/// implement [`PartitionEvaluator::evaluate_inside_range`]
7295
fn uses_window_frame(&self) -> bool {
7396
false
7497
}

datafusion/physical-expr/src/window/partition_evaluator.rs

Lines changed: 129 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! partition evaluation module
18+
//! Partition evaluation module
1919
2020
use crate::window::window_expr::BuiltinWindowState;
2121
use crate::window::WindowAggState;
@@ -25,24 +25,97 @@ use datafusion_common::{DataFusionError, ScalarValue};
2525
use std::fmt::Debug;
2626
use std::ops::Range;
2727

28-
/// Partition evaluator
28+
/// Partition evaluator for Window Functions
29+
///
30+
/// # Background
31+
///
32+
/// An implementation of this trait is created and used for each
33+
/// partition defined by an `OVER` clause and is instantiated by
34+
/// [`BuiltInWindowFunctionExpr::create_evaluator`]
35+
///
36+
/// For example, evaluating `window_func(val) OVER (PARTITION BY col)`
37+
/// on the following data:
38+
///
39+
/// ```text
40+
/// col | val
41+
/// --- + ----
42+
/// A | 10
43+
/// A | 10
44+
/// C | 20
45+
/// D | 30
46+
/// D | 30
47+
/// ```
48+
///
49+
/// Will instantiate three `PartitionEvaluator`s, one each for the
50+
/// partitions defined by `col=A`, `col=B`, and `col=C`.
51+
///
52+
/// ```text
53+
/// col | val
54+
/// --- + ----
55+
/// A | 10 <--- partition 1
56+
/// A | 10
57+
///
58+
/// col | val
59+
/// --- + ----
60+
/// C | 20 <--- partition 2
61+
///
62+
/// col | val
63+
/// --- + ----
64+
/// D | 30 <--- partition 3
65+
/// D | 30
66+
/// ```
67+
///
68+
/// Different methods on this trait will be called depending on the
69+
/// capabilities described by [`BuiltInWindowFunctionExpr`]:
70+
///
71+
/// # Stateless `PartitionEvaluator`
72+
///
73+
/// In this case, [`Self::evaluate`], [`Self::evaluate_with_rank`] or
74+
/// [`Self::evaluate_inside_range`] is called with values for the
75+
/// entire partition.
76+
///
77+
/// # Stateful `PartitionEvaluator`
78+
///
79+
/// In this case, [`Self::evaluate_stateful`] is called to calculate
80+
/// the results of the window function incrementally for each new
81+
/// batch, saving and restoring any state needed to do so as
82+
/// [`BuiltinWindowState`].
83+
///
84+
/// For example, when computing `ROW_NUMBER` incrementally,
85+
/// [`Self::evaluate_stateful`] will be called multiple times with
86+
/// different batches. For all batches after the first, the output
87+
/// `row_number` must start from last `row_number` produced for the
88+
/// previous batch. The previous row number is saved and restored as
89+
/// the state.
90+
///
91+
/// [`BuiltInWindowFunctionExpr`]: crate::window::BuiltInWindowFunctionExpr
92+
/// [`BuiltInWindowFunctionExpr::create_evaluator`]: crate::window::BuiltInWindowFunctionExpr::create_evaluator
2993
pub trait PartitionEvaluator: Debug + Send {
30-
/// Whether the evaluator should be evaluated with rank
94+
/// Can this evaluator be evaluated with (only) rank
95+
///
96+
/// If `include_rank` is true, then [`Self::evaluate_with_rank`]
97+
/// will be called for each partition, which includes the
98+
/// `rank`.
3199
fn include_rank(&self) -> bool {
32100
false
33101
}
34102

35-
/// Returns state of the Built-in Window Function
103+
/// Returns the internal state of the window function
104+
///
105+
/// Only used for stateful evaluation
36106
fn state(&self) -> Result<BuiltinWindowState> {
37107
// If we do not use state we just return Default
38108
Ok(BuiltinWindowState::Default)
39109
}
40110

41-
/// Updates the internal state for Built-in window function
42-
// state is useful to update internal state for Built-in window function.
43-
// idx is the index of last row for which result is calculated.
44-
// range_columns is the result of order by column values. It is used to calculate rank boundaries
45-
// sort_partition_points is the boundaries of each rank in the range_column. It is used to update rank.
111+
/// Updates the internal state for window function
112+
///
113+
/// Only used for stateful evaluation
114+
///
115+
/// `state`: is useful to update internal state for window function.
116+
/// `idx`: is the index of last row for which result is calculated.
117+
/// `range_columns`: is the result of order by column values. It is used to calculate rank boundaries
118+
/// `sort_partition_points`: is the boundaries of each rank in the range_column. It is used to update rank.
46119
fn update_state(
47120
&mut self,
48121
_state: &WindowAggState,
@@ -54,36 +127,72 @@ pub trait PartitionEvaluator: Debug + Send {
54127
Ok(())
55128
}
56129

130+
/// Sets the internal state for window function
131+
///
132+
/// Only used for stateful evaluation
57133
fn set_state(&mut self, _state: &BuiltinWindowState) -> Result<()> {
58134
Err(DataFusionError::NotImplemented(
59135
"set_state is not implemented for this window function".to_string(),
60136
))
61137
}
62138

63-
/// Gets the range where Built-in window function result is calculated.
64-
// idx is the index of last row for which result is calculated.
65-
// n_rows is the number of rows of the input record batch (Used during bound check)
139+
/// Gets the range where the window function result is calculated.
140+
///
141+
/// `idx`: is the index of last row for which result is calculated.
142+
/// `n_rows`: is the number of rows of the input record batch (Used during bounds check)
66143
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<Range<usize>> {
67144
Err(DataFusionError::NotImplemented(
68145
"get_range is not implemented for this window function".to_string(),
69146
))
70147
}
71148

72-
/// Evaluate the partition evaluator against the partition
149+
/// Called for window functions that *do not use* values from the
150+
/// the window frame, such as `ROW_NUMBER`, `RANK`, `DENSE_RANK`,
151+
/// `PERCENT_RANK`, `CUME_DIST`, `LEAD`, `LAG`).
73152
fn evaluate(&self, _values: &[ArrayRef], _num_rows: usize) -> Result<ArrayRef> {
74153
Err(DataFusionError::NotImplemented(
75154
"evaluate is not implemented by default".into(),
76155
))
77156
}
78157

79-
/// Evaluate window function result inside given range
158+
/// Evaluate window function result inside given range.
159+
///
160+
/// Only used for stateful evaluation
80161
fn evaluate_stateful(&mut self, _values: &[ArrayRef]) -> Result<ScalarValue> {
81162
Err(DataFusionError::NotImplemented(
82163
"evaluate_stateful is not implemented by default".into(),
83164
))
84165
}
85166

86-
/// evaluate the partition evaluator against the partition but with rank
167+
/// [`PartitionEvaluator::evaluate_with_rank`] is called for window
168+
/// functions that only need the rank of a row within its window
169+
/// frame.
170+
///
171+
/// Evaluate the partition evaluator against the partition using
172+
/// the row ranks. For example, `RANK(col)` produces
173+
///
174+
/// ```text
175+
/// col | rank
176+
/// --- + ----
177+
/// A | 1
178+
/// A | 1
179+
/// C | 3
180+
/// D | 4
181+
/// D | 5
182+
/// ```
183+
///
184+
/// For this case, `num_rows` would be `5` and the
185+
/// `ranks_in_partition` would be called with
186+
///
187+
/// ```text
188+
/// [
189+
/// (0,1),
190+
/// (2,2),
191+
/// (3,4),
192+
/// ]
193+
/// ```
194+
///
195+
/// See [`Self::include_rank`] for more details
87196
fn evaluate_with_rank(
88197
&self,
89198
_num_rows: usize,
@@ -94,7 +203,11 @@ pub trait PartitionEvaluator: Debug + Send {
94203
))
95204
}
96205

97-
/// evaluate window function result inside given range
206+
/// Called for window functions that use values from window frame,
207+
/// such as `FIRST_VALUE`, `LAST_VALUE`, `NTH_VALUE` and produce a
208+
/// single value for every row in the partition.
209+
///
210+
/// Returns a [`ScalarValue`] that is the value of the window function for the entire partition
98211
fn evaluate_inside_range(
99212
&self,
100213
_values: &[ArrayRef],

datafusion/physical-expr/src/window/sliding_aggregate.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,10 @@ use crate::{
3636
expressions::PhysicalSortExpr, reverse_order_bys, AggregateExpr, PhysicalExpr,
3737
};
3838

39-
/// A window expr that takes the form of an aggregate function
40-
/// Aggregate Window Expressions that have the form
41-
/// `OVER({ROWS | RANGE| GROUPS} BETWEEN UNBOUNDED PRECEDING AND ...)`
42-
/// e.g cumulative window frames uses `PlainAggregateWindowExpr`. Where as Aggregate Window Expressions
43-
/// that have the form `OVER({ROWS | RANGE| GROUPS} BETWEEN M {PRECEDING| FOLLOWING} AND ...)`
44-
/// e.g sliding window frames uses `SlidingAggregateWindowExpr`.
39+
/// A window expr that takes the form of an aggregate function that
40+
/// can be incrementally computed over sliding windows.
41+
///
42+
/// See comments on [`WindowExpr`] for more details.
4543
#[derive(Debug)]
4644
pub struct SlidingAggregateWindowExpr {
4745
aggregate: Arc<dyn AggregateExpr>,
@@ -72,10 +70,11 @@ impl SlidingAggregateWindowExpr {
7270
}
7371
}
7472

75-
/// peer based evaluation based on the fact that batch is pre-sorted given the sort columns
76-
/// and then per partition point we'll evaluate the peer group (e.g. SUM or MAX gives the same
77-
/// results for peers) and concatenate the results.
78-
73+
/// Incrementally update window function using the fact that batch is
74+
/// pre-sorted given the sort columns and then per partition point.
75+
///
76+
/// Evaluates the peer group (e.g. `SUM` or `MAX` gives the same results
77+
/// for peers) and concatenate the results.
7978
impl WindowExpr for SlidingAggregateWindowExpr {
8079
/// Return a reference to Any that can be used for downcasting
8180
fn as_any(&self) -> &dyn Any {

0 commit comments

Comments
 (0)