Skip to content

Commit

Permalink
Parse JSON Processor: README, more testing, support for JSON Pointer (#…
Browse files Browse the repository at this point in the history
…1696)

Signed-off-by: Finn Roblin <[email protected]>
  • Loading branch information
finnroblin authored Sep 26, 2022
1 parent 6d35560 commit 27e39d2
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 3 deletions.
64 changes: 64 additions & 0 deletions data-prepper-plugins/parse-json-processor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Parse JSON Processor
This is a processor that takes in an Event and parses its JSON data, including any nested fields.
## Basic Usage
To get started, create the following `pipelines.yaml`.
```yaml
parse-json-pipeline:
source:
stdin:
processor:
- json:
sink:
- stdout:
```
#### Basic Example:
If you wish to test the JSON Processor with the above config then you may find the following example useful.
Run the pipeline and paste the following line into your console, and then enter `exit` on a new line.
```
{"outer_key": {"inner_key": "inner_value"}}
```
The processor will parse the message into the following:
```
{"message": {"outer_key": {"inner_key": "inner_value"}}", "outer_key":{"inner_key":"inner_value"}}}
```
#### Example with JSON Pointer:
If you wish to parse a selection of the JSON data, you can specify a JSON Pointer using the `pointer` option in the configuration.
The following configuration file and example demonstrates a basic pointer use case.
```yaml
parse-json-pipeline:
source:
stdin:
processor:
- json:
pointer: "outer_key/inner_key"
sink:
- stdout:
```
Run the pipeline and paste the following line into your console, and then enter `exit` on a new line.
```
{"outer_key": {"inner_key": "inner_value"}}
```

The processor will parse the message into the following:
```
{"message": {"outer_key": {"inner_key": "inner_value"}}", "inner_key": "inner_value"}
```
## Configuration
* `source` (Optional) — The field in the `Event` that will be parsed.
* Default: `message`

* `destination` (Optional) — The destination field of the parsed JSON. Defaults to the root of the `Event`.
* Defaults to writing to the root of the `Event` (The processor will write to root when `destination` is `null`).
* Cannot be `""`, `/`, or any whitespace-only `String` because these are not valid `Event` fields.

* `pointer` (Optional) — A JSON Pointer to the field to be parsed.
* There is no `pointer` by default, meaning the entire `source` is parsed.
* The `pointer` can access JSON Array indices as well.
* If the JSON Pointer is invalid then the entire `source` data is parsed into the outgoing `Event`.
* If the pointed-to key already exists in the `Event` and the `destination` is the root, then the entire path of the key will be used.

## Developer Guide
This plugin is compatible with Java 8 and up. See
- [CONTRIBUTING](https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md)
- [monitoring](https://github.com/opensearch-project/data-prepper/blob/main/docs/monitoring.md)
1 change: 1 addition & 0 deletions data-prepper-plugins/parse-json-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:common')
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-common')
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.amazon.dataprepper.model.annotations.DataPrepperPlugin;
import com.amazon.dataprepper.model.annotations.DataPrepperPluginConstructor;
import com.amazon.dataprepper.model.event.Event;
import com.amazon.dataprepper.model.event.JacksonEvent;
import com.amazon.dataprepper.model.processor.AbstractProcessor;
import com.amazon.dataprepper.model.processor.Processor;
import com.amazon.dataprepper.model.record.Record;
Expand All @@ -18,7 +19,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -29,25 +33,33 @@ public class ParseJsonProcessor extends AbstractProcessor<Record<Event>, Record<

private final String source;
private final String destination;
private final String pointer;

@DataPrepperPluginConstructor
public ParseJsonProcessor(final PluginMetrics pluginMetrics, final ParseJsonProcessorConfig parseJsonProcessorConfig) {
super(pluginMetrics);

source = parseJsonProcessorConfig.getSource();
destination = parseJsonProcessorConfig.getDestination();
pointer = parseJsonProcessorConfig.getPointer();
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
final ObjectMapper objectMapper = new ObjectMapper();
final boolean doWriteToRoot = Objects.isNull(destination);
final boolean doUsePointer = Objects.nonNull(pointer);

for (final Record<Event> record : records) {
final Event event = record.getData();
final String message = event.get(source, String.class);
try {
final TypeReference<HashMap<String, Object>> hashMapTypeReference = new TypeReference<HashMap<String, Object>>() {};
final Map<String, Object> parsedJson = objectMapper.readValue(message, hashMapTypeReference);
Map<String, Object> parsedJson = objectMapper.readValue(message, hashMapTypeReference);

if (doUsePointer) {
parsedJson = parseUsingPointer(event, parsedJson, pointer, doWriteToRoot);
}

if (doWriteToRoot) {
writeToRoot(event, parsedJson);
Expand Down Expand Up @@ -76,9 +88,62 @@ public void shutdown() {

}

private Map<String, Object> parseUsingPointer(final Event event, final Map<String, Object> parsedJson, final String pointer,
final boolean doWriteToRoot) {
final Event temporaryEvent = JacksonEvent.builder().withEventType("event").build();
temporaryEvent.put(source, parsedJson);

final String trimmedPointer = trimPointer(pointer);
final String actualPointer = source + "/" + trimmedPointer;

final boolean pointerIsValid = temporaryEvent.containsKey(actualPointer);
if (!pointerIsValid) {
LOG.error("Writing entire JSON because the pointer {} is invalid on Event {}", pointer, event);
return parsedJson;
}

final Object valueAtPointer = temporaryEvent.get(actualPointer, Object.class);
final String endOfPointer = getEndOfPointer(trimmedPointer);

final boolean shouldUseEntirePointerAsKey = event.containsKey(endOfPointer) && doWriteToRoot;
if (shouldUseEntirePointerAsKey) {
return Collections.singletonMap(normalizePointerStructure(trimmedPointer), valueAtPointer);
}

return Collections.singletonMap(normalizePointerStructure(endOfPointer), valueAtPointer);
}

private String getEndOfPointer(final String trimmedPointer) {
final ArrayList<String> elements = new ArrayList<>(Arrays.asList(trimmedPointer.split("/")));
if (elements.size() <= 1) return trimmedPointer;

final boolean lastElementInPathIsAnArrayIndex = elements.get(elements.size()-1).matches("[0-9]+");

if (lastElementInPathIsAnArrayIndex) {
final String lastTwoElements = elements.get(elements.size() - 2) + "/" + elements.get(elements.size() - 1);
return lastTwoElements;
}

return elements.get(elements.size()-1);
}

/**
* Trim the pointer and change each front slash / to be a dot (.) to proccess
* @param pointer
* @return
*/
private String normalizePointerStructure(final String pointer) {
return pointer.replace('/','.');
}

private String trimPointer(String pointer) {
final String trimmedLeadingSlash = pointer.startsWith("/") ? pointer.substring(1) : pointer;
return trimmedLeadingSlash.endsWith("/") ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 1) : trimmedLeadingSlash;
}

private void writeToRoot(final Event event, final Map<String, Object> parsedJson) {
for (Map.Entry<String, Object> entry : parsedJson.entrySet()) {
event.put(entry.getKey(), entry.getValue());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package com.amazon.dataprepper.plugins.processor.parsejson;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotBlank;

import java.util.Objects;

public class ParseJsonProcessorConfig {
static final String DEFAULT_SOURCE = "message";

Expand All @@ -18,6 +21,9 @@ public class ParseJsonProcessorConfig {
@JsonProperty("destination")
private String destination;

@JsonProperty("pointer")
private String pointer;

/**
* The field of the Event that contains the JSON data.
*
Expand All @@ -36,4 +42,24 @@ public String getSource() {
public String getDestination() {
return destination;
}

/**
* An optional setting used to specify a JSON Pointer. Pointer points to the JSON key that will be parsed into the destination.
* There is no pointer by default, meaning that the entirety of source will be parsed. If the target key would overwrite an existing
* key in the Event then the absolute path of the target key will be placed into destination
*
* Note: (should this be configurable/what about double conflicts?)
* @return String representing JSON Pointer
*/
public String getPointer() {
return pointer;
}

@AssertTrue(message = "destination cannot be empty, whitespace, or a front slash (/)")
boolean isValidDestination() {
if (Objects.isNull(destination)) return true;

final String trimmedDestination = destination.trim();
return trimmedDestination.length() != 0 && !(trimmedDestination.equals("/"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.processor.parsejson;

import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import static com.amazon.dataprepper.plugins.processor.parsejson.ParseJsonProcessorConfig.DEFAULT_SOURCE;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import static com.amazon.dataprepper.test.helper.ReflectivelySetField.setField;

public class ParseJsonProcessorConfigTest {

private ParseJsonProcessorConfig createObjectUnderTest() {
return new ParseJsonProcessorConfig();
}

@Test
public void test_when_defaultParseJsonProcessorConfig_then_returns_default_values() {
final ParseJsonProcessorConfig objectUnderTest = createObjectUnderTest();

assertThat(objectUnderTest.getSource(), equalTo(DEFAULT_SOURCE));
assertThat(objectUnderTest.getDestination(), equalTo(null));
assertThat(objectUnderTest.getPointer(), equalTo(null));
}

@Nested
class Validation {
final ParseJsonProcessorConfig config = createObjectUnderTest();

@Test
void test_when_destinationIsWhiteSpaceOrFrontSlash_then_isValidDestinationFalse()
throws NoSuchFieldException, IllegalAccessException {
setField(ParseJsonProcessorConfig.class, config, "destination", "good destination");

assertThat(config.isValidDestination(), equalTo(true));

setField(ParseJsonProcessorConfig.class, config, "destination", "");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseJsonProcessorConfig.class, config, "destination", " ");

assertThat(config.isValidDestination(), equalTo(false));

setField(ParseJsonProcessorConfig.class, config, "destination", " / ");

assertThat(config.isValidDestination(), equalTo(false));
}
}
}
Loading

0 comments on commit 27e39d2

Please sign in to comment.