Skip to content

Commit 5388bbb

Browse files
adriangbclaude
authored andcommitted
feat: Add percentile_cont aggregate function (apache#17988)
## Summary Adds exact `percentile_cont` aggregate function as the counterpart to the existing `approx_percentile_cont` function. ## What changes were made? ### New Implementation - Created `percentile_cont.rs` with full implementation - `PercentileCont` struct implementing `AggregateUDFImpl` - `PercentileContAccumulator` for standard aggregation - `DistinctPercentileContAccumulator` for DISTINCT mode - `PercentileContGroupsAccumulator` for efficient grouped aggregation - `calculate_percentile` function with linear interpolation ### Features - **Exact calculation**: Stores all values in memory for precise results - **WITHIN GROUP syntax**: Supports `WITHIN GROUP (ORDER BY ...)` - **Interpolation**: Uses linear interpolation between values - **All numeric types**: Works with integers, floats, and decimals - **Ordered-set aggregate**: Properly marked as `is_ordered_set_aggregate()` - **GROUP BY support**: Efficient grouped aggregation via GroupsAccumulator ### Tests Added comprehensive tests in `aggregate.slt`: - Error conditions validation - Basic percentile calculations (0.0, 0.25, 0.5, 0.75, 1.0) - Comparison with `median` function - Ascending and descending order - GROUP BY aggregation - NULL handling - Edge cases (empty sets, single values) - Float interpolation - Various numeric data types ## Example Usage ```sql -- Basic usage with WITHIN GROUP syntax SELECT percentile_cont(0.75) WITHIN GROUP (ORDER BY column_name) FROM table_name; -- With GROUP BY SELECT category, percentile_cont(0.95) WITHIN GROUP (ORDER BY value) FROM sales GROUP BY category; -- Compare with median (percentile_cont(0.5) == median) SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY price) FROM products; ``` ## Performance Considerations Like `median`, this function stores all values in memory before computing results. For large datasets or when approximation is acceptable, use `approx_percentile_cont` instead. ## Related Issues Closes apache#6714 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude <[email protected]>
1 parent fd07e73 commit 5388bbb

File tree

7 files changed

+1294
-50
lines changed

7 files changed

+1294
-50
lines changed

datafusion/functions-aggregate/src/approx_percentile_cont.rs

Lines changed: 19 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,36 @@ use std::fmt::{Debug, Formatter};
2020
use std::mem::size_of_val;
2121
use std::sync::Arc;
2222

23-
use arrow::array::{Array, RecordBatch};
23+
use arrow::array::Array;
2424
use arrow::compute::{filter, is_not_null};
2525
use arrow::datatypes::FieldRef;
2626
use arrow::{
2727
array::{
2828
ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
2929
Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
3030
},
31-
datatypes::{DataType, Field, Schema},
31+
datatypes::{DataType, Field},
3232
};
3333
use datafusion_common::{
34-
downcast_value, internal_err, not_impl_datafusion_err, not_impl_err, plan_err,
35-
Result, ScalarValue,
34+
downcast_value, internal_err, not_impl_err, plan_err, DataFusionError, Result,
35+
ScalarValue,
3636
};
3737
use datafusion_expr::expr::{AggregateFunction, Sort};
3838
use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
3939
use datafusion_expr::type_coercion::aggregates::{INTEGERS, NUMERICS};
4040
use datafusion_expr::utils::format_state_name;
4141
use datafusion_expr::{
42-
Accumulator, AggregateUDFImpl, ColumnarValue, Documentation, Expr, Signature,
43-
TypeSignature, Volatility,
42+
Accumulator, AggregateUDFImpl, Documentation, Expr, Signature, TypeSignature,
43+
Volatility,
4444
};
4545
use datafusion_functions_aggregate_common::tdigest::{
4646
TDigest, TryIntoF64, DEFAULT_MAX_SIZE,
4747
};
4848
use datafusion_macros::user_doc;
4949
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
5050

51+
use crate::utils::{get_scalar_value, validate_percentile_expr};
52+
5153
create_func!(ApproxPercentileCont, approx_percentile_cont_udaf);
5254

5355
/// Computes the approximate percentile continuous of a set of numbers
@@ -164,7 +166,8 @@ impl ApproxPercentileCont {
164166
&self,
165167
args: AccumulatorArgs,
166168
) -> Result<ApproxPercentileAccumulator> {
167-
let percentile = validate_input_percentile_expr(&args.exprs[1])?;
169+
let percentile =
170+
validate_percentile_expr(&args.exprs[1], "APPROX_PERCENTILE_CONT")?;
168171

169172
let is_descending = args
170173
.order_bys
@@ -214,45 +217,15 @@ impl ApproxPercentileCont {
214217
}
215218
}
216219

217-
fn get_scalar_value(expr: &Arc<dyn PhysicalExpr>) -> Result<ScalarValue> {
218-
let empty_schema = Arc::new(Schema::empty());
219-
let batch = RecordBatch::new_empty(Arc::clone(&empty_schema));
220-
if let ColumnarValue::Scalar(s) = expr.evaluate(&batch)? {
221-
Ok(s)
222-
} else {
223-
internal_err!("Didn't expect ColumnarValue::Array")
224-
}
225-
}
226-
227-
fn validate_input_percentile_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<f64> {
228-
let percentile = match get_scalar_value(expr)
229-
.map_err(|_| not_impl_datafusion_err!("Percentile value for 'APPROX_PERCENTILE_CONT' must be a literal, got: {expr}"))? {
230-
ScalarValue::Float32(Some(value)) => {
231-
value as f64
232-
}
233-
ScalarValue::Float64(Some(value)) => {
234-
value
235-
}
236-
sv => {
237-
return not_impl_err!(
238-
"Percentile value for 'APPROX_PERCENTILE_CONT' must be Float32 or Float64 literal (got data type {})",
239-
sv.data_type()
240-
)
241-
}
242-
};
243-
244-
// Ensure the percentile is between 0 and 1.
245-
if !(0.0..=1.0).contains(&percentile) {
246-
return plan_err!(
247-
"Percentile value must be between 0.0 and 1.0 inclusive, {percentile} is invalid"
248-
);
249-
}
250-
Ok(percentile)
251-
}
252-
253220
fn validate_input_max_size_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<usize> {
254-
let max_size = match get_scalar_value(expr)
255-
.map_err(|_| not_impl_datafusion_err!("Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal, got: {expr}"))? {
221+
let scalar_value = get_scalar_value(expr).map_err(|_e| {
222+
DataFusionError::Plan(
223+
"Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be a literal"
224+
.to_string(),
225+
)
226+
})?;
227+
228+
let max_size = match scalar_value {
256229
ScalarValue::UInt8(Some(q)) => q as usize,
257230
ScalarValue::UInt16(Some(q)) => q as usize,
258231
ScalarValue::UInt32(Some(q)) => q as usize,
@@ -262,7 +235,7 @@ fn validate_input_max_size_expr(expr: &Arc<dyn PhysicalExpr>) -> Result<usize> {
262235
ScalarValue::Int16(Some(q)) if q > 0 => q as usize,
263236
ScalarValue::Int8(Some(q)) if q > 0 => q as usize,
264237
sv => {
265-
return not_impl_err!(
238+
return plan_err!(
266239
"Tdigest max_size value for 'APPROX_PERCENTILE_CONT' must be UInt > 0 literal (got data type {}).",
267240
sv.data_type()
268241
)

datafusion/functions-aggregate/src/lib.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,15 @@ pub mod hyperloglog;
8181
pub mod median;
8282
pub mod min_max;
8383
pub mod nth_value;
84+
pub mod percentile_cont;
8485
pub mod regr;
8586
pub mod stddev;
8687
pub mod string_agg;
8788
pub mod sum;
8889
pub mod variance;
8990

9091
pub mod planner;
92+
mod utils;
9193

9294
use crate::approx_percentile_cont::approx_percentile_cont_udaf;
9395
use crate::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight_udaf;
@@ -123,6 +125,7 @@ pub mod expr_fn {
123125
pub use super::min_max::max;
124126
pub use super::min_max::min;
125127
pub use super::nth_value::nth_value;
128+
pub use super::percentile_cont::percentile_cont;
126129
pub use super::regr::regr_avgx;
127130
pub use super::regr::regr_avgy;
128131
pub use super::regr::regr_count;
@@ -171,6 +174,7 @@ pub fn all_default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
171174
approx_distinct::approx_distinct_udaf(),
172175
approx_percentile_cont_udaf(),
173176
approx_percentile_cont_with_weight_udaf(),
177+
percentile_cont::percentile_cont_udaf(),
174178
string_agg::string_agg_udaf(),
175179
bit_and_or_xor::bit_and_udaf(),
176180
bit_and_or_xor::bit_or_udaf(),

0 commit comments

Comments
 (0)