Skip to content

Commit c7119e6

Browse files
committed
Updates
1 parent c47a435 commit c7119e6

File tree

7 files changed

+61
-25
lines changed

7 files changed

+61
-25
lines changed

datafusion-examples/examples/simple_udwf.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
use std::sync::Arc;
1919

20+
use arrow::{
21+
array::{AsArray, Float64Array},
22+
datatypes::Float64Type,
23+
};
2024
use arrow_schema::DataType;
2125
use datafusion::datasource::file_format::options::CsvReadOptions;
2226

@@ -113,26 +117,19 @@ fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
113117
Ok(Box::new(MyPartitionEvaluator::new()))
114118
}
115119

116-
117-
118120
/// This implements the lowest level evaluation for a window function
119121
///
120122
/// It handles calculating the value of the window function for each
121123
/// distinct values of `PARTITION BY` (each car type in our example)
122124
#[derive(Clone, Debug)]
123-
struct MyPartitionEvaluator {
124-
}
125+
struct MyPartitionEvaluator {}
125126

126127
impl MyPartitionEvaluator {
127-
fn new() -> Self
128-
{
128+
fn new() -> Self {
129129
Self {}
130130
}
131131
}
132132

133-
134-
135-
136133
/// These different evaluation methods are called depending on the various settings of WindowUDF
137134
impl PartitionEvaluator for MyPartitionEvaluator {
138135
fn get_range(&self, _idx: usize, _n_rows: usize) -> Result<std::ops::Range<usize>> {
@@ -142,14 +139,48 @@ impl PartitionEvaluator for MyPartitionEvaluator {
142139
}
143140

144141
/// This function is given the values of each partition
145-
fn evaluate(&self, values: &[arrow::array::ArrayRef], num_rows: usize) -> Result<arrow::array::ArrayRef> {
146-
println!("processing num_rows={num_rows}, values:\n{values:#?}");
147-
Err(DataFusionError::NotImplemented(
148-
"evaluate is not implemented by default".into(),
149-
))
142+
fn evaluate(
143+
&self,
144+
values: &[arrow::array::ArrayRef],
145+
_num_rows: usize,
146+
) -> Result<arrow::array::ArrayRef> {
147+
// datafusion has handled ensuring we get the correct input argument
148+
assert_eq!(values.len(), 1);
149+
150+
// For this example, we convert convert the input argument to an
151+
// array of floating point numbers to calculate a moving average
152+
let arr: &Float64Array = values[0].as_ref().as_primitive::<Float64Type>();
153+
154+
// implement a simple moving average by averaging the current
155+
// value with the previous value
156+
//
157+
// value | avg
158+
// ------+------
159+
// 10 | 10
160+
// 20 | 15
161+
// 30 | 25
162+
// 30 | 30
163+
//
164+
let mut previous_value = None;
165+
let new_values: Float64Array = arr
166+
.values()
167+
.iter()
168+
.map(|&value| {
169+
let new_value = previous_value
170+
.map(|previous_value| (value + previous_value) / 2.0)
171+
.unwrap_or(value);
172+
previous_value = Some(value);
173+
new_value
174+
})
175+
.collect();
176+
177+
Ok(Arc::new(new_values))
150178
}
151179

152-
fn evaluate_stateful(&mut self, _values: &[arrow::array::ArrayRef]) -> Result<datafusion_common::ScalarValue> {
180+
fn evaluate_stateful(
181+
&mut self,
182+
_values: &[arrow::array::ArrayRef],
183+
) -> Result<datafusion_common::ScalarValue> {
153184
Err(DataFusionError::NotImplemented(
154185
"evaluate_stateful is not implemented by default".into(),
155186
))
@@ -176,5 +207,4 @@ impl PartitionEvaluator for MyPartitionEvaluator {
176207
}
177208
}
178209

179-
180210
// TODO show how to use other evaluate methods

datafusion/core/src/execution/context.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,6 @@ impl SessionContext {
815815
.insert(f.name.clone(), Arc::new(f));
816816
}
817817

818-
819818
/// Creates a [`DataFrame`] for reading a data source.
820819
///
821820
/// For more control such as reading multiple files, you can use
@@ -2042,7 +2041,6 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
20422041
fn options(&self) -> &ConfigOptions {
20432042
self.state.config_options()
20442043
}
2045-
20462044
}
20472045

20482046
impl FunctionRegistry for SessionState {
@@ -2079,7 +2077,6 @@ impl FunctionRegistry for SessionState {
20792077
))
20802078
})
20812079
}
2082-
20832080
}
20842081

20852082
impl OptimizerConfig for SessionState {

datafusion/core/src/physical_plan/windows/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
245245
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
246246
(self.fun.partition_evaluator)()
247247
}
248+
249+
fn name(&self) -> &str {
250+
&self.name
251+
}
248252
}
249253

250254
pub(crate) fn calc_requirements<

datafusion/expr/src/udwf.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
//! Support for user-defined window (UDWF) window functions
1919
20-
use std::fmt::{self, Debug, Formatter};
20+
use std::fmt::{self, Debug, Display, Formatter};
2121

2222
use crate::{function::PartitionEvaluatorFunctionFactory, ReturnTypeFunction, Signature};
2323

@@ -41,6 +41,13 @@ impl Debug for WindowUDF {
4141
}
4242
}
4343

44+
/// Defines how the WindowUDF is shown to users
45+
impl Display for WindowUDF {
46+
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
47+
write!(f, "{}", self.name)
48+
}
49+
}
50+
4451
impl PartialEq for WindowUDF {
4552
fn eq(&self, other: &Self) -> bool {
4653
self.name == other.name && self.signature == other.signature

datafusion/expr/src/window_function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ impl fmt::Display for WindowFunction {
7474
WindowFunction::AggregateFunction(fun) => fun.fmt(f),
7575
WindowFunction::BuiltInWindowFunction(fun) => fun.fmt(f),
7676
WindowFunction::AggregateUDF(fun) => std::fmt::Debug::fmt(fun, f),
77-
WindowFunction::WindowUDF(fun) => std::fmt::Debug::fmt(fun, f),
77+
WindowFunction::WindowUDF(fun) => fun.fmt(f),
7878
}
7979
}
8080
}

datafusion/physical-expr/src/window/built_in_window_function_expr.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
4646

4747
/// Human readable name such as `"MIN(c2)"` or `"RANK()"`. The default
4848
/// implementation returns placeholder text.
49-
fn name(&self) -> &str {
50-
"BuiltInWindowFunctionExpr: default name"
51-
}
49+
fn name(&self) -> &str;
5250

5351
/// Evaluate window function's arguments against the input window
5452
/// batch and return an [`ArrayRef`].

datafusion/sql/src/expr/function.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
189189
.get_aggregate_meta(name)
190190
.map(WindowFunction::AggregateUDF)
191191
})
192-
// next check user defined window functions
192+
// next check user defined window functions
193193
.or_else(|| {
194194
self.schema_provider
195195
.get_window_meta(name)

0 commit comments

Comments
 (0)