Skip to content

Commit

Permalink
some fixes to jira source (opensearch-project#5203)
Browse files Browse the repository at this point in the history
* prevent polling when there is already a leader scheduler

Signed-off-by: Maxwell Brown <[email protected]>

* verify internet address everytime we invoke restapi

Signed-off-by: Maxwell Brown <[email protected]>

* reparralelize crawler call

Signed-off-by: Maxwell Brown <[email protected]>

* Test fixes and remove unused code

Signed-off-by: Maxwell Brown <[email protected]>

* remove unneccessary stubbing

Signed-off-by: Maxwell Brown <[email protected]>

* jirarestclient test

Signed-off-by: Maxwell Brown <[email protected]>

* Jira Iterator future list tests

Signed-off-by: Maxwell Brown <[email protected]>

---------

Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 authored Dec 4, 2024
1 parent 76b06ef commit 3999910
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ public JiraItemInfoBuilder withProject(String project) {
return this;
}

public JiraItemInfoBuilder withIssueType(String issueType) {
this.issueType = issueType;
return this;
}

public JiraItemInfoBuilder withIssueBean(IssueBean issue) {
Map<String, Object> issueMetadata = new HashMap<>();
issueMetadata.put(PROJECT_KEY, issue.getProject());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.dataprepper.plugins.source.jira;


import com.google.common.annotations.VisibleForTesting;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
Expand All @@ -9,15 +10,19 @@

import javax.inject.Named;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

@Named
public class JiraIterator implements Iterator<ItemInfo> {

private static final int HAS_NEXT_TIMEOUT = 60;
private static final Logger log = LoggerFactory.getLogger(JiraIterator.class);
private final JiraSourceConfig sourceConfig;
private final JiraService service;
Expand All @@ -27,6 +32,7 @@ public class JiraIterator implements Iterator<ItemInfo> {
private Queue<ItemInfo> itemInfoQueue;
private Instant lastPollTime;
private boolean firstTime = true;
private List<Future<Boolean>> futureList = new ArrayList<>();

public JiraIterator(final JiraService service,
PluginExecutorServiceProvider executorServiceProvider,
Expand All @@ -40,12 +46,41 @@ public JiraIterator(final JiraService service,
public boolean hasNext() {
if (firstTime) {
log.trace("Crawling has been started");
itemInfoQueue = service.getJiraEntities(sourceConfig, lastPollTime);
startCrawlerThreads();
firstTime = false;
}
int timeout = HAS_NEXT_TIMEOUT;
while (isCrawlerRunning() && itemInfoQueue.isEmpty() && timeout > 0) {
try {
log.trace("Waiting for crawler queue to be filled for next {} seconds", timeout);
Thread.sleep(crawlerQWaitTimeMillis);
timeout--;
} catch (InterruptedException e) {
log.error("An exception has occurred while checking for the next document in crawling queue");
Thread.currentThread().interrupt();
}
}
return !this.itemInfoQueue.isEmpty();
}

private boolean isCrawlerRunning() {
boolean isRunning = false;
if (!futureList.isEmpty()) {
for (Future<Boolean> future : futureList) {
if (!future.isDone()) {
isRunning = true;
break;
}
}
}
return isRunning;
}

private void startCrawlerThreads() {
futureList.add(crawlerTaskExecutor.submit(() ->
service.getJiraEntities(sourceConfig, lastPollTime, itemInfoQueue), false));
}

@Override
public ItemInfo next() {
if (hasNext()) {
Expand All @@ -66,4 +101,14 @@ public void initialize(Instant jiraChangeLogToken) {
this.firstTime = true;
}

@VisibleForTesting
public List<Future<Boolean>> showFutureList() {
return futureList;
}

@VisibleForTesting
public Queue<ItemInfo> showItemInfoQueue() {
return itemInfoQueue;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,11 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.ISSUE_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.PROJECT_KEY;
import static org.opensearch.dataprepper.plugins.source.jira.utils.Constants.UPDATED;
import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.CLOSING_ROUND_BRACKET;
import static org.opensearch.dataprepper.plugins.source.jira.utils.JqlConstants.DELIMITER;
Expand Down Expand Up @@ -65,12 +61,10 @@ public JiraService(JiraSourceConfig jiraSourceConfig, JiraRestClient jiraRestCli
* @param configuration the configuration.
* @param timestamp timestamp.
*/
public Queue<ItemInfo> getJiraEntities(JiraSourceConfig configuration, Instant timestamp) {
public void getJiraEntities(JiraSourceConfig configuration, Instant timestamp, Queue<ItemInfo> itemInfoQueue) {
log.trace("Started to fetch entities");
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
searchForNewTicketsAndAddToQueue(configuration, timestamp, itemInfoQueue);
log.trace("Creating item information and adding in queue");
return itemInfoQueue;
}

public String getIssue(String issueKey) {
Expand Down Expand Up @@ -170,21 +164,4 @@ private void validateProjectFilters(JiraSourceConfig configuration) {
}
}

/**
* Method for creating Item Info.
*
* @param key Input Parameter
* @param metadata Input Parameter
* @return Item Info
*/
private ItemInfo createItemInfo(String key, Map<String, Object> metadata) {
return JiraItemInfo.builder().withEventTime(Instant.now())
.withId((String) metadata.get(ISSUE_KEY))
.withItemId(key)
.withMetadata(metadata)
.withProject((String) metadata.get(PROJECT_KEY))
.withIssueType((String) metadata.get(CONTENT_TYPE))
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
import lombok.extern.slf4j.Slf4j;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig;
import org.opensearch.dataprepper.plugins.source.jira.exception.BadRequestException;
import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException;
import org.opensearch.dataprepper.plugins.source.jira.models.SearchResults;
import org.opensearch.dataprepper.plugins.source.jira.rest.auth.JiraAuthConfig;
import org.opensearch.dataprepper.plugins.source.jira.utils.AddressValidation;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.HttpClientErrorException;
Expand Down Expand Up @@ -93,8 +95,8 @@ public String getIssue(String issueKey) {
return invokeRestApi(uri, String.class).getBody();
}

private <T> ResponseEntity<T> invokeRestApi(URI uri, Class<T> responseType) {

private <T> ResponseEntity<T> invokeRestApi(URI uri, Class<T> responseType) throws BadRequestException{
AddressValidation.validateInetAddress(AddressValidation.getInetAddress(uri.toString()));
int retryCount = 0;
while (retryCount < RETRY_ATTEMPT) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.NoSuchElementException;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -101,6 +102,38 @@ void testItemInfoQueueNotEmpty() {
assertNotNull(jiraIterator.next());
}

@Test
void testStartCrawlerThreads() {
jiraIterator = createObjectUnderTest();
jiraIterator.initialize(Instant.ofEpochSecond(0));
jiraIterator.hasNext();
jiraIterator.hasNext();
assertTrue(jiraIterator.showFutureList().size() == 1);
}

@Test
void testFuturesCompleted() throws InterruptedException {
jiraIterator = createObjectUnderTest();
List<IssueBean> mockIssues = new ArrayList<>();
IssueBean issue1 = createIssueBean(false);
mockIssues.add(issue1);
IssueBean issue2 = createIssueBean(false);
mockIssues.add(issue2);
IssueBean issue3 = createIssueBean(false);
mockIssues.add(issue3);
when(mockSearchResults.getIssues()).thenReturn(mockIssues);
when(mockSearchResults.getTotal()).thenReturn(0);
doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

jiraIterator.initialize(Instant.ofEpochSecond(0));
jiraIterator.setCrawlerQWaitTimeMillis(1);
jiraIterator.hasNext();

Thread.sleep(1);
jiraIterator.showFutureList().forEach(future -> assertTrue(future.isDone()));
assertEquals(jiraIterator.showItemInfoQueue().size(), mockIssues.size());
}

@Test
void testItemInfoQueueEmpty(){
jiraIterator = createObjectUnderTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void testGetJiraEntities() throws JsonProcessingException {
doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
Queue<ItemInfo> itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp);
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue);
assertEquals(mockIssues.size(), itemInfoQueue.size());
}

Expand All @@ -166,7 +168,8 @@ public void buildIssueItemInfoMultipleFutureThreads() throws JsonProcessingExcep
doReturn(mockSearchResults).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
Queue<ItemInfo> itemInfoQueue = jiraService.getJiraEntities(jiraSourceConfig, timestamp);
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();
jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue);
assertTrue(itemInfoQueue.size() >= 100);
}

Expand All @@ -186,7 +189,9 @@ public void testBadProjectKeys() throws JsonProcessingException {
JiraService jiraService = new JiraService(jiraSourceConfig, jiraRestClient);

Instant timestamp = Instant.ofEpochSecond(0);
assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp));
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();

assertThrows(BadRequestException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue));
}

@Test
Expand All @@ -201,7 +206,9 @@ public void testGetJiraEntitiesException() throws JsonProcessingException {
doThrow(RuntimeException.class).when(jiraRestClient).getAllIssues(any(StringBuilder.class), anyInt(), any(JiraSourceConfig.class));

Instant timestamp = Instant.ofEpochSecond(0);
assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp));
Queue<ItemInfo> itemInfoQueue = new ConcurrentLinkedQueue<>();

assertThrows(RuntimeException.class, () -> jiraService.getJiraEntities(jiraSourceConfig, timestamp, itemInfoQueue));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.source.jira.JiraServiceTest;
import org.opensearch.dataprepper.plugins.source.jira.JiraSourceConfig;
import org.opensearch.dataprepper.plugins.source.jira.exception.BadRequestException;
import org.opensearch.dataprepper.plugins.source.jira.exception.UnAuthorizedException;
import org.opensearch.dataprepper.plugins.source.jira.models.SearchResults;
import org.opensearch.dataprepper.plugins.source.jira.rest.auth.JiraAuthConfig;
Expand Down Expand Up @@ -108,7 +109,7 @@ public void testGetAllIssuesOauth2() throws JsonProcessingException {
JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(OAUTH2, issueType, issueStatus, projectKey);
JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig);
SearchResults mockSearchResults = mock(SearchResults.class);
doReturn("http://mock-service.jira.com").when(authConfig).getUrl();
doReturn("http://mock-service.jira.com/").when(authConfig).getUrl();
doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class));
SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig);
assertNotNull(results);
Expand All @@ -123,10 +124,17 @@ public void testGetAllIssuesBasic() throws JsonProcessingException {
JiraSourceConfig jiraSourceConfig = JiraServiceTest.createJiraConfiguration(BASIC, issueType, issueStatus, projectKey);
JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig);
SearchResults mockSearchResults = mock(SearchResults.class);
when(authConfig.getUrl()).thenReturn("https://example.com");
when(authConfig.getUrl()).thenReturn("https://example.com/");
doReturn(new ResponseEntity<>(mockSearchResults, HttpStatus.OK)).when(restTemplate).getForEntity(any(URI.class), any(Class.class));
SearchResults results = jiraRestClient.getAllIssues(jql, 0, jiraSourceConfig);
assertNotNull(results);
}

@Test
public void testRestApiAddressValidation() throws JsonProcessingException {
when(authConfig.getUrl()).thenReturn("https://224.0.0.1/");
JiraRestClient jiraRestClient = new JiraRestClient(restTemplate, authConfig);
assertThrows(BadRequestException.class, () -> jiraRestClient.getIssue("TEST-1"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class LeaderScheduler implements Runnable {
/**
* Default duration to extend the timeout of lease
*/
private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3;
private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);

/**
* Default interval to run lease check and shard discovery
Expand Down Expand Up @@ -68,7 +68,7 @@ public void run() {
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, null);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
}

} catch (Exception e) {
Expand All @@ -78,7 +78,7 @@ public void run() {
// Extend the timeout
// will always be a leader until shutdown
try {
coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES));
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
} catch (final Exception e) {
LOG.error("Failed to save Leader partition state. This process will retry.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator);
verify(coordinator).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ void testExceptionWhileAcquiringWorkerPartition() throws InterruptedException {
@Test
void testWhenNoPartitionToWorkOn() throws InterruptedException {
WorkerScheduler workerScheduler = new WorkerScheduler(buffer, coordinator, sourceConfig, crawler);
given(coordinator.acquireAvailablePartition(SaasSourcePartition.PARTITION_TYPE)).willReturn(Optional.empty());

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(workerScheduler);
Expand Down

0 comments on commit 3999910

Please sign in to comment.