diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index cd604a1b9d..62b31ad0c5 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -136,10 +136,11 @@ public JsonNode getJsonNode() { */ @Override public void put(final String key, final Object value) { + checkArgument(!key.isEmpty(), "key cannot be an empty string for put method"); final String trimmedKey = checkAndTrimKey(key); - final LinkedList keys = new LinkedList<>(Arrays.asList(trimmedKey.split(SEPARATOR))); + final LinkedList keys = new LinkedList<>(Arrays.asList(trimmedKey.split(SEPARATOR, -1))); JsonNode parentNode = jsonNode; @@ -247,7 +248,12 @@ private List mapNodeToList(final String key, final JsonNode node, final C } private JsonPointer toJsonPointer(final String key) { - String jsonPointerExpression = SEPARATOR + key; + final String jsonPointerExpression; + if (key.isEmpty() || key.startsWith("/")) { + jsonPointerExpression = key; + } else { + jsonPointerExpression = SEPARATOR + key; + } return JsonPointer.compile(jsonPointerExpression); } @@ -259,6 +265,7 @@ private JsonPointer toJsonPointer(final String key) { @Override public void delete(final String key) { + checkArgument(!key.isEmpty(), "key cannot be an empty string for delete method"); final String trimmedKey = checkAndTrimKey(key); final int index = trimmedKey.lastIndexOf(SEPARATOR); @@ -399,24 +406,31 @@ public static boolean isValidEventKey(final String key) { } private String checkAndTrimKey(final String key) { checkKey(key); - return trimKey(key); + return trimTrailingSlashInKey(key); } private static void checkKey(final String key) { checkNotNull(key, "key cannot be null"); - checkArgument(!key.isEmpty(), "key cannot be an empty string"); + if (key.isEmpty()) { + // Empty string key is valid + return; + } if (key.length() > MAX_KEY_LENGTH) { throw new IllegalArgumentException("key cannot be longer than " + MAX_KEY_LENGTH + " characters"); } if (!isValidKey(key)) { - throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_ and must follow JsonPointer (ie. 'field/to/key')"); + throw new IllegalArgumentException("key " + key + " must contain only alphanumeric chars with .-_@/ and must follow JsonPointer (ie. 'field/to/key')"); } } private String trimKey(final String key) { final String trimmedLeadingSlash = key.startsWith(SEPARATOR) ? key.substring(1) : key; - return trimmedLeadingSlash.endsWith(SEPARATOR) ? trimmedLeadingSlash.substring(0, trimmedLeadingSlash.length() - 2) : trimmedLeadingSlash; + return trimTrailingSlashInKey(trimmedLeadingSlash); + } + + private String trimTrailingSlashInKey(final String key) { + return key.length() > 1 && key.endsWith(SEPARATOR) ? key.substring(0, key.length() - 1) : key; } private static boolean isValidKey(final String key) { diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java index bf3a320728..99ffd71259 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/event/JacksonEventTest.java @@ -23,6 +23,7 @@ import java.util.Random; import java.util.UUID; +import static org.hamcrest.CoreMatchers.containsStringIgnoringCase; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -70,7 +71,7 @@ public void testPutAndGet_withRandomString() { } @ParameterizedTest - @ValueSource(strings = {"foo", "foo-bar", "foo_bar", "foo.bar", "/foo", "/foo/", "a1K.k3-01_02"}) + @ValueSource(strings = {"/", "foo", "foo-bar", "foo_bar", "foo.bar", "/foo", "/foo/", "a1K.k3-01_02"}) void testPutAndGet_withStrings(final String key) { final UUID value = UUID.randomUUID(); @@ -81,6 +82,12 @@ void testPutAndGet_withStrings(final String key) { assertThat(result, is(equalTo(value))); } + @Test + public void testPutKeyCannotBeEmptyString() { + Throwable exception = assertThrows(IllegalArgumentException.class, () -> event.put("", "value")); + assertThat(exception.getMessage(), containsStringIgnoringCase("key cannot be an empty string")); + } + @Test public void testPutAndGet_withMultLevelKey() { final String key = "foo/bar"; @@ -126,6 +133,28 @@ public void testPutAndGet_withMultiLevelKeyWithADash() { assertThat(result, is(equalTo(value))); } + @ParameterizedTest + @ValueSource(strings = {"foo", "/foo", "/foo/", "foo/"}) + void testGetAtRootLevel(final String key) { + final String value = UUID.randomUUID().toString(); + + event.put(key, value); + final Map result = event.get("", Map.class); + + assertThat(result, is(Map.of("foo", value))); + } + + @ParameterizedTest + @ValueSource(strings = {"/foo/bar", "foo/bar", "foo/bar/"}) + void testGetAtRootLevelWithMultiLevelKey(final String key) { + final String value = UUID.randomUUID().toString(); + + event.put(key, value); + final Map result = event.get("", Map.class); + + assertThat(result, is(Map.of("foo", Map.of("bar", value)))); + } + @Test public void testPutUpdateAndGet_withPojo() { final String key = "foo/bar"; @@ -250,21 +279,9 @@ public void testOverwritingExistingKey() { assertThat(result, is(equalTo(value))); } - @Test - public void testDeletingKey() { - final String key = "foo"; - - event.put(key, UUID.randomUUID()); - event.delete(key); - final UUID result = event.get(key, UUID.class); - - assertThat(result, is(nullValue())); - } - - @Test - public void testDelete_withNestedKey() { - final String key = "foo/bar"; - + @ParameterizedTest + @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) + public void testDeleteKey(final String key) { event.put(key, UUID.randomUUID()); event.delete(key); final UUID result = event.get(key, UUID.class); @@ -272,9 +289,9 @@ public void testDelete_withNestedKey() { assertThat(result, is(nullValue())); } - @Test - public void testDelete_withNonexistentKey() { - final String key = "foo/bar"; + @ParameterizedTest + @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) + public void testDelete_withNonexistentKey(final String key) { UUID result = event.get(key, UUID.class); assertThat(result, is(nullValue())); @@ -285,19 +302,27 @@ public void testDelete_withNonexistentKey() { } @Test - public void testContainsKey_withKey() { - final String key = "foo"; - - event.put(key, UUID.randomUUID()); - assertThat(event.containsKey(key), is(true)); + public void testDeleteKeyCannotBeEmptyString() { + Throwable exception = assertThrows(IllegalArgumentException.class, () -> event.delete("")); + assertThat(exception.getMessage(), containsStringIgnoringCase("key cannot be an empty string")); } @Test - public void testContainsKey_withouthKey() { - final String key = "foo"; + public void testContainsKeyReturnsTrueForEmptyStringKey() { + assertThat(event.containsKey(""), is(true)); + } + @ParameterizedTest + @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) + public void testContainsKey_withKey(final String key) { event.put(key, UUID.randomUUID()); - assertThat(event.containsKey("bar"), is(false)); + assertThat(event.containsKey(key), is(true)); + } + + @ParameterizedTest + @ValueSource(strings = {"/", "foo", "/foo", "/foo/bar", "foo/bar", "foo/bar/", "/foo/bar/leaf/key"}) + public void testContainsKey_withouthKey(final String key) { + assertThat(event.containsKey(key), is(false)); } @Test @@ -324,7 +349,7 @@ public void testIsValueAList_withNull() { } @ParameterizedTest - @ValueSource(strings = {"", "withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars", + @ValueSource(strings = {"withSpecialChars*$%", "\\-withEscapeChars", "\\\\/withMultipleEscapeChars", "with,Comma", "with:Colon", "with[Bracket", "with|Brace"}) void testKey_withInvalidKey_throwsIllegalArgumentException(final String invalidKey) { assertThrowsForKeyCheck(IllegalArgumentException.class, invalidKey); diff --git a/data-prepper-plugins/mutate-event-processors/README.md b/data-prepper-plugins/mutate-event-processors/README.md index e22c2195df..661e8ee5dc 100644 --- a/data-prepper-plugins/mutate-event-processors/README.md +++ b/data-prepper-plugins/mutate-event-processors/README.md @@ -541,14 +541,74 @@ The processed event will have the following data: } ``` +If we enable `convert_field_to_list` option: +```yaml +... +processor: + - map_to_list: + source: "my-map" + target: "my-list" + convert_field_to_list: true +... +``` +the processed event will have the following data: +```json +{ + "my-list": [ + ["key1", "value1"], + ["key2", "value2"], + ["key3", "value3"] + ], + "my-map": { + "key1": "value1", + "key2": "value2", + "key3": "value3" + } +} +``` + +If source is set to empty string (""), it will use the event root as source. +```yaml +... +processor: + - map_to_list: + source: "" + target: "my-list" + convert_field_to_list: true +... +``` +Input data like this: +```json +{ + "key1": "value1", + "key2": "value2", + "key3": "value3" +} +``` +will end up with this after processing: +```json +{ + "my-list": [ + ["key1", "value1"], + ["key2", "value2"], + ["key3", "value3"] + ], + "key1": "value1", + "key2": "value2", + "key3": "value3" +} +``` + ### Configuration -* `source` - (required): the source map to perform the operation +* `source` - (required): the source map to perform the operation; If set to empty string (""), it will use the event root as source. * `target` - (required): the target list to put the converted list * `key_name` - (optional): the key name of the field to hold the original key, default is "key" * `value_name` - (optional): the key name of the field to hold the original value, default is "value" * `exclude_keys` - (optional): the keys in source map that will be excluded from processing, default is empty list * `remove_processed_fields` - (optional): default is false; if true, will remove processed fields from source map * `map_to_list_when` - (optional): used to configure a condition for event processing based on certain property of the incoming event. Default is null (all events will be processed). +* `convert_field_to_list` - (optional): default to false; if true, will convert fields to lists instead of objects +* `tags_on_failure` - (optional): a list of tags to add to event metadata when the event fails to process ## Developer Guide diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java index 49e74680fa..7b088f9976 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessor.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.mutateevent; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; @@ -22,11 +24,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; @DataPrepperPlugin(name = "map_to_list", pluginType = Processor.class, pluginConfigurationType = MapToListProcessorConfig.class) public class MapToListProcessor extends AbstractProcessor, Record> { private static final Logger LOG = LoggerFactory.getLogger(MapToListProcessor.class); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final MapToListProcessorConfig config; private final ExpressionEvaluator expressionEvaluator; private final Set excludeKeySet = new HashSet<>(); @@ -49,36 +53,70 @@ public Collection> doExecute(final Collection> recor } try { - final Map sourceMap = recordEvent.get(config.getSource(), Map.class); - final List> targetList = new ArrayList<>(); - - Map modifiedSourceMap = new HashMap<>(); - for (final Map.Entry entry : sourceMap.entrySet()) { - if (excludeKeySet.contains(entry.getKey())) { - if (config.getRemoveProcessedFields()) { - modifiedSourceMap.put(entry.getKey(), entry.getValue()); + final Map sourceMap = getSourceMap(recordEvent); + + if (config.getConvertFieldToList()) { + final List> targetNestedList = new ArrayList<>(); + + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + targetNestedList.add(List.of(entry.getKey(), entry.getValue())); } - continue; - } - targetList.add(Map.of( - config.getKeyName(), entry.getKey(), - config.getValueName(), entry.getValue() - )); - } - if (config.getRemoveProcessedFields()) { - recordEvent.put(config.getSource(), modifiedSourceMap); + } + removeProcessedFields(sourceMap, recordEvent); + recordEvent.put(config.getTarget(), targetNestedList); + } else { + final List> targetList = new ArrayList<>(); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (!excludeKeySet.contains(entry.getKey())) { + targetList.add(Map.of( + config.getKeyName(), entry.getKey(), + config.getValueName(), entry.getValue() + )); + } + } + removeProcessedFields(sourceMap, recordEvent); + recordEvent.put(config.getTarget(), targetList); } - - recordEvent.put(config.getTarget(), targetList); } catch (Exception e) { LOG.error("Fail to perform Map to List operation", e); - //TODO: add tagging on failure + recordEvent.getMetadata().addTags(config.getTagsOnFailure()); } } return records; } + private Map getSourceMap(Event recordEvent) throws JsonProcessingException { + final Map sourceMap; + sourceMap = recordEvent.get(config.getSource(), Map.class); + return sourceMap; + } + + private void removeProcessedFields(Map sourceMap, Event recordEvent) { + if (!config.getRemoveProcessedFields()) { + return; + } + + if (Objects.equals(config.getSource(), "")) { + // Source is root + for (final Map.Entry entry : sourceMap.entrySet()) { + if (excludeKeySet.contains(entry.getKey())) { + continue; + } + recordEvent.delete(entry.getKey()); + } + } else { + Map modifiedSourceMap = new HashMap<>(); + for (final Map.Entry entry : sourceMap.entrySet()) { + if (excludeKeySet.contains(entry.getKey())) { + modifiedSourceMap.put(entry.getKey(), entry.getValue()); + } + } + recordEvent.put(config.getSource(), modifiedSourceMap); + } + } + @Override public void prepareForShutdown() { } diff --git a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java index 3d863ea784..46a2ec79f0 100644 --- a/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java +++ b/data-prepper-plugins/mutate-event-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorConfig.java @@ -18,7 +18,6 @@ public class MapToListProcessorConfig { private static final List DEFAULT_EXCLUDE_KEYS = new ArrayList<>(); private static final boolean DEFAULT_REMOVE_PROCESSED_FIELDS = false; - @NotEmpty @NotNull @JsonProperty("source") private String source; @@ -43,6 +42,12 @@ public class MapToListProcessorConfig { @JsonProperty("remove_processed_fields") private boolean removeProcessedFields = DEFAULT_REMOVE_PROCESSED_FIELDS; + @JsonProperty("convert_field_to_list") + private boolean convertFieldToList = false; + + @JsonProperty("tags_on_failure") + private List tagsOnFailure; + public String getSource() { return source; } @@ -70,4 +75,12 @@ public List getExcludeKeys() { public boolean getRemoveProcessedFields() { return removeProcessedFields; } + + public boolean getConvertFieldToList() { + return convertFieldToList; + } + + public List getTagsOnFailure() { + return tagsOnFailure; + } } diff --git a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java index c748e676f4..83d736ba21 100644 --- a/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java +++ b/data-prepper-plugins/mutate-event-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutateevent/MapToListProcessorTest.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.UUID; @@ -48,6 +49,8 @@ void setUp() { lenient().when(mockConfig.getMapToListWhen()).thenReturn(null); lenient().when(mockConfig.getExcludeKeys()).thenReturn(new ArrayList<>()); lenient().when(mockConfig.getRemoveProcessedFields()).thenReturn(false); + lenient().when(mockConfig.getConvertFieldToList()).thenReturn(false); + lenient().when(mockConfig.getTagsOnFailure()).thenReturn(new ArrayList<>()); } @Test @@ -72,6 +75,47 @@ void testMapToListSuccessWithDefaultOptions() { assertSourceMapUnchanged(resultEvent); } + @Test + void testMapToListSuccessWithNestedMap() { + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecordWithNestedMap(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(2)); + assertThat(resultList, containsInAnyOrder( + Map.of("key", "key1", "value", "value1"), + Map.of("key", "key2", "value", Map.of("key2-1", "value2")) + )); + } + + @Test + void testMapToListSuccessWithRootAsSource() { + when(mockConfig.getSource()).thenReturn(""); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createFlatTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + Map.of("key", "key1", "value", "value1"), + Map.of("key", "key2", "value", "value2"), + Map.of("key", "key3", "value", "value3") + )); + assertSourceMapUnchangedForFlatRecord(resultEvent); + } + @Test void testMapToListSuccessWithCustomKeyNameValueName() { final String keyName = "custom-key-name"; @@ -130,6 +174,25 @@ void testExcludedKeysAreNotProcessed() { assertSourceMapUnchanged(resultEvent); } + @Test + void testExcludedKeysAreNotProcessedWithRootAsSource() { + when(mockConfig.getSource()).thenReturn(""); + when(mockConfig.getExcludeKeys()).thenReturn(List.of("key1", "key3", "key5")); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createFlatTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(1)); + assertThat(resultList.get(0), is(Map.of("key", "key2", "value", "value2"))); + assertSourceMapUnchangedForFlatRecord(resultEvent); + } + @Test void testRemoveProcessedFields() { when(mockConfig.getExcludeKeys()).thenReturn(List.of("key1", "key3", "key5")); @@ -155,6 +218,96 @@ void testRemoveProcessedFields() { assertThat(resultEvent.get("my-map/key3", String.class), is("value3")); } + @Test + void testRemoveProcessedFieldsWithRootAsSource() { + when(mockConfig.getSource()).thenReturn(""); + when(mockConfig.getExcludeKeys()).thenReturn(List.of("key1", "key3", "key5")); + when(mockConfig.getRemoveProcessedFields()).thenReturn(true); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createFlatTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(1)); + assertThat(resultList.get(0), is(Map.of("key", "key2", "value", "value2"))); + + assertThat(resultEvent.containsKey("key1"), is(true)); + assertThat(resultEvent.get("key1", String.class), is("value1")); + assertThat(resultEvent.containsKey("key2"), is(false)); + assertThat(resultEvent.containsKey("key3"), is(true)); + assertThat(resultEvent.get("key3", String.class), is("value3")); + } + + @Test + public void testConvertFieldToListSuccess() { + when(mockConfig.getConvertFieldToList()).thenReturn(true); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + List.of("key1", "value1"), + List.of("key2", "value2"), + List.of("key3", "value3") + )); + assertSourceMapUnchanged(resultEvent); + } + + @Test + public void testConvertFieldToListSuccessWithNestedMap() { + when(mockConfig.getConvertFieldToList()).thenReturn(true); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecordWithNestedMap(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(2)); + assertThat(resultList, containsInAnyOrder( + List.of("key1", "value1"), + List.of("key2", Map.of("key2-1", "value2")) + )); + } + + @Test + public void testConvertFieldToListSuccessWithRootAsSource() { + when(mockConfig.getSource()).thenReturn(""); + when(mockConfig.getConvertFieldToList()).thenReturn(true); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createFlatTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + List> resultList = resultEvent.get("my-list", List.class); + + assertThat(resultList.size(), is(3)); + assertThat(resultList, containsInAnyOrder( + List.of("key1", "value1"), + List.of("key2", "value2"), + List.of("key3", "value3") + )); + assertSourceMapUnchangedForFlatRecord(resultEvent); + } + @Test public void testEventNotProcessedWhenTheWhenConditionIsFalse() { final String whenCondition = UUID.randomUUID().toString(); @@ -172,6 +325,25 @@ public void testEventNotProcessedWhenTheWhenConditionIsFalse() { assertSourceMapUnchanged(resultEvent); } + @Test + void testFailureTagsAreAddedWhenException() { + // non-existing source key + when(mockConfig.getSource()).thenReturn("my-other-map"); + final List testTags = List.of("tag1", "tag2"); + when(mockConfig.getTagsOnFailure()).thenReturn(testTags); + + final MapToListProcessor processor = createObjectUnderTest(); + final Record testRecord = createTestRecord(); + final List> resultRecord = (List>) processor.doExecute(Collections.singletonList(testRecord)); + + assertThat(resultRecord.size(), is(1)); + + final Event resultEvent = resultRecord.get(0).getData(); + assertThat(resultEvent.containsKey("my-list"), is(false)); + assertSourceMapUnchanged(resultEvent); + assertThat(resultEvent.getMetadata().getTags(), is(new HashSet<>(testTags))); + } + private MapToListProcessor createObjectUnderTest() { return new MapToListProcessor(pluginMetrics, mockConfig, expressionEvaluator); } @@ -188,10 +360,39 @@ private Record createTestRecord() { return new Record<>(event); } + private Record createFlatTestRecord() { + final Map data = Map.of( + "key1", "value1", + "key2", "value2", + "key3", "value3"); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } + + private Record createTestRecordWithNestedMap() { + final Map> data = Map.of("my-map", Map.of( + "key1", "value1", + "key2", Map.of("key2-1", "value2"))); + final Event event = JacksonEvent.builder() + .withData(data) + .withEventType("event") + .build(); + return new Record<>(event); + } + private void assertSourceMapUnchanged(final Event resultEvent) { assertThat(resultEvent.containsKey("my-map"), is(true)); assertThat(resultEvent.get("my-map/key1", String.class), is("value1")); assertThat(resultEvent.get("my-map/key2", String.class), is("value2")); assertThat(resultEvent.get("my-map/key3", String.class), is("value3")); } + + private void assertSourceMapUnchangedForFlatRecord(final Event resultEvent) { + assertThat(resultEvent.get("key1", String.class), is("value1")); + assertThat(resultEvent.get("key2", String.class), is("value2")); + assertThat(resultEvent.get("key3", String.class), is("value3")); + } }