Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add batch size field for jira source
Browse files Browse the repository at this point in the history
Signed-off-by: Maxwell Brown <mxwelwbr@amazon.com>
Galactus22625 committed Jan 22, 2025
1 parent 9c61e03 commit 5095cf1
Showing 8 changed files with 67 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ public JiraSource(final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
Crawler crawler,
PluginExecutorServiceProvider executorServiceProvider) {
super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
super(PLUGIN_NAME, pluginMetrics, jiraSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, jiraSourceConfig.getBatchSize());
log.info("Creating Jira Source Plugin");
this.jiraSourceConfig = jiraSourceConfig;
this.jiraOauthConfig = jiraOauthConfig;
Original file line number Diff line number Diff line change
@@ -38,6 +38,12 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@Valid
private AuthenticationConfig authenticationConfig;

/**
* Batch size for fetching tickets
*/
@JsonProperty("batch_size")
private int batchSize = 50;


/**
* Filter Config to filter what tickets get ingested
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@
@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 50;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");
@@ -36,14 +35,14 @@ public Crawler(CrawlerClient client) {
}

public Instant crawl(Instant lastPollTime,
EnhancedSourceCoordinator coordinator) {
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
Original file line number Diff line number Diff line change
@@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source<Record<Event>>, Uses
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final String sourcePluginName;
private final int batchSize;
private EnhancedSourceCoordinator coordinator;
private Buffer<Record<Event>> buffer;

@@ -52,13 +53,15 @@ public CrawlerSourcePlugin(final String sourcePluginName,
final PluginFactory pluginFactory,
final AcknowledgementSetManager acknowledgementSetManager,
final Crawler crawler,
final PluginExecutorServiceProvider executorServiceProvider) {
final PluginExecutorServiceProvider executorServiceProvider,
final int batchSize) {
log.debug("Creating {} Source Plugin", sourcePluginName);
this.sourcePluginName = sourcePluginName;
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.crawler = crawler;
this.batchSize = batchSize;

this.acknowledgementSetManager = acknowledgementSetManager;
this.executorService = executorServiceProvider.get();
@@ -74,7 +77,7 @@ public void start(Buffer<Record<Event>> buffer) {
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.debug("Leader partition creation status: {}", isPartitionCreated);

Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler);
Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize);
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
Original file line number Diff line number Diff line change
@@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable {
@Setter
private Duration leaseInterval;
private LeaderPartition leaderPartition;
private final int batchSize;

public LeaderScheduler(EnhancedSourceCoordinator coordinator,
CrawlerSourcePlugin sourcePlugin,
Crawler crawler) {
Crawler crawler,
int batchSize) {
this.coordinator = coordinator;
this.leaseInterval = DEFAULT_LEASE_INTERVAL;
this.sourcePlugin = sourcePlugin;
this.crawler = crawler;
this.batchSize = batchSize;
}

@Override
@@ -65,7 +68,7 @@ public void run() {
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest {

private testCrawlerSourcePlugin saasSourcePlugin;

private final int batchSize = 50;

@BeforeEach
void setUp() {
when(executorServiceProvider.get()).thenReturn(executorService);
@@ -130,7 +132,7 @@ public testCrawlerSourcePlugin(final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager,
final Crawler crawler,
final PluginExecutorServiceProvider executorServiceProvider) {
super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
super("TestcasePlugin", pluginMetrics, sourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, batchSize);
}
}

Original file line number Diff line number Diff line change
@@ -68,20 +68,21 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}
@@ -90,27 +91,50 @@ void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessExcep
void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

List<ItemInfo> itemInfoList2 = new ArrayList<>();
int maxItemsPerPage2 = 25;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
int maxItemsPerPage = 50;
itemInfoList.add(null);
for (int i = 0; i < maxItemsPerPage - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

@@ -121,7 +145,8 @@ void testUpdatingPollTimeNullMetaData() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

@@ -132,17 +157,11 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
int maxItemsPerPage = 50;
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
assertNotEquals(lastPollTime, updatedPollTime);
}


private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException {
Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage");
maxItemsPerPageField.setAccessible(true);
return (int) maxItemsPerPageField.get(null);
}

private ItemInfo createTestItemInfo(String id) {
return new TestItemInfo(id, new HashMap<>(), Instant.now());
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
@@ -37,9 +38,11 @@ public class LeaderSchedulerTest {
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
@@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
@@ -66,15 +69,15 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
@@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr

@Test
void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
@@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class));
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}

0 comments on commit 5095cf1

Please sign in to comment.