From 0ea2ed2dca164fd54c31438254b58962d5db806e Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 23 Feb 2024 10:18:42 -0600 Subject: [PATCH] Add csv_when parameter to the csv processor Signed-off-by: Taylor Gray --- .../plugins/processor/csv/CsvProcessor.java | 21 +++++++- .../processor/csv/CsvProcessorConfig.java | 5 ++ .../plugins/processor/csv/CsvProcessorIT.java | 10 +++- .../processor/csv/CsvProcessorTest.java | 53 +++++++++++++++++-- 4 files changed, 82 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java index a0bb90b860..a9f99e5862 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessor.java @@ -5,10 +5,12 @@ package org.opensearch.dataprepper.plugins.processor.csv; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; @@ -42,11 +44,23 @@ public class CsvProcessor extends AbstractProcessor, Record private final CsvProcessorConfig config; + private final ExpressionEvaluator expressionEvaluator; + @DataPrepperPluginConstructor - public CsvProcessor(final PluginMetrics pluginMetrics, final CsvProcessorConfig config) { + public CsvProcessor(final PluginMetrics pluginMetrics, + final CsvProcessorConfig config, + final ExpressionEvaluator expressionEvaluator) { super(pluginMetrics); this.csvInvalidEventsCounter = pluginMetrics.counter(CSV_INVALID_EVENTS); this.config = config; + this.expressionEvaluator = expressionEvaluator; + + if (config.getCsvWhen() != null + && !expressionEvaluator.isValidExpressionStatement(config.getCsvWhen())) { + throw new InvalidPluginConfigurationException( + String.format("csv_when value of %s is not a valid expression statement. " + + "See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax.", config.getCsvWhen())); + } } @Override @@ -59,6 +73,11 @@ public Collection> doExecute(final Collection> recor final Event event = record.getData(); try { + + if (config.getCsvWhen() != null && !expressionEvaluator.evaluateConditional(config.getCsvWhen(), event)) { + continue; + } + final String message = event.get(config.getSource(), String.class); if (Objects.isNull(message)) { diff --git a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java index 4e6500b675..ec5d685b7e 100644 --- a/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java +++ b/data-prepper-plugins/csv-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorConfig.java @@ -37,6 +37,9 @@ public class CsvProcessorConfig { @JsonProperty("column_names") private List columnNames; + @JsonProperty("csv_when") + private String csvWhen; + /** * The field of the Event that contains the CSV data to be processed. * @@ -93,6 +96,8 @@ public List getColumnNames() { return columnNames; } + public String getCsvWhen() { return csvWhen; } + @AssertTrue(message = "delimiter must be exactly one character.") boolean isValidDelimiter() { return delimiter.length() == 1; diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorIT.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorIT.java index 33e54c5952..417a997e12 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorIT.java +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorIT.java @@ -5,6 +5,10 @@ package org.opensearch.dataprepper.plugins.processor.csv; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; @@ -20,6 +24,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +@ExtendWith(MockitoExtension.class) public class CsvProcessorIT { private static final String PLUGIN_NAME = "csv"; private static final String TEST_PIPELINE_NAME = "test_pipeline"; @@ -28,6 +33,9 @@ public class CsvProcessorIT { private CsvProcessor csvProcessor; private VpcFlowLogTypeGenerator vpcFlowLogTypeGenerator; + @Mock + private ExpressionEvaluator expressionEvaluator; + @BeforeEach void setup() { csvProcessorConfig = new CsvProcessorConfig(); @@ -39,7 +47,7 @@ void setup() { PluginMetrics pluginMetrics = PluginMetrics.fromNames(PLUGIN_NAME, TEST_PIPELINE_NAME); - csvProcessor = new CsvProcessor(pluginMetrics, csvProcessorConfig); + csvProcessor = new CsvProcessor(pluginMetrics, csvProcessorConfig, expressionEvaluator); vpcFlowLogTypeGenerator = new VpcFlowLogTypeGenerator(); } diff --git a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorTest.java b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorTest.java index be6da7e80f..5239679fab 100644 --- a/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorTest.java +++ b/data-prepper-plugins/csv-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/csv/CsvProcessorTest.java @@ -5,16 +5,18 @@ package org.opensearch.dataprepper.plugins.processor.csv; -import org.opensearch.dataprepper.metrics.PluginMetrics; -import org.opensearch.dataprepper.model.event.Event; -import org.opensearch.dataprepper.model.event.JacksonEvent; -import org.opensearch.dataprepper.model.record.Record; import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; import java.util.Arrays; import java.util.Collections; @@ -25,6 +27,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; @@ -39,6 +42,9 @@ class CsvProcessorTest { @Mock private Counter csvInvalidEventsCounter; + @Mock + private ExpressionEvaluator expressionEvaluator; + private CsvProcessor csvProcessor; @BeforeEach @@ -57,14 +63,22 @@ void setup() { } private CsvProcessor createObjectUnderTest() { - return new CsvProcessor(pluginMetrics, processorConfig); + return new CsvProcessor(pluginMetrics, processorConfig, expressionEvaluator); } @Test void do_nothing_when_source_is_null_value_or_does_not_exist_in_the_Event() { + final Record eventUnderTest = createMessageEvent(""); + + final String csvWhen = UUID.randomUUID().toString(); + when(processorConfig.getCsvWhen()).thenReturn(csvWhen); + when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(true); + when(expressionEvaluator.evaluateConditional(csvWhen, eventUnderTest.getData())).thenReturn(true); when(processorConfig.getSource()).thenReturn(UUID.randomUUID().toString()); + csvProcessor = createObjectUnderTest(); + final List> editedEvents = (List>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest)); final Event parsedEvent = getSingleEvent(editedEvents); @@ -328,6 +342,35 @@ void test_when_invalidEvent_then_metricIncrementedAndNoException() { assertThat(parsedEvent.containsKey("column3"), equalTo(false)); } + @Test + void invalid_csv_when_throws_InvalidPluginConfigurationException() { + final String csvWhen = UUID.randomUUID().toString(); + + when(processorConfig.getCsvWhen()).thenReturn(csvWhen); + when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + void do_nothing_when_expression_evaluation_returns_false_for_event() { + final String csvWhen = UUID.randomUUID().toString(); + + when(processorConfig.getCsvWhen()).thenReturn(csvWhen); + when(expressionEvaluator.isValidExpressionStatement(csvWhen)).thenReturn(true); + + final Record eventUnderTest = createMessageEvent(""); + when(expressionEvaluator.evaluateConditional(csvWhen, eventUnderTest.getData())).thenReturn(false); + + csvProcessor = createObjectUnderTest(); + + + final List> editedEvents = (List>) csvProcessor.doExecute(Collections.singletonList(eventUnderTest)); + final Event parsedEvent = getSingleEvent(editedEvents); + + assertThat(parsedEvent, equalTo(eventUnderTest.getData())); + } + private Record createMessageEvent(final String message) { final Map eventData = new HashMap<>(); eventData.put("message",message);