Skip to content

Commit

Permalink
add batch size field for jira source (#5348)
Browse files Browse the repository at this point in the history
* add batch size field for jira source

Signed-off-by: Maxwell Brown <[email protected]>

* remove unused config fields

Signed-off-by: Maxwell Brown <[email protected]>

* add interface function to simplify batchSize code

Signed-off-by: Maxwell Brown <[email protected]>

* default batch size comments

Signed-off-by: Maxwell Brown <[email protected]>

---------

Signed-off-by: Maxwell Brown <[email protected]>
Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 authored Jan 23, 2025
1 parent 7f769df commit 19d9dae
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import org.opensearch.dataprepper.plugins.source.jira.configuration.FilterConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;

import java.time.Duration;
import java.util.List;

@Getter
public class JiraSourceConfig implements CrawlerSourceConfig {

private static final Duration DEFAULT_BACKOFF_MILLIS = Duration.ofMinutes(2);
private static final int DEFAULT_BATCH_SIZE = 50;

/**
* Jira account url
Expand All @@ -38,26 +37,19 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@Valid
private AuthenticationConfig authenticationConfig;


/**
* Filter Config to filter what tickets get ingested
* Batch size for fetching tickets
*/
@JsonProperty("filter")
private FilterConfig filterConfig;
@JsonProperty("batch_size")
private int batchSize = DEFAULT_BATCH_SIZE;


/**
* Number of worker threads to spawn to parallel source fetching
* Filter Config to filter what tickets get ingested
*/
@JsonProperty("workers")
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
@JsonProperty("filter")
private FilterConfig filterConfig;

/**
* Default time to wait (with exponential backOff) in the case of
* waiting for the source service to respond
*/
@JsonProperty("backoff_time")
private Duration backOff = DEFAULT_BACKOFF_MILLIS;

/**
* Boolean property indicating end to end acknowledgments state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.BASIC;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.OAUTH2;
import static org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig.DEFAULT_NUMBER_OF_WORKERS;

public class JiraSourceConfigTest {
private final PluginConfigVariable accessToken = new MockPluginConfigVariableImpl("access token test");
Expand Down Expand Up @@ -115,11 +113,9 @@ private JiraSourceConfig createJiraSourceConfig(String authtype, boolean hasToke
void testGetters() throws Exception {
jiraSourceConfig = createJiraSourceConfig(BASIC, false);
assertEquals(jiraSourceConfig.getFilterConfig().getIssueTypeConfig().getInclude(), issueTypeList);
assertEquals(jiraSourceConfig.getNumWorkers(), DEFAULT_NUMBER_OF_WORKERS);
assertEquals(jiraSourceConfig.getFilterConfig().getProjectConfig().getNameConfig().getInclude(), projectList);
assertEquals(jiraSourceConfig.getFilterConfig().getStatusConfig().getInclude(), statusList);
assertEquals(jiraSourceConfig.getAccountUrl(), accountUrl);
assertNotNull(jiraSourceConfig.getBackOff());
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getPassword(), password);
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getUsername(), username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 100;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");
Expand All @@ -37,14 +36,14 @@ public Crawler(CrawlerClient client) {
}

public Instant crawl(Instant lastPollTime,
EnhancedSourceCoordinator coordinator) {
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

int getBatchSize();

/**
* Boolean to indicate if acknowledgments enabled for this source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source<Record<Event>>, Uses
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final String sourcePluginName;
private final int batchSize;
private EnhancedSourceCoordinator coordinator;
private Buffer<Record<Event>> buffer;

Expand All @@ -59,6 +60,7 @@ public CrawlerSourcePlugin(final String sourcePluginName,
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.crawler = crawler;
this.batchSize = sourceConfig.getBatchSize();

this.acknowledgementSetManager = acknowledgementSetManager;
this.executorService = executorServiceProvider.get();
Expand All @@ -74,7 +76,7 @@ public void start(Buffer<Record<Event>> buffer) {
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.debug("Leader partition creation status: {}", isPartitionCreated);

Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler);
Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize);
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable {
@Setter
private Duration leaseInterval;
private LeaderPartition leaderPartition;
private final int batchSize;

public LeaderScheduler(EnhancedSourceCoordinator coordinator,
CrawlerSourcePlugin sourcePlugin,
Crawler crawler) {
Crawler crawler,
int batchSize) {
this.coordinator = coordinator;
this.leaseInterval = DEFAULT_LEASE_INTERVAL;
this.sourcePlugin = sourcePlugin;
this.crawler = crawler;
this.batchSize = batchSize;
}

@Override
Expand All @@ -65,7 +68,7 @@ public void run() {
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest {

private testCrawlerSourcePlugin saasSourcePlugin;

private final int batchSize = 50;

@BeforeEach
void setUp() {
when(executorServiceProvider.get()).thenReturn(executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;

import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -49,6 +48,8 @@ public class CrawlerTest {

private Crawler crawler;

private static final int DEFAULT_BATCH_SIZE = 50;

@BeforeEach
public void setup() {
crawler = new Crawler(client);
Expand All @@ -69,49 +70,69 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException {
void testCrawlWithNonEmptyList(){
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}

@Test
void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException {
void testCrawlWithMultiplePartitions(){
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage + 1; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

List<ItemInfo> itemInfoList2 = new ArrayList<>();
int maxItemsPerPage2 = 25;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
itemInfoList.add(null);
for (int i = 0; i < maxItemsPerPage - 1; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

Expand All @@ -122,7 +143,7 @@ void testUpdatingPollTimeNullMetaData() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

Expand All @@ -133,17 +154,10 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(lastPollTime, updatedPollTime);
}


private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException {
Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage");
maxItemsPerPageField.setAccessible(true);
return (int) maxItemsPerPageField.get(null);
}

private ItemInfo createTestItemInfo(String id) {
return new TestItemInfo(id, new HashMap<>(), Instant.now());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
Expand All @@ -37,9 +38,11 @@ public class LeaderSchedulerTest {
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -66,15 +69,15 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr

@Test
void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
Expand All @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class));
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}

0 comments on commit 19d9dae

Please sign in to comment.