Skip to content
Merged
201 changes: 169 additions & 32 deletions datafusion/functions/src/datetime/to_timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use std::any::Any;
use std::sync::Arc;

use crate::datetime::common::*;
use arrow::array::Float64Array;
use arrow::array::timezone::Tz;
use arrow::array::{
Array, Decimal128Array, Decimal256Array, Float64Array, TimestampNanosecondArray,
};
use arrow::datatypes::DataType::*;
use arrow::datatypes::TimeUnit::{Microsecond, Millisecond, Nanosecond, Second};
use arrow::datatypes::{
Expand Down Expand Up @@ -325,6 +327,75 @@ impl_to_timestamp_constructors!(ToTimestampMillisFunc);
impl_to_timestamp_constructors!(ToTimestampMicrosFunc);
impl_to_timestamp_constructors!(ToTimestampNanosFunc);

fn decimal_to_nanoseconds(value: i128, scale: i8) -> i64 {
let nanos_exponent = 9_i16 - scale as i16;
let timestamp_nanos = if nanos_exponent >= 0 {
value * 10_i128.pow(nanos_exponent as u32)
} else {
value / 10_i128.pow(nanos_exponent.unsigned_abs() as u32)
};
timestamp_nanos as i64
}

fn decimal128_to_timestamp_nanos(
arg: &ColumnarValue,
tz: Option<Arc<str>>,
) -> Result<ColumnarValue> {
match arg {
ColumnarValue::Scalar(ScalarValue::Decimal128(Some(value), _, scale)) => {
let timestamp_nanos = decimal_to_nanoseconds(*value, *scale);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(timestamp_nanos),
tz,
)))
}
ColumnarValue::Scalar(ScalarValue::Decimal128(None, _, _)) => Ok(
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, tz)),
),
ColumnarValue::Array(arr) => {
let decimal_arr = downcast_arg!(arr, Decimal128Array);
let scale = decimal_arr.scale();
let result: TimestampNanosecondArray = decimal_arr
.iter()
.map(|v| v.map(|val| decimal_to_nanoseconds(val, scale)))
.collect();
let result = result.with_timezone_opt(tz);
Ok(ColumnarValue::Array(Arc::new(result)))
}
_ => exec_err!("Invalid Decimal128 value for to_timestamp"),
}
}

fn decimal256_to_timestamp_nanos(
arg: &ColumnarValue,
tz: Option<Arc<str>>,
) -> Result<ColumnarValue> {
match arg {
ColumnarValue::Scalar(ScalarValue::Decimal256(Some(value), _, scale)) => {
let value_i128 = value.as_i128();
let timestamp_nanos = decimal_to_nanoseconds(value_i128, *scale);
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(timestamp_nanos),
tz,
)))
}
ColumnarValue::Scalar(ScalarValue::Decimal256(None, _, _)) => Ok(
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None, tz)),
),
ColumnarValue::Array(arr) => {
let decimal_arr = downcast_arg!(arr, Decimal256Array);
let scale = decimal_arr.scale();
let result: TimestampNanosecondArray = decimal_arr
.iter()
.map(|v| v.map(|val| decimal_to_nanoseconds(val.as_i128(), scale)))
.collect();
let result = result.with_timezone_opt(tz);
Ok(ColumnarValue::Array(Arc::new(result)))
}
_ => exec_err!("Invalid Decimal256 value for to_timestamp"),
}
}

/// to_timestamp SQL function
///
/// Note: `to_timestamp` returns `Timestamp(Nanosecond)` though its arguments are interpreted as **seconds**.
Expand Down Expand Up @@ -380,13 +451,14 @@ impl ScalarUDFImpl for ToTimestampFunc {
let tz = self.timezone.clone();

match args[0].data_type() {
Int32 | Int64 => args[0]
Int8 | Int16 | Int32 | Int64 | UInt8 | UInt16 | UInt32 | UInt64 => args[0]
.cast_to(&Timestamp(Second, None), None)?
.cast_to(&Timestamp(Nanosecond, tz), None),
Null | Timestamp(_, _) => args[0].cast_to(&Timestamp(Nanosecond, tz), None),
Float64 => {
Float16 | Float32 | Float64 => {
let arg = args[0].cast_to(&Float64, None)?;
let rescaled = arrow::compute::kernels::numeric::mul(
&args[0].to_array(1)?,
&arg.to_array(1)?,
&arrow::array::Scalar::new(Float64Array::from(vec![
1_000_000_000f64,
])),
Expand All @@ -397,31 +469,15 @@ impl ScalarUDFImpl for ToTimestampFunc {
&DEFAULT_CAST_OPTIONS,
)?))
}
Decimal32(_, _) | Decimal64(_, _) => {
let arg = args[0].cast_to(&Decimal128(38, 9), None)?;
decimal128_to_timestamp_nanos(&arg, tz)
}
Decimal128(_, _) => decimal128_to_timestamp_nanos(&args[0], tz),
Decimal256(_, _) => decimal256_to_timestamp_nanos(&args[0], tz),
Utf8View | LargeUtf8 | Utf8 => {
to_timestamp_impl::<TimestampNanosecondType>(&args, "to_timestamp", &tz)
}
Decimal128(_, _) => {
match &args[0] {
ColumnarValue::Scalar(ScalarValue::Decimal128(
Some(value),
_,
scale,
)) => {
// Convert decimal to seconds and nanoseconds
let scale_factor = 10_i128.pow(*scale as u32);
let seconds = value / scale_factor;
let fraction = value % scale_factor;
let nanos = (fraction * 1_000_000_000) / scale_factor;
let timestamp_nanos = seconds * 1_000_000_000 + nanos;

Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
Some(timestamp_nanos as i64),
tz,
)))
}
_ => exec_err!("Invalid decimal value"),
}
}
other => {
exec_err!("Unsupported data type {other} for function to_timestamp")
}
Expand Down Expand Up @@ -473,9 +529,23 @@ impl ScalarUDFImpl for ToTimestampSecondsFunc {
let tz = self.timezone.clone();

match args[0].data_type() {
Null | Int32 | Int64 | Timestamp(_, _) | Decimal128(_, _) => {
args[0].cast_to(&Timestamp(Second, tz), None)
}
Null
| Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Timestamp(_, _)
| Decimal32(_, _)
| Decimal64(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => args[0].cast_to(&Timestamp(Second, tz), None),
Float16 | Float32 | Float64 => args[0]
.cast_to(&Int64, None)?
.cast_to(&Timestamp(Second, tz), None),
Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampSecondType>(
&args,
"to_timestamp_seconds",
Expand Down Expand Up @@ -533,9 +603,25 @@ impl ScalarUDFImpl for ToTimestampMillisFunc {
}

match args[0].data_type() {
Null | Int32 | Int64 | Timestamp(_, _) => {
Null
| Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Timestamp(_, _)
| Decimal32(_, _)
| Decimal64(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => {
args[0].cast_to(&Timestamp(Millisecond, self.timezone.clone()), None)
}
Float16 | Float32 | Float64 => args[0]
.cast_to(&Int64, None)?
.cast_to(&Timestamp(Millisecond, self.timezone.clone()), None),
Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMillisecondType>(
&args,
"to_timestamp_millis",
Expand Down Expand Up @@ -593,9 +679,25 @@ impl ScalarUDFImpl for ToTimestampMicrosFunc {
}

match args[0].data_type() {
Null | Int32 | Int64 | Timestamp(_, _) => {
Null
| Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Timestamp(_, _)
| Decimal32(_, _)
| Decimal64(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => {
args[0].cast_to(&Timestamp(Microsecond, self.timezone.clone()), None)
}
Float16 | Float32 | Float64 => args[0]
.cast_to(&Int64, None)?
.cast_to(&Timestamp(Microsecond, self.timezone.clone()), None),
Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampMicrosecondType>(
&args,
"to_timestamp_micros",
Expand Down Expand Up @@ -653,9 +755,25 @@ impl ScalarUDFImpl for ToTimestampNanosFunc {
}

match args[0].data_type() {
Null | Int32 | Int64 | Timestamp(_, _) => {
Null
| Int8
| Int16
| Int32
| Int64
| UInt8
| UInt16
| UInt32
| UInt64
| Timestamp(_, _)
| Decimal32(_, _)
| Decimal64(_, _)
| Decimal128(_, _)
| Decimal256(_, _) => {
args[0].cast_to(&Timestamp(Nanosecond, self.timezone.clone()), None)
}
Float16 | Float32 | Float64 => args[0]
.cast_to(&Int64, None)?
.cast_to(&Timestamp(Nanosecond, self.timezone.clone()), None),
Utf8View | LargeUtf8 | Utf8 => to_timestamp_impl::<TimestampNanosecondType>(
&args,
"to_timestamp_nanos",
Expand Down Expand Up @@ -1735,4 +1853,23 @@ mod tests {
assert_contains!(actual, expected);
}
}

#[test]
fn test_decimal_to_nanoseconds_negative_scale() {
// scale -2: internal value 5 represents 5 * 10^2 = 500 seconds
let nanos = decimal_to_nanoseconds(5, -2);
assert_eq!(nanos, 500_000_000_000); // 500 seconds in nanoseconds

// scale -1: internal value 10 represents 10 * 10^1 = 100 seconds
let nanos = decimal_to_nanoseconds(10, -1);
assert_eq!(nanos, 100_000_000_000);

// scale 0: internal value 5 represents 5 seconds
let nanos = decimal_to_nanoseconds(5, 0);
assert_eq!(nanos, 5_000_000_000);

// scale 3: internal value 1500 represents 1.5 seconds
let nanos = decimal_to_nanoseconds(1500, 3);
assert_eq!(nanos, 1_500_000_000);
}
}
Loading