Skip to content

Commit 660a3ac

Browse files
authored
Support different TimeUnits and timezones when reading Timestamps from INT96 (#7285)
* Support different Timestamp TimeUnit resolutions for INT96. * Use i64 for subtracting JULIAN_DAY_OF_EPOCH. * docs. * Add deprecation comment. * Fix typo. * Add timezone test. * Fix clippy. * Try to fix Clippy again.
1 parent 8dad535 commit 660a3ac

File tree

5 files changed

+214
-42
lines changed

5 files changed

+214
-42
lines changed

parquet/src/arrow/array_reader/primitive_array.rs

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ use crate::column::page::PageIterator;
2323
use crate::data_type::{DataType, Int96};
2424
use crate::errors::{ParquetError, Result};
2525
use crate::schema::types::ColumnDescPtr;
26-
use arrow_array::Decimal256Array;
2726
use arrow_array::{
28-
builder::TimestampNanosecondBufferBuilder, ArrayRef, BooleanArray, Decimal128Array,
29-
Float32Array, Float64Array, Int32Array, Int64Array, TimestampNanosecondArray, UInt32Array,
30-
UInt64Array,
27+
builder::{
28+
TimestampMicrosecondBufferBuilder, TimestampMillisecondBufferBuilder,
29+
TimestampNanosecondBufferBuilder, TimestampSecondBufferBuilder,
30+
},
31+
ArrayRef, BooleanArray, Decimal128Array, Decimal256Array, Float32Array, Float64Array,
32+
Int32Array, Int64Array, TimestampMicrosecondArray, TimestampMillisecondArray,
33+
TimestampNanosecondArray, TimestampSecondArray, UInt32Array, UInt64Array,
3134
};
3235
use arrow_buffer::{i256, BooleanBuffer, Buffer};
3336
use arrow_data::ArrayDataBuilder;
@@ -37,13 +40,13 @@ use std::sync::Arc;
3740

3841
/// Provides conversion from `Vec<T>` to `Buffer`
3942
pub trait IntoBuffer {
40-
fn into_buffer(self) -> Buffer;
43+
fn into_buffer(self, target_type: &ArrowType) -> Buffer;
4144
}
4245

4346
macro_rules! native_buffer {
4447
($($t:ty),*) => {
4548
$(impl IntoBuffer for Vec<$t> {
46-
fn into_buffer(self) -> Buffer {
49+
fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
4750
Buffer::from_vec(self)
4851
}
4952
})*
@@ -52,18 +55,44 @@ macro_rules! native_buffer {
5255
native_buffer!(i8, i16, i32, i64, u8, u16, u32, u64, f32, f64);
5356

5457
impl IntoBuffer for Vec<bool> {
55-
fn into_buffer(self) -> Buffer {
58+
fn into_buffer(self, _target_type: &ArrowType) -> Buffer {
5659
BooleanBuffer::from_iter(self).into_inner()
5760
}
5861
}
5962

6063
impl IntoBuffer for Vec<Int96> {
61-
fn into_buffer(self) -> Buffer {
62-
let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
63-
for v in self {
64-
builder.append(v.to_nanos())
64+
fn into_buffer(self, target_type: &ArrowType) -> Buffer {
65+
match target_type {
66+
ArrowType::Timestamp(TimeUnit::Second, _) => {
67+
let mut builder = TimestampSecondBufferBuilder::new(self.len());
68+
for v in self {
69+
builder.append(v.to_seconds())
70+
}
71+
builder.finish()
72+
}
73+
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
74+
let mut builder = TimestampMillisecondBufferBuilder::new(self.len());
75+
for v in self {
76+
builder.append(v.to_millis())
77+
}
78+
builder.finish()
79+
}
80+
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
81+
let mut builder = TimestampMicrosecondBufferBuilder::new(self.len());
82+
for v in self {
83+
builder.append(v.to_micros())
84+
}
85+
builder.finish()
86+
}
87+
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
88+
let mut builder = TimestampNanosecondBufferBuilder::new(self.len());
89+
for v in self {
90+
builder.append(v.to_nanos())
91+
}
92+
builder.finish()
93+
}
94+
_ => unreachable!("Invalid target_type for Int96."),
6595
}
66-
builder.finish()
6796
}
6897
}
6998

@@ -161,8 +190,11 @@ where
161190
PhysicalType::FLOAT => ArrowType::Float32,
162191
PhysicalType::DOUBLE => ArrowType::Float64,
163192
PhysicalType::INT96 => match target_type {
193+
ArrowType::Timestamp(TimeUnit::Second, _) => target_type.clone(),
194+
ArrowType::Timestamp(TimeUnit::Millisecond, _) => target_type.clone(),
195+
ArrowType::Timestamp(TimeUnit::Microsecond, _) => target_type.clone(),
164196
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => target_type.clone(),
165-
_ => unreachable!("INT96 must be timestamp nanosecond"),
197+
_ => unreachable!("INT96 must be a timestamp."),
166198
},
167199
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
168200
unreachable!("PrimitiveArrayReaders don't support complex physical types");
@@ -172,7 +204,10 @@ where
172204
// Convert to arrays by using the Parquet physical type.
173205
// The physical types are then cast to Arrow types if necessary
174206

175-
let record_data = self.record_reader.consume_record_data().into_buffer();
207+
let record_data = self
208+
.record_reader
209+
.consume_record_data()
210+
.into_buffer(target_type);
176211

177212
let array_data = ArrayDataBuilder::new(arrow_data_type)
178213
.len(self.record_reader.num_values())
@@ -194,7 +229,22 @@ where
194229
},
195230
PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)),
196231
PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)),
197-
PhysicalType::INT96 => Arc::new(TimestampNanosecondArray::from(array_data)),
232+
PhysicalType::INT96 => match target_type {
233+
ArrowType::Timestamp(TimeUnit::Second, _) => {
234+
Arc::new(TimestampSecondArray::from(array_data))
235+
}
236+
ArrowType::Timestamp(TimeUnit::Millisecond, _) => {
237+
Arc::new(TimestampMillisecondArray::from(array_data))
238+
}
239+
ArrowType::Timestamp(TimeUnit::Microsecond, _) => {
240+
Arc::new(TimestampMicrosecondArray::from(array_data))
241+
}
242+
ArrowType::Timestamp(TimeUnit::Nanosecond, _) => {
243+
Arc::new(TimestampNanosecondArray::from(array_data))
244+
}
245+
_ => unreachable!("INT96 must be a timestamp."),
246+
},
247+
198248
PhysicalType::BYTE_ARRAY | PhysicalType::FIXED_LEN_BYTE_ARRAY => {
199249
unreachable!("PrimitiveArrayReaders don't support complex physical types");
200250
}

parquet/src/arrow/arrow_reader/mod.rs

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,7 +1015,7 @@ mod tests {
10151015
use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
10161016
use crate::data_type::{
10171017
BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1018-
FloatType, Int32Type, Int64Type, Int96Type,
1018+
FloatType, Int32Type, Int64Type, Int96, Int96Type,
10191019
};
10201020
use crate::errors::Result;
10211021
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
@@ -1517,17 +1517,76 @@ mod tests {
15171517
#[test]
15181518
fn test_int96_single_column_reader_test() {
15191519
let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1520-
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1521-
2,
1522-
ConvertedType::NONE,
1523-
None,
1524-
|vals| {
1520+
1521+
type TypeHintAndConversionFunction =
1522+
(Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1523+
1524+
let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1525+
// Test without a specified ArrowType hint.
1526+
(None, |vals: &[Option<Int96>]| {
15251527
Arc::new(TimestampNanosecondArray::from_iter(
15261528
vals.iter().map(|x| x.map(|x| x.to_nanos())),
1527-
)) as _
1528-
},
1529-
encodings,
1530-
);
1529+
)) as ArrayRef
1530+
}),
1531+
// Test other TimeUnits as ArrowType hints.
1532+
(
1533+
Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1534+
|vals: &[Option<Int96>]| {
1535+
Arc::new(TimestampSecondArray::from_iter(
1536+
vals.iter().map(|x| x.map(|x| x.to_seconds())),
1537+
)) as ArrayRef
1538+
},
1539+
),
1540+
(
1541+
Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1542+
|vals: &[Option<Int96>]| {
1543+
Arc::new(TimestampMillisecondArray::from_iter(
1544+
vals.iter().map(|x| x.map(|x| x.to_millis())),
1545+
)) as ArrayRef
1546+
},
1547+
),
1548+
(
1549+
Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1550+
|vals: &[Option<Int96>]| {
1551+
Arc::new(TimestampMicrosecondArray::from_iter(
1552+
vals.iter().map(|x| x.map(|x| x.to_micros())),
1553+
)) as ArrayRef
1554+
},
1555+
),
1556+
(
1557+
Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1558+
|vals: &[Option<Int96>]| {
1559+
Arc::new(TimestampNanosecondArray::from_iter(
1560+
vals.iter().map(|x| x.map(|x| x.to_nanos())),
1561+
)) as ArrayRef
1562+
},
1563+
),
1564+
// Test another timezone with TimeUnit as ArrowType hints.
1565+
(
1566+
Some(ArrowDataType::Timestamp(
1567+
TimeUnit::Second,
1568+
Some(Arc::from("-05:00")),
1569+
)),
1570+
|vals: &[Option<Int96>]| {
1571+
Arc::new(
1572+
TimestampSecondArray::from_iter(
1573+
vals.iter().map(|x| x.map(|x| x.to_seconds())),
1574+
)
1575+
.with_timezone("-05:00"),
1576+
) as ArrayRef
1577+
},
1578+
),
1579+
];
1580+
1581+
resolutions.iter().for_each(|(arrow_type, converter)| {
1582+
run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1583+
2,
1584+
ConvertedType::NONE,
1585+
arrow_type.clone(),
1586+
converter,
1587+
encodings,
1588+
);
1589+
})
15311590
}
15321591

15331592
struct RandUtf8Gen {}

parquet/src/arrow/schema/primitive.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,24 @@ fn apply_hint(parquet: DataType, hint: DataType) -> DataType {
5050
// Coerce Date32 back to Date64 (#1666)
5151
(DataType::Date32, DataType::Date64) => hint,
5252

53-
// Determine timezone
53+
// Timestamps of the same resolution can be converted to a a different timezone.
5454
(DataType::Timestamp(p, _), DataType::Timestamp(h, Some(_))) if p == h => hint,
5555

56+
// INT96 default to Timestamp(TimeUnit::Nanosecond, None) (see from_parquet below).
57+
// Allow different resolutions to support larger date ranges.
58+
(
59+
DataType::Timestamp(TimeUnit::Nanosecond, None),
60+
DataType::Timestamp(TimeUnit::Second, _),
61+
) => hint,
62+
(
63+
DataType::Timestamp(TimeUnit::Nanosecond, None),
64+
DataType::Timestamp(TimeUnit::Millisecond, _),
65+
) => hint,
66+
(
67+
DataType::Timestamp(TimeUnit::Nanosecond, None),
68+
DataType::Timestamp(TimeUnit::Microsecond, _),
69+
) => hint,
70+
5671
// Determine offset size
5772
(DataType::Utf8, DataType::LargeUtf8) => hint,
5873
(DataType::Binary, DataType::LargeBinary) => hint,

parquet/src/data_type.rs

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@ pub struct Int96 {
3838
value: [u32; 3],
3939
}
4040

41+
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42+
43+
/// Number of seconds in a day
44+
const SECONDS_IN_DAY: i64 = 86_400;
45+
/// Number of milliseconds in a second
46+
const MILLISECONDS: i64 = 1_000;
47+
/// Number of microseconds in a second
48+
const MICROSECONDS: i64 = 1_000_000;
49+
/// Number of nanoseconds in a second
50+
const NANOSECONDS: i64 = 1_000_000_000;
51+
52+
/// Number of milliseconds in a day
53+
const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54+
/// Number of microseconds in a day
55+
const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56+
/// Number of nanoseconds in a day
57+
const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58+
4159
impl Int96 {
4260
/// Creates new INT96 type struct with no data set.
4361
pub fn new() -> Self {
@@ -57,30 +75,60 @@ impl Int96 {
5775
}
5876

5977
/// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch
78+
#[deprecated(since = "54.0.0", note = "Use `to_millis` instead")]
6079
pub fn to_i64(&self) -> i64 {
61-
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
62-
seconds * 1_000 + nanoseconds / 1_000_000
80+
self.to_millis()
81+
}
82+
83+
/// Converts this INT96 into an i64 representing the number of SECONDS since EPOCH
84+
///
85+
/// Will wrap around on overflow
86+
#[inline]
87+
pub fn to_seconds(&self) -> i64 {
88+
let (day, nanos) = self.data_as_days_and_nanos();
89+
(day as i64 - JULIAN_DAY_OF_EPOCH)
90+
.wrapping_mul(SECONDS_IN_DAY)
91+
.wrapping_add(nanos / 1_000_000_000)
92+
}
93+
94+
/// Converts this INT96 into an i64 representing the number of MILLISECONDS since EPOCH
95+
///
96+
/// Will wrap around on overflow
97+
#[inline]
98+
pub fn to_millis(&self) -> i64 {
99+
let (day, nanos) = self.data_as_days_and_nanos();
100+
(day as i64 - JULIAN_DAY_OF_EPOCH)
101+
.wrapping_mul(MILLISECONDS_IN_DAY)
102+
.wrapping_add(nanos / 1_000_000)
103+
}
104+
105+
/// Converts this INT96 into an i64 representing the number of MICROSECONDS since EPOCH
106+
///
107+
/// Will wrap around on overflow
108+
#[inline]
109+
pub fn to_micros(&self) -> i64 {
110+
let (day, nanos) = self.data_as_days_and_nanos();
111+
(day as i64 - JULIAN_DAY_OF_EPOCH)
112+
.wrapping_mul(MICROSECONDS_IN_DAY)
113+
.wrapping_add(nanos / 1_000)
63114
}
64115

65116
/// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
66117
///
67118
/// Will wrap around on overflow
119+
#[inline]
68120
pub fn to_nanos(&self) -> i64 {
69-
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
70-
seconds
71-
.wrapping_mul(1_000_000_000)
72-
.wrapping_add(nanoseconds)
121+
let (day, nanos) = self.data_as_days_and_nanos();
122+
(day as i64 - JULIAN_DAY_OF_EPOCH)
123+
.wrapping_mul(NANOSECONDS_IN_DAY)
124+
.wrapping_add(nanos)
73125
}
74126

75-
/// Converts this INT96 to a number of seconds and nanoseconds since EPOCH
76-
pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
77-
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
78-
const SECONDS_PER_DAY: i64 = 86_400;
79-
80-
let day = self.data()[2] as i64;
81-
let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
82-
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
83-
(seconds, nanoseconds)
127+
#[inline]
128+
fn data_as_days_and_nanos(&self) -> (i32, i64) {
129+
let day = self.data()[2] as i32;
130+
let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
131+
(day, nanos)
84132
}
85133
}
86134

parquet/src/record/api.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ impl Field {
701701
/// `Timestamp` value.
702702
#[inline]
703703
pub fn convert_int96(_descr: &ColumnDescPtr, value: Int96) -> Self {
704-
Field::TimestampMillis(value.to_i64())
704+
Field::TimestampMillis(value.to_millis())
705705
}
706706

707707
/// Converts Parquet FLOAT type with logical type into `f32` value.

0 commit comments

Comments
 (0)