Skip to content

Commit

Permalink
Add truncate string processor
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Jan 8, 2024
1 parent f19de03 commit 379c6ae
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.Processor;

/**
* This processor takes in a key and changes its value to a string with the leading and trailing spaces trimmed.
* If the value is not a string, no action is performed.
*/
@DataPrepperPlugin(name = "truncate_string", pluginType = Processor.class, pluginConfigurationType = TruncateStringProcessorConfig.class)
public class TruncateStringProcessor extends AbstractStringProcessor<TruncateStringProcessorConfig.Entry> {
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public TruncateStringProcessor(final PluginMetrics pluginMetrics, final TruncateStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, config);
this.expressionEvaluator = expressionEvaluator;
}

@Override
protected void performKeyAction(final Event recordEvent, final TruncateStringProcessorConfig.Entry entry, final String value) {
if (entry.getTruncateWhen() != null && !expressionEvaluator.evaluateConditional(entry.getTruncateWhen(), recordEvent)) {
return;
}
recordEvent.put(entry.getSource(), value.substring(0, entry.getLength()));
}

@Override
protected String getKey(final TruncateStringProcessorConfig.Entry entry) {
return entry.getSource();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;

import java.util.List;

public class TruncateStringProcessorConfig implements StringProcessorConfig<TruncateStringProcessorConfig.Entry> {
public static class Entry {

@NotEmpty
@NotNull
private String source;

@JsonProperty("length")
@NotNull
private int length;

@JsonProperty("truncate_when")
private String truncateWhen;

public String getSource() {
return source;
}

public int getLength() {
return length;
}

public String getTruncateWhen() { return truncateWhen; }

public Entry(final String source, final int length, final String truncateWhen) {
this.source = source;
this.length = length;
this.truncateWhen = truncateWhen;
}

public Entry() {}
}

@Override
@JsonIgnore
public List<Entry> getIterativeConfig() {
return entries;
}

private List<@Valid Entry> entries;

public List<Entry> getEntries() {
return entries;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class TruncateStringProcessorTests {

@Mock
private PluginMetrics pluginMetrics;

@Mock
private TruncateStringProcessorConfig config;

@Mock
private ExpressionEvaluator expressionEvaluator;

private TruncateStringProcessor createObjectUnderTest() {
return new TruncateStringProcessor(pluginMetrics, config, expressionEvaluator);
}

@ParameterizedTest
@ArgumentsSource(TruncateStringArgumentsProvider.class)
void testTruncateStringProcessor(final String message, final int truncateLength, final String truncatedMessage) {

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", truncateLength, null)));

final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest();
final Record<Event> record = createEvent(message);
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateStringProcessor.doExecute(Collections.singletonList(record));
assertThat(truncatedRecords.get(0).getData().get("message", Object.class), notNullValue());
assertThat(truncatedRecords.get(0).getData().get("message", Object.class), equalTo(truncatedMessage));
}

public void testLengthNotDefinedThrowsError() {
when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", null, null)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", null, null)));

assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest());
}

@Test
void test_event_is_the_same_when_truncateWhen_condition_returns_false() {
final String truncateWhen = UUID.randomUUID().toString();
final String message = UUID.randomUUID().toString();

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", 5, truncateWhen)));

final TruncateStringProcessor truncateStringProcessor = createObjectUnderTest();
final Record<Event> record = createEvent(message);
when(expressionEvaluator.evaluateConditional(truncateWhen, record.getData())).thenReturn(false);
final List<Record<Event>> truncatedRecords = (List<Record<Event>>) truncateStringProcessor.doExecute(Collections.singletonList(record));

assertThat(truncatedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
}


private TruncateStringProcessorConfig.Entry createEntry(final String source, final Integer length, final String truncateWhen) {
return new TruncateStringProcessorConfig.Entry(source, length, truncateWhen);
}

private Record<Event> createEvent(final String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put("message", message);
return new Record<>(JacksonEvent.builder()
.withEventType("event")
.withData(eventData)
.build());
}

static class TruncateStringArgumentsProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.arguments("hello,world,no-truncate", 100, "hello,world,no-truncate"),
Arguments.arguments("hello,world,truncate", 11, "hello,world"),
Arguments.arguments("hello,world", 1, "h"),
Arguments.arguments("hello", 0, "")
);
}
}

}

0 comments on commit 379c6ae

Please sign in to comment.