Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add batch size field for jira source #5348

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
import org.opensearch.dataprepper.plugins.source.jira.configuration.FilterConfig;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig;

import java.time.Duration;
import java.util.List;

@Getter
public class JiraSourceConfig implements CrawlerSourceConfig {

private static final Duration DEFAULT_BACKOFF_MILLIS = Duration.ofMinutes(2);
private static final int DEFAULT_BATCH_SIZE = 50;

/**
* Jira account url
Expand All @@ -38,26 +37,19 @@ public class JiraSourceConfig implements CrawlerSourceConfig {
@Valid
private AuthenticationConfig authenticationConfig;


/**
* Filter Config to filter what tickets get ingested
* Batch size for fetching tickets
*/
@JsonProperty("filter")
private FilterConfig filterConfig;
@JsonProperty("batch_size")
private int batchSize = DEFAULT_BATCH_SIZE;


/**
* Number of worker threads to spawn to parallel source fetching
* Filter Config to filter what tickets get ingested
*/
@JsonProperty("workers")
private int numWorkers = DEFAULT_NUMBER_OF_WORKERS;
@JsonProperty("filter")
private FilterConfig filterConfig;

/**
* Default time to wait (with exponential backOff) in the case of
* waiting for the source service to respond
*/
@JsonProperty("backoff_time")
private Duration backOff = DEFAULT_BACKOFF_MILLIS;

/**
* Boolean property indicating end to end acknowledgments state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.BASIC;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.OAUTH2;
import static org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourceConfig.DEFAULT_NUMBER_OF_WORKERS;

public class JiraSourceConfigTest {
private final PluginConfigVariable accessToken = new MockPluginConfigVariableImpl("access token test");
Expand Down Expand Up @@ -115,11 +113,9 @@ private JiraSourceConfig createJiraSourceConfig(String authtype, boolean hasToke
void testGetters() throws Exception {
jiraSourceConfig = createJiraSourceConfig(BASIC, false);
assertEquals(jiraSourceConfig.getFilterConfig().getIssueTypeConfig().getInclude(), issueTypeList);
assertEquals(jiraSourceConfig.getNumWorkers(), DEFAULT_NUMBER_OF_WORKERS);
assertEquals(jiraSourceConfig.getFilterConfig().getProjectConfig().getNameConfig().getInclude(), projectList);
assertEquals(jiraSourceConfig.getFilterConfig().getStatusConfig().getInclude(), statusList);
assertEquals(jiraSourceConfig.getAccountUrl(), accountUrl);
assertNotNull(jiraSourceConfig.getBackOff());
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getPassword(), password);
assertEquals(jiraSourceConfig.getAuthenticationConfig().getBasicConfig().getUsername(), username);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
private static final int maxItemsPerPage = 100;
private final Timer crawlingTimer;
private final PluginMetrics pluginMetrics =
PluginMetrics.fromNames("sourceCrawler", "crawler");
Expand All @@ -37,14 +36,14 @@ public Crawler(CrawlerClient client) {
}

public Instant crawl(Instant lastPollTime,
EnhancedSourceCoordinator coordinator) {
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < maxItemsPerPage && itemInfoIterator.hasNext(); i++) {
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {
ItemInfo nextItem = itemInfoIterator.next();
if (nextItem == null) {
//we don't expect null items, but just in case, we'll skip them
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ public interface CrawlerSourceConfig {

int DEFAULT_NUMBER_OF_WORKERS = 1;

int getBatchSize();

/**
* Boolean to indicate if acknowledgments enabled for this source
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public abstract class CrawlerSourcePlugin implements Source<Record<Event>>, Uses
private final CrawlerSourceConfig sourceConfig;
private final Crawler crawler;
private final String sourcePluginName;
private final int batchSize;
private EnhancedSourceCoordinator coordinator;
private Buffer<Record<Event>> buffer;

Expand All @@ -59,6 +60,7 @@ public CrawlerSourcePlugin(final String sourcePluginName,
this.sourceConfig = sourceConfig;
this.pluginFactory = pluginFactory;
this.crawler = crawler;
this.batchSize = sourceConfig.getBatchSize();

this.acknowledgementSetManager = acknowledgementSetManager;
this.executorService = executorServiceProvider.get();
Expand All @@ -74,7 +76,7 @@ public void start(Buffer<Record<Event>> buffer) {
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
log.debug("Leader partition creation status: {}", isPartitionCreated);

Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler);
Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize);
this.executorService.submit(leaderScheduler);
//Register worker threaders
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ public class LeaderScheduler implements Runnable {
@Setter
private Duration leaseInterval;
private LeaderPartition leaderPartition;
private final int batchSize;

public LeaderScheduler(EnhancedSourceCoordinator coordinator,
CrawlerSourcePlugin sourcePlugin,
Crawler crawler) {
Crawler crawler,
int batchSize) {
this.coordinator = coordinator;
this.leaseInterval = DEFAULT_LEASE_INTERVAL;
this.sourcePlugin = sourcePlugin;
this.crawler = crawler;
this.batchSize = batchSize;
}

@Override
Expand All @@ -65,7 +68,7 @@ public void run() {
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class CrawlerSourcePluginTest {

private testCrawlerSourcePlugin saasSourcePlugin;

private final int batchSize = 50;

@BeforeEach
void setUp() {
when(executorServiceProvider.get()).thenReturn(executorService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;

import java.lang.reflect.Field;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -49,6 +48,8 @@ public class CrawlerTest {

private Crawler crawler;

private static final int DEFAULT_BATCH_SIZE = 50;

@BeforeEach
public void setup() {
crawler = new Crawler(client);
Expand All @@ -69,49 +70,69 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList() throws NoSuchFieldException, IllegalAccessException {
void testCrawlWithNonEmptyList(){
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}

@Test
void testCrawlWithMultiplePartitions() throws NoSuchFieldException, IllegalAccessException {
void testCrawlWithMultiplePartitions(){
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
for (int i = 0; i < maxItemsPerPage + 1; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

List<ItemInfo> itemInfoList2 = new ArrayList<>();
int maxItemsPerPage2 = 25;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = getMaxItemsPerPage();
itemInfoList.add(null);
for (int i = 0; i < maxItemsPerPage - 1; i++) {
for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator);
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

Expand All @@ -122,7 +143,7 @@ void testUpdatingPollTimeNullMetaData() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

Expand All @@ -133,17 +154,10 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(lastPollTime, updatedPollTime);
}


private int getMaxItemsPerPage() throws NoSuchFieldException, IllegalAccessException {
Field maxItemsPerPageField = Crawler.class.getDeclaredField("maxItemsPerPage");
maxItemsPerPageField.setAccessible(true);
return (int) maxItemsPerPageField.get(null);
}

private ItemInfo createTestItemInfo(String id) {
return new TestItemInfo(id, new HashMap<>(), Instant.now());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Executors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
Expand All @@ -37,9 +38,11 @@ public class LeaderSchedulerTest {
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -52,7 +55,7 @@ void testUnableToAcquireLeaderPartition() throws InterruptedException {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testLeaderPartitionsCreation(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -66,15 +69,15 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(initializationState);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
Expand All @@ -92,12 +95,12 @@ void testExceptionWhileAcquiringLeaderPartition(boolean initializationState) thr

@Test
void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler);
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
leaderScheduler.setLeaseInterval(Duration.ofMillis(10));
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class))).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
Expand All @@ -110,6 +113,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class));
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}
Loading