From 5095cf13e3b5442d5a0172037f68147372fbe0de Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Wed, 22 Jan 2025 12:43:18 -0800 Subject: [PATCH 1/5] add batch size field for jira source Signed-off-by: Maxwell Brown --- .../plugins/source/jira/JiraSource.java | 2 +- .../plugins/source/jira/JiraSourceConfig.java | 6 +++ .../source/source_crawler/base/Crawler.java | 5 +- .../base/CrawlerSourcePlugin.java | 7 ++- .../scheduler/LeaderScheduler.java | 7 ++- .../base/CrawlerSourcePluginTest.java | 4 +- .../source_crawler/base/CrawlerTest.java | 51 +++++++++++++------ .../scheduler/LeaderSchedulerTest.java | 17 ++++--- 8 files changed, 67 insertions(+), 32 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java index 2ffc7b3b53..887c95814f 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java @@ -55,7 +55,7 @@ public JiraSource(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, Crawler crawler, PluginExecutorServiceProvider executorServiceProvider) { - super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, jiraSourceConfig.getBatchSize()); log.info("Creating Jira Source Plugin"); this.jiraSourceConfig = jiraSourceConfig; this.jiraOauthConfig = jiraOauthConfig; diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index df5cd70f0b..c1a3acc5e1 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -38,6 +38,12 @@ public class JiraSourceConfig implements CrawlerSourceConfig { @Valid private AuthenticationConfig authenticationConfig; + /** + * Batch size for fetching tickets + */ + @JsonProperty("batch_size") + private int batchSize = 50; + /** * Filter Config to filter what tickets get ingested diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java index 327fff0d89..a36fed4fb1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java @@ -23,7 +23,6 @@ @Named public class Crawler { private static final Logger log = LoggerFactory.getLogger(Crawler.class); - private static final int maxItemsPerPage = 50; private final Timer crawlingTimer; private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("sourceCrawler", "crawler"); @@ -36,14 +35,14 @@ public Crawler(CrawlerClient client) { } public Instant crawl(Instant lastPollTime, - EnhancedSourceCoordinator coordinator) { + EnhancedSourceCoordinator coordinator, int batchSize) { long startTime = System.currentTimeMillis(); client.setLastPollTime(lastPollTime); Iterator itemInfoIterator = client.listItems(); log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); do { final List itemInfoList = new ArrayList<>(); - for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) { + for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) { ItemInfo nextItem = itemInfoIterator.next(); if (nextItem == null) { //we don't expect null items, but just in case, we'll skip them diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index d959033c9c..a765039dd8 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source>, Uses private final CrawlerSourceConfig sourceConfig; private final Crawler crawler; private final String sourcePluginName; + private final int batchSize; private EnhancedSourceCoordinator coordinator; private Buffer> buffer; @@ -52,13 +53,15 @@ public CrawlerSourcePlugin(final String sourcePluginName, final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, - final PluginExecutorServiceProvider executorServiceProvider) { + final PluginExecutorServiceProvider executorServiceProvider, + final int batchSize) { log.debug("Creating {} Source Plugin", sourcePluginName); this.sourcePluginName = sourcePluginName; this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; this.crawler = crawler; + this.batchSize = batchSize; this.acknowledgementSetManager = acknowledgementSetManager; this.executorService = executorServiceProvider.get(); @@ -74,7 +77,7 @@ public void start(Buffer> buffer) { boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); log.debug("Leader partition creation status: {}", isPartitionCreated); - Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler); + Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize); this.executorService.submit(leaderScheduler); //Register worker threaders for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) { diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index 615237e8e5..79705206f1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable { @Setter private Duration leaseInterval; private LeaderPartition leaderPartition; + private final int batchSize; public LeaderScheduler(EnhancedSourceCoordinator coordinator, CrawlerSourcePlugin sourcePlugin, - Crawler crawler) { + Crawler crawler, + int batchSize) { this.coordinator = coordinator; this.leaseInterval = DEFAULT_LEASE_INTERVAL; this.sourcePlugin = sourcePlugin; this.crawler = crawler; + this.batchSize = batchSize; } @Override @@ -65,7 +68,7 @@ public void run() { Instant lastPollTime = leaderProgressState.getLastPollTime(); //Start crawling and create child partitions - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize); leaderProgressState.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(leaderProgressState); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java index 8edfa66f71..09488e14bf 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest { private testCrawlerSourcePlugin saasSourcePlugin; + private final int batchSize = 50; + @BeforeEach void setUp() { when(executorServiceProvider.get()).thenReturn(executorService); @@ -130,7 +132,7 @@ public testCrawlerSourcePlugin(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, final PluginExecutorServiceProvider executorServiceProvider) { - super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, batchSize); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index 45d2fcb402..f516abb20a 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -68,7 +68,8 @@ public void executePartitionTest() { void testCrawlWithEmptyList() { Instant lastPollTime = Instant.ofEpochMilli(0); when(client.listItems()).thenReturn(Collections.emptyIterator()); - crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); } @@ -76,12 +77,12 @@ void testCrawlWithEmptyList() { void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; for (int i = 0; i < maxItemsPerPage; i++) { itemInfoList.add(new TestItemInfo("itemId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -90,27 +91,50 @@ void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessExcep void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; for (int i = 0; i < maxItemsPerPage + 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); + } + @Test + void testBatchSize() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = 50; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + int expectedNumberOfInvocations = 1; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); + + List itemInfoList2 = new ArrayList<>(); + int maxItemsPerPage2 = 25; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList2.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2); + expectedNumberOfInvocations += 2; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); } @Test void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; itemInfoList.add(null); for (int i = 0; i < maxItemsPerPage - 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -121,7 +145,8 @@ void testUpdatingPollTimeNullMetaData() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); } @@ -132,17 +157,11 @@ void testUpdatedPollTimeNiCreatedLarger() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); assertNotEquals(lastPollTime, updatedPollTime); } - - private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException { - Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); - maxItemsPerPageField.setAccessible(true); - return (int) maxItemsPerPageField.get(null); - } - private ItemInfo createTestItemInfo(String id) { return new TestItemInfo(id, new HashMap<>(), Instant.now()); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index 6390008b21..d5ef0cecde 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; @@ -37,9 +38,11 @@ public class LeaderSchedulerTest { @Mock private Crawler crawler; + private final int batchSize = 50; + @Test void testUnableToAcquireLeaderPartition() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException { @ParameterizedTest @ValueSource(booleans = {true, false}) void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -66,7 +69,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator); + verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize); verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); } @@ -74,7 +77,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte @ParameterizedTest @ValueSource(booleans = {true, false}) void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr @Test void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(false); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); - when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10)); + when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10)); when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) .thenReturn(Optional.of(leaderPartition)) .thenThrow(RuntimeException.class); @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class)); + verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt()); } } From 4e0147852855b6343cc87491d753e611f39369f2 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Wed, 22 Jan 2025 12:50:35 -0800 Subject: [PATCH 2/5] unused import removed Signed-off-by: Maxwell Brown --- .../plugins/source/source_crawler/base/CrawlerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index f516abb20a..baa544f87c 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -14,7 +14,6 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; -import java.lang.reflect.Field; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; From f36a79967e2be4ecdaf222c32b5d5e2a324be3aa Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Wed, 22 Jan 2025 13:12:30 -0800 Subject: [PATCH 3/5] remove unused config fields Signed-off-by: Maxwell Brown --- .../plugins/source/jira/JiraSourceConfig.java | 13 ------------- .../plugins/source/jira/JiraSourceConfigTest.java | 2 -- 2 files changed, 15 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index c1a3acc5e1..938b77a17c 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -52,19 +52,6 @@ public class JiraSourceConfig implements CrawlerSourceConfig { private FilterConfig filterConfig; - /** - * Number of worker threads to spawn to parallel source fetching - */ - @JsonProperty("workers") - private int numWorkers = DEFAULT_NUMBER_OF_WORKERS; - - /** - * Default time to wait (with exponential backOff) in the case of - * waiting for the source service to respond - */ - @JsonProperty("backoff_time") - private Duration backOff = DEFAULT_BACKOFF_MILLIS; - public String getAccountUrl() { return this.getHosts().get(0); } diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java index b7b30af5f6..f5607410c1 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java @@ -115,11 +115,9 @@ private JiraSourceConfig createJiraSourceConfig(String authtype, boolean hasToke void testGetters() throws Exception { jiraSourceConfig = createJiraSourceConfig(BASIC, false); assertEquals(jiraSourceConfig.getFilterConfig().getIssueTypeConfig().getInclude(), issueTypeList); - assertEquals(jiraSourceConfig.getNumWorkers(), DEFAULT_NUMBER_OF_WORKERS); assertEquals(jiraSourceConfig.getFilterConfig().getProjectConfig().getNameConfig().getInclude(), projectList); assertEquals(jiraSourceConfig.getFilterConfig().getStatusConfig().getInclude(), statusList); assertEquals(jiraSourceConfig.getAccountUrl(), accountUrl); - assertNotNull(jiraSourceConfig.getBackOff()); assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getPassword(), password); assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getUsername(), username); } From 0ef2cc7897c8fdfc2162ec0bb30111074a050e32 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Wed, 22 Jan 2025 13:23:34 -0800 Subject: [PATCH 4/5] add interface function to simplify batchSize code Signed-off-by: Maxwell Brown --- .../dataprepper/plugins/source/jira/JiraSource.java | 2 +- .../plugins/source/jira/JiraSourceConfigTest.java | 2 -- .../source/source_crawler/base/CrawlerSourceConfig.java | 2 ++ .../source/source_crawler/base/CrawlerSourcePlugin.java | 5 ++--- .../source/source_crawler/base/CrawlerSourcePluginTest.java | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java index 887c95814f..2ffc7b3b53 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java @@ -55,7 +55,7 @@ public JiraSource(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, Crawler crawler, PluginExecutorServiceProvider executorServiceProvider) { - super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, jiraSourceConfig.getBatchSize()); + super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); log.info("Creating Jira Source Plugin"); this.jiraSourceConfig = jiraSourceConfig; this.jiraOauthConfig = jiraOauthConfig; diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java index f5607410c1..94aa04c554 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfigTest.java @@ -23,11 +23,9 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.BASIC; import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.OAUTH2; -import static org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig.DEFAULT_NUMBER_OF_WORKERS; public class JiraSourceConfigTest { private final PluginConfigVariable accessToken = new MockPluginConfigVariableImpl("access token test"); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java index 18649e052c..2c48911202 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourceConfig.java @@ -6,4 +6,6 @@ public interface CrawlerSourceConfig { int DEFAULT_NUMBER_OF_WORKERS = 1; + + int getBatchSize(); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index a765039dd8..784f7610b0 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -53,15 +53,14 @@ public CrawlerSourcePlugin(final String sourcePluginName, final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, - final PluginExecutorServiceProvider executorServiceProvider, - final int batchSize) { + final PluginExecutorServiceProvider executorServiceProvider) { log.debug("Creating {} Source Plugin", sourcePluginName); this.sourcePluginName = sourcePluginName; this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; this.crawler = crawler; - this.batchSize = batchSize; + this.batchSize = sourceConfig.getBatchSize(); this.acknowledgementSetManager = acknowledgementSetManager; this.executorService = executorServiceProvider.get(); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java index 09488e14bf..397078fcf4 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java @@ -132,7 +132,7 @@ public testCrawlerSourcePlugin(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, final PluginExecutorServiceProvider executorServiceProvider) { - super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, batchSize); + super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); } } From 65e1c46a0c8639d7bc46f98426b9c7e90d10be34 Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Thu, 23 Jan 2025 13:17:47 -0800 Subject: [PATCH 5/5] default batch size comments Signed-off-by: Maxwell Brown --- .../plugins/source/jira/JiraSourceConfig.java | 5 ++-- .../source_crawler/base/CrawlerTest.java | 30 ++++++++----------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index 1144191e6b..3cb7b9501c 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -17,13 +17,12 @@ import org.opensearch.dataprepper.plugins.source.jira.configuration.FilterConfig; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig; -import java.time.Duration; import java.util.List; @Getter public class JiraSourceConfig implements CrawlerSourceConfig { - private static final Duration DEFAULT_BACKOFF_MILLIS = Duration.ofMinutes(2); + private static final int DEFAULT_BATCH_SIZE = 50; /** * Jira account url @@ -42,7 +41,7 @@ public class JiraSourceConfig implements CrawlerSourceConfig { * Batch size for fetching tickets */ @JsonProperty("batch_size") - private int batchSize = 50; + private int batchSize = DEFAULT_BATCH_SIZE; /** diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index d90f794cbf..afac9f3e54 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -48,6 +48,8 @@ public class CrawlerTest { private Crawler crawler; + private static final int DEFAULT_BATCH_SIZE = 50; + @BeforeEach public void setup() { crawler = new Crawler(client); @@ -68,35 +70,32 @@ public void executePartitionTest() { void testCrawlWithEmptyList() { Instant lastPollTime = Instant.ofEpochMilli(0); when(client.listItems()).thenReturn(Collections.emptyIterator()); - int maxItemsPerPage = 50; - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException { + void testCrawlWithNonEmptyList(){ Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = 50; - for (int i = 0; i < maxItemsPerPage; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { itemInfoList.add(new TestItemInfo("itemId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException { + void testCrawlWithMultiplePartitions(){ Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = 50; - for (int i = 0; i < maxItemsPerPage + 1; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); } @@ -128,13 +127,12 @@ void testBatchSize() { void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = 50; itemInfoList.add(null); - for (int i = 0; i < maxItemsPerPage - 1; i++) { + for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -145,8 +143,7 @@ void testUpdatingPollTimeNullMetaData() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - int maxItemsPerPage = 50; - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); } @@ -157,8 +154,7 @@ void testUpdatedPollTimeNiCreatedLarger() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - int maxItemsPerPage = 50; - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(lastPollTime, updatedPollTime); }