diff --git a/data-prepper-plugins/mutate-event-processors/build.gradle b/data-prepper-plugins/mutate-event-processors/build.gradle index e4b0c63cea..96a6aef38f 100644 --- a/data-prepper-plugins/mutate-event-processors/build.gradle +++ b/data-prepper-plugins/mutate-event-processors/build.gradle @@ -24,4 +24,5 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' testImplementation project(':data-prepper-test-event') testImplementation testLibs.slf4j.simple + testImplementation testLibs.spring.test } \ No newline at end of file diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java index d5af649c84..350e71a967 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java @@ -36,6 +36,7 @@ public class MapToListProcessor extends AbstractProcessor, Record< private final MapToListProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; private final Set excludeKeySet = new HashSet<>(); + private final List configEntries; @DataPrepperPluginConstructor public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { @@ -50,6 +51,31 @@ public MapToListProcessor(final PluginMetrics pluginMetrics, final MapToListProc String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", config.getMapToListWhen())); } + + if (config.getSource() != null || config.getMapToListWhen() != null) { + MapToListProcessorConfig.Entry entry = new MapToListProcessorConfig.Entry( + config.getSource(), + config.getTarget(), + config.getKeyName(), + config.getValueName(), + config.getRemoveProcessedFields(), + config.getConvertFieldToList(), + config.getExcludeKeys(), + config.getTagsOnFailure(), + config.getMapToListWhen() + ); + configEntries = List.of(entry); + } else { + configEntries = config.getEntries(); + } + + for (MapToListProcessorConfig.Entry configEntry : configEntries) { + if (configEntry.getMapToListWhen() != null && !expressionEvaluator.isValidExpressionStatement(configEntry.getMapToListWhen())) { + throw new InvalidPluginConfigurationException( + String.format("map_to_list_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", + configEntry.getMapToListWhen())); + } + } } @Override @@ -57,73 +83,75 @@ public Collection> doExecute(final Collection> recor for (final Record record : records) { final Event recordEvent = record.getData(); - try { + for (MapToListProcessorConfig.Entry configEntry : configEntries) { + try { - if (config.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(config.getMapToListWhen(), recordEvent)) { - continue; - } + if (configEntry.getMapToListWhen() != null && !expressionEvaluator.evaluateConditional(configEntry.getMapToListWhen(), recordEvent)) { + continue; + } - try { - final Map sourceMap = getSourceMap(recordEvent); + try { + final Map sourceMap = getSourceMap(recordEvent, configEntry.getSource()); - if (config.getConvertFieldToList()) { - final List> targetNestedList = new ArrayList<>(); + if (configEntry.getConvertFieldToList()) { + final List> targetNestedList = new ArrayList<>(); - for (final Map.Entry entry : sourceMap.entrySet()) { - if (!excludeKeySet.contains(entry.getKey())) { - targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue())); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + targetNestedList.add(Arrays.asList(entry.getKey(), entry.getValue())); + } } - } - removeProcessedFields(sourceMap, recordEvent); - recordEvent.put(config.getTarget(), targetNestedList); - } else { - final List> targetList = new ArrayList<>(); - for (final Map.Entry entry : sourceMap.entrySet()) { - if (!excludeKeySet.contains(entry.getKey())) { - final Map listItem = new HashMap<>(); - listItem.put(config.getKeyName(), entry.getKey()); - listItem.put(config.getValueName(), entry.getValue()); - targetList.add(listItem); + removeProcessedFields(sourceMap, recordEvent, configEntry); + recordEvent.put(configEntry.getTarget(), targetNestedList); + } else { + final List> targetList = new ArrayList<>(); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + final Map listItem = new HashMap<>(); + listItem.put(configEntry.getKeyName(), entry.getKey()); + listItem.put(configEntry.getValueName(), entry.getValue()); + targetList.add(listItem); + } } + removeProcessedFields(sourceMap, recordEvent, configEntry); + recordEvent.put(configEntry.getTarget(), targetList); } - removeProcessedFields(sourceMap, recordEvent); - recordEvent.put(config.getTarget(), targetList); + } catch (Exception e) { + LOG.atError() + .addMarker(EVENT) + .addMarker(NOISY) + .setMessage("Fail to perform Map to List operation") + .setCause(e) + .log(); + recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure()); } - } catch (Exception e) { + } catch (final Exception e) { LOG.atError() .addMarker(EVENT) .addMarker(NOISY) - .setMessage("Fail to perform Map to List operation") + .setMessage("There was an exception while processing Event [{}]") + .addArgument(recordEvent) .setCause(e) .log(); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); + recordEvent.getMetadata().addTags(configEntry.getTagsOnFailure()); } - } catch (final Exception e) { - LOG.atError() - .addMarker(EVENT) - .addMarker(NOISY) - .setMessage("There was an exception while processing Event [{}]") - .addArgument(recordEvent) - .setCause(e) - .log(); - recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } } return records; } - private Map getSourceMap(Event recordEvent) throws JsonProcessingException { + private Map getSourceMap(Event recordEvent, String source) throws JsonProcessingException { final Map sourceMap; - sourceMap = recordEvent.get(config.getSource(), Map.class); + sourceMap = recordEvent.get(source, Map.class); return sourceMap; } - private void removeProcessedFields(Map sourceMap, Event recordEvent) { - if (!config.getRemoveProcessedFields()) { + private void removeProcessedFields(Map sourceMap, Event recordEvent, MapToListProcessorConfig.Entry configEntry) { + if (!configEntry.getRemoveProcessedFields()) { return; } - if (Objects.equals(config.getSource(), "")) { + if (Objects.equals(configEntry.getSource(), "")) { // Source is root for (final Map.Entry entry : sourceMap.entrySet()) { if (excludeKeySet.contains(entry.getKey())) { @@ -138,7 +166,7 @@ private void removeProcessedFields(Map sourceMap, Event recordEv modifiedSourceMap.put(entry.getKey(), entry.getValue()); } } - recordEvent.put(config.getSource(), modifiedSourceMap); + recordEvent.put(configEntry.getSource(), modifiedSourceMap); } } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java index b9111b6493..1ceb4f63af 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java @@ -9,14 +9,29 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import jakarta.validation.Valid; +import jakarta.validation.constraints.AssertTrue; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.annotations.ConditionalRequired; +import org.opensearch.dataprepper.model.annotations.ConditionalRequired.IfThenElse; +import org.opensearch.dataprepper.model.annotations.ConditionalRequired.SchemaProperty; import org.opensearch.dataprepper.model.annotations.ExampleValues; import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; import java.util.ArrayList; import java.util.List; +@ConditionalRequired(value = { + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "entries", value = "null")}, + thenExpect = {@SchemaProperty(field = "source")} + ), + @IfThenElse( + ifFulfilled = {@SchemaProperty(field = "source", value = "null")}, + thenExpect = {@SchemaProperty(field = "entries")} + ) +}) @JsonPropertyOrder @JsonClassDescription("The map_to_list processor converts a map of key-value pairs to a list of objects. " + "Each object contains the key and value in separate fields.") @@ -26,14 +41,101 @@ public class MapToListProcessorConfig { private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false; - @NotNull + public static class Entry { + @NotNull + @JsonProperty("source") + @JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " + + "string (\"\"), it will use the root of the event as the source.") + private String source; + + @NotEmpty + @NotNull + @JsonProperty("target") + @JsonPropertyDescription("The target for the generated list.") + private String target; + + @JsonProperty(value = "key_name", defaultValue = DEFAULT_KEY_NAME) + @JsonPropertyDescription("The name of the field in which to store the original key. Default is key.") + @ExampleValues({ + @Example(value = "og_key", description = "The original key in the map is stored in 'og_key' in the list.") + }) + private String keyName = DEFAULT_KEY_NAME; + + @JsonProperty(value = "value_name", defaultValue = DEFAULT_VALUE_NAME) + @JsonPropertyDescription("The name of the field in which to store the original value. Default is value.") + @ExampleValues({ + @Example(value = "og_value", description = "The original value in the map is stored in 'og_value' in the list.") + }) + private String valueName = DEFAULT_VALUE_NAME; + + @JsonProperty("remove_processed_fields") + @JsonPropertyDescription("When true, the processor will remove the processed fields from the source map. " + + "Default is false.") + private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS; + + @JsonProperty("convert_field_to_list") + @JsonPropertyDescription("If true, the processor will convert the fields from the source map into lists and " + + "place them in fields in the target list. Default is false.") + private boolean convertFieldToList = false; + + @JsonProperty("exclude_keys") + @JsonPropertyDescription("The keys in the source map that will be excluded from processing. Default is an " + + "empty list ([]).") + private List excludeKeys = DEFAULT_EXCLUDE_KEYS; + + @JsonProperty("tags_on_failure") + @JsonPropertyDescription("A list of tags to add to the event metadata when the event fails to process.") + private List tagsOnFailure; + + @JsonProperty("map_to_list_when") + @JsonPropertyDescription("A conditional expression, " + + "such as /some-key == \"test\", that will be evaluated to determine whether the processor will " + + "be run on the event. By default, all events will be processed unless otherwise stated.") + @ExampleValues({ + @Example(value = "/some-key == \"test\"", description = "When the key is 'test', the processor will be applied to the event.") + }) + private String mapToListWhen; + + public String getSource() { return source; } + public String getTarget() { return target; } + public String getKeyName() { return keyName; } + public String getValueName() { return valueName; } + public boolean getRemoveProcessedFields() { return removeProcessedFields; } + public boolean getConvertFieldToList() { return convertFieldToList; } + public List getExcludeKeys() { return excludeKeys; } + public List getTagsOnFailure() { return tagsOnFailure; } + public String getMapToListWhen() { return mapToListWhen; } + + public Entry(final String source, + final String target, + final String keyName, + final String valueName, + final boolean removeProcessedFields, + final boolean convertFieldToList, + final List excludeKeys, + final List tagsOnFailure, + final String mapToListWhen) { + this.source = source; + this.target = target; + this.keyName = keyName; + this.valueName = valueName; + this.removeProcessedFields = removeProcessedFields; + this.convertFieldToList = convertFieldToList; + this.excludeKeys = excludeKeys; + this.tagsOnFailure = tagsOnFailure; + this.mapToListWhen = mapToListWhen; + } + + public Entry() { + + } + } + @JsonProperty("source") @JsonPropertyDescription("The source map used to perform the mapping operation. When set to an empty " + "string (\"\"), it will use the root of the event as the source.") private String source; - @NotEmpty - @NotNull @JsonProperty("target") @JsonPropertyDescription("The target for the generated list.") private String target; @@ -80,6 +182,21 @@ public class MapToListProcessorConfig { }) private String mapToListWhen; + @Valid + @JsonProperty("entries") + @JsonPropertyDescription("A list of entries to process map-to-list operations.") + private List entries; + + @AssertTrue(message = "Must use either entries configuration or individual configuration") + boolean isUsingAtLeastOneConfiguration() { + return entries != null || source != null; + } + + @AssertTrue(message = "Cannot use both entries configuration and individual configuration together") + boolean isNotUsingBothConfigurations() { + return entries == null || source == null; + } + public String getSource() { return source; } @@ -115,4 +232,8 @@ public boolean getConvertFieldToList() { public List getTagsOnFailure() { return tagsOnFailure; } + + public List getEntries() { + return entries; + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java index 6e9717c103..8325ad9d5f 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.record.Record; +import org.springframework.test.util.ReflectionTestUtils; import java.util.ArrayList; import java.util.Arrays; @@ -404,6 +405,115 @@ public void testConvertFieldToListSuccessWithNullValuesInMap() { )); assertSourceMapUnchangedWithNullValues(resultEvent); } + @Test + void invalid_map_to_list_when_with_entries_format_throws_InvalidPluginConfigurationException() { + when(mockConfig.getSource()).thenReturn(null); + + final List entries = List.of( + new MapToListProcessorConfig.Entry( + "my-map", + "my-list", + "key", + "value", + false, + false, + new ArrayList<>(), + null, + "invalid_condition" + ) + ); + + when(mockConfig.getEntries()).thenReturn(entries); + when(expressionEvaluator.isValidExpressionStatement("invalid_condition")).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void testMultipleEntriesFormatWithDifferentConditions() { + when(mockConfig.getSource()).thenReturn(null); + + final List entries = Arrays.asList( + new MapToListProcessorConfig.Entry( + "my-map", + "target1", + "key", + "value", + false, + false, + new ArrayList<>(), + null, + "condition1" + ), + new MapToListProcessorConfig.Entry( + "my-map2", + "target2", + "key2", + "value2", + false, + false, + new ArrayList<>(), + null, + "condition2" + ) + ); + + when(mockConfig.getEntries()).thenReturn(entries); + when(expressionEvaluator.isValidExpressionStatement("condition1")).thenReturn(true); + when(expressionEvaluator.isValidExpressionStatement("condition2")).thenReturn(true); + + final Record testRecord = createTestRecordWithMultipleMaps(); + when(expressionEvaluator.evaluateConditional("condition1", testRecord.getData())).thenReturn(true); + when(expressionEvaluator.evaluateConditional("condition2", testRecord.getData())).thenReturn(false); + + final MapToListProcessor processor = createObjectUnderTest(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("target1", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + Map.of("key", "key1", "value", "value1"), + Map.of("key", "key2", "value", "value2"), + Map.of("key", "key3", "value", "value3") + )); + assertSourceMapUnchanged(resultEvent); + } + + @Test + public void test_both_configurations_used_together() { + final MapToListProcessorConfig configObjectUnderTest = new MapToListProcessorConfig(); + ReflectionTestUtils.setField(configObjectUnderTest, "source", "my-map"); + + final MapToListProcessorConfig.Entry entry = new MapToListProcessorConfig.Entry( + "my-map2", + "target2", + "key", + "value", + false, + false, + new ArrayList<>(), + null, + "condition" + ); + + ReflectionTestUtils.setField(configObjectUnderTest, "entries", List.of(entry)); + + assertThat(configObjectUnderTest.isNotUsingBothConfigurations(), is(false)); + } + + @Test + public void test_no_configuration_used() { + final MapToListProcessorConfig configObjectUnderTest = new MapToListProcessorConfig(); + + ReflectionTestUtils.setField(configObjectUnderTest, "source", null); + ReflectionTestUtils.setField(configObjectUnderTest, "entries", null); + + assertThat(configObjectUnderTest.isUsingAtLeastOneConfiguration(), is(false)); + } private MapToListProcessor createObjectUnderTest() { return new MapToListProcessor(pluginMetrics, mockConfig, expressionEvaluator); @@ -456,6 +566,23 @@ private Record createTestRecordWithNullValues() { return new Record<>(event); } + private Record createTestRecordWithMultipleMaps() { + final Map> data = Map.of( + "my-map", Map.of( + "key1", "value1", + "key2", "value2", + "key3", "value3"), + "my-map2", Map.of( + "key4", "value4", + "key5", "value5", + "key6", "value6")); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } + private void assertSourceMapUnchanged(final Event resultEvent) { assertThat(resultEvent.containsKey("my-map"), is(true)); assertThat(resultEvent.get("my-map/key1", String.class), is("value1"));