From 2865c799778251abb67357f685123bede6f3b474 Mon Sep 17 00:00:00 2001 From: Kondaka Date: Thu, 31 Oct 2024 13:22:33 -0700 Subject: [PATCH] Refactor InMemorySource/Sink to a separate test framework package Signed-off-by: Kondaka --- data-prepper-core/build.gradle | 1 + .../Connected_SingleExtraSinkIT.java | 10 ++- .../integration/Connected_SingleIT.java | 10 ++- .../integration/CoreHttpServerIT.java | 6 +- .../integration/MinimalPipelineIT.java | 10 ++- .../integration/MultiWorkerPipelineIT.java | 10 ++- .../integration/PipelinesWithAcksIT.java | 76 +++++++++++++------ .../integration/ProcessorPipelineIT.java | 10 ++- .../integration/Router_SingleRouteIT.java | 11 ++- .../Router_ThreeRoutesDefaultIT.java | 11 ++- .../integration/Router_ThreeRoutesIT.java | 11 ++- .../DefaultAcknowledgementSet.java | 2 + data-prepper-test-framework/build.gradle | 26 +++++++ .../test/framework/DataPrepperTestRunner.java | 9 +-- .../test/framework}/InMemoryConfig.java | 2 +- .../test/framework}/InMemorySink.java | 2 +- .../test/framework}/InMemorySinkAccessor.java | 2 +- .../test/framework}/InMemorySource.java | 6 +- .../framework}/InMemorySourceAccessor.java | 36 +-------- settings.gradle | 1 + 20 files changed, 159 insertions(+), 93 deletions(-) create mode 100644 data-prepper-test-framework/build.gradle rename {data-prepper-core/src/integrationTest => data-prepper-test-framework/src/main}/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java (92%) rename {data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins => data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework}/InMemoryConfig.java (91%) rename {data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins => data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework}/InMemorySink.java (97%) rename {data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins => data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework}/InMemorySinkAccessor.java (97%) rename {data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins => data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework}/InMemorySource.java (97%) rename {data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins => data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework}/InMemorySourceAccessor.java (65%) diff --git a/data-prepper-core/build.gradle b/data-prepper-core/build.gradle index c939129a1c..175b3cbd82 100644 --- a/data-prepper-core/build.gradle +++ b/data-prepper-core/build.gradle @@ -62,6 +62,7 @@ dependencies { testImplementation libs.commons.lang3 testImplementation project(':data-prepper-test-event') testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-test-framework') testImplementation project(':data-prepper-api').sourceSets.test.output } diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleExtraSinkIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleExtraSinkIT.java index 718f0cf6ea..51797c6f05 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleExtraSinkIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleExtraSinkIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.List; @@ -29,9 +29,12 @@ public class Connected_SingleExtraSinkIT { private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleExtraSinkIT"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/connected/"; private static final String IN_MEMORY_IDENTIFIER_ENTRY_SINK = IN_MEMORY_IDENTIFIER + "_Entry"; private static final String IN_MEMORY_IDENTIFIER_EXIT_SINK = IN_MEMORY_IDENTIFIER + "_Exit"; - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection-extra-sink.yaml"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "single-connection-extra-sink.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -40,6 +43,7 @@ public class Connected_SingleExtraSinkIT { void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleIT.java index 15588165f6..00b59d091d 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Connected_SingleIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.Collections; @@ -31,7 +31,10 @@ public class Connected_SingleIT { private static final String IN_MEMORY_IDENTIFIER = "Connected_SingleIT"; - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "connected/single-connection.yaml"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/connected/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "single-connection.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -40,6 +43,7 @@ public class Connected_SingleIT { void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/CoreHttpServerIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/CoreHttpServerIT.java index 66ea915e71..e8dbdb5160 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/CoreHttpServerIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/CoreHttpServerIT.java @@ -23,13 +23,17 @@ class CoreHttpServerIT { private static final Logger log = LoggerFactory.getLogger(CoreHttpServerIT.class); - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "minimal-pipeline.yaml"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "minimal-pipeline.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; @BeforeEach void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MinimalPipelineIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MinimalPipelineIT.java index 966d577131..788226ed90 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MinimalPipelineIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MinimalPipelineIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.Collections; @@ -32,7 +32,10 @@ class MinimalPipelineIT { private static final String IN_MEMORY_IDENTIFIER = "MinimalPipelineIT"; - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "minimal-pipeline.yaml"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "minimal-pipeline.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -41,6 +44,7 @@ class MinimalPipelineIT { void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MultiWorkerPipelineIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MultiWorkerPipelineIT.java index eeb5abfc18..df86682f8a 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MultiWorkerPipelineIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/MultiWorkerPipelineIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.Collections; @@ -34,7 +34,10 @@ class MultiWorkerPipelineIT { private static final String IN_MEMORY_IDENTIFIER = "MultiWorkerPipelineIT"; - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "multi-worker.yaml"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "multi-worker.yaml"; private static final int WORKER_THREADS = 4; private static final int BATCH_SIZE = 10; private DataPrepperTestRunner dataPrepperTestRunner; @@ -45,6 +48,7 @@ class MultiWorkerPipelineIT { void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java index e1893dffa7..b262dd40f9 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/PipelinesWithAcksIT.java @@ -9,15 +9,19 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Instant; +import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; @@ -32,17 +36,20 @@ @FixMethodOrder() class PipelinesWithAcksIT { private static final Logger LOG = LoggerFactory.getLogger(PipelinesWithAcksIT.class); + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String PIPELINE_BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/pipeline/acknowledgements/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; private static final String IN_MEMORY_IDENTIFIER = "PipelinesWithAcksIT"; - private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = "acknowledgements/simple-test.yaml"; - private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-pipelines-test.yaml"; - private static final String TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/two-parallel-pipelines-test.yaml"; - private static final String THREE_PIPELINES_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test.yaml"; - private static final String THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-test.yaml"; - private static final String THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-unrouted-test.yaml"; - private static final String THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipeline-route-default-test.yaml"; - private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = "acknowledgements/three-pipelines-test-multi-sink.yaml"; - private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-three-sinks.yaml"; - private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = "acknowledgements/one-pipeline-ack-expiry-test.yaml"; + private static final String SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "simple-test.yaml"; + private static final String TWO_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "two-pipelines-test.yaml"; + private static final String TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "two-parallel-pipelines-test.yaml"; + private static final String THREE_PIPELINES_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipelines-test.yaml"; + private static final String THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-route-test.yaml"; + private static final String THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-unrouted-test.yaml"; + private static final String THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipeline-route-default-test.yaml"; + private static final String THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-pipelines-test-multi-sink.yaml"; + private static final String ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "one-pipeline-three-sinks.yaml"; + private static final String ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "one-pipeline-ack-expiry-test.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -50,6 +57,7 @@ class PipelinesWithAcksIT { void setUp(String configFile) { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(configFile) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); LOG.info("PipelinesWithAcksIT with config file {} started at {}", configFile, Instant.now()); @@ -64,11 +72,29 @@ void tearDown() { dataPrepperTestRunner.stop(); } + private List> createRecords(int numRecords, boolean withStatus) { + List> records = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + final int max = 600; + final int min = 100; + int status = (int)(Math.random() * (max - min + 1) + min); + Map eventMap = (withStatus) ? + Map.of("message", UUID.randomUUID().toString(), "status", status) : + Map.of("message", UUID.randomUUID().toString()); + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + records.add(new Record<>(event)); + } + return records; + } + @Test void simple_pipeline_with_single_record() { setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST); final int numRecords = 1; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -84,7 +110,7 @@ void simple_pipeline_with_single_record() { void simple_pipeline_with_multiple_records() { setUp(SIMPLE_PIPELINE_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -99,7 +125,7 @@ void simple_pipeline_with_multiple_records() { void two_pipelines_with_multiple_records() { setUp(TWO_PIPELINES_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -114,7 +140,7 @@ void two_pipelines_with_multiple_records() { void three_pipelines_with_multiple_records() { setUp(THREE_PIPELINES_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -129,7 +155,7 @@ void three_pipelines_with_multiple_records() { void three_pipelines_with_all_unrouted_records() { setUp(THREE_PIPELINES_WITH_UNROUTED_CONFIGURATION_UNDER_TEST); final int numRecords = 2; - inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -144,8 +170,8 @@ void three_pipelines_with_all_unrouted_records() { @Test void three_pipelines_with_route_and_multiple_records() { setUp(THREE_PIPELINES_WITH_ROUTE_CONFIGURATION_UNDER_TEST); - final int numRecords = 100; - inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); + final int numRecords = 10; + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, true)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -161,7 +187,7 @@ void three_pipelines_with_default_route_and_multiple_records() { setUp(THREE_PIPELINES_WITH_DEFAULT_ROUTE_CONFIGURATION_UNDER_TEST); final int numRecords = 10; - inMemorySourceAccessor.submitWithStatus(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, true)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -176,7 +202,7 @@ void three_pipelines_with_default_route_and_multiple_records() { void two_parallel_pipelines_multiple_records() { setUp(TWO_PARALLEL_PIPELINES_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -191,7 +217,7 @@ void two_parallel_pipelines_multiple_records() { void three_pipelines_multi_sink_multiple_records() { setUp(THREE_PIPELINES_MULTI_SINK_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -206,7 +232,7 @@ void three_pipelines_multi_sink_multiple_records() { void one_pipeline_three_sinks_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -221,7 +247,7 @@ void one_pipeline_three_sinks_multiple_records() { void one_pipeline_ack_expiry_multiple_records() { setUp(ONE_PIPELINE_ACK_EXPIRY_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); await().atMost(40000, TimeUnit.MILLISECONDS) .untilAsserted(() -> { @@ -236,7 +262,7 @@ void one_pipeline_ack_expiry_multiple_records() { void one_pipeline_three_sinks_negative_ack_multiple_records() { setUp(ONE_PIPELINE_THREE_SINKS_CONFIGURATION_UNDER_TEST); final int numRecords = 100; - inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, numRecords); + inMemorySourceAccessor.submit(IN_MEMORY_IDENTIFIER, createRecords(numRecords, false)); inMemorySinkAccessor.setResult(false); await().atMost(40000, TimeUnit.MILLISECONDS) diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorPipelineIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorPipelineIT.java index 8673fd9f21..1adb6529ea 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorPipelineIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/ProcessorPipelineIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,10 @@ class ProcessorPipelineIT { private static final Logger LOG = LoggerFactory.getLogger(ProcessorPipelineIT.class); private static final String IN_MEMORY_IDENTIFIER = "ProcessorPipelineIT"; - private static final String PIPELINE_CONFIGURATION_UNDER_TEST = "processor-pipeline.yaml"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "processor-pipeline.yaml"; private DataPrepperTestRunner dataPrepperTestRunner; private InMemorySourceAccessor inMemorySourceAccessor; private InMemorySinkAccessor inMemorySinkAccessor; @@ -43,6 +46,7 @@ class ProcessorPipelineIT { void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); inMemorySourceAccessor = dataPrepperTestRunner.getInMemorySourceAccessor(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_SingleRouteIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_SingleRouteIT.java index 66cb93214b..3c7dd1396c 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_SingleRouteIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_SingleRouteIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.ArrayList; @@ -33,6 +33,10 @@ class Router_SingleRouteIT { private static final String TESTING_KEY = "ConditionalRoutingIT"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/route/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "single-route.yaml"; private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha"; private static final String KNOWN_CONDITIONAL_KEY = "value"; private static final String ALPHA_VALUE = "a"; @@ -43,7 +47,8 @@ class Router_SingleRouteIT { @BeforeEach void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() - .withPipelinesDirectoryOrFile("route/single-route.yaml") + .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java index fbc61053a5..dfa2486786 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java @@ -11,8 +11,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.ArrayList; @@ -33,6 +33,10 @@ class Router_ThreeRoutesDefaultIT { private static final String TESTING_KEY = "ConditionalRoutingIT"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/route/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-route-with-default-route.yaml"; private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all"; private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha"; private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta"; @@ -51,7 +55,8 @@ class Router_ThreeRoutesDefaultIT { @BeforeEach void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() - .withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml") + .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesIT.java b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesIT.java index e8c1c08563..0adf9c639b 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesIT.java +++ b/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesIT.java @@ -13,8 +13,8 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner; import java.util.ArrayList; @@ -35,6 +35,10 @@ class Router_ThreeRoutesIT { private static final String TESTING_KEY = "ConditionalRoutingIT"; + private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper/"; + private static final String DATA_PREPPER_CONFIG_FILE = BASE_PATH + "configuration/data-prepper-config.yaml"; + private static final String PIPELINE_BASE_PATH = BASE_PATH + "pipeline/route/"; + private static final String PIPELINE_CONFIGURATION_UNDER_TEST = PIPELINE_BASE_PATH + "three-route.yaml"; private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all"; private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha"; private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta"; @@ -50,7 +54,8 @@ class Router_ThreeRoutesIT { @BeforeEach void setUp() { dataPrepperTestRunner = DataPrepperTestRunner.builder() - .withPipelinesDirectoryOrFile("route/three-route.yaml") + .withPipelinesDirectoryOrFile(PIPELINE_CONFIGURATION_UNDER_TEST) + .withDataPrepperConfigFile(DATA_PREPPER_CONFIG_FILE) .build(); dataPrepperTestRunner.start(); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java index b52399646f..8e33bf0dc2 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java @@ -73,6 +73,7 @@ public void checkProgress() { @Override public void add(EventHandle eventHandle) { + System.out.println("============add===="+eventHandle); lock.lock(); try { InternalEventHandle internalEventHandle = (InternalEventHandle)eventHandle; @@ -145,6 +146,7 @@ public void complete() { @Override public boolean release(final EventHandle eventHandle, final boolean result) { + System.out.println("============rel===="+eventHandle+"...."+result); lock.lock(); // Result indicates negative or positive acknowledgement. Even if one of the // events in the set report negative acknowledgement, then the end result diff --git a/data-prepper-test-framework/build.gradle b/data-prepper-test-framework/build.gradle new file mode 100644 index 0000000000..4dfad90701 --- /dev/null +++ b/data-prepper-test-framework/build.gradle @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +plugins { + id 'java' + id 'data-prepper.publish' +} + +group = 'org.opensearch.dataprepper.test' + +dependencies { + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation project(':data-prepper-api') + implementation project(':data-prepper-core') + implementation(libs.spring.context) + implementation(libs.spring.core) + implementation testLibs.hamcrest + testRuntimeOnly testLibs.junit.engine +} + +test { + useJUnitPlatform() +} + diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java similarity index 92% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java index 6f38971d47..5a4881c8d1 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/DataPrepperTestRunner.java @@ -8,8 +8,8 @@ import org.opensearch.dataprepper.AbstractContextManager; import org.opensearch.dataprepper.core.DataPrepper; import org.opensearch.dataprepper.core.parser.config.FileStructurePathProvider; -import org.opensearch.dataprepper.plugins.InMemorySinkAccessor; -import org.opensearch.dataprepper.plugins.InMemorySourceAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySinkAccessor; +import org.opensearch.dataprepper.plugins.test.framework.InMemorySourceAccessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -25,7 +25,6 @@ */ public class DataPrepperTestRunner { private static final Logger LOG = LoggerFactory.getLogger(DataPrepperTestRunner.class); - private static final String BASE_PATH = "src/integrationTest/resources/org/opensearch/dataprepper"; private final String dataPrepperConfigFile; private final String pipelinesDirectoryOrFile; private final InMemorySourceAccessor inMemorySourceAccessor; @@ -93,13 +92,13 @@ private class TestFileStructurePathProvider implements FileStructurePathProvider @Override public String getPipelineConfigFileLocation() { - return BASE_PATH + "/pipeline/" + pipelinesDirectoryOrFile; + return pipelinesDirectoryOrFile; } @Nullable @Override public String getDataPrepperConfigFileLocation() { - return BASE_PATH + "/configuration/" + dataPrepperConfigFile; + return dataPrepperConfigFile; } } diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemoryConfig.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemoryConfig.java similarity index 91% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemoryConfig.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemoryConfig.java index 204dd77b29..3fa3b4f795 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemoryConfig.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemoryConfig.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.test.framework; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySink.java similarity index 97% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySink.java index a98b1c56b8..eabe066e9c 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySink.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySink.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.test.framework; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySinkAccessor.java similarity index 97% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySinkAccessor.java index 21f359f361..3303055a0a 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySinkAccessor.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySinkAccessor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.test.framework; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySource.java similarity index 97% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySource.java index cb363ce2c8..4a1c3297e8 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySource.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySource.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.test.framework; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -69,7 +69,7 @@ public boolean areAcknowledgementsEnabled() { public void stop() { isStopped = true; try { - runningThread.join(1000); + runningThread.join(20000); } catch (final InterruptedException e) { runningThread.interrupt(); } @@ -127,7 +127,7 @@ public void run() { { inMemorySourceAccessor.setAckReceived(result); }, - Duration.ofSeconds(15)); + Duration.ofSeconds(30)); records.stream().forEach((record) -> { ackSet.add(record.getData()); }); ackSet.complete(); writeToBuffer(records); diff --git a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySourceAccessor.java b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySourceAccessor.java similarity index 65% rename from data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySourceAccessor.java rename to data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySourceAccessor.java index 3957d259a9..047cd0a785 100644 --- a/data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/plugins/InMemorySourceAccessor.java +++ b/data-prepper-test-framework/src/main/java/org/opensearch/dataprepper/test/framework/InMemorySourceAccessor.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins; +package org.opensearch.dataprepper.plugins.test.framework; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventBuilder; @@ -53,37 +53,6 @@ public Boolean getAckReceived() { return ackReceived.get(); } - public void submit(final String testingKey, int numRecords) { - if (eventFactory == null) { - return; - } - List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - Map eventMap = Map.of("message", UUID.randomUUID().toString()); - EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withData(eventMap); - Event event = eventBuilder.build(); - records.add(new Record<>(event)); - } - submit(testingKey, records); - } - - public void submitWithStatus(final String testingKey, int numRecords) { - if (eventFactory == null) { - return; - } - List> records = new ArrayList<>(); - for (int i = 0; i < numRecords; i++) { - final int max = 600; - final int min = 100; - int status = (int)(Math.random() * (max - min + 1) + min); - Map eventMap = Map.of("message", UUID.randomUUID().toString(), "status", status); - EventBuilder eventBuilder = (EventBuilder) eventFactory.eventBuilder(EventBuilder.class).withData(eventMap); - Event event = eventBuilder.build(); - records.add(new Record<>(event)); - } - submit(testingKey, records); - } - /** * Submits records to the in_memory source. These will be available to the source * for reading. @@ -92,6 +61,9 @@ public void submitWithStatus(final String testingKey, int numRecords) { * @param newRecords New records to add. */ public void submit(final String testingKey, final List> newRecords) { + if (eventFactory == null) { + return; + } lock.lock(); try { recordsMap.computeIfAbsent(testingKey, i -> new ArrayList<>()) diff --git a/settings.gradle b/settings.gradle index f8fc52a4d3..89e12b13eb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -135,6 +135,7 @@ include 'e2e-test' include 'e2e-test:trace' include 'e2e-test:log' include 'data-prepper-test-common' +include 'data-prepper-test-framework' include 'performance-test' include 'data-prepper-plugins:date-processor' include 'data-prepper-expression'