diff --git a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java index 032bea1530..5c2ee0e8e6 100644 --- a/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java +++ b/data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorTest.java @@ -9,30 +9,13 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import static org.junit.jupiter.params.provider.Arguments.arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; import org.mockito.Mock; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import org.mockito.MockitoAnnotations; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -55,9 +38,6 @@ import org.opensearch.dataprepper.plugins.lambda.common.config.ClientOptions; import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; import org.opensearch.dataprepper.plugins.lambda.processor.exception.StrictResponseModeNotRespectedException; -import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.createLambdaConfigurationFromYaml; -import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleEventRecords; -import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleRecord; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.lambda.LambdaAsyncClient; @@ -75,6 +55,28 @@ import java.util.function.Consumer; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.createLambdaConfigurationFromYaml; +import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleEventRecords; +import static org.opensearch.dataprepper.plugins.lambda.utils.LambdaTestSetupUtil.getSampleRecord; + @MockitoSettings(strictness = Strictness.LENIENT) public class LambdaProcessorTest { @@ -129,6 +131,55 @@ public class LambdaProcessorTest { @Mock private LambdaAsyncClient lambdaAsyncClient; + private static Stream getLambdaResponseConversionSamplesForStrictAndAggregateMode() { + return Stream.of( + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "null", StrictResponseModeNotRespectedException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), StrictResponseModeNotRespectedException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), StrictResponseModeNotRespectedException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), StrictResponseModeNotRespectedException.class, 0), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), null, 2), + //Aggregate mode + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", null, 0), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), null, 0), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), null, 0), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1) + + ); + } + + private static Stream getDoExecuteSamplesForStrictAndAggregateMode() { + List> firstSample = getSampleEventRecords(1); + + List> secondSample = getSampleEventRecords(1); + List> thirdSample = getSampleEventRecords(1); + List> fourthSample = getSampleEventRecords(1); + List> fifthSample = getSampleEventRecords(1); + String fifthSampleJsonString = fifthSample.get(0).getData().toJsonString(); + fifthSampleJsonString = "[" + fifthSampleJsonString + "]"; + + + return Stream.of( + arguments("lambda-processor-success-config.yaml", firstSample, null, firstSample, true), + arguments("lambda-processor-success-config.yaml", secondSample, "null", secondSample, true), + arguments("lambda-processor-success-config.yaml", thirdSample, "random string", thirdSample, true), + arguments("lambda-processor-success-config.yaml", fourthSample, SdkBytes.fromByteArray("[]".getBytes()), fourthSample, true), + arguments("lambda-processor-success-config.yaml", fifthSample, SdkBytes.fromByteArray(fifthSampleJsonString.getBytes()), fifthSample, false)/*, + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()),Collections.emptyList()), + arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), Collections.emptyList()), + //Aggregate mode + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, Collections.emptyList()), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", Collections.emptyList()), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", Collections.emptyList()), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), Collections.emptyList()), + arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), Collections.emptyList()) +*/ + ); + } @BeforeEach public void setUp() { @@ -263,19 +314,27 @@ public void testDoExecute_UnableParseResponse(String configFileName) throws Exce int recordCount = (int) (Math.random() * 100); List> records = getSampleEventRecords(recordCount); InvokeResponse invokeResponse = mock(InvokeResponse.class); + // Setting up an invalid json that will fail at the parsing step + when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"invalid_json:\"parsing_fails\"}]")); + when(invokeResponse.statusCode()).thenReturn(200); // Success status code + // Mock the invoke method to return a completed future + CompletableFuture invokeFuture = CompletableFuture.completedFuture(invokeResponse); + when(lambdaAsyncClient.invoke(any(InvokeRequest.class))).thenReturn(invokeFuture); - - // Mock Buffer to return empty payload - when(invokeResponse.payload()).thenReturn(SdkBytes.fromUtf8String("[{\"key\": \"value\"}]")); + // Processor instant creation and executing LambdaProcessorConfig lambdaProcessorConfig = createLambdaConfigurationFromYaml(configFileName); LambdaProcessor lambdaProcessor = new LambdaProcessor(pluginFactory, pluginSetting, lambdaProcessorConfig, awsCredentialsSupplier, expressionEvaluator); + // Clearing up previous interactions in case if they have any + reset(numberOfRecordsSuccessCounter); + reset(numberOfRecordsFailedCounter); populatePrivateFields(lambdaProcessor); // Act Collection> result = lambdaProcessor.doExecute(records); // Assert - assertEquals(recordCount, result.size(), "Result should be empty due to empty Lambda response."); + assertEquals(recordCount, result.size(), + "In the case of parsing failure, original records with failure tags should come out as response"); verify(numberOfRecordsSuccessCounter, times(0)).increment(1.0); verify(numberOfRecordsFailedCounter, times(1)).increment(recordCount); } @@ -493,28 +552,6 @@ public void testConvertLambdaResponseToEvent_WithUnequalEventCounts_SuccessfulPr assertEquals(3, resultRecords.size(), "ResultRecords should contain three records."); } - - private static Stream getLambdaResponseConversionSamplesForStrictAndAggregateMode() { - return Stream.of( - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "null", StrictResponseModeNotRespectedException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), StrictResponseModeNotRespectedException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), StrictResponseModeNotRespectedException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), StrictResponseModeNotRespectedException.class, 0), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), null, 2), - //Aggregate mode - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, RuntimeException.class, 0), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", null, 0), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", JsonParseException.class, 0), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), null, 0), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), null, 0), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}]".getBytes()), null, 1) - - ); - } - @ParameterizedTest @MethodSource("getLambdaResponseConversionSamplesForStrictAndAggregateMode") public void testConvertLambdaResponseToEvent_for_strict_and_aggregate_mode(String configFile, @@ -545,35 +582,6 @@ public void testConvertLambdaResponseToEvent_for_strict_and_aggregate_mode(Strin } } - private static Stream getDoExecuteSamplesForStrictAndAggregateMode() { - List> firstSample = getSampleEventRecords(1); - - List> secondSample = getSampleEventRecords(1); - List> thirdSample = getSampleEventRecords(1); - List> fourthSample = getSampleEventRecords(1); - List> fifthSample = getSampleEventRecords(1); - String fifthSampleJsonString = fifthSample.get(0).getData().toJsonString(); - fifthSampleJsonString = "[" + fifthSampleJsonString + "]"; - - - return Stream.of( - arguments("lambda-processor-success-config.yaml", firstSample, null, firstSample, true), - arguments("lambda-processor-success-config.yaml", secondSample, "null", secondSample, true), - arguments("lambda-processor-success-config.yaml", thirdSample, "random string",thirdSample, true), - arguments("lambda-processor-success-config.yaml", fourthSample, SdkBytes.fromByteArray("[]".getBytes()), fourthSample, true), - arguments("lambda-processor-success-config.yaml", fifthSample, SdkBytes.fromByteArray(fifthSampleJsonString.getBytes()), fifthSample, false)/*, - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()),Collections.emptyList()), - arguments("lambda-processor-success-config.yaml", getSampleEventRecords(2), SdkBytes.fromByteArray("[{\"key\":\"val\"}, {\"key\":\"val\"}]".getBytes()), Collections.emptyList()), - //Aggregate mode - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), null, Collections.emptyList()), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "null", Collections.emptyList()), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), "random string", Collections.emptyList()), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("{}".getBytes()), Collections.emptyList()), - arguments("lambda-processor-aggregate-mode-config.yaml", getSampleEventRecords(1), SdkBytes.fromByteArray("[]".getBytes()), Collections.emptyList()) -*/ - ); - } - @ParameterizedTest @MethodSource("getDoExecuteSamplesForStrictAndAggregateMode") public void testDoExecute_for_strict_and_aggregate_mode(String configFile,