Skip to content

Commit

Permalink
Add 'Entry' class in MapToListProcessorConfig to support multiple ent…
Browse files Browse the repository at this point in the history
…ries format. And modify MapToListProcessor to convert the existing config to entries format and use entry objects for executing processor task.

Signed-off-by: Niketan Chandarana <[email protected]>
  • Loading branch information
niketan16 committed Feb 5, 2025
1 parent b181a8d commit c699113
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 45 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/mutate-event-processors/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation project(':data-prepper-test-event')
testImplementation testLibs.slf4j.simple
testImplementation testLibs.spring.test
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MapToListProcessor extends AbstractProcessor<Record<Event>, Record<
private final MapToListProcessorConfig config;
private final ExpressionEvaluator expressionEvaluator;
private final Set<String> excludeKeySet = new HashSet<>();
private final List<MapToListProcessorConfig.Entry> configEntries;

@DataPrepperPluginConstructor
public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
Expand All @@ -50,80 +51,107 @@ public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProc
String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
config.getMapToListWhen()));
}

if (config.getSource() != null || config.getMapToListWhen() != null) {
MapToListProcessorConfig.Entry entry = new MapToListProcessorConfig.Entry(
config.getSource(),
config.getTarget(),
config.getKeyName(),
config.getValueName(),
config.getRemoveProcessedFields(),
config.getConvertFieldToList(),
config.getExcludeKeys(),
config.getTagsOnFailure(),
config.getMapToListWhen()
);
configEntries = List.of(entry);
} else {
configEntries = config.getEntries();
}

for (MapToListProcessorConfig.Entry configEntry : configEntries) {
if (configEntry.getMapToListWhen() != null && !expressionEvaluator.isValidExpressionStatement(configEntry.getMapToListWhen())) {
throw new InvalidPluginConfigurationException(
String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax",
configEntry.getMapToListWhen()));
}
}
}

@Override
public Collection<Record<Event>> doExecute(final Collection<Record<Event>> records) {
for (final Record<Event> record : records) {
final Event recordEvent = record.getData();

try {
for (MapToListProcessorConfig.Entry configEntry : configEntries) {
try {

if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) {
continue;
}
if (configEntry.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(configEntry.getMapToListWhen(), recordEvent)) {
continue;
}

try {
final Map<String, Object> sourceMap = getSourceMap(recordEvent);
try {
final Map<String, Object> sourceMap = getSourceMap(recordEvent, configEntry.getSource());

if (config.getConvertFieldToList()) {
final List<List<Object>> targetNestedList = new ArrayList<>();
if (configEntry.getConvertFieldToList()) {
final List<List<Object>> targetNestedList = new ArrayList<>();

for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue()));
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue()));
}
}
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetNestedList);
} else {
final List<Map<String, Object>> targetList = new ArrayList<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
final Map<String, Object> listItem = new HashMap<>();
listItem.put(config.getKeyName(), entry.getKey());
listItem.put(config.getValueName(), entry.getValue());
targetList.add(listItem);
removeProcessedFields(sourceMap, recordEvent, configEntry);
recordEvent.put(configEntry.getTarget(), targetNestedList);
} else {
final List<Map<String, Object>> targetList = new ArrayList<>();
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (!excludeKeySet.contains(entry.getKey())) {
final Map<String, Object> listItem = new HashMap<>();
listItem.put(configEntry.getKeyName(), entry.getKey());
listItem.put(configEntry.getValueName(), entry.getValue());
targetList.add(listItem);
}
}
removeProcessedFields(sourceMap, recordEvent, configEntry);
recordEvent.put(configEntry.getTarget(), targetList);
}
removeProcessedFields(sourceMap, recordEvent);
recordEvent.put(config.getTarget(), targetList);
} catch (Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Fail to perform Map to List operation")
.setCause(e)
.log();
recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure());
}
} catch (Exception e) {
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("Fail to perform Map to List operation")
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure());
}
} catch (final Exception e) {
LOG.atError()
.addMarker(EVENT)
.addMarker(NOISY)
.setMessage("There was an exception while processing Event [{}]")
.addArgument(recordEvent)
.setCause(e)
.log();
recordEvent.getMetadata().addTags(config.getTagsOnFailure());
}
}
return records;
}

private Map<String, Object> getSourceMap(Event recordEvent) throws JsonProcessingException {
private Map<String, Object> getSourceMap(Event recordEvent, String source) throws JsonProcessingException {
final Map<String, Object> sourceMap;
sourceMap = recordEvent.get(config.getSource(), Map.class);
sourceMap = recordEvent.get(source, Map.class);
return sourceMap;
}

private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent) {
if (!config.getRemoveProcessedFields()) {
private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEvent, MapToListProcessorConfig.Entry configEntry) {
if (!configEntry.getRemoveProcessedFields()) {
return;
}

if (Objects.equals(config.getSource(), "")) {
if (Objects.equals(configEntry.getSource(), "")) {
// Source is root
for (final Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (excludeKeySet.contains(entry.getKey())) {
Expand All @@ -138,7 +166,7 @@ private void removeProcessedFields(Map<String, Object> sourceMap, Event recordEv
modifiedSourceMap.put(entry.getKey(), entry.getValue());
}
}
recordEvent.put(config.getSource(), modifiedSourceMap);
recordEvent.put(configEntry.getSource(), modifiedSourceMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse;
import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty;
import org.opensearch.dataprepper.model.annotations.ExampleValues;
import org.opensearch.dataprepper.model.annotations.ExampleValues.Example;

import java.util.ArrayList;
import java.util.List;

@ConditionalRequired(value = {
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "entries", value = "null")},
thenExpect = {@SchemaProperty(field = "source")}
),
@IfThenElse(
ifFulfilled = {@SchemaProperty(field = "source", value = "null")},
thenExpect = {@SchemaProperty(field = "entries")}
)
})
@JsonPropertyOrder
@JsonClassDescription("The <code>map_to_list</code> processor converts a map of key-value pairs to a list of objects. " +
"Each object contains the key and value in separate fields.")
Expand All @@ -26,14 +41,101 @@ public class MapToListProcessorConfig {
private static final List<String> DEFAULT_EXCLUDE_KEYS = new ArrayList<>();
private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false;

@NotNull
public static class Entry {
@NotNull
@JsonProperty("source")
@JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " +
"string (<code>\"\"</code>), it will use the root of the event as the <code>source</code>.")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
@JsonPropertyDescription("The target for the generated list.")
private String target;

@JsonProperty(value = "key_name", defaultValue = DEFAULT_KEY_NAME)
@JsonPropertyDescription("The name of the field in which to store the original key. Default is <code>key</code>.")
@ExampleValues({
@Example(value = "og_key", description = "The original key in the map is stored in 'og_key' in the list.")
})
private String keyName = DEFAULT_KEY_NAME;

@JsonProperty(value = "value_name", defaultValue = DEFAULT_VALUE_NAME)
@JsonPropertyDescription("The name of the field in which to store the original value. Default is <code>value</code>.")
@ExampleValues({
@Example(value = "og_value", description = "The original value in the map is stored in 'og_value' in the list.")
})
private String valueName = DEFAULT_VALUE_NAME;

@JsonProperty("remove_processed_fields")
@JsonPropertyDescription("When <code>true</code>, the processor will remove the processed fields from the source map. " +
"Default is <code>false</code>.")
private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS;

@JsonProperty("convert_field_to_list")
@JsonPropertyDescription("If <code>true</code>, the processor will convert the fields from the source map into lists and " +
"place them in fields in the target list. Default is <code>false</code>.")
private boolean convertFieldToList = false;

@JsonProperty("exclude_keys")
@JsonPropertyDescription("The keys in the source map that will be excluded from processing. Default is an " +
"empty list (<code>[]</code>).")
private List<String> excludeKeys = DEFAULT_EXCLUDE_KEYS;

@JsonProperty("tags_on_failure")
@JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.")
private List<String> tagsOnFailure;

@JsonProperty("map_to_list_when")
@JsonPropertyDescription("A <a href=\"https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/\">conditional expression</a>, " +
"such as <code>/some-key == \"test\"</code>, that will be evaluated to determine whether the processor will " +
"be run on the event. By default, all events will be processed unless otherwise stated.")
@ExampleValues({
@Example(value = "/some-key == \"test\"", description = "When the key is 'test', the processor will be applied to the event.")
})
private String mapToListWhen;

public String getSource() { return source; }
public String getTarget() { return target; }
public String getKeyName() { return keyName; }
public String getValueName() { return valueName; }
public boolean getRemoveProcessedFields() { return removeProcessedFields; }
public boolean getConvertFieldToList() { return convertFieldToList; }
public List<String> getExcludeKeys() { return excludeKeys; }
public List<String> getTagsOnFailure() { return tagsOnFailure; }
public String getMapToListWhen() { return mapToListWhen; }

public Entry(final String source,
final String target,
final String keyName,
final String valueName,
final boolean removeProcessedFields,
final boolean convertFieldToList,
final List<String> excludeKeys,
final List<String> tagsOnFailure,
final String mapToListWhen) {
this.source = source;
this.target = target;
this.keyName = keyName;
this.valueName = valueName;
this.removeProcessedFields = removeProcessedFields;
this.convertFieldToList = convertFieldToList;
this.excludeKeys = excludeKeys;
this.tagsOnFailure = tagsOnFailure;
this.mapToListWhen = mapToListWhen;
}

public Entry() {

}
}

@JsonProperty("source")
@JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " +
"string (<code>\"\"</code>), it will use the root of the event as the <code>source</code>.")
private String source;

@NotEmpty
@NotNull
@JsonProperty("target")
@JsonPropertyDescription("The target for the generated list.")
private String target;
Expand Down Expand Up @@ -80,6 +182,21 @@ public class MapToListProcessorConfig {
})
private String mapToListWhen;

@Valid
@JsonProperty("entries")
@JsonPropertyDescription("A list of entries to process map-to-list operations.")
private List<Entry> entries;

@AssertTrue(message = "Must use either entries configuration or individual configuration")
boolean isUsingAtLeastOneConfiguration() {
return entries != null || source != null;
}

@AssertTrue(message = "Cannot use both entries configuration and individual configuration together")
boolean isNotUsingBothConfigurations() {
return entries == null || source == null;
}

public String getSource() {
return source;
}
Expand Down Expand Up @@ -115,4 +232,8 @@ public boolean getConvertFieldToList() {
public List<String> getTagsOnFailure() {
return tagsOnFailure;
}

public List<Entry> getEntries() {
return entries;
}
}
Loading

0 comments on commit c699113

Please sign in to comment.