Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements to map_to_list processor #4033

Merged
merged 11 commits into from
Feb 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.Objects;
import java.util.StringJoiner;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -201,8 +200,13 @@ public <T> T get(final String key, final Class<T> clazz) {
}

private JsonNode getNode(final String key) {
final JsonPointer jsonPointer = toJsonPointer(key);
return jsonNode.at(jsonPointer);
final String jsonPointerKey;
if (key.isEmpty() || key.startsWith("/")) {
jsonPointerKey = key;
} else {
jsonPointerKey = SEPARATOR + key;
}
return jsonNode.at(jsonPointerKey);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this optimization we are doing? If yes, why not modify toJsonPointer() function? Or delete that function if that's not needed anymoer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Moved the changes to toJsonPointer() method. In the process, found some corner case issues with previous changes, addressed those and added more unit tests.

}

private <T> T mapNodeToObject(final String key, final JsonNode node, final Class<T> clazz) {
Expand Down Expand Up @@ -399,24 +403,31 @@ public static boolean isValidEventKey(final String key) {
}
private String checkAndTrimKey(final String key) {
checkKey(key);
return trimKey(key);
return trimTrailingSlashInKey(key);
}

private static void checkKey(final String key) {
checkNotNull(key, "key cannot be null");
checkArgument(!key.isEmpty(), "key cannot be an empty string");
if (key.isEmpty()) {
// Empty string key is valid
return;
}
if (key.length() > MAX_KEY_LENGTH) {
throw new IllegalArgumentException("key cannot be longer than " + MAX_KEY_LENGTH + " characters");
}
if (!isValidKey(key)) {
throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_ and must follow JsonPointer (ie. 'field/to/key')");
throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_@/ and must follow JsonPointer (ie. 'field/to/key')");
}
}

private String trimKey(final String key) {

final String trimmedLeadingSlash = key.startsWith(SEPARATOR) ? key.substring(1) : key;
return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash;
return trimTrailingSlashInKey(trimmedLeadingSlash);
}

private String trimTrailingSlashInKey(final String key) {
return key.endsWith(SEPARATOR) ? key.substring(0, key.length() - 1) : key;
}

private static boolean isValidKey(final String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,28 @@ public void testPutAndGet_withMultiLevelKeyWithADash() {
assertThat(result, is(equalTo(value)));
}

@ParameterizedTest
@ValueSource(strings = {"foo", "/foo", "/foo/", "foo/"})
void testGetAtRootLevel(final String key) {
final String value = UUID.randomUUID().toString();

event.put(key, value);
final Map<String, String> result = event.get("", Map.class);

assertThat(result, is(Map.of("foo", value)));
}

@ParameterizedTest
@ValueSource(strings = {"/foo/bar", "foo/bar", "foo/bar/"})
void testGetAtRootLevelWithMultiLevelKey(final String key) {
final String value = UUID.randomUUID().toString();

event.put(key, value);
final Map<String, String> result = event.get("", Map.class);

assertThat(result, is(Map.of("foo", Map.of("bar", value))));
}

@Test
public void testPutUpdateAndGet_withPojo() {
final String key = "foo/bar";
Expand Down Expand Up @@ -324,7 +346,7 @@ public void testIsValueAList_withNull() {
}

@ParameterizedTest
@ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
@ValueSource(strings = {"withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars",
"with,Comma", "with:Colon", "with[Bracket", "with|Brace"})
void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) {
assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey);
Expand Down
62 changes: 61 additions & 1 deletion data-prepper-plugins/mutate-event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -541,14 +541,74 @@ The processed event will have the following data:
}
```

If we enable `convert_field_to_list` option:
```yaml
...
processor:
- map_to_list:
source: "my-map"
target: "my-list"
convert_field_to_list: true
...
```
the processed event will have the following data:
```json
{
"my-list": [
["key1", "value1"],
["key2", "value2"],
["key3", "value3"]
],
"my-map": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
```

If source is set to empty string (""), it will use the event root as source.
```yaml
...
processor:
- map_to_list:
source: ""
target: "my-list"
convert_field_to_list: true
...
```
Input data like this:
```json
{
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
```
will end up with this after processing:
```json
{
"my-list": [
["key1", "value1"],
["key2", "value2"],
["key3", "value3"]
],
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
```

### Configuration
* `source` - (required): the source map to perform the operation
* `source` - (required): the source map to perform the operation; If set to empty string (""), it will use the event root as source.
* `target` - (required): the target list to put the converted list
* `key_name` - (optional): the key name of the field to hold the original key, default is "key"
* `value_name` - (optional): the key name of the field to hold the original value, default is "value"
* `exclude_keys` - (optional): the keys in source map that will be excluded from processing, default is empty list
* `remove_processed_fields` - (optional): default is false; if true, will remove processed fields from source map
* `map_to_list_when` - (optional): used to configure a condition for event processing based on certain property of the incoming event. Default is null (all events will be processed).
* `convert_field_to_list` - (optional): default to false; if true, will convert fields to lists instead of objects
* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process


## Developer Guide
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.mutateevent;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
Expand All @@ -22,11 +24,13 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

@DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class)
public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<Event>> {
private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final MapToListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Set<String> excludeKeySet = new HashSet<>();
Expand All @@ -49,36 +53,70 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
}

try {
final Map<String, Object> sourceMap = recordEvent.get(config.getSource(), Map.class);
final List<Map<String, Object>> targetList = new ArrayList<>();

Map<String, Object> modifiedSourceMap = new HashMap<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
if (config.getRemoveProcessedFields()) {
modifiedSourceMap.put(entry.getKey(), entry.getValue());
final Map<String, Object> sourceMap = getSourceMap(recordEvent);

if (config.getConvertFieldToList()) {
final List<List<Object>> targetNestedList = new ArrayList<>();

for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetNestedList.add(List.of(entry.getKey(), entry.getValue()));
}
continue;
}
targetList.add(Map.of(
config.getKeyName(), entry.getKey(),
config.getValueName(), entry.getValue()
));
}

if (config.getRemoveProcessedFields()) {
recordEvent.put(config.getSource(), modifiedSourceMap);
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetNestedList);
} else {
final List<Map<String, Object>> targetList = new ArrayList<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetList.add(Map.of(
config.getKeyName(), entry.getKey(),
config.getValueName(), entry.getValue()
));
}
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetList);
}

recordEvent.put(config.getTarget(), targetList);
} catch (Exception e) {
LOG.error("Fail to perform Map to List operation", e);
//TODO: add tagging on failure
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private Map<String, Object> getSourceMap(Event recordEvent) throws JsonProcessingException {
final Map<String, Object> sourceMap;
sourceMap = recordEvent.get(config.getSource(), Map.class);
return sourceMap;
}

private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent) {
if (!config.getRemoveProcessedFields()) {
return;
}

if (Objects.equals(config.getSource(), "")) {
// Source is root
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
continue;
}
recordEvent.delete(entry.getKey());
}
} else {
Map<String, Object> modifiedSourceMap = new HashMap<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
modifiedSourceMap.put(entry.getKey(), entry.getValue());
}
}
recordEvent.put(config.getSource(), modifiedSourceMap);
}
}

@Override
public void prepareForShutdown() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ public class MapToListProcessorConfig {
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false;

@NotEmpty
@NotNull
@JsonProperty("source")
private String source;
Expand All @@ -43,6 +42,12 @@ public class MapToListProcessorConfig {
@JsonProperty("remove_processed_fields")
private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS;

@JsonProperty("convert_field_to_list")
private boolean convertFieldToList = false;

@JsonProperty("tags_on_failure")
private List<String> tagsOnFailure;

public String getSource() {
return source;
}
Expand Down Expand Up @@ -70,4 +75,12 @@ public List<String> getExcludeKeys() {
public boolean getRemoveProcessedFields() {
return removeProcessedFields;
}

public boolean getConvertFieldToList() {
return convertFieldToList;
}

public List<String> getTagsOnFailure() {
return tagsOnFailure;
}
}
Loading
Loading