From 8a7132d0b7de8d0965dd3c4c6f7fae9900fdd570 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka <41027584+kkondaka@users.noreply.github.com> Date: Mon, 25 Mar 2024 11:09:01 -0700 Subject: [PATCH] Truncate Processor: Add support to truncate all fields in an event (#4317) Truncate Processor: Add support to truncate all fields in an event Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../processor/truncate/TruncateProcessor.java | 53 +++++++++++++------ .../truncate/TruncateProcessorConfig.java | 14 +++-- .../truncate/TruncateProcessorTests.java | 44 ++++++++++++--- 3 files changed, 85 insertions(+), 26 deletions(-) diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java index c9b71173ce..2063652a7c 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessor.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.opensearch.dataprepper.logging.DataPrepperMarkers.EVENT; @@ -41,14 +42,38 @@ public TruncateProcessor(final PluginMetrics pluginMetrics, final TruncateProces } private String getTruncatedValue(final String value, final int startIndex, final Integer length) { - String truncatedValue = - (length == null || startIndex+length >= value.length()) ? - value.substring(startIndex) : + String truncatedValue = + (length == null || startIndex+length >= value.length()) ? + value.substring(startIndex) : value.substring(startIndex, startIndex + length); return truncatedValue; } + private void truncateKey(Event event, String key, Object value, TruncateProcessorConfig.Entry entryConfig) { + final boolean recurse = entryConfig.getRecurse(); + final int startIndex = entryConfig.getStartAt() == null ? 0 : entryConfig.getStartAt(); + final Integer length = entryConfig.getLength(); + if (value instanceof String) { + event.put(key, getTruncatedValue((String) value, startIndex, length)); + } else if (value instanceof List) { + List result = new ArrayList<>(); + for (Object listItem : (List) value) { + if (listItem instanceof String) { + result.add(getTruncatedValue((String) listItem, startIndex, length)); + } else { + result.add(listItem); + } + } + event.put(key, result); + } else if (recurse && (value instanceof Map)) { + Map valueMap = (Map)value; + for (Map.Entry mapEntry: valueMap.entrySet()) { + truncateKey(event, key+"/"+mapEntry.getKey(), mapEntry.getValue(), entryConfig); + } + } + } + @Override public Collection> doExecute(final Collection> records) { for(final Record record : records) { @@ -58,30 +83,26 @@ public Collection> doExecute(final Collection> recor for (TruncateProcessorConfig.Entry entry : entries) { final List sourceKeys = entry.getSourceKeys(); final String truncateWhen = entry.getTruncateWhen(); + final boolean recurse = entry.getRecurse(); final int startIndex = entry.getStartAt() == null ? 0 : entry.getStartAt(); final Integer length = entry.getLength(); if (truncateWhen != null && !expressionEvaluator.evaluateConditional(truncateWhen, recordEvent)) { continue; } + if (sourceKeys == null) { + for (Map.Entry mapEntry: recordEvent.toMap().entrySet()) { + truncateKey(recordEvent, mapEntry.getKey(), mapEntry.getValue(), entry); + } + continue; + } + for (String sourceKey : sourceKeys) { if (!recordEvent.containsKey(sourceKey)) { continue; } final Object value = recordEvent.get(sourceKey, Object.class); - if (value instanceof String) { - recordEvent.put(sourceKey, getTruncatedValue((String) value, startIndex, length)); - } else if (value instanceof List) { - List result = new ArrayList<>(); - for (Object listItem : (List) value) { - if (listItem instanceof String) { - result.add(getTruncatedValue((String) listItem, startIndex, length)); - } else { - result.add(listItem); - } - } - recordEvent.put(sourceKey, result); - } + truncateKey(recordEvent, sourceKey, value, entry); } } } catch (final Exception e) { diff --git a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java index 1172da77b6..7fde949719 100644 --- a/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java +++ b/data-prepper-plugins/truncate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorConfig.java @@ -15,8 +15,6 @@ public class TruncateProcessorConfig { public static class Entry { - @NotEmpty - @NotNull @JsonProperty("source_keys") private List sourceKeys; @@ -26,14 +24,18 @@ public static class Entry { @JsonProperty("length") private Integer length; + @JsonProperty("recursive") + private Boolean recurse = false; + @JsonProperty("truncate_when") private String truncateWhen; - public Entry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) { + public Entry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final Boolean recurse) { this.sourceKeys = sourceKeys; this.startAt = startAt; this.length = length; this.truncateWhen = truncateWhen; + this.recurse = recurse; } public Entry() {} @@ -46,6 +48,10 @@ public Integer getStartAt() { return startAt; } + public Boolean getRecurse() { + return recurse; + } + public Integer getLength() { return length; } @@ -54,7 +60,7 @@ public String getTruncateWhen() { return truncateWhen; } - @AssertTrue(message = "source_keys must be specified and at least one of start_at or length or both must be specified and the values must be positive integers") + @AssertTrue(message = "At least one of start_at or length or both must be specified and the values must be positive integers") public boolean isValidConfig() { if (length == null && startAt == null) { return false; diff --git a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java index 7717181864..00af15ed63 100644 --- a/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java +++ b/data-prepper-plugins/truncate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/truncate/TruncateProcessorTests.java @@ -20,6 +20,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.google.common.collect.ImmutableMap; import java.util.Collections; import java.util.HashMap; @@ -53,7 +54,7 @@ private TruncateProcessor createObjectUnderTest() { @ArgumentsSource(TruncateArgumentsProvider.class) void testTruncateProcessor(final Object messageValue, final Integer startAt, final Integer truncateLength, final Object truncatedMessage) { - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), startAt, truncateLength, null, false))); final TruncateProcessor truncateProcessor = createObjectUnderTest(); final Record record = createEvent("message", messageValue); final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); @@ -64,8 +65,8 @@ void testTruncateProcessor(final Object messageValue, final Integer startAt, fin @ParameterizedTest @ArgumentsSource(MultipleTruncateArgumentsProvider.class) void testTruncateProcessorMultipleEntries(final Object messageValue, final Integer startAt1, final Integer truncateLength1, final Integer startAt2, final Integer truncateLength2, final Object truncatedMessage1, final Object truncatedMessage2) { - TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null); - TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null); + TruncateProcessorConfig.Entry entry1 = createEntry(List.of("message1"), startAt1, truncateLength1, null, false); + TruncateProcessorConfig.Entry entry2 = createEntry(List.of("message2"), startAt2, truncateLength2, null, false); when(config.getEntries()).thenReturn(List.of(entry1, entry2)); final Record record1 = createEvent("message1", messageValue); final Record record2 = createEvent("message2", messageValue); @@ -82,7 +83,7 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { final String truncateWhen = UUID.randomUUID().toString(); final String message = UUID.randomUUID().toString(); - when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(List.of("message"), null, 5, truncateWhen, false))); final TruncateProcessor truncateProcessor = createObjectUnderTest(); final Record record = createEvent("message", message); @@ -92,8 +93,32 @@ void test_event_is_the_same_when_truncateWhen_condition_returns_false() { assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); } - private TruncateProcessorConfig.Entry createEntry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen) { - return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen); + @Test + void test_event_with_all_fields_truncated() { + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, false))); + final TruncateProcessor truncateProcessor = createObjectUnderTest(); + final Record record = createEventWithMultipleKeys(Map.of("key1", "aaaaa12345", "key2", "bbbbb12345", "key3", "ccccccc12345")); + final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); + Event event = truncatedRecords.get(0).getData(); + assertThat(event.get("key1", String.class), equalTo("aaaaa")); + assertThat(event.get("key2", String.class), equalTo("bbbbb")); + assertThat(event.get("key3", String.class), equalTo("ccccc")); + } + + @Test + void test_event_with_all_fields_truncated_recursively() { + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry(null, null, 5, null, true))); + final TruncateProcessor truncateProcessor = createObjectUnderTest(); + final Record record = createEventWithMultipleKeys(ImmutableMap.of("key1", "aaaaa12345", "key2", ImmutableMap.of("key3", "bbbbb12345", "key4", ImmutableMap.of("key5", "ccccccc12345")))); + final List> truncatedRecords = (List>) truncateProcessor.doExecute(Collections.singletonList(record)); + Event event = truncatedRecords.get(0).getData(); + assertThat(event.get("key1", String.class), equalTo("aaaaa")); + assertThat(event.get("key2/key3", String.class), equalTo("bbbbb")); + assertThat(event.get("key2/key4/key5", String.class), equalTo("ccccc")); + } + + private TruncateProcessorConfig.Entry createEntry(final List sourceKeys, final Integer startAt, final Integer length, final String truncateWhen, final boolean recurse) { + return new TruncateProcessorConfig.Entry(sourceKeys, startAt, length, truncateWhen, recurse); } private Record createEvent(final String key, final Object value) { @@ -105,6 +130,13 @@ private Record createEvent(final String key, final Object value) { .build()); } + private Record createEventWithMultipleKeys(final Map data) { + return new Record<>(JacksonEvent.builder() + .withEventType("event") + .withData(data) + .build()); + } + static class TruncateArgumentsProvider implements ArgumentsProvider { @Override