Skip to content

Commit b03f745

Browse files
add env for max level of flattening allowed for events
env `P_MAX_FLATTEN_LEVEL` to control the maximum level of flattening allowed default to 10 this is to ensure nested list type fields do not get created eg. with current implementation of hard coded level of 4, field gets created with data type - ``` { "name": "Records_resources_accountId", "data_type": { "List": { "name": "item", "data_type": { "List": { "name": "item", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } }, "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ``` after this change, data type changes to - ``` { "name": "Records_resources_accountId", "data_type": "Utf8", "nullable": true, "dict_id": 0, "dict_is_ordered": false, "metadata": {} } ```
1 parent 26422df commit b03f745

File tree

4 files changed

+22
-24
lines changed

4 files changed

+22
-24
lines changed

src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ pub struct Options {
378378
help = "total number of fields recommended in a dataset"
379379
)]
380380
pub dataset_fields_allowed_limit: usize,
381+
382+
// maximum level of flattening allowed for events
383+
// this is to prevent nested list type fields from getting created
384+
#[arg(
385+
long,
386+
env = "P_MAX_FLATTEN_LEVEL",
387+
default_value = "10",
388+
help = "Maximum level of flattening allowed for events"
389+
)]
390+
pub event_flatten_level: usize,
381391
}
382392

383393
#[derive(Parser, Debug)]

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub async fn flatten_and_push_logs(
6262
//custom flattening required for Amazon Kinesis
6363
let message: Message = serde_json::from_value(json)?;
6464
for record in flatten_kinesis_logs(message) {
65-
push_logs(stream_name, record, &LogSource::default(), p_custom_fields).await?;
65+
push_logs(stream_name, record, log_source, p_custom_fields).await?;
6666
}
6767
}
6868
LogSource::OtelLogs => {

src/utils/json/flatten.rs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ use serde_json::value::Value;
2525

2626
use thiserror::Error;
2727

28+
use crate::parseable::PARSEABLE;
29+
2830
#[derive(Error, Debug)]
2931
pub enum JsonFlattenError {
3032
#[error("Cannot flatten this JSON")]
@@ -314,21 +316,21 @@ pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError>
314316
}
315317

316318
/// recursively checks the level of nesting for the serde Value
317-
/// if Value has more than 4 levels of hierarchy, returns true
318-
/// example -
319+
/// if Value has more than configured `P_MAX_FLATTEN_LEVEL` levels of hierarchy, returns true
320+
/// example - if `P_MAX_FLATTEN_LEVEL` is 4, and the JSON is
319321
/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true
320322
/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false
321-
pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
322-
if current_level > 4 {
323+
pub fn has_more_than_max_allowed_levels(value: &Value, current_level: usize) -> bool {
324+
if current_level > PARSEABLE.options.event_flatten_level {
323325
return true;
324326
}
325327
match value {
326328
Value::Array(arr) => arr
327329
.iter()
328-
.any(|item| has_more_than_four_levels(item, current_level)),
330+
.any(|item| has_more_than_max_allowed_levels(item, current_level)),
329331
Value::Object(map) => map
330332
.values()
331-
.any(|val| has_more_than_four_levels(val, current_level + 1)),
333+
.any(|val| has_more_than_max_allowed_levels(val, current_level + 1)),
332334
_ => false,
333335
}
334336
}
@@ -344,9 +346,7 @@ pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError
344346

345347
#[cfg(test)]
346348
mod tests {
347-
use crate::utils::json::flatten::{
348-
flatten_array_objects, generic_flattening, has_more_than_four_levels,
349-
};
349+
use crate::utils::json::flatten::{flatten_array_objects, generic_flattening};
350350

351351
use super::{flatten, JsonFlattenError};
352352
use serde_json::{json, Map, Value};
@@ -605,18 +605,6 @@ mod tests {
605605
);
606606
}
607607

608-
#[test]
609-
fn unacceptable_levels_of_nested_json() {
610-
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
611-
assert!(has_more_than_four_levels(&value, 1));
612-
}
613-
614-
#[test]
615-
fn acceptable_levels_of_nested_json() {
616-
let value = json!({"a":{"b":{"e":["a","b"]}}});
617-
assert!(!has_more_than_four_levels(&value, 1));
618-
}
619-
620608
#[test]
621609
fn flatten_json() {
622610
let value = json!({"a":{"b":{"e":["a","b"]}}});

src/utils/json/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use std::fmt;
2020
use std::num::NonZeroU32;
2121

22-
use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels};
22+
use flatten::{convert_to_array, generic_flattening, has_more_than_max_allowed_levels};
2323
use serde::de::Visitor;
2424
use serde_json;
2525
use serde_json::Value;
@@ -43,7 +43,7 @@ pub fn flatten_json_body(
4343
) -> Result<Value, anyhow::Error> {
4444
// Flatten the json body only if new schema and has less than 4 levels of nesting
4545
let mut nested_value = if schema_version == SchemaVersion::V1
46-
&& !has_more_than_four_levels(&body, 1)
46+
&& !has_more_than_max_allowed_levels(&body, 1)
4747
&& matches!(
4848
log_source,
4949
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis

0 commit comments

Comments
 (0)