Skip to content

least (copy), greatest (copy) and dateadd mock #70

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 30, 2024
10 changes: 6 additions & 4 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,17 @@ impl ControlService for ControlServiceImpl {

// let mut buffer = Vec::new();
// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
// let mut stream_writer = StreamWriter::try_new_with_options(
// &mut buffer, &records[0].schema_ref(), options).unwrap();
// let mut stream_writer =
// StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options)
// .unwrap();
// stream_writer.write(&records[0]).unwrap();
// stream_writer.finish().unwrap();
// drop(stream_writer);
//

// // Try to add flatbuffer verification
// println!("{:?}", buffer.len());
// let res = general_purpose::STANDARD.encode(buffer);
// let base64 = general_purpose::STANDARD.encode(buffer);
// Ok((base64, columns))
// let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap();
//
// let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded);
Expand Down
171 changes: 114 additions & 57 deletions crates/control_plane/src/sql/functions/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use crate::models::ColumnInfo;
use arrow::array::{
Array, Int32Array, Int64Array, StructArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UnionArray,
Array, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray, UnionArray,
};
use arrow::datatypes::{Field, Schema, TimeUnit};
use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc};
use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::datatypes::DataType;
use datafusion::common::Result as DataFusionResult;
use std::sync::Arc;

const TIMESTAMP_FORMAT: &str = "%Y-%m-%d-%H:%M:%S%.9f";

pub fn first_non_empty_type(union_array: &UnionArray) -> Option<(DataType, ArrayRef)> {
for i in 0..union_array.type_ids().len() {
let type_id = union_array.type_id(i);
Expand Down Expand Up @@ -71,8 +74,6 @@ pub fn convert_record_batches(
columns.push(converted_column);
}
let new_schema = Arc::new(Schema::new(fields));
println!("new schema: {:?}", new_schema);
println!("columns: {:?}", columns);
let converted_batch = RecordBatch::try_new(new_schema, columns)?;
converted_batches.push(converted_batch);
}
Expand All @@ -81,60 +82,116 @@ pub fn convert_record_batches(
}

fn convert_timestamp_to_struct(column: &ArrayRef, unit: &TimeUnit) -> ArrayRef {
let (epoch, fraction) = match unit {
TimeUnit::Second => {
let array = column
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
let epoch: Int64Array = array.clone().unary(|x| x);
let fraction: Int32Array = Int32Array::from(vec![0; column.len()]);
(epoch, fraction)
}
TimeUnit::Millisecond => {
let array = column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
let epoch: Int64Array = array.clone().unary(|x| x / 1_000);
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000 * 1_000_000) as i32);
(epoch, fraction)
}
TimeUnit::Microsecond => {
let array = column
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap();
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000);
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000 * 1_000) as i32);
(epoch, fraction)
}
TimeUnit::Nanosecond => {
let array = column
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap();
let epoch: Int64Array = array.clone().unary(|x| x / 1_000_000_000);
let fraction: Int32Array = array.clone().unary(|x| (x % 1_000_000_000) as i32);
(epoch, fraction)
}
let now = Utc::now();
let timestamps: Vec<_> = match unit {
TimeUnit::Second => column
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.iter()
.map(|x| {
let ts = DateTime::from_timestamp(x.unwrap_or(now.timestamp()), 0).unwrap();
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
})
.collect(),
TimeUnit::Millisecond => column
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.iter()
.map(|x| {
let ts =
DateTime::from_timestamp_millis(x.unwrap_or(now.timestamp_millis())).unwrap();
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
})
.collect(),
TimeUnit::Microsecond => column
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.iter()
.map(|x| {
let ts =
DateTime::from_timestamp_micros(x.unwrap_or(now.timestamp_micros())).unwrap();
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
})
.collect(),
TimeUnit::Nanosecond => column
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.iter()
.map(|x| {
let ts =
DateTime::from_timestamp_nanos(x.unwrap_or(now.timestamp_nanos_opt().unwrap()));
format!("{}.{}", ts.timestamp(), ts.timestamp_subsec_micros())
})
.collect(),
};

// let (epoch, fraction) = match unit {
// TimeUnit::Second => {
// let array = column
// .as_any()
// .downcast_ref::<TimestampSecondArray>()
// .unwrap();
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now)).collect();
// let fraction: Int32Array = Int32Array::from(vec![0; column.len()]);
// (epoch, fraction)
// }
// TimeUnit::Millisecond => {
// let array = column
// .as_any()
// .downcast_ref::<TimestampMillisecondArray>()
// .unwrap();
// let now_millis = now * 1_000;
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_millis) / 1_000).collect();
// let fraction: Int32Array = array
// .iter()
// .map(|x| (x.unwrap_or(0) % 1_000 * 1_000_000) as i32)
// .collect();
// (epoch, fraction)
// }
// TimeUnit::Microsecond => {
// let array = column
// .as_any()
// .downcast_ref::<TimestampMicrosecondArray>()
// .unwrap();
// let now_micros = now * 1_000_000;
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_micros) / 1_000_000).collect();
// let fraction: Int32Array = array
// .iter()
// .map(|x| (x.unwrap_or(0) % 1_000_000 * 1_000) as i32)
// .collect();
// (epoch, fraction)
// }
// TimeUnit::Nanosecond => {
// let array = column
// .as_any()
// .downcast_ref::<TimestampNanosecondArray>()
// .unwrap();
// let now_nanos = now * 1_000_000_000;
// let epoch: Int64Array = array.iter().map(|x| x.unwrap_or(now_nanos) / 1_000_000_000).collect();
// let fraction: Int32Array = array
// .iter()
// .map(|x| (x.unwrap_or(0) % 1_000_000_000) as i32)
// .collect();
// (epoch, fraction)
// }
// };
// let string_values: Vec<_> = epoch.iter().map(|x| x.unwrap_or(0).to_string()).collect();
// let string_array = StringArray::from(string_values);
// let timezone = Int32Array::from(vec![1440; column.len()]); // Assuming UTC timezone
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("epoch", DataType::Int64, false)),
Arc::new(epoch) as ArrayRef,
),
(
Arc::new(Field::new("fraction", DataType::Int32, false)),
Arc::new(fraction) as ArrayRef,
),
// (
// Arc::new(Field::new("timezone", DataType::Int32, false)),
// Arc::new(timezone) as ArrayRef,
// ),
]);
// let struct_array = StructArray::try_new(
// vec![
// Arc::new(Field::new("epoch", DataType::Int64, false)),
// Arc::new(Field::new("fraction", DataType::Int32, false)),
// ]
// .into(),
// vec![Arc::new(epoch) as ArrayRef, Arc::new(fraction) as ArrayRef],
// None,
// )
// .unwrap();
// Arc::new(epoch) as ArrayRef

Arc::new(struct_array) as ArrayRef
Arc::new(StringArray::from(timestamps)) as ArrayRef
}
Loading
Loading