From 5095cf13e3b5442d5a0172037f68147372fbe0de Mon Sep 17 00:00:00 2001 From: Maxwell Brown Date: Wed, 22 Jan 2025 12:43:18 -0800 Subject: [PATCH] add batch size field for jira source Signed-off-by: Maxwell Brown --- .../plugins/source/jira/JiraSource.java | 2 +- .../plugins/source/jira/JiraSourceConfig.java | 6 +++ .../source/source_crawler/base/Crawler.java | 5 +- .../base/CrawlerSourcePlugin.java | 7 ++- .../scheduler/LeaderScheduler.java | 7 ++- .../base/CrawlerSourcePluginTest.java | 4 +- .../source_crawler/base/CrawlerTest.java | 51 +++++++++++++------ .../scheduler/LeaderSchedulerTest.java | 17 ++++--- 8 files changed, 67 insertions(+), 32 deletions(-) diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java index 2ffc7b3b53..887c95814f 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSource.java @@ -55,7 +55,7 @@ public JiraSource(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, Crawler crawler, PluginExecutorServiceProvider executorServiceProvider) { - super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, jiraSourceConfig.getBatchSize()); log.info("Creating Jira Source Plugin"); this.jiraSourceConfig = jiraSourceConfig; this.jiraOauthConfig = jiraOauthConfig; diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java index df5cd70f0b..c1a3acc5e1 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraSourceConfig.java @@ -38,6 +38,12 @@ public class JiraSourceConfig implements CrawlerSourceConfig { @Valid private AuthenticationConfig authenticationConfig; + /** + * Batch size for fetching tickets + */ + @JsonProperty("batch_size") + private int batchSize = 50; + /** * Filter Config to filter what tickets get ingested diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java index 327fff0d89..a36fed4fb1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java @@ -23,7 +23,6 @@ @Named public class Crawler { private static final Logger log = LoggerFactory.getLogger(Crawler.class); - private static final int maxItemsPerPage = 50; private final Timer crawlingTimer; private final PluginMetrics pluginMetrics = PluginMetrics.fromNames("sourceCrawler", "crawler"); @@ -36,14 +35,14 @@ public Crawler(CrawlerClient client) { } public Instant crawl(Instant lastPollTime, - EnhancedSourceCoordinator coordinator) { + EnhancedSourceCoordinator coordinator, int batchSize) { long startTime = System.currentTimeMillis(); client.setLastPollTime(lastPollTime); Iterator itemInfoIterator = client.listItems(); log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); do { final List itemInfoList = new ArrayList<>(); - for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) { + for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) { ItemInfo nextItem = itemInfoIterator.next(); if (nextItem == null) { //we don't expect null items, but just in case, we'll skip them diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java index d959033c9c..a765039dd8 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source>, Uses private final CrawlerSourceConfig sourceConfig; private final Crawler crawler; private final String sourcePluginName; + private final int batchSize; private EnhancedSourceCoordinator coordinator; private Buffer> buffer; @@ -52,13 +53,15 @@ public CrawlerSourcePlugin(final String sourcePluginName, final PluginFactory pluginFactory, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, - final PluginExecutorServiceProvider executorServiceProvider) { + final PluginExecutorServiceProvider executorServiceProvider, + final int batchSize) { log.debug("Creating {} Source Plugin", sourcePluginName); this.sourcePluginName = sourcePluginName; this.pluginMetrics = pluginMetrics; this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; this.crawler = crawler; + this.batchSize = batchSize; this.acknowledgementSetManager = acknowledgementSetManager; this.executorService = executorServiceProvider.get(); @@ -74,7 +77,7 @@ public void start(Buffer> buffer) { boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition()); log.debug("Leader partition creation status: {}", isPartitionCreated); - Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler); + Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize); this.executorService.submit(leaderScheduler); //Register worker threaders for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) { diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index 615237e8e5..79705206f1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable { @Setter private Duration leaseInterval; private LeaderPartition leaderPartition; + private final int batchSize; public LeaderScheduler(EnhancedSourceCoordinator coordinator, CrawlerSourcePlugin sourcePlugin, - Crawler crawler) { + Crawler crawler, + int batchSize) { this.coordinator = coordinator; this.leaseInterval = DEFAULT_LEASE_INTERVAL; this.sourcePlugin = sourcePlugin; this.crawler = crawler; + this.batchSize = batchSize; } @Override @@ -65,7 +68,7 @@ public void run() { Instant lastPollTime = leaderProgressState.getLastPollTime(); //Start crawling and create child partitions - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize); leaderProgressState.setLastPollTime(updatedPollTime); leaderPartition.setLeaderProgressState(leaderProgressState); coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java index 8edfa66f71..09488e14bf 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePluginTest.java @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest { private testCrawlerSourcePlugin saasSourcePlugin; + private final int batchSize = 50; + @BeforeEach void setUp() { when(executorServiceProvider.get()).thenReturn(executorService); @@ -130,7 +132,7 @@ public testCrawlerSourcePlugin(final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, final Crawler crawler, final PluginExecutorServiceProvider executorServiceProvider) { - super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider); + super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, batchSize); } } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index 45d2fcb402..f516abb20a 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -68,7 +68,8 @@ public void executePartitionTest() { void testCrawlWithEmptyList() { Instant lastPollTime = Instant.ofEpochMilli(0); when(client.listItems()).thenReturn(Collections.emptyIterator()); - crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); } @@ -76,12 +77,12 @@ void testCrawlWithEmptyList() { void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; for (int i = 0; i < maxItemsPerPage; i++) { itemInfoList.add(new TestItemInfo("itemId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -90,27 +91,50 @@ void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessExcep void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; for (int i = 0; i < maxItemsPerPage + 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); + } + @Test + void testBatchSize() { + Instant lastPollTime = Instant.ofEpochMilli(0); + List itemInfoList = new ArrayList<>(); + int maxItemsPerPage = 50; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + int expectedNumberOfInvocations = 1; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); + + List itemInfoList2 = new ArrayList<>(); + int maxItemsPerPage2 = 25; + for (int i = 0; i < maxItemsPerPage; i++) { + itemInfoList2.add(new TestItemInfo("testId")); + } + when(client.listItems()).thenReturn(itemInfoList.iterator()); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2); + expectedNumberOfInvocations += 2; + verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); } @Test void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); - int maxItemsPerPage = getMaxItemsPerPage(); + int maxItemsPerPage = 50; itemInfoList.add(null); for (int i = 0; i < maxItemsPerPage - 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator); + crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @@ -121,7 +145,8 @@ void testUpdatingPollTimeNullMetaData() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); } @@ -132,17 +157,11 @@ void testUpdatedPollTimeNiCreatedLarger() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator); + int maxItemsPerPage = 50; + Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); assertNotEquals(lastPollTime, updatedPollTime); } - - private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException { - Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage"); - maxItemsPerPageField.setAccessible(true); - return (int) maxItemsPerPageField.get(null); - } - private ItemInfo createTestItemInfo(String id) { return new TestItemInfo(id, new HashMap<>(), Instant.now()); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index 6390008b21..d5ef0cecde 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; @@ -37,9 +38,11 @@ public class LeaderSchedulerTest { @Mock private Crawler crawler; + private final int batchSize = 50; + @Test void testUnableToAcquireLeaderPartition() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException { @ParameterizedTest @ValueSource(booleans = {true, false}) void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -66,7 +69,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator); + verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize); verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); } @@ -74,7 +77,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte @ParameterizedTest @ValueSource(booleans = {true, false}) void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(initializationState); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr @Test void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { - LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler); + LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); leaderScheduler.setLeaseInterval(Duration.ofMillis(10)); LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(false); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); - when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10)); + when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10)); when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) .thenReturn(Optional.of(leaderPartition)) .thenThrow(RuntimeException.class); @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class)); + verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt()); } }