Skip to content

Commit e5c2493

Browse files
author
Devdutt Shenoi
committed
refactor: direct mutation vs reconstruction
1 parent 6995ad1 commit e5c2493

File tree

3 files changed

+31
-43
lines changed

3 files changed

+31
-43
lines changed

src/event/format/json.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use serde_json::Value;
3030
use std::{collections::HashMap, sync::Arc};
3131
use tracing::error;
3232

33-
use super::{update_field_type_in_schema, EventFormat};
33+
use super::EventFormat;
3434
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
3535

3636
static TIME_FIELD_NAME_PARTS: [&str; 11] = [
@@ -74,7 +74,6 @@ impl EventFormat for Event {
7474
fn to_data(
7575
self,
7676
schema: &HashMap<String, Arc<Field>>,
77-
time_partition: Option<&String>,
7877
schema_version: SchemaVersion,
7978
static_schema_flag: bool,
8079
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
@@ -101,12 +100,11 @@ impl EventFormat for Event {
101100
.map_err(|err| {
102101
anyhow!("Could not infer schema for this event due to err {:?}", err)
103102
})?;
104-
let new_infer_schema = update_field_type_in_schema(
105-
Arc::new(infer_schema),
106-
Some(stream_schema),
107-
time_partition,
108-
);
109-
infer_schema = Schema::new(new_infer_schema.fields().clone());
103+
104+
for log_record in value_arr.iter() {
105+
override_inferred_data_type(&mut infer_schema, log_record, schema_version);
106+
}
107+
110108
Schema::try_merge(vec![
111109
Schema::new(stream_schema.values().cloned().collect::<Fields>()),
112110
infer_schema.clone(),
@@ -237,14 +235,14 @@ fn extract_and_parse_time(
237235
// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having
238236
// a string value parseable into timestamp as timestamp type and all numbers as float64.
239237
pub fn override_inferred_data_type(
240-
inferred_schema: Schema,
238+
schema: &mut Schema,
241239
log_record: &Value,
242240
schema_version: SchemaVersion,
243-
) -> Schema {
241+
) {
244242
let Value::Object(map) = log_record else {
245-
return inferred_schema;
243+
return;
246244
};
247-
let updated_fields = inferred_schema
245+
schema.fields = schema
248246
.fields()
249247
.iter()
250248
.map(|field| {
@@ -277,9 +275,7 @@ pub fn override_inferred_data_type(
277275
_ => Field::new(field_name, field.data_type().clone(), true),
278276
}
279277
})
280-
.collect_vec();
281-
282-
Schema::new(updated_fields)
278+
.collect();
283279
}
284280

285281
// Returns arrow schema with the fields that are present in the request body

src/event/format/mod.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ pub trait EventFormat: Sized {
115115
fn to_data(
116116
self,
117117
schema: &HashMap<String, Arc<Field>>,
118-
time_partition: Option<&String>,
119118
schema_version: SchemaVersion,
120119
static_schema_flag: bool,
121120
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
@@ -134,12 +133,8 @@ pub trait EventFormat: Sized {
134133
p_custom_fields: &HashMap<String, String>,
135134
) -> Result<(RecordBatch, bool), AnyError> {
136135
let p_timestamp = self.get_p_timestamp();
137-
let (data, schema, is_first) = self.to_data(
138-
storage_schema,
139-
time_partition,
140-
schema_version,
141-
static_schema_flag,
142-
)?;
136+
let (data, schema, is_first) =
137+
self.to_data(storage_schema, schema_version, static_schema_flag)?;
143138

144139
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
145140
return Err(anyhow!(
@@ -149,21 +144,22 @@ pub trait EventFormat: Sized {
149144
};
150145

151146
// prepare the record batch and new fields to be added
152-
let mut new_schema = Arc::new(Schema::new(schema));
153-
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
147+
let mut new_schema = Schema::new(schema);
148+
if !Self::is_schema_matching(&new_schema, storage_schema, static_schema_flag) {
154149
return Err(anyhow!("Schema mismatch"));
155150
}
156-
new_schema = update_field_type_in_schema(new_schema, None, time_partition);
157151

158-
let rb = Self::decode(data, new_schema.clone())?;
152+
update_field_type_in_schema(&mut new_schema, Some(storage_schema), time_partition);
153+
let updated_schema = Arc::new(new_schema);
159154

155+
let rb = Self::decode(data, updated_schema)?;
160156
let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?;
161157

162158
Ok((rb, is_first))
163159
}
164160

165161
fn is_schema_matching(
166-
new_schema: Arc<Schema>,
162+
new_schema: &Schema,
167163
storage_schema: &HashMap<String, Arc<Field>>,
168164
static_schema_flag: bool,
169165
) -> bool {
@@ -200,7 +196,7 @@ pub trait EventFormat: Sized {
200196
}
201197

202198
pub fn get_existing_field_names(
203-
inferred_schema: Arc<Schema>,
199+
inferred_schema: &Schema,
204200
existing_schema: Option<&HashMap<String, Arc<Field>>>,
205201
) -> HashSet<String> {
206202
let mut existing_field_names = HashSet::new();
@@ -219,8 +215,8 @@ pub fn get_existing_field_names(
219215

220216
pub fn override_existing_timestamp_fields(
221217
existing_schema: &HashMap<String, Arc<Field>>,
222-
inferred_schema: Arc<Schema>,
223-
) -> Arc<Schema> {
218+
inferred_schema: &mut Schema,
219+
) {
224220
let timestamp_field_names: HashSet<String> = existing_schema
225221
.values()
226222
.filter_map(|field| {
@@ -231,7 +227,8 @@ pub fn override_existing_timestamp_fields(
231227
}
232228
})
233229
.collect();
234-
let updated_fields: Vec<Arc<Field>> = inferred_schema
230+
231+
inferred_schema.fields = inferred_schema
235232
.fields()
236233
.iter()
237234
.map(|field| {
@@ -246,28 +243,24 @@ pub fn override_existing_timestamp_fields(
246243
}
247244
})
248245
.collect();
249-
250-
Arc::new(Schema::new(updated_fields))
251246
}
252247

253248
pub fn update_field_type_in_schema(
254-
inferred_schema: Arc<Schema>,
249+
inferred_schema: &mut Schema,
255250
existing_schema: Option<&HashMap<String, Arc<Field>>>,
256251
time_partition: Option<&String>,
257-
) -> Arc<Schema> {
258-
let mut updated_schema = inferred_schema.clone();
259-
let existing_field_names = get_existing_field_names(inferred_schema.clone(), existing_schema);
260-
252+
) {
253+
let existing_field_names = get_existing_field_names(inferred_schema, existing_schema);
261254
if let Some(existing_schema) = existing_schema {
262255
// overriding known timestamp fields which were inferred as string fields
263-
updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema);
256+
override_existing_timestamp_fields(existing_schema, inferred_schema);
264257
}
265258

266259
let Some(time_partition) = time_partition else {
267-
return updated_schema;
260+
return;
268261
};
269262

270-
let new_schema: Vec<Field> = updated_schema
263+
inferred_schema.fields = inferred_schema
271264
.fields()
272265
.iter()
273266
.map(|field| {
@@ -283,5 +276,4 @@ pub fn update_field_type_in_schema(
283276
}
284277
})
285278
.collect();
286-
Arc::new(Schema::new(new_schema))
287279
}

src/handlers/http/logstream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
116116

117117
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
118118
for log_record in log_records.iter() {
119-
schema = override_inferred_data_type(schema, log_record, SchemaVersion::V1);
119+
override_inferred_data_type(&mut schema, log_record, SchemaVersion::V1);
120120
}
121121

122122
Ok((web::Json(schema), StatusCode::OK))

0 commit comments

Comments
 (0)