Skip to content

Commit 3128589

Browse files
fix: ingestion performance
current: multiple loops throuch each event in the batch 1. to find the level of hierarchy in the nested json 2. flattening to convert arrays to rows 3. app separator `_` between keys for objects change: merge all the loops into one to improve performance add env var `P_JSON_FLATTEN_DEPTH_LIMIT` to limit the level of hierarchy defaults to 3 throw exception if level of hierarchy in the json exceeds the allowed limit refactor code
1 parent 5a39e22 commit 3128589

File tree

7 files changed

+361
-282
lines changed

7 files changed

+361
-282
lines changed

src/cli.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,14 @@ pub struct Options {
294294
)]
295295
pub ingestor_endpoint: String,
296296

297+
#[arg(
298+
long,
299+
env = "P_JSON_FLATTEN_DEPTH_LIMIT",
300+
default_value = "3",
301+
help = "Set the depth limit for flattening nested JSON"
302+
)]
303+
pub json_flatten_depth_limit: usize,
304+
297305
#[command(flatten)]
298306
oidc: Option<OidcConfig>,
299307

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ impl EventFormat for Event {
9494
};
9595

9696
if value_arr
97-
.iter()
98-
.any(|value| fields_mismatch(&schema, value, schema_version))
97+
.iter()
98+
.any(|value| fields_mismatch(&schema, value, schema_version))
9999
{
100100
return Err(anyhow!(
101101
"Could not process this event due to mismatch in datatype"

src/event/format/mod.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,8 @@ pub trait EventFormat: Sized {
112112
time_partition: Option<&String>,
113113
schema_version: SchemaVersion,
114114
) -> Result<(RecordBatch, bool), AnyError> {
115-
let (data, mut schema, is_first) = self.to_data(
116-
storage_schema,
117-
time_partition,
118-
schema_version,
119-
)?;
115+
let (data, mut schema, is_first) =
116+
self.to_data(storage_schema, time_partition, schema_version)?;
120117

121118
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
122119
return Err(anyhow!(

src/handlers/http/ingest.rs

Lines changed: 75 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -383,13 +383,12 @@ mod tests {
383383
use arrow::datatypes::Int64Type;
384384
use arrow_array::{ArrayRef, Float64Array, Int64Array, ListArray, StringArray};
385385
use arrow_schema::{DataType, Field};
386-
use serde_json::json;
386+
use serde_json::{json, Value};
387387
use std::{collections::HashMap, sync::Arc};
388388

389389
use crate::{
390-
handlers::http::modal::utils::ingest_utils::into_event_batch,
391-
metadata::SchemaVersion,
392-
utils::json::{convert_array_to_object, flatten::convert_to_array},
390+
handlers::http::modal::utils::ingest_utils::into_event_batch, metadata::SchemaVersion,
391+
utils::json::flatten_json_body,
393392
};
394393

395394
trait TestExt {
@@ -534,21 +533,6 @@ mod tests {
534533
assert_eq!(rb.num_columns(), 1);
535534
}
536535

537-
#[test]
538-
fn non_object_arr_is_err() {
539-
let json = json!([1]);
540-
541-
assert!(convert_array_to_object(
542-
json,
543-
None,
544-
None,
545-
None,
546-
SchemaVersion::V0,
547-
&crate::event::format::LogSource::default()
548-
)
549-
.is_err())
550-
}
551-
552536
#[test]
553537
fn array_into_recordbatch_inffered_schema() {
554538
let json = json!([
@@ -717,11 +701,11 @@ mod tests {
717701
let json = json!([
718702
{
719703
"a": 1,
720-
"b": "hello",
704+
"b": "hello"
721705
},
722706
{
723707
"a": 1,
724-
"b": "hello",
708+
"b": "hello"
725709
},
726710
{
727711
"a": 1,
@@ -732,72 +716,66 @@ mod tests {
732716
"a": 1,
733717
"b": "hello",
734718
"c": [{"a": 1, "b": 2}]
735-
},
719+
}
736720
]);
737-
let flattened_json = convert_to_array(
738-
convert_array_to_object(
739-
json,
740-
None,
741-
None,
742-
None,
743-
SchemaVersion::V0,
744-
&crate::event::format::LogSource::default(),
745-
)
746-
.unwrap(),
747-
)
748-
.unwrap();
749-
750-
let (rb, _) = into_event_batch(
751-
flattened_json,
752-
HashMap::default(),
753-
false,
721+
let data = flatten_json_body(
722+
json,
723+
None,
724+
None,
754725
None,
755726
SchemaVersion::V0,
727+
&crate::event::format::LogSource::default(),
728+
3,
756729
)
757730
.unwrap();
758-
assert_eq!(rb.num_rows(), 4);
759-
assert_eq!(rb.num_columns(), 5);
760-
assert_eq!(
761-
rb.column_by_name("a").unwrap().as_int64_arr().unwrap(),
762-
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
763-
);
764-
assert_eq!(
765-
rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(),
766-
&StringArray::from(vec![
767-
Some("hello"),
768-
Some("hello"),
769-
Some("hello"),
770-
Some("hello")
771-
])
772-
);
773-
774-
assert_eq!(
775-
rb.column_by_name("c_a")
776-
.unwrap()
777-
.as_any()
778-
.downcast_ref::<ListArray>()
779-
.unwrap(),
780-
&ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
781-
None,
782-
None,
783-
Some(vec![Some(1i64)]),
784-
Some(vec![Some(1)])
785-
])
786-
);
787-
788-
assert_eq!(
789-
rb.column_by_name("c_b")
790-
.unwrap()
791-
.as_any()
792-
.downcast_ref::<ListArray>()
793-
.unwrap(),
794-
&ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
795-
None,
796-
None,
797-
None,
798-
Some(vec![Some(2i64)])
799-
])
800-
);
731+
for value in data {
732+
let (rb, _) =
733+
into_event_batch(value, HashMap::default(), false, None, SchemaVersion::V0)
734+
.unwrap();
735+
assert_eq!(rb.num_rows(), 4);
736+
assert_eq!(rb.num_columns(), 5);
737+
assert_eq!(
738+
rb.column_by_name("a").unwrap().as_int64_arr().unwrap(),
739+
&Int64Array::from(vec![Some(1), Some(1), Some(1), Some(1)])
740+
);
741+
assert_eq!(
742+
rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(),
743+
&StringArray::from(vec![
744+
Some("hello"),
745+
Some("hello"),
746+
Some("hello"),
747+
Some("hello")
748+
])
749+
);
750+
751+
assert_eq!(
752+
rb.column_by_name("c_a")
753+
.unwrap()
754+
.as_any()
755+
.downcast_ref::<ListArray>()
756+
.unwrap(),
757+
&ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
758+
None,
759+
None,
760+
Some(vec![Some(1i64)]),
761+
Some(vec![Some(1)])
762+
])
763+
);
764+
765+
assert_eq!(
766+
rb.column_by_name("c_b")
767+
.unwrap()
768+
.as_any()
769+
.downcast_ref::<ListArray>()
770+
.unwrap(),
771+
&ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
772+
None,
773+
None,
774+
None,
775+
Some(vec![Some(2i64)])
776+
])
777+
);
778+
}
801779
}
802780

803781
#[test]
@@ -822,52 +800,52 @@ mod tests {
822800
"c": [{"a": 1, "b": 2}]
823801
},
824802
]);
825-
let flattened_json = convert_to_array(
826-
convert_array_to_object(
827-
json,
828-
None,
829-
None,
830-
None,
831-
SchemaVersion::V1,
832-
&crate::event::format::LogSource::default(),
833-
)
834-
.unwrap(),
803+
let flattened_json = flatten_json_body(
804+
json,
805+
None,
806+
None,
807+
None,
808+
SchemaVersion::V1,
809+
&crate::event::format::LogSource::default(),
810+
3,
835811
)
836812
.unwrap();
813+
let arr_flattened_json = Value::Array(flattened_json);
837814

838815
let (rb, _) = into_event_batch(
839-
flattened_json,
816+
arr_flattened_json,
840817
HashMap::default(),
841818
false,
842819
None,
843820
SchemaVersion::V1,
844821
)
845822
.unwrap();
846823

847-
assert_eq!(rb.num_rows(), 4);
824+
assert_eq!(rb.num_rows(), 5);
848825
assert_eq!(rb.num_columns(), 5);
849826
assert_eq!(
850827
rb.column_by_name("a").unwrap().as_float64_arr().unwrap(),
851-
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
828+
&Float64Array::from(vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(1.0)])
852829
);
853830
assert_eq!(
854831
rb.column_by_name("b").unwrap().as_utf8_arr().unwrap(),
855832
&StringArray::from(vec![
856833
Some("hello"),
857834
Some("hello"),
858835
Some("hello"),
836+
Some("hello"),
859837
Some("hello")
860838
])
861839
);
862840

863841
assert_eq!(
864842
rb.column_by_name("c_a").unwrap().as_float64_arr().unwrap(),
865-
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0)])
843+
&Float64Array::from(vec![None, None, Some(1.0), Some(1.0), None])
866844
);
867845

868846
assert_eq!(
869847
rb.column_by_name("c_b").unwrap().as_float64_arr().unwrap(),
870-
&Float64Array::from(vec![None, None, None, Some(2.0)])
848+
&Float64Array::from(vec![None, None, None, None, Some(2.0)])
871849
);
872850
}
873851
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ use crate::{
3232
kinesis::{flatten_kinesis_logs, Message},
3333
},
3434
metadata::{SchemaVersion, STREAM_INFO},
35+
option::CONFIG,
3536
storage::StreamType,
36-
utils::json::{convert_array_to_object, flatten::convert_to_array},
37+
utils::json::flatten_json_body,
3738
};
3839

3940
pub async fn flatten_and_push_logs(
@@ -69,25 +70,27 @@ pub async fn push_logs(
6970
let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?;
7071
let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?;
7172
let schema_version = STREAM_INFO.get_schema_version(stream_name)?;
72-
73+
let json_flatten_depth_limit = CONFIG.options.json_flatten_depth_limit;
7374
let data = if time_partition.is_some() || custom_partition.is_some() {
74-
convert_array_to_object(
75+
flatten_json_body(
7576
json,
7677
time_partition.as_ref(),
7778
time_partition_limit,
7879
custom_partition.as_ref(),
7980
schema_version,
8081
log_source,
82+
json_flatten_depth_limit,
8183
)?
8284
} else {
83-
vec![convert_to_array(convert_array_to_object(
85+
vec![Value::Array(flatten_json_body(
8486
json,
85-
None,
86-
None,
87-
None,
87+
time_partition.as_ref(),
88+
time_partition_limit,
89+
custom_partition.as_ref(),
8890
schema_version,
8991
log_source,
90-
)?)?]
92+
json_flatten_depth_limit,
93+
)?)]
9194
};
9295

9396
for value in data {

0 commit comments

Comments
 (0)