Skip to content

Commit

Permalink
Add csv_when parameter to the csv processor
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Feb 23, 2024
1 parent 1aede50 commit 0ea2ed2
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.dataprepper.plugins.processor.csv;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -42,11 +44,23 @@ public class CsvProcessor extends AbstractProcessor<Record<Event>, Record<Event>

private final CsvProcessorConfig config;

private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public CsvProcessor(final PluginMetrics pluginMetrics, final CsvProcessorConfig config) {
public CsvProcessor(final PluginMetrics pluginMetrics,
final CsvProcessorConfig config,
final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics);
this.csvInvalidEventsCounter = pluginMetrics.counter(CSV_INVALID_EVENTS);
this.config = config;
this.expressionEvaluator = expressionEvaluator;

if (config.getCsvWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(config.getCsvWhen())) {
throw new InvalidPluginConfigurationException(
String.format("csv_when value of %s is not a valid expression statement. " +
"See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", config.getCsvWhen()));
}
}

@Override
Expand All @@ -59,6 +73,11 @@ public Collection<Record<Event>> doExecute(final Collection<Record<Event>> recor
final Event event = record.getData();

try {

if (config.getCsvWhen() != null && !expressionEvaluator.evaluateConditional(config.getCsvWhen(), event)) {
continue;
}

final String message = event.get(config.getSource(), String.class);

if (Objects.isNull(message)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public class CsvProcessorConfig {
@JsonProperty("column_names")
private List<String> columnNames;

@JsonProperty("csv_when")
private String csvWhen;

/**
* The field of the Event that contains the CSV data to be processed.
*
Expand Down Expand Up @@ -93,6 +96,8 @@ public List<String> getColumnNames() {
return columnNames;
}

public String getCsvWhen() { return csvWhen; }

@AssertTrue(message = "delimiter must be exactly one character.")
boolean isValidDelimiter() {
return delimiter.length() == 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.dataprepper.plugins.processor.csv;

import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand All @@ -20,6 +24,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField;

@ExtendWith(MockitoExtension.class)
public class CsvProcessorIT {
private static final String PLUGIN_NAME = "csv";
private static final String TEST_PIPELINE_NAME = "test_pipeline";
Expand All @@ -28,6 +33,9 @@ public class CsvProcessorIT {
private CsvProcessor csvProcessor;
private VpcFlowLogTypeGenerator vpcFlowLogTypeGenerator;

@Mock
private ExpressionEvaluator expressionEvaluator;

@BeforeEach
void setup() {
csvProcessorConfig = new CsvProcessorConfig();
Expand All @@ -39,7 +47,7 @@ void setup() {

PluginMetrics pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME);

csvProcessor = new CsvProcessor(pluginMetrics, csvProcessorConfig);
csvProcessor = new CsvProcessor(pluginMetrics, csvProcessorConfig, expressionEvaluator);
vpcFlowLogTypeGenerator = new VpcFlowLogTypeGenerator();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,18 @@

package org.opensearch.dataprepper.plugins.processor.csv;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.record.Record;
import io.micrometer.core.instrument.Counter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -25,6 +27,7 @@

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
Expand All @@ -39,6 +42,9 @@ class CsvProcessorTest {
@Mock
private Counter csvInvalidEventsCounter;

@Mock
private ExpressionEvaluator expressionEvaluator;

private CsvProcessor csvProcessor;

@BeforeEach
Expand All @@ -57,14 +63,22 @@ void setup() {
}

private CsvProcessor createObjectUnderTest() {
return new CsvProcessor(pluginMetrics, processorConfig);
return new CsvProcessor(pluginMetrics, processorConfig, expressionEvaluator);
}

@Test
void do_nothing_when_source_is_null_value_or_does_not_exist_in_the_Event() {

final Record<Event> eventUnderTest = createMessageEvent("");

final String csvWhen = UUID.randomUUID().toString();
when(processorConfig.getCsvWhen()).thenReturn(csvWhen);
when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(true);
when(expressionEvaluator.evaluateConditional(csvWhen, eventUnderTest.getData())).thenReturn(true);
when(processorConfig.getSource()).thenReturn(UUID.randomUUID().toString());

csvProcessor = createObjectUnderTest();


final List<Record<Event>> editedEvents = (List<Record<Event>>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest));
final Event parsedEvent = getSingleEvent(editedEvents);
Expand Down Expand Up @@ -328,6 +342,35 @@ void test_when_invalidEvent_then_metricIncrementedAndNoException() {
assertThat(parsedEvent.containsKey("column3"), equalTo(false));
}

@Test
void invalid_csv_when_throws_InvalidPluginConfigurationException() {
final String csvWhen = UUID.randomUUID().toString();

when(processorConfig.getCsvWhen()).thenReturn(csvWhen);
when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
void do_nothing_when_expression_evaluation_returns_false_for_event() {
final String csvWhen = UUID.randomUUID().toString();

when(processorConfig.getCsvWhen()).thenReturn(csvWhen);
when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(true);

final Record<Event> eventUnderTest = createMessageEvent("");
when(expressionEvaluator.evaluateConditional(csvWhen, eventUnderTest.getData())).thenReturn(false);

csvProcessor = createObjectUnderTest();


final List<Record<Event>> editedEvents = (List<Record<Event>>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest));
final Event parsedEvent = getSingleEvent(editedEvents);

assertThat(parsedEvent, equalTo(eventUnderTest.getData()));
}

private Record<Event> createMessageEvent(final String message) {
final Map<String, Object> eventData = new HashMap<>();
eventData.put("message",message);
Expand Down

0 comments on commit 0ea2ed2

Please sign in to comment.