diff --git a/src/cli.rs b/src/cli.rs index 2bd86c5a4..26cab2e95 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -378,6 +378,16 @@ pub struct Options { help = "total number of fields recommended in a dataset" )] pub dataset_fields_allowed_limit: usize, + + // maximum level of flattening allowed for events + // this is to prevent nested list type fields from getting created + #[arg( + long, + env = "P_MAX_FLATTEN_LEVEL", + default_value = "10", + help = "Maximum level of flattening allowed for events" + )] + pub event_flatten_level: usize, } #[derive(Parser, Debug)] diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 6cffb6d99..04efcbba4 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -62,7 +62,7 @@ pub async fn flatten_and_push_logs( //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?; + push_logs(stream_name, record, log_source, p_custom_fields).await?; } } LogSource::OtelLogs => { diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 06f1e7201..11b41beb5 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -25,6 +25,8 @@ use serde_json::value::Value; use thiserror::Error; +use crate::parseable::PARSEABLE; + #[derive(Error, Debug)] pub enum JsonFlattenError { #[error("Cannot flatten this JSON")] @@ -274,19 +276,31 @@ pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> let results = map .iter() .fold(vec![Map::new()], |results, (key, val)| match val { - Value::Array(arr) => arr - .iter() - .flat_map(|flatten_item| { - generic_flattening(flatten_item).unwrap_or_default() - }) - .flat_map(|flattened_item| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), flattened_item.clone()); - new_obj - }) - }) - .collect(), + Value::Array(arr) => { + if arr.is_empty() { + // Insert empty array for this key in all current results + results + .into_iter() + .map(|mut result| { + result.insert(key.clone(), Value::Array(vec![])); + result + }) + .collect() + } else { + arr.iter() + .flat_map(|flatten_item| { + generic_flattening(flatten_item).unwrap_or_default() + }) + .flat_map(|flattened_item| { + results.iter().map(move |result| { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_obj + }) + }) + .collect() + } + } Value::Object(_) => generic_flattening(val) .unwrap_or_default() .iter() @@ -314,21 +328,21 @@ pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> } /// recursively checks the level of nesting for the serde Value -/// if Value has more than 4 levels of hierarchy, returns true -/// example - +/// if Value has more than configured `P_MAX_FLATTEN_LEVEL` levels of hierarchy, returns true +/// example - if `P_MAX_FLATTEN_LEVEL` is 4, and the JSON is /// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true /// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false -pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool { - if current_level > 4 { +pub fn has_more_than_max_allowed_levels(value: &Value, current_level: usize) -> bool { + if current_level > PARSEABLE.options.event_flatten_level { return true; } match value { Value::Array(arr) => arr .iter() - .any(|item| has_more_than_four_levels(item, current_level)), + .any(|item| has_more_than_max_allowed_levels(item, current_level)), Value::Object(map) => map .values() - .any(|val| has_more_than_four_levels(val, current_level + 1)), + .any(|val| has_more_than_max_allowed_levels(val, current_level + 1)), _ => false, } } @@ -344,9 +358,7 @@ pub fn convert_to_array(flattened: Vec) -> Result Result { // Flatten the json body only if new schema and has less than 4 levels of nesting let mut nested_value = if schema_version == SchemaVersion::V1 - && !has_more_than_four_levels(&body, 1) + && !has_more_than_max_allowed_levels(&body, 1) && matches!( log_source, LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis @@ -144,50 +144,9 @@ where #[cfg(test)] mod tests { - use crate::event::format::LogSource; - use super::*; use serde::{Deserialize, Serialize}; use serde_json::json; - - #[test] - fn hierarchical_json_flattening_success() { - let value = json!({"a":{"b":{"e":["a","b"]}}}); - let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]); - assert_eq!( - flatten_json_body( - value, - None, - None, - None, - crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() - ) - .unwrap(), - expected - ); - } - - #[test] - fn hierarchical_json_flattening_failure() { - let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); - let expected = json!({"a_b_c_d_e": ["a","b"]}); - assert_eq!( - flatten_json_body( - value, - None, - None, - None, - crate::metadata::SchemaVersion::V1, - false, - &LogSource::default() - ) - .unwrap(), - expected - ); - } - #[derive(Serialize, Deserialize)] struct TestBool { #[serde( @@ -353,63 +312,4 @@ mod tests { flattened_json ); } - - #[test] - fn arr_obj_with_nested_type_v1() { - let json = json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1}] - }, - { - "a": 1, - "b": "hello", - "c": [{"a": 1, "b": 2}] - }, - ]); - let flattened_json = flatten_json_body( - json, - None, - None, - None, - SchemaVersion::V1, - false, - &crate::event::format::LogSource::default(), - ) - .unwrap(); - - assert_eq!( - json!([ - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - }, - { - "a": 1, - "b": "hello", - "c_a": 1, - }, - { - "a": 1, - "b": "hello", - "c_a": 1, - "c_b": 2, - }, - ]), - flattened_json - ); - } }