Skip to content

Commit 6995ad1

Browse files
author
Devdutt Shenoi
committed
refactor: override_inferred_data_type
1 parent 77ae28e commit 6995ad1

File tree

3 files changed

+89
-99
lines changed

3 files changed

+89
-99
lines changed

src/event/format/json.rs

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,31 @@
2222
use anyhow::anyhow;
2323
use arrow_array::RecordBatch;
2424
use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder};
25-
use arrow_schema::{DataType, Field, Fields, Schema};
25+
use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit};
2626
use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
2727
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2828
use itertools::Itertools;
2929
use serde_json::Value;
3030
use std::{collections::HashMap, sync::Arc};
3131
use tracing::error;
3232

33-
use super::EventFormat;
33+
use super::{update_field_type_in_schema, EventFormat};
3434
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
3535

36+
static TIME_FIELD_NAME_PARTS: [&str; 11] = [
37+
"time",
38+
"date",
39+
"timestamp",
40+
"created",
41+
"received",
42+
"ingested",
43+
"collected",
44+
"start",
45+
"end",
46+
"ts",
47+
"dt",
48+
];
49+
3650
pub struct Event {
3751
pub json: Value,
3852
pub p_timestamp: DateTime<Utc>,
@@ -87,12 +101,10 @@ impl EventFormat for Event {
87101
.map_err(|err| {
88102
anyhow!("Could not infer schema for this event due to err {:?}", err)
89103
})?;
90-
let new_infer_schema = super::update_field_type_in_schema(
104+
let new_infer_schema = update_field_type_in_schema(
91105
Arc::new(infer_schema),
92106
Some(stream_schema),
93107
time_partition,
94-
Some(&value_arr),
95-
schema_version,
96108
);
97109
infer_schema = Schema::new(new_infer_schema.fields().clone());
98110
Schema::try_merge(vec![
@@ -222,6 +234,54 @@ fn extract_and_parse_time(
222234
Ok(parsed_time.naive_utc())
223235
}
224236

237+
// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having
238+
// a string value parseable into timestamp as timestamp type and all numbers as float64.
239+
pub fn override_inferred_data_type(
240+
inferred_schema: Schema,
241+
log_record: &Value,
242+
schema_version: SchemaVersion,
243+
) -> Schema {
244+
let Value::Object(map) = log_record else {
245+
return inferred_schema;
246+
};
247+
let updated_fields = inferred_schema
248+
.fields()
249+
.iter()
250+
.map(|field| {
251+
let field_name = field.name().as_str();
252+
match (schema_version, map.get(field.name())) {
253+
// in V1 for new fields in json named "time"/"date" or such and having inferred
254+
// type string, that can be parsed as timestamp, use the timestamp type.
255+
// NOTE: support even more datetime string formats
256+
(SchemaVersion::V1, Some(Value::String(s)))
257+
if TIME_FIELD_NAME_PARTS
258+
.iter()
259+
.any(|part| field_name.to_lowercase().contains(part))
260+
&& field.data_type() == &DataType::Utf8
261+
&& (DateTime::parse_from_rfc3339(s).is_ok()
262+
|| DateTime::parse_from_rfc2822(s).is_ok()) =>
263+
{
264+
// Update the field's data type to Timestamp
265+
Field::new(
266+
field_name,
267+
DataType::Timestamp(TimeUnit::Millisecond, None),
268+
true,
269+
)
270+
}
271+
// in V1 for new fields in json with inferred type number, cast as float64.
272+
(SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => {
273+
// Update the field's data type to Float64
274+
Field::new(field_name, DataType::Float64, true)
275+
}
276+
// Return the original field if no update is needed
277+
_ => Field::new(field_name, field.data_type().clone(), true),
278+
}
279+
})
280+
.collect_vec();
281+
282+
Schema::new(updated_fields)
283+
}
284+
225285
// Returns arrow schema with the fields that are present in the request body
226286
// This schema is an input to convert the request body to arrow record batch
227287
fn derive_arrow_schema(

src/event/format/mod.rs

Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use arrow_array::RecordBatch;
2828
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2929
use chrono::{DateTime, Utc};
3030
use serde::{Deserialize, Serialize};
31-
use serde_json::Value;
3231

3332
use crate::{
3433
metadata::SchemaVersion,
@@ -40,19 +39,6 @@ use super::{Event, DEFAULT_TIMESTAMP_KEY};
4039

4140
pub mod json;
4241

43-
static TIME_FIELD_NAME_PARTS: [&str; 11] = [
44-
"time",
45-
"date",
46-
"timestamp",
47-
"created",
48-
"received",
49-
"ingested",
50-
"collected",
51-
"start",
52-
"end",
53-
"ts",
54-
"dt",
55-
];
5642
type EventSchema = Vec<Arc<Field>>;
5743

5844
/// Source of the logs, used to perform special processing for certain sources
@@ -167,8 +153,7 @@ pub trait EventFormat: Sized {
167153
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
168154
return Err(anyhow!("Schema mismatch"));
169155
}
170-
new_schema =
171-
update_field_type_in_schema(new_schema, None, time_partition, None, schema_version);
156+
new_schema = update_field_type_in_schema(new_schema, None, time_partition);
172157

173158
let rb = Self::decode(data, new_schema.clone())?;
174159

@@ -269,8 +254,6 @@ pub fn update_field_type_in_schema(
269254
inferred_schema: Arc<Schema>,
270255
existing_schema: Option<&HashMap<String, Arc<Field>>>,
271256
time_partition: Option<&String>,
272-
log_records: Option<&Vec<Value>>,
273-
schema_version: SchemaVersion,
274257
) -> Arc<Schema> {
275258
let mut updated_schema = inferred_schema.clone();
276259
let existing_field_names = get_existing_field_names(inferred_schema.clone(), existing_schema);
@@ -280,13 +263,6 @@ pub fn update_field_type_in_schema(
280263
updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema);
281264
}
282265

283-
if let Some(log_records) = log_records {
284-
for log_record in log_records {
285-
updated_schema =
286-
override_data_type(updated_schema.clone(), log_record.clone(), schema_version);
287-
}
288-
}
289-
290266
let Some(time_partition) = time_partition else {
291267
return updated_schema;
292268
};
@@ -309,51 +285,3 @@ pub fn update_field_type_in_schema(
309285
.collect();
310286
Arc::new(Schema::new(new_schema))
311287
}
312-
313-
// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having
314-
// a string value parseable into timestamp as timestamp type and all numbers as float64.
315-
pub fn override_data_type(
316-
inferred_schema: Arc<Schema>,
317-
log_record: Value,
318-
schema_version: SchemaVersion,
319-
) -> Arc<Schema> {
320-
let Value::Object(map) = log_record else {
321-
return inferred_schema;
322-
};
323-
let updated_schema: Vec<Field> = inferred_schema
324-
.fields()
325-
.iter()
326-
.map(|field| {
327-
let field_name = field.name().as_str();
328-
match (schema_version, map.get(field.name())) {
329-
// in V1 for new fields in json named "time"/"date" or such and having inferred
330-
// type string, that can be parsed as timestamp, use the timestamp type.
331-
// NOTE: support even more datetime string formats
332-
(SchemaVersion::V1, Some(Value::String(s)))
333-
if TIME_FIELD_NAME_PARTS
334-
.iter()
335-
.any(|part| field_name.to_lowercase().contains(part))
336-
&& field.data_type() == &DataType::Utf8
337-
&& (DateTime::parse_from_rfc3339(s).is_ok()
338-
|| DateTime::parse_from_rfc2822(s).is_ok()) =>
339-
{
340-
// Update the field's data type to Timestamp
341-
Field::new(
342-
field_name,
343-
DataType::Timestamp(TimeUnit::Millisecond, None),
344-
true,
345-
)
346-
}
347-
// in V1 for new fields in json with inferred type number, cast as float64.
348-
(SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => {
349-
// Update the field's data type to Float64
350-
Field::new(field_name, DataType::Float64, true)
351-
}
352-
// Return the original field if no update is needed
353-
_ => Field::new(field_name, field.data_type().clone(), true),
354-
}
355-
})
356-
.collect();
357-
358-
Arc::new(Schema::new(updated_schema))
359-
}

src/handlers/http/logstream.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,35 @@
1616
*
1717
*/
1818

19-
use self::error::StreamError;
20-
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
21-
use super::query::update_schema_when_distributed;
22-
use crate::event::format::override_data_type;
23-
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
24-
use crate::metadata::SchemaVersion;
25-
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
26-
use crate::parseable::{StreamNotFound, PARSEABLE};
27-
use crate::rbac::role::Action;
28-
use crate::rbac::Users;
29-
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
30-
use crate::storage::retention::Retention;
31-
use crate::storage::{StreamInfo, StreamType};
32-
use crate::utils::actix::extract_session_key_from_req;
33-
use crate::{stats, validator, LOCK_EXPECT};
19+
use std::fs::remove_dir_all;
3420

3521
use actix_web::http::StatusCode;
3622
use actix_web::web::{Json, Path};
3723
use actix_web::{web, HttpRequest, Responder};
3824
use arrow_json::reader::infer_json_schema_from_iterator;
3925
use bytes::Bytes;
4026
use chrono::Utc;
27+
use error::StreamError;
4128
use itertools::Itertools;
4229
use serde_json::{json, Value};
43-
use std::fs;
44-
use std::sync::Arc;
4530
use tracing::warn;
4631

32+
use crate::event::format::json::override_inferred_data_type;
33+
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
34+
use crate::metadata::SchemaVersion;
35+
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
36+
use crate::parseable::{StreamNotFound, PARSEABLE};
37+
use crate::rbac::role::Action;
38+
use crate::rbac::Users;
39+
use crate::stats::{self, event_labels_date, storage_size_labels_date, Stats};
40+
use crate::storage::retention::Retention;
41+
use crate::storage::{StreamInfo, StreamType};
42+
use crate::utils::actix::extract_session_key_from_req;
43+
use crate::{validator, LOCK_EXPECT};
44+
45+
use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats};
46+
use super::query::update_schema_when_distributed;
47+
4748
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
4849
let stream_name = stream_name.into_inner();
4950
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
@@ -57,7 +58,7 @@ pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamE
5758
objectstore.delete_stream(&stream_name).await?;
5859
// Delete from staging
5960
let stream_dir = PARSEABLE.get_or_create_stream(&stream_name);
60-
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
61+
if remove_dir_all(&stream_dir.data_path).is_err() {
6162
warn!(
6263
"failed to delete local data for stream {}. Clean {} manually",
6364
stream_name,
@@ -113,10 +114,11 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
113114
}
114115
};
115116

116-
let mut schema = Arc::new(infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap());
117-
for log_record in log_records {
118-
schema = override_data_type(schema, log_record, SchemaVersion::V1);
117+
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
118+
for log_record in log_records.iter() {
119+
schema = override_inferred_data_type(schema, log_record, SchemaVersion::V1);
119120
}
121+
120122
Ok((web::Json(schema), StatusCode::OK))
121123
}
122124

0 commit comments

Comments
 (0)