Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple entries of source + target + map_to_list_when condition in map_to_list processor #5418

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<
private final MapToListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Set<String> excludeKeySet = new HashSet<>();
private final List<MapToListProcessorConfig.Entry> configEntries;

@DataPrepperPluginConstructor
public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -50,80 +51,107 @@ public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProc
String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
config.getMapToListWhen()));
}

if (config.getSource() != null || config.getMapToListWhen() != null) {
MapToListProcessorConfig.Entry entry = new MapToListProcessorConfig.Entry(
config.getSource(),
config.getTarget(),
config.getKeyName(),
config.getValueName(),
config.getRemoveProcessedFields(),
config.getConvertFieldToList(),
config.getExcludeKeys(),
config.getTagsOnFailure(),
config.getMapToListWhen()
);
configEntries = List.of(entry);
} else {
configEntries = config.getEntries();
}

for (MapToListProcessorConfig.Entry configEntry : configEntries) {
if (configEntry.getMapToListWhen() != null && !expressionEvaluator.isValidExpressionStatement(configEntry.getMapToListWhen())) {
throw new InvalidPluginConfigurationException(
String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
configEntry.getMapToListWhen()));
}
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
for (MapToListProcessorConfig.Entry configEntry : configEntries) {
try {

if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) {
continue;
}
if (configEntry.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(configEntry.getMapToListWhen(), recordEvent)) {
continue;
}

try {
final Map<String, Object> sourceMap = getSourceMap(recordEvent);
try {
final Map<String, Object> sourceMap = getSourceMap(recordEvent, configEntry.getSource());

if (config.getConvertFieldToList()) {
final List<List<Object>> targetNestedList = new ArrayList<>();
if (configEntry.getConvertFieldToList()) {
final List<List<Object>> targetNestedList = new ArrayList<>();

for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue()));
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue()));
}
}
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetNestedList);
} else {
final List<Map<String, Object>> targetList = new ArrayList<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
final Map<String, Object> listItem = new HashMap<>();
listItem.put(config.getKeyName(), entry.getKey());
listItem.put(config.getValueName(), entry.getValue());
targetList.add(listItem);
removeProcessedFields(sourceMap, recordEvent, configEntry);
recordEvent.put(configEntry.getTarget(), targetNestedList);
} else {
final List<Map<String, Object>> targetList = new ArrayList<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
final Map<String, Object> listItem = new HashMap<>();
listItem.put(configEntry.getKeyName(), entry.getKey());
listItem.put(configEntry.getValueName(), entry.getValue());
targetList.add(listItem);
}
}
removeProcessedFields(sourceMap, recordEvent, configEntry);
recordEvent.put(configEntry.getTarget(), targetList);
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetList);
} catch (Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Fail to perform Map to List operation")
.setCause(e)
.log();
recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure());
}
} catch (Exception e) {
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Fail to perform Map to List operation")
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private Map<String, Object> getSourceMap(Event recordEvent) throws JsonProcessingException {
private Map<String, Object> getSourceMap(Event recordEvent, String source) throws JsonProcessingException {
final Map<String, Object> sourceMap;
sourceMap = recordEvent.get(config.getSource(), Map.class);
sourceMap = recordEvent.get(source, Map.class);
return sourceMap;
}

private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent) {
if (!config.getRemoveProcessedFields()) {
private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent, MapToListProcessorConfig.Entry configEntry) {
if (!configEntry.getRemoveProcessedFields()) {
return;
}

if (Objects.equals(config.getSource(), "")) {
if (Objects.equals(configEntry.getSource(), "")) {
// Source is root
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
Expand All @@ -138,7 +166,7 @@ private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEv
modifiedSourceMap.put(entry.getKey(), entry.getValue());
}
}
recordEvent.put(config.getSource(), modifiedSourceMap);
recordEvent.put(configEntry.getSource(), modifiedSourceMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;

import java.util.ArrayList;
import java.util.List;

@ConditionalRequired(value = {
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null")},
thenExpect = {@SchemaProperty(field = "source")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "source", value = "null")},
thenExpect = {@SchemaProperty(field = "entries")}
)
})
@JsonPropertyOrder
@JsonClassDescription("The <code>map_to_list</code> processor converts a map of key-value pairs to a list of objects. " +
"Each object contains the key and value in separate fields.")
Expand All @@ -26,14 +41,101 @@ public class MapToListProcessorConfig {
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false;

@NotNull
public static class Entry {
@NotNull
@JsonProperty("source")
@JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " +
"string (<code>\"\"</code>), it will use the root of the event as the <code>source</code>.")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
@JsonPropertyDescription("The target for the generated list.")
private String target;

@JsonProperty(value = "key_name", defaultValue = DEFAULT_KEY_NAME)
@JsonPropertyDescription("The name of the field in which to store the original key. Default is <code>key</code>.")
@ExampleValues({
@Example(value = "og_key", description = "The original key in the map is stored in 'og_key' in the list.")
})
private String keyName = DEFAULT_KEY_NAME;

@JsonProperty(value = "value_name", defaultValue = DEFAULT_VALUE_NAME)
@JsonPropertyDescription("The name of the field in which to store the original value. Default is <code>value</code>.")
@ExampleValues({
@Example(value = "og_value", description = "The original value in the map is stored in 'og_value' in the list.")
})
private String valueName = DEFAULT_VALUE_NAME;

@JsonProperty("remove_processed_fields")
@JsonPropertyDescription("When <code>true</code>, the processor will remove the processed fields from the source map. " +
"Default is <code>false</code>.")
private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS;

@JsonProperty("convert_field_to_list")
@JsonPropertyDescription("If <code>true</code>, the processor will convert the fields from the source map into lists and " +
"place them in fields in the target list. Default is <code>false</code>.")
private boolean convertFieldToList = false;

@JsonProperty("exclude_keys")
@JsonPropertyDescription("The keys in the source map that will be excluded from processing. Default is an " +
"empty list (<code>[]</code>).")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("tags_on_failure")
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
private List<String> tagsOnFailure;

@JsonProperty("map_to_list_when")
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will " +
"be run on the event. By default, all events will be processed unless otherwise stated.")
@ExampleValues({
@Example(value = "/some-key == \"test\"", description = "When the key is 'test', the processor will be applied to the event.")
})
private String mapToListWhen;

public String getSource() { return source; }
public String getTarget() { return target; }
public String getKeyName() { return keyName; }
public String getValueName() { return valueName; }
public boolean getRemoveProcessedFields() { return removeProcessedFields; }
public boolean getConvertFieldToList() { return convertFieldToList; }
public List<String> getExcludeKeys() { return excludeKeys; }
public List<String> getTagsOnFailure() { return tagsOnFailure; }
public String getMapToListWhen() { return mapToListWhen; }

public Entry(final String source,
final String target,
final String keyName,
final String valueName,
final boolean removeProcessedFields,
final boolean convertFieldToList,
final List<String> excludeKeys,
final List<String> tagsOnFailure,
final String mapToListWhen) {
this.source = source;
this.target = target;
this.keyName = keyName;
this.valueName = valueName;
this.removeProcessedFields = removeProcessedFields;
this.convertFieldToList = convertFieldToList;
this.excludeKeys = excludeKeys;
this.tagsOnFailure = tagsOnFailure;
this.mapToListWhen = mapToListWhen;
}

public Entry() {

}
}

@JsonProperty("source")
@JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " +
"string (<code>\"\"</code>), it will use the root of the event as the <code>source</code>.")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
@JsonPropertyDescription("The target for the generated list.")
private String target;
Expand Down Expand Up @@ -80,6 +182,21 @@ public class MapToListProcessorConfig {
})
private String mapToListWhen;

@Valid
@JsonProperty("entries")
@JsonPropertyDescription("A list of entries to process map-to-list operations.")
niketan16 marked this conversation as resolved.
Show resolved Hide resolved
private List<Entry> entries;

@AssertTrue(message = "Must use either entries configuration or individual configuration")
boolean isUsingAtLeastOneConfiguration() {
return entries != null || source != null;
}

@AssertTrue(message = "Cannot use both entries configuration and individual configuration together")
boolean isNotUsingBothConfigurations() {
return entries == null || source == null;
}

public String getSource() {
return source;
}
Expand Down Expand Up @@ -115,4 +232,8 @@ public boolean getConvertFieldToList() {
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public List<Entry> getEntries() {
return entries;
}
}
Loading
Loading