From 27e39d2ab6a9b73883af8e5d55457462012213c1 Mon Sep 17 00:00:00 2001 From: Finn <67562851+finnroblin@users.noreply.github.com> Date: Mon, 26 Sep 2022 19:36:57 -0400 Subject: [PATCH] Parse JSON Processor: README, more testing, support for JSON Pointer (#1696) Signed-off-by: Finn Roblin --- .../parse-json-processor/README.md | 64 +++++++++ .../parse-json-processor/build.gradle | 1 + .../parsejson/ParseJsonProcessor.java | 69 +++++++++- .../parsejson/ParseJsonProcessorConfig.java | 26 ++++ .../ParseJsonProcessorConfigTest.java | 56 ++++++++ .../parsejson/ParseJsonProcessorTest.java | 126 +++++++++++++++++- 6 files changed, 339 insertions(+), 3 deletions(-) create mode 100644 data-prepper-plugins/parse-json-processor/README.md create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java diff --git a/data-prepper-plugins/parse-json-processor/README.md b/data-prepper-plugins/parse-json-processor/README.md new file mode 100644 index 0000000000..debbeb3f95 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/README.md @@ -0,0 +1,64 @@ +# Parse JSON Processor +This is a processor that takes in an Event and parses its JSON data, including any nested fields. +## Basic Usage +To get started, create the following `pipelines.yaml`. +```yaml +parse-json-pipeline: + source: + stdin: + processor: + - json: + sink: + - stdout: +``` +#### Basic Example: +If you wish to test the JSON Processor with the above config then you may find the following example useful. +Run the pipeline and paste the following line into your console, and then enter `exit` on a new line. +``` +{"outer_key": {"inner_key": "inner_value"}} +``` + +The processor will parse the message into the following: +``` +{"message": {"outer_key": {"inner_key": "inner_value"}}", "outer_key":{"inner_key":"inner_value"}}} +``` +#### Example with JSON Pointer: +If you wish to parse a selection of the JSON data, you can specify a JSON Pointer using the `pointer` option in the configuration. +The following configuration file and example demonstrates a basic pointer use case. +```yaml +parse-json-pipeline: + source: + stdin: + processor: + - json: + pointer: "outer_key/inner_key" + sink: + - stdout: +``` +Run the pipeline and paste the following line into your console, and then enter `exit` on a new line. +``` +{"outer_key": {"inner_key": "inner_value"}} +``` + +The processor will parse the message into the following: +``` +{"message": {"outer_key": {"inner_key": "inner_value"}}", "inner_key": "inner_value"} +``` +## Configuration +* `source` (Optional) — The field in the `Event` that will be parsed. + * Default: `message` + +* `destination` (Optional) — The destination field of the parsed JSON. Defaults to the root of the `Event`. + * Defaults to writing to the root of the `Event` (The processor will write to root when `destination` is `null`). + * Cannot be `""`, `/`, or any whitespace-only `String` because these are not valid `Event` fields. + +* `pointer` (Optional) — A JSON Pointer to the field to be parsed. + * There is no `pointer` by default, meaning the entire `source` is parsed. + * The `pointer` can access JSON Array indices as well. + * If the JSON Pointer is invalid then the entire `source` data is parsed into the outgoing `Event`. + * If the pointed-to key already exists in the `Event` and the `destination` is the root, then the entire path of the key will be used. + +## Developer Guide +This plugin is compatible with Java 8 and up. See +- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md) +- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md) diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index e8bf5805b0..27c478a586 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:common') implementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation project(':data-prepper-test-common') } test { diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java b/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java index 1635d7c335..c6f5c8d39b 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessor.java @@ -9,6 +9,7 @@ import com.amazon.dataprepper.model.annotations.DataPrepperPlugin; import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor; import com.amazon.dataprepper.model.event.Event; +import com.amazon.dataprepper.model.event.JacksonEvent; import com.amazon.dataprepper.model.processor.AbstractProcessor; import com.amazon.dataprepper.model.processor.Processor; import com.amazon.dataprepper.model.record.Record; @@ -18,7 +19,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -29,25 +33,33 @@ public class ParseJsonProcessor extends AbstractProcessor, Record< private final String source; private final String destination; + private final String pointer; + @DataPrepperPluginConstructor public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig) { super(pluginMetrics); source = parseJsonProcessorConfig.getSource(); destination = parseJsonProcessorConfig.getDestination(); + pointer = parseJsonProcessorConfig.getPointer(); } @Override public Collection> doExecute(final Collection> records) { final ObjectMapper objectMapper = new ObjectMapper(); final boolean doWriteToRoot = Objects.isNull(destination); + final boolean doUsePointer = Objects.nonNull(pointer); for (final Record record : records) { final Event event = record.getData(); final String message = event.get(source, String.class); try { final TypeReference> hashMapTypeReference = new TypeReference>() {}; - final Map parsedJson = objectMapper.readValue(message, hashMapTypeReference); + Map parsedJson = objectMapper.readValue(message, hashMapTypeReference); + + if (doUsePointer) { + parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot); + } if (doWriteToRoot) { writeToRoot(event, parsedJson); @@ -76,9 +88,62 @@ public void shutdown() { } + private Map parseUsingPointer(final Event event, final Map parsedJson, final String pointer, + final boolean doWriteToRoot) { + final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build(); + temporaryEvent.put(source, parsedJson); + + final String trimmedPointer = trimPointer(pointer); + final String actualPointer = source + "/" + trimmedPointer; + + final boolean pointerIsValid = temporaryEvent.containsKey(actualPointer); + if (!pointerIsValid) { + LOG.error("Writing entire JSON because the pointer {} is invalid on Event {}", pointer, event); + return parsedJson; + } + + final Object valueAtPointer = temporaryEvent.get(actualPointer, Object.class); + final String endOfPointer = getEndOfPointer(trimmedPointer); + + final boolean shouldUseEntirePointerAsKey = event.containsKey(endOfPointer) && doWriteToRoot; + if (shouldUseEntirePointerAsKey) { + return Collections.singletonMap(normalizePointerStructure(trimmedPointer), valueAtPointer); + } + + return Collections.singletonMap(normalizePointerStructure(endOfPointer), valueAtPointer); + } + + private String getEndOfPointer(final String trimmedPointer) { + final ArrayList elements = new ArrayList<>(Arrays.asList(trimmedPointer.split("/"))); + if (elements.size() <= 1) return trimmedPointer; + + final boolean lastElementInPathIsAnArrayIndex = elements.get(elements.size()-1).matches("[0-9]+"); + + if (lastElementInPathIsAnArrayIndex) { + final String lastTwoElements = elements.get(elements.size() - 2) + "/" + elements.get(elements.size() - 1); + return lastTwoElements; + } + + return elements.get(elements.size()-1); + } + + /** + * Trim the pointer and change each front slash / to be a dot (.) to proccess + * @param pointer + * @return + */ + private String normalizePointerStructure(final String pointer) { + return pointer.replace('/','.'); + } + + private String trimPointer(String pointer) { + final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer; + return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash; + } + private void writeToRoot(final Event event, final Map parsedJson) { for (Map.Entry entry : parsedJson.entrySet()) { event.put(entry.getKey(), entry.getValue()); } } -} +} \ No newline at end of file diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java index c4d2ccb2e0..1d4373a3c5 100644 --- a/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java +++ b/data-prepper-plugins/parse-json-processor/src/main/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfig.java @@ -6,8 +6,11 @@ package com.amazon.dataprepper.plugins.processor.parsejson; import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotBlank; +import java.util.Objects; + public class ParseJsonProcessorConfig { static final String DEFAULT_SOURCE = "message"; @@ -18,6 +21,9 @@ public class ParseJsonProcessorConfig { @JsonProperty("destination") private String destination; + @JsonProperty("pointer") + private String pointer; + /** * The field of the Event that contains the JSON data. * @@ -36,4 +42,24 @@ public String getSource() { public String getDestination() { return destination; } + + /** + * An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination. + * There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing + * key in the Event then the absolute path of the target key will be placed into destination + * + * Note: (should this be configurable/what about double conflicts?) + * @return String representing JSON Pointer + */ + public String getPointer() { + return pointer; + } + + @AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)") + boolean isValidDestination() { + if (Objects.isNull(destination)) return true; + + final String trimmedDestination = destination.trim(); + return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/")); + } } \ No newline at end of file diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java new file mode 100644 index 0000000000..63a9d37a90 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorConfigTest.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.processor.parsejson; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import static com.amazon.dataprepper.plugins.processor.parsejson.ParseJsonProcessorConfig.DEFAULT_SOURCE; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import static com.amazon.dataprepper.test.helper.ReflectivelySetField.setField; + +public class ParseJsonProcessorConfigTest { + + private ParseJsonProcessorConfig createObjectUnderTest() { + return new ParseJsonProcessorConfig(); + } + + @Test + public void test_when_defaultParseJsonProcessorConfig_then_returns_default_values() { + final ParseJsonProcessorConfig objectUnderTest = createObjectUnderTest(); + + assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE)); + assertThat(objectUnderTest.getDestination(), equalTo(null)); + assertThat(objectUnderTest.getPointer(), equalTo(null)); + } + + @Nested + class Validation { + final ParseJsonProcessorConfig config = createObjectUnderTest(); + + @Test + void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse() + throws NoSuchFieldException, IllegalAccessException { + setField(ParseJsonProcessorConfig.class, config, "destination", "good destination"); + + assertThat(config.isValidDestination(), equalTo(true)); + + setField(ParseJsonProcessorConfig.class, config, "destination", ""); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseJsonProcessorConfig.class, config, "destination", " "); + + assertThat(config.isValidDestination(), equalTo(false)); + + setField(ParseJsonProcessorConfig.class, config, "destination", " / "); + + assertThat(config.isValidDestination(), equalTo(false)); + } + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java index f8fd8fd6f7..7526b0e11a 100644 --- a/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java +++ b/data-prepper-plugins/parse-json-processor/src/test/java/com/amazon/dataprepper/plugins/processor/parsejson/ParseJsonProcessorTest.java @@ -15,6 +15,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,6 +44,7 @@ void setup() { ParseJsonProcessorConfig defaultConfig = new ParseJsonProcessorConfig(); when(processorConfig.getSource()).thenReturn(defaultConfig.getSource()); when(processorConfig.getDestination()).thenReturn(defaultConfig.getDestination()); + when(processorConfig.getPointer()).thenReturn(defaultConfig.getPointer()); parseJsonProcessor = createObjectUnderTest(); } @@ -130,6 +132,128 @@ void test_when_deeplyNestedFieldInKey_then_canReachDeepestLayer() { assertThat(parsedEvent.get(jsonPointerToValue, String.class), equalTo("value")); } + @Test + void test_when_nestedJSONArray_then_parsedIntoArrayAndIndicesAccessible() { + final String key = "key"; + final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); + final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}"; + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); + final String pointerToFirstElement = key + "/0"; + assertThat(parsedEvent.get(pointerToFirstElement, String.class), equalTo(value.get(0))); + } + + @Test + void test_when_nestedJSONArrayOfJSON_then_parsedIntoArrayAndIndicesAccessible() { + final String key = "key"; + final ArrayList> value = new ArrayList<>(List.of(Collections.singletonMap("key0","value0"), + Collections.singletonMap("key1","value1"))); + final String jsonArray = "{\"key\":[{\"key0\":\"value0\"},{\"key1\":\"value1\"}]}"; + + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + assertThat(parsedEvent.get(key, ArrayList.class), equalTo(value)); + + final String pointerToInternalValue = key + "/0/key0"; + assertThat(parsedEvent.get(pointerToInternalValue, String.class), equalTo("value0")); + } + + @Test + void test_when_nestedJSONArrayOfJSONAndPointer_then_parsedIntoValue() { + final String pointer = "/key/0/key0"; + when(processorConfig.getPointer()).thenReturn(pointer); + parseJsonProcessor = createObjectUnderTest(); + + final ArrayList> value = new ArrayList<>(List.of(Collections.singletonMap("key0","value0"), + Collections.singletonMap("key1","value1"))); + final String jsonArray = "{\"key\":[{\"key0\":\"value0\"},{\"key1\":\"value1\"}]}"; + + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + + assertThat(parsedEvent.get("key0", String.class), equalTo("value0")); + assertThat(parsedEvent.containsKey("key1"),equalTo(false)); + } + + @Test + void test_when_nestedJSONArrayAndIndexPointer_then_parsedIntoArrayAndIndicesAccessible() { + final String pointer = "/key/0/"; + when(processorConfig.getPointer()).thenReturn(pointer); + parseJsonProcessor = createObjectUnderTest(); + final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); + final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}"; + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThat(parsedEvent.containsKey(processorConfig.getSource()), equalTo(true)); + assertThat(parsedEvent.get("key.0", String.class), equalTo(value.get(0))); + } + + @Test + void test_when_pointerKeyAlreadyPresentInEvent_then_usesAbsolutePath() { + final String pointer = "/log/s3/"; + when(processorConfig.getPointer()).thenReturn(pointer); + parseJsonProcessor = createObjectUnderTest(); + final Map s3Data = Collections.singletonMap("bucket","sampleBucket"); + final Map data = new HashMap<>(); + data.put("message", "{\"log\": {\"s3\": {\"data\":\"sample data\"}}}"); + data.put("s3", s3Data); + + Record record = buildRecordWithEvent(data); + final Event parsedEvent = ((List>) parseJsonProcessor.doExecute(Collections.singletonList(record))) + .get(0).getData(); + + assertThatKeyEquals(parsedEvent, "message", data.get("message")); + assertThatKeyEquals(parsedEvent, "s3", data.get("s3")); + + assertThatKeyEquals(parsedEvent, "log.s3", Collections.singletonMap("data", "sample data")); + } + + @Test + void test_when_nestedDestinationField_then_writesToNestedDestination() { + final String destination = "/destination/nested"; + when(processorConfig.getDestination()).thenReturn(destination); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + final Map data = Collections.singletonMap("key", "value"); + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + final String location = destination + "/key"; + + assertThat(parsedEvent.get(location, String.class), equalTo("value")); + assertThat(parsedEvent.get(destination, Map.class), equalTo(data)); + } + + @Test + void test_when_invalidPointer_then_logsErrorAndParsesEntireEvent() { + final String pointer = "key/10000"; + when(processorConfig.getPointer()).thenReturn(pointer); + parseJsonProcessor = createObjectUnderTest(); // need to recreate so that new config options are used + + final ArrayList value = new ArrayList<>(List.of("Element0","Element1","Element2")); + final String jsonArray = "{\"key\":[\"Element0\",\"Element1\",\"Element2\"]}"; + + final Event parsedEvent = createAndParseMessageEvent(jsonArray); + + assertThatKeyEquals(parsedEvent, processorConfig.getSource(), jsonArray); + assertThatKeyEquals(parsedEvent, "key", value); + } + + + + @Test + void test_when_multipleChildren_then_allAreParsedOut() { + final Map data = Collections.singletonMap("key", "{inner1:value1,inner2:value2}"); + final String serializedMessage = convertMapToJSONString(data); + final Event parsedEvent = createAndParseMessageEvent(serializedMessage); + + assertThatKeyEquals(parsedEvent, "key/inner1", "value1"); + assertThatKeyEquals(parsedEvent, "key/inner2", "value2"); + } + private String constructDeeplyNestedJsonPointer(final int numberOfLayers) { String pointer = "/" + DEEPLY_NESTED_KEY_NAME; for (int layer = 0; layer < numberOfLayers; layer++) { @@ -205,4 +329,4 @@ private void assertThatFirstMapIsSubsetOfSecondMap(final Map sub assertThat(secondMap.get(key), equalTo(value)); } } -} +} \ No newline at end of file