Skip to content

Commit 63ba5b6

Browse files
authored
Consolidate Example: simplify_udwf_expression.rs into advanced_udwf.rs (#13883)
1 parent 405b99c commit 63ba5b6

File tree

3 files changed

+90
-139
lines changed

3 files changed

+90
-139
lines changed

datafusion-examples/examples/advanced_udwf.rs

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ use arrow::{
2424
};
2525
use arrow_schema::Field;
2626
use datafusion::error::Result;
27+
use datafusion::functions_aggregate::average::avg_udaf;
2728
use datafusion::prelude::*;
2829
use datafusion_common::ScalarValue;
29-
use datafusion_expr::function::WindowUDFFieldArgs;
30+
use datafusion_expr::expr::WindowFunction;
31+
use datafusion_expr::function::{WindowFunctionSimplification, WindowUDFFieldArgs};
32+
use datafusion_expr::simplify::SimplifyInfo;
3033
use datafusion_expr::{
31-
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
34+
Expr, PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
3235
};
3336
use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
3437

@@ -142,6 +145,67 @@ impl PartitionEvaluator for MyPartitionEvaluator {
142145
}
143146
}
144147

148+
/// This UDWF will show how to use the WindowUDFImpl::simplify() API
149+
#[derive(Debug, Clone)]
150+
struct SimplifySmoothItUdf {
151+
signature: Signature,
152+
}
153+
154+
impl SimplifySmoothItUdf {
155+
fn new() -> Self {
156+
Self {
157+
signature: Signature::exact(
158+
// this function will always take one arguments of type f64
159+
vec![DataType::Float64],
160+
// this function is deterministic and will always return the same
161+
// result for the same input
162+
Volatility::Immutable,
163+
),
164+
}
165+
}
166+
}
167+
impl WindowUDFImpl for SimplifySmoothItUdf {
168+
fn as_any(&self) -> &dyn Any {
169+
self
170+
}
171+
172+
fn name(&self) -> &str {
173+
"simplify_smooth_it"
174+
}
175+
176+
fn signature(&self) -> &Signature {
177+
&self.signature
178+
}
179+
180+
fn partition_evaluator(
181+
&self,
182+
_partition_evaluator_args: PartitionEvaluatorArgs,
183+
) -> Result<Box<dyn PartitionEvaluator>> {
184+
todo!()
185+
}
186+
187+
/// this function will simplify `SimplifySmoothItUdf` to `AggregateUDF` for `Avg`
188+
/// default implementation will not be called (left as `todo!()`)
189+
fn simplify(&self) -> Option<WindowFunctionSimplification> {
190+
let simplify = |window_function: WindowFunction, _: &dyn SimplifyInfo| {
191+
Ok(Expr::WindowFunction(WindowFunction {
192+
fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()),
193+
args: window_function.args,
194+
partition_by: window_function.partition_by,
195+
order_by: window_function.order_by,
196+
window_frame: window_function.window_frame,
197+
null_treatment: window_function.null_treatment,
198+
}))
199+
};
200+
201+
Some(Box::new(simplify))
202+
}
203+
204+
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
205+
Ok(Field::new(field_args.name(), DataType::Float64, true))
206+
}
207+
}
208+
145209
// create local execution context with `cars.csv` registered as a table named `cars`
146210
async fn create_context() -> Result<SessionContext> {
147211
// declare a new context. In spark API, this corresponds to a new spark SQL session
@@ -162,12 +226,15 @@ async fn main() -> Result<()> {
162226
let smooth_it = WindowUDF::from(SmoothItUdf::new());
163227
ctx.register_udwf(smooth_it.clone());
164228

165-
// Use SQL to run the new window function
229+
let simplify_smooth_it = WindowUDF::from(SimplifySmoothItUdf::new());
230+
ctx.register_udwf(simplify_smooth_it.clone());
231+
232+
// Use SQL to retrieve entire table
166233
let df = ctx.sql("SELECT * from cars").await?;
167234
// print the results
168235
df.show().await?;
169236

170-
// Use SQL to run the new window function:
237+
// Use SQL to run smooth_it:
171238
//
172239
// `PARTITION BY car`:each distinct value of car (red, and green)
173240
// should be treated as a separate partition (and will result in
@@ -201,7 +268,7 @@ async fn main() -> Result<()> {
201268
// print the results
202269
df.show().await?;
203270

204-
// this time, call the new widow function with an explicit
271+
// this time, call the function with an explicit
205272
// window so evaluate will be invoked with each window.
206273
//
207274
// `ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING`: each invocation
@@ -232,5 +299,22 @@ async fn main() -> Result<()> {
232299
// print the results
233300
df.show().await?;
234301

302+
// Use SQL to run simplify_smooth_it
303+
let df = ctx
304+
.sql(
305+
"SELECT \
306+
car, \
307+
speed, \
308+
simplify_smooth_it(speed) OVER (PARTITION BY car ORDER BY time) AS smooth_speed,\
309+
time \
310+
from cars \
311+
ORDER BY \
312+
car",
313+
)
314+
.await?;
315+
316+
// print the results
317+
df.show().await?;
318+
235319
Ok(())
236320
}

datafusion-examples/examples/simplify_udwf_expression.rs

Lines changed: 0 additions & 133 deletions
This file was deleted.

datafusion/expr/src/udwf.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
344344
/// optimizations manually for specific UDFs.
345345
///
346346
/// Example:
347-
/// [`simplify_udwf_expression.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/simplify_udwf_expression.rs>
347+
/// [`advanced_udwf.rs`]: <https://github.com/apache/arrow-datafusion/blob/main/datafusion-examples/examples/advanced_udwf.rs>
348348
///
349349
/// # Returns
350350
/// [None] if simplify is not defined or,

0 commit comments

Comments
 (0)