Skip to content

add env for max level of flattening allowed for events #1320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ pub struct Options {
help = "total number of fields recommended in a dataset"
)]
pub dataset_fields_allowed_limit: usize,

// maximum level of flattening allowed for events
// this is to prevent nested list type fields from getting created
#[arg(
long,
env = "P_MAX_FLATTEN_LEVEL",
default_value = "10",
help = "Maximum level of flattening allowed for events"
)]
pub event_flatten_level: usize,
}

#[derive(Parser, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn flatten_and_push_logs(
//custom flattening required for Amazon Kinesis
let message: Message = serde_json::from_value(json)?;
for record in flatten_kinesis_logs(message) {
push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?;
push_logs(stream_name, record, log_source, p_custom_fields).await?;
}
}
LogSource::OtelLogs => {
Expand Down
68 changes: 34 additions & 34 deletions src/utils/json/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use serde_json::value::Value;

use thiserror::Error;

use crate::parseable::PARSEABLE;

#[derive(Error, Debug)]
pub enum JsonFlattenError {
#[error("Cannot flatten this JSON")]
Expand Down Expand Up @@ -274,19 +276,31 @@ pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError>
let results = map
.iter()
.fold(vec![Map::new()], |results, (key, val)| match val {
Value::Array(arr) => arr
.iter()
.flat_map(|flatten_item| {
generic_flattening(flatten_item).unwrap_or_default()
})
.flat_map(|flattened_item| {
results.iter().map(move |result| {
let mut new_obj = result.clone();
new_obj.insert(key.clone(), flattened_item.clone());
new_obj
})
})
.collect(),
Value::Array(arr) => {
if arr.is_empty() {
// Insert empty array for this key in all current results
results
.into_iter()
.map(|mut result| {
result.insert(key.clone(), Value::Array(vec![]));
result
})
.collect()
} else {
arr.iter()
.flat_map(|flatten_item| {
generic_flattening(flatten_item).unwrap_or_default()
})
.flat_map(|flattened_item| {
results.iter().map(move |result| {
let mut new_obj = result.clone();
new_obj.insert(key.clone(), flattened_item.clone());
new_obj
})
})
.collect()
}
}
Value::Object(_) => generic_flattening(val)
.unwrap_or_default()
.iter()
Expand Down Expand Up @@ -314,21 +328,21 @@ pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError>
}

/// recursively checks the level of nesting for the serde Value
/// if Value has more than 4 levels of hierarchy, returns true
/// example -
/// if Value has more than configured `P_MAX_FLATTEN_LEVEL` levels of hierarchy, returns true
/// example - if `P_MAX_FLATTEN_LEVEL` is 4, and the JSON is
/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true
/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false
pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
if current_level > 4 {
pub fn has_more_than_max_allowed_levels(value: &Value, current_level: usize) -> bool {
if current_level > PARSEABLE.options.event_flatten_level {
return true;
}
match value {
Value::Array(arr) => arr
.iter()
.any(|item| has_more_than_four_levels(item, current_level)),
.any(|item| has_more_than_max_allowed_levels(item, current_level)),
Value::Object(map) => map
.values()
.any(|val| has_more_than_four_levels(val, current_level + 1)),
.any(|val| has_more_than_max_allowed_levels(val, current_level + 1)),
_ => false,
}
}
Expand All @@ -344,9 +358,7 @@ pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError

#[cfg(test)]
mod tests {
use crate::utils::json::flatten::{
flatten_array_objects, generic_flattening, has_more_than_four_levels,
};
use crate::utils::json::flatten::{flatten_array_objects, generic_flattening};

use super::{flatten, JsonFlattenError};
use serde_json::{json, Map, Value};
Expand Down Expand Up @@ -605,18 +617,6 @@ mod tests {
);
}

#[test]
fn unacceptable_levels_of_nested_json() {
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
assert!(has_more_than_four_levels(&value, 1));
}

#[test]
fn acceptable_levels_of_nested_json() {
let value = json!({"a":{"b":{"e":["a","b"]}}});
assert!(!has_more_than_four_levels(&value, 1));
}

#[test]
fn flatten_json() {
let value = json!({"a":{"b":{"e":["a","b"]}}});
Expand Down
104 changes: 2 additions & 102 deletions src/utils/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use std::fmt;
use std::num::NonZeroU32;

use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels};
use flatten::{convert_to_array, generic_flattening, has_more_than_max_allowed_levels};
use serde::de::Visitor;
use serde_json;
use serde_json::Value;
Expand All @@ -43,7 +43,7 @@ pub fn flatten_json_body(
) -> Result<Value, anyhow::Error> {
// Flatten the json body only if new schema and has less than 4 levels of nesting
let mut nested_value = if schema_version == SchemaVersion::V1
&& !has_more_than_four_levels(&body, 1)
&& !has_more_than_max_allowed_levels(&body, 1)
&& matches!(
log_source,
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
Expand Down Expand Up @@ -144,50 +144,9 @@ where

#[cfg(test)]
mod tests {
use crate::event::format::LogSource;

use super::*;
use serde::{Deserialize, Serialize};
use serde_json::json;

#[test]
fn hierarchical_json_flattening_success() {
let value = json!({"a":{"b":{"e":["a","b"]}}});
let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]);
assert_eq!(
flatten_json_body(
value,
None,
None,
None,
crate::metadata::SchemaVersion::V1,
false,
&LogSource::default()
)
.unwrap(),
expected
);
}

#[test]
fn hierarchical_json_flattening_failure() {
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
let expected = json!({"a_b_c_d_e": ["a","b"]});
assert_eq!(
flatten_json_body(
value,
None,
None,
None,
crate::metadata::SchemaVersion::V1,
false,
&LogSource::default()
)
.unwrap(),
expected
);
}

#[derive(Serialize, Deserialize)]
struct TestBool {
#[serde(
Expand Down Expand Up @@ -353,63 +312,4 @@ mod tests {
flattened_json
);
}

#[test]
fn arr_obj_with_nested_type_v1() {
let json = json!([
{
"a": 1,
"b": "hello",
},
{
"a": 1,
"b": "hello",
},
{
"a": 1,
"b": "hello",
"c": [{"a": 1}]
},
{
"a": 1,
"b": "hello",
"c": [{"a": 1, "b": 2}]
},
]);
let flattened_json = flatten_json_body(
json,
None,
None,
None,
SchemaVersion::V1,
false,
&crate::event::format::LogSource::default(),
)
.unwrap();

assert_eq!(
json!([
{
"a": 1,
"b": "hello",
},
{
"a": 1,
"b": "hello",
},
{
"a": 1,
"b": "hello",
"c_a": 1,
},
{
"a": 1,
"b": "hello",
"c_a": 1,
"c_b": 2,
},
]),
flattened_json
);
}
}
Loading