Skip to content

Commit d316931

Browse files
Weijun-Halamb
authored andcommitted
fix: Use dynamic timezone in now() function for accurate timestamp (apache#18017)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#17993 ## Rationale for this change ``` DataFusion CLI v50.1.0 > SET TIME ZONE = '+08:00'; 0 row(s) fetched. Elapsed 0.011 seconds. > SELECT arrow_typeof(now()); +---------------------------------------+ | arrow_typeof(now()) | +---------------------------------------+ | Timestamp(Nanosecond, Some("+08:00")) | +---------------------------------------+ 1 row(s) fetched. Elapsed 0.015 seconds. > SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date; +--------+ | result | +--------+ | 1 | +--------+ 1 row(s) fetched. Elapsed 0.029 seconds. ``` <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? When the timezone changes, re-register `now()` function <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 5bd66a9 commit d316931

File tree

9 files changed

+148
-15
lines changed

9 files changed

+148
-15
lines changed

datafusion/core/src/execution/context/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,26 @@ impl SessionContext {
10811081
} else {
10821082
let mut state = self.state.write();
10831083
state.config_mut().options_mut().set(&variable, &value)?;
1084+
1085+
// Re-initialize any UDFs that depend on configuration
1086+
// This allows both built-in and custom functions to respond to configuration changes
1087+
let config_options = state.config().options();
1088+
1089+
// Collect updated UDFs in a separate vector
1090+
let udfs_to_update: Vec<_> = state
1091+
.scalar_functions()
1092+
.values()
1093+
.filter_map(|udf| {
1094+
udf.inner()
1095+
.with_updated_config(config_options)
1096+
.map(Arc::new)
1097+
})
1098+
.collect();
1099+
1100+
for udf in udfs_to_update {
1101+
state.register_udf(udf)?;
1102+
}
1103+
10841104
drop(state);
10851105
}
10861106

datafusion/core/src/execution/session_state.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1433,12 +1433,31 @@ impl SessionStateBuilder {
14331433
}
14341434

14351435
if let Some(scalar_functions) = scalar_functions {
1436-
scalar_functions.into_iter().for_each(|udf| {
1437-
let existing_udf = state.register_udf(udf);
1438-
if let Ok(Some(existing_udf)) = existing_udf {
1439-
debug!("Overwrote an existing UDF: {}", existing_udf.name());
1436+
for udf in scalar_functions {
1437+
let config_options = state.config().options();
1438+
match udf.inner().with_updated_config(config_options) {
1439+
Some(new_udf) => {
1440+
if let Err(err) = state.register_udf(Arc::new(new_udf)) {
1441+
debug!(
1442+
"Failed to re-register updated UDF '{}': {}",
1443+
udf.name(),
1444+
err
1445+
);
1446+
}
1447+
}
1448+
None => match state.register_udf(Arc::clone(&udf)) {
1449+
Ok(Some(existing)) => {
1450+
debug!("Overwrote existing UDF '{}'", existing.name());
1451+
}
1452+
Ok(None) => {
1453+
debug!("Registered UDF '{}'", udf.name());
1454+
}
1455+
Err(err) => {
1456+
debug!("Failed to register UDF '{}': {}", udf.name(), err);
1457+
}
1458+
},
14401459
}
1441-
});
1460+
}
14421461
}
14431462

14441463
if let Some(aggregate_functions) = aggregate_functions {

datafusion/core/tests/expr_api/simplification.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -514,8 +514,7 @@ fn multiple_now() -> Result<()> {
514514
// expect the same timestamp appears in both exprs
515515
let actual = get_optimized_plan_formatted(plan, &time);
516516
let expected = format!(
517-
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\
518-
\n TableScan: test",
517+
"Projection: TimestampNanosecond({}, Some(\"+00:00\")) AS now(), TimestampNanosecond({}, Some(\"+00:00\")) AS t2\n TableScan: test",
519518
time.timestamp_nanos_opt().unwrap(),
520519
time.timestamp_nanos_opt().unwrap()
521520
);

datafusion/core/tests/optimizer/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,9 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
144144
let statement = &ast[0];
145145

146146
// create a logical query plan
147+
let config = ConfigOptions::default();
147148
let context_provider = MyContextProvider::default()
148-
.with_udf(datetime::now())
149+
.with_udf(datetime::now(&config))
149150
.with_udf(datafusion_functions::core::arrow_cast())
150151
.with_udf(datafusion_functions::string::concat())
151152
.with_udf(datafusion_functions::string::concat_ws());

datafusion/expr/src/udf.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -546,6 +546,33 @@ pub trait ScalarUDFImpl: Debug + DynEq + DynHash + Send + Sync {
546546
/// [`DataFusionError::Internal`]: datafusion_common::DataFusionError::Internal
547547
fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
548548

549+
/// Create a new instance of this function with updated configuration.
550+
///
551+
/// This method is called when configuration options change at runtime
552+
/// (e.g., via `SET` statements) to allow functions that depend on
553+
/// configuration to update themselves accordingly.
554+
///
555+
/// Note the current [`ConfigOptions`] are also passed to [`Self::invoke_with_args`] so
556+
/// this API is not needed for functions where the values may
557+
/// depend on the current options.
558+
///
559+
/// This API is useful for functions where the return
560+
/// **type** depends on the configuration options, such as the `now()` function
561+
/// which depends on the current timezone.
562+
///
563+
/// # Arguments
564+
///
565+
/// * `config` - The updated configuration options
566+
///
567+
/// # Returns
568+
///
569+
/// * `Some(ScalarUDF)` - A new instance of this function configured with the new settings
570+
/// * `None` - If this function does not change with new configuration settings (the default)
571+
///
572+
fn with_updated_config(&self, _config: &ConfigOptions) -> Option<ScalarUDF> {
573+
None
574+
}
575+
549576
/// What type will be returned by this function, given the arguments?
550577
///
551578
/// By default, this function calls [`Self::return_type`] with the
@@ -879,6 +906,10 @@ impl ScalarUDFImpl for AliasedScalarUDFImpl {
879906
self.inner.invoke_with_args(args)
880907
}
881908

909+
fn with_updated_config(&self, _config: &ConfigOptions) -> Option<ScalarUDF> {
910+
None
911+
}
912+
882913
fn aliases(&self) -> &[String] {
883914
&self.aliases
884915
}

datafusion/functions/src/datetime/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ make_udf_function!(date_part::DatePartFunc, date_part);
4545
make_udf_function!(date_trunc::DateTruncFunc, date_trunc);
4646
make_udf_function!(make_date::MakeDateFunc, make_date);
4747
make_udf_function!(from_unixtime::FromUnixtimeFunc, from_unixtime);
48-
make_udf_function!(now::NowFunc, now);
4948
make_udf_function!(to_char::ToCharFunc, to_char);
5049
make_udf_function!(to_date::ToDateFunc, to_date);
5150
make_udf_function!(to_local_time::ToLocalTimeFunc, to_local_time);
@@ -56,6 +55,9 @@ make_udf_function!(to_timestamp::ToTimestampMillisFunc, to_timestamp_millis);
5655
make_udf_function!(to_timestamp::ToTimestampMicrosFunc, to_timestamp_micros);
5756
make_udf_function!(to_timestamp::ToTimestampNanosFunc, to_timestamp_nanos);
5857

58+
// create UDF with config
59+
make_udf_function_with_config!(now::NowFunc, now);
60+
5961
// we cannot currently use the export_functions macro since it doesn't handle
6062
// functions with varargs currently
6163

@@ -91,6 +93,7 @@ pub mod expr_fn {
9193
),(
9294
now,
9395
"returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement",
96+
@config
9497
),
9598
(
9699
to_local_time,
@@ -255,6 +258,7 @@ pub mod expr_fn {
255258

256259
/// Returns all DataFusion functions defined in this package
257260
pub fn functions() -> Vec<Arc<ScalarUDF>> {
261+
use datafusion_common::config::ConfigOptions;
258262
vec![
259263
current_date(),
260264
current_time(),
@@ -263,7 +267,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
263267
date_trunc(),
264268
from_unixtime(),
265269
make_date(),
266-
now(),
270+
now(&ConfigOptions::default()),
267271
to_char(),
268272
to_date(),
269273
to_local_time(),

datafusion/functions/src/datetime/now.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@ use arrow::datatypes::DataType::Timestamp;
1919
use arrow::datatypes::TimeUnit::Nanosecond;
2020
use arrow::datatypes::{DataType, Field, FieldRef};
2121
use std::any::Any;
22+
use std::sync::Arc;
2223

24+
use datafusion_common::config::ConfigOptions;
2325
use datafusion_common::{internal_err, Result, ScalarValue};
2426
use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyInfo};
2527
use datafusion_expr::{
26-
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDFImpl, Signature,
27-
Volatility,
28+
ColumnarValue, Documentation, Expr, ReturnFieldArgs, ScalarUDF, ScalarUDFImpl,
29+
Signature, Volatility,
2830
};
2931
use datafusion_macros::user_doc;
3032

@@ -41,19 +43,30 @@ The `now()` return value is determined at query time and will return the same ti
4143
pub struct NowFunc {
4244
signature: Signature,
4345
aliases: Vec<String>,
46+
timezone: Option<Arc<str>>,
4447
}
4548

4649
impl Default for NowFunc {
4750
fn default() -> Self {
48-
Self::new()
51+
Self::new_with_config(&ConfigOptions::default())
4952
}
5053
}
5154

5255
impl NowFunc {
56+
#[deprecated(since = "50.2.0", note = "use `new_with_config` instead")]
5357
pub fn new() -> Self {
5458
Self {
5559
signature: Signature::nullary(Volatility::Stable),
5660
aliases: vec!["current_timestamp".to_string()],
61+
timezone: Some(Arc::from("+00")),
62+
}
63+
}
64+
65+
pub fn new_with_config(config: &ConfigOptions) -> Self {
66+
Self {
67+
signature: Signature::nullary(Volatility::Stable),
68+
aliases: vec!["current_timestamp".to_string()],
69+
timezone: Some(Arc::from(config.execution.time_zone.as_str())),
5770
}
5871
}
5972
}
@@ -77,10 +90,14 @@ impl ScalarUDFImpl for NowFunc {
7790
&self.signature
7891
}
7992

93+
fn with_updated_config(&self, config: &ConfigOptions) -> Option<ScalarUDF> {
94+
Some(Self::new_with_config(config).into())
95+
}
96+
8097
fn return_field_from_args(&self, _args: ReturnFieldArgs) -> Result<FieldRef> {
8198
Ok(Field::new(
8299
self.name(),
83-
Timestamp(Nanosecond, Some("+00:00".into())),
100+
Timestamp(Nanosecond, self.timezone.clone()),
84101
false,
85102
)
86103
.into())
@@ -106,8 +123,9 @@ impl ScalarUDFImpl for NowFunc {
106123
.execution_props()
107124
.query_execution_start_time
108125
.timestamp_nanos_opt();
126+
109127
Ok(ExprSimplifyResult::Simplified(Expr::Literal(
110-
ScalarValue::TimestampNanosecond(now_ts, Some("+00:00".into())),
128+
ScalarValue::TimestampNanosecond(now_ts, self.timezone.clone()),
111129
None,
112130
)))
113131
}

datafusion/functions/src/macros.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
/// Exported functions accept:
4141
/// - `Vec<Expr>` argument (single argument followed by a comma)
4242
/// - Variable number of `Expr` arguments (zero or more arguments, must be without commas)
43+
/// - Functions that require config (marked with `@config` prefix)
4344
#[macro_export]
4445
macro_rules! export_functions {
4546
($(($FUNC:ident, $DOC:expr, $($arg:tt)*)),*) => {
@@ -49,6 +50,15 @@ macro_rules! export_functions {
4950
)*
5051
};
5152

53+
// function that requires config (marked with @config)
54+
(single $FUNC:ident, $DOC:expr, @config) => {
55+
#[doc = $DOC]
56+
pub fn $FUNC() -> datafusion_expr::Expr {
57+
use datafusion_common::config::ConfigOptions;
58+
super::$FUNC(&ConfigOptions::default()).call(vec![])
59+
}
60+
};
61+
5262
// single vector argument (a single argument followed by a comma)
5363
(single $FUNC:ident, $DOC:expr, $arg:ident,) => {
5464
#[doc = $DOC]
@@ -89,6 +99,22 @@ macro_rules! make_udf_function {
8999
};
90100
}
91101

102+
/// Creates a singleton `ScalarUDF` of the `$UDF` function and a function
103+
/// named `$NAME` which returns that singleton. The function takes a
104+
/// configuration argument of type `$CONFIG_TYPE` to create the UDF.
105+
#[macro_export]
106+
macro_rules! make_udf_function_with_config {
107+
($UDF:ty, $NAME:ident) => {
108+
#[allow(rustdoc::redundant_explicit_links)]
109+
#[doc = concat!("Return a [`ScalarUDF`](datafusion_expr::ScalarUDF) implementation of ", stringify!($NAME))]
110+
pub fn $NAME(config: &datafusion_common::config::ConfigOptions) -> std::sync::Arc<datafusion_expr::ScalarUDF> {
111+
std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl(
112+
<$UDF>::new_with_config(&config),
113+
))
114+
}
115+
};
116+
}
117+
92118
/// Macro creates a sub module if the feature is not enabled
93119
///
94120
/// The rationale for providing stub functions is to help users to configure datafusion

datafusion/sqllogictest/test_files/timestamps.slt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,21 @@ true
7373
##########
7474
## Current time Tests
7575
##########
76+
statement ok
77+
SET TIME ZONE = '+08'
78+
79+
query T
80+
select arrow_typeof(now());
81+
----
82+
Timestamp(Nanosecond, Some("+08"))
83+
84+
query I
85+
SELECT count(1) result FROM (SELECT now() as n) a WHERE n > '2000-01-01'::date;
86+
----
87+
1
88+
89+
statement ok
90+
SET TIME ZONE = '+00'
7691

7792
query B
7893
select cast(now() as time) = current_time();

0 commit comments

Comments
 (0)