diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java index 349ab7bccb..9631840972 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/main/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfo.java @@ -76,12 +76,28 @@ public Map getKeyAttributes() { @Override public Instant getLastModifiedAt() { - long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0")); - long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0")); + long updatedAtMillis = getMetadataField(Constants.UPDATED); + long createdAtMillis = getMetadataField(Constants.CREATED); return createdAtMillis > updatedAtMillis ? Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis); } + private Long getMetadataField(String fieldName) { + Object value = this.metadata.get(fieldName); + if (value == null) { + return 0L; + } else if (value instanceof Long) { + return (Long) value; + } else if (value instanceof String) { + try { + return Long.parseLong((String) value); + } catch (Exception e) { + return 0L; + } + } + return 0L; + } + public static class JiraItemInfoBuilder { private Map metadata; private Instant eventTime; diff --git a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfoTest.java b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfoTest.java index f5e6b0906b..1b614ab64a 100644 --- a/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfoTest.java +++ b/data-prepper-plugins/saas-source-plugins/jira-source/src/test/java/org/opensearch/dataprepper/plugins/source/jira/JiraItemInfoTest.java @@ -15,6 +15,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.plugins.source.jira.utils.Constants; import java.time.Instant; import java.util.Map; @@ -95,12 +96,12 @@ void testGetPartitionKey() { @Test void testGetLastModifiedAt() { - when(metadata.getOrDefault("updated", "0")).thenReturn("5"); - when(metadata.getOrDefault("created", "0")).thenReturn("0"); + when(metadata.get(Constants.UPDATED)).thenReturn("5"); + when(metadata.get(Constants.CREATED)).thenReturn("0"); assertEquals(Instant.ofEpochMilli(5), jiraItemInfo.getLastModifiedAt()); - when(metadata.getOrDefault("updated", "0")).thenReturn("5"); - when(metadata.getOrDefault("created", "0")).thenReturn("7"); + when(metadata.get(Constants.UPDATED)).thenReturn("5"); + when(metadata.get(Constants.CREATED)).thenReturn("7"); assertEquals(Instant.ofEpochMilli(7), jiraItemInfo.getLastModifiedAt()); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java index ea834eba3b..7a8c4f2275 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java @@ -7,13 +7,16 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.inject.Named; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Iterator; @@ -21,6 +24,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES; + @Named public class Crawler { private static final Logger log = LoggerFactory.getLogger(Crawler.class); @@ -35,11 +40,15 @@ public Crawler(CrawlerClient client) { this.crawlingTimer = pluginMetrics.timer("crawlingTime"); } - public Instant crawl(Instant lastPollTime, + public Instant crawl(LeaderPartition leaderPartition, EnhancedSourceCoordinator coordinator, int batchSize) { long startTime = System.currentTimeMillis(); + Instant lastLeaderSavedInstant = Instant.now(); + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + Instant lastPollTime = leaderProgressState.getLastPollTime(); client.setLastPollTime(lastPollTime); Iterator itemInfoIterator = client.listItems(); + Instant latestModifiedTime = Instant.from(lastPollTime); log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime); do { final List itemInfoList = new ArrayList<>(); @@ -51,20 +60,40 @@ public Instant crawl(Instant lastPollTime, continue; } itemInfoList.add(nextItem); + if (nextItem.getLastModifiedAt().isAfter(latestModifiedTime)) { + latestModifiedTime = nextItem.getLastModifiedAt(); + } } createPartition(itemInfoList, coordinator); - // intermediate updates to master partition state is required here + + // Check point leader progress state at every minute interval. + Instant currentTimeInstance = Instant.now(); + if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) { + // intermediate updates to master partition state + updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator); + lastLeaderSavedInstant = currentTimeInstance; + } + } while (itemInfoIterator.hasNext()); + Instant startTimeInstant = Instant.ofEpochMilli(startTime); + updateLeaderProgressState(leaderPartition, startTimeInstant, coordinator); long crawlTimeMillis = System.currentTimeMillis() - startTime; log.debug("Crawling completed in {} ms", crawlTimeMillis); crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS); - return Instant.ofEpochMilli(startTime); + return startTimeInstant; } public void executePartition(SaasWorkerProgressState state, Buffer> buffer, AcknowledgementSet acknowledgementSet) { client.executePartition(state, buffer, acknowledgementSet); } + private void updateLeaderProgressState(LeaderPartition leaderPartition, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + leaderProgressState.setLastPollTime(updatedPollTime); + leaderPartition.setLeaderProgressState(leaderProgressState); + coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); + } + private void createPartition(List itemInfoList, EnhancedSourceCoordinator coordinator) { if (itemInfoList.isEmpty()) { return; diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java index 79705206f1..44196c906a 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderScheduler.java @@ -6,23 +6,19 @@ import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler; import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; -import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.time.Instant; import java.util.Optional; public class LeaderScheduler implements Runnable { - private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); - /** * Default duration to extend the timeout of lease */ - private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3); - + public static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3); + private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); /** * Default interval to run lease check and shard discovery */ @@ -64,14 +60,8 @@ public void run() { // Once owned, run Normal LEADER node process. // May want to quit this scheduler if we don't want to monitor future changes if (leaderPartition != null) { - LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); - Instant lastPollTime = leaderProgressState.getLastPollTime(); - - //Start crawling and create child partitions - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize); - leaderProgressState.setLastPollTime(updatedPollTime); - leaderPartition.setLeaderProgressState(leaderProgressState); - coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES); + //Start crawling, create child partitions and also continue to update leader partition state + crawler.crawl(leaderPartition, coordinator, batchSize); } } catch (Exception e) { diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java index afac9f3e54..d88cd7aef1 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerTest.java @@ -10,7 +10,9 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition; +import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState; import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo; import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo; @@ -20,48 +22,50 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; @ExtendWith(MockitoExtension.class) public class CrawlerTest { + private static final int DEFAULT_BATCH_SIZE = 50; @Mock private AcknowledgementSet acknowledgementSet; - @Mock private EnhancedSourceCoordinator coordinator; - @Mock private Buffer> buffer; - @Mock private CrawlerClient client; - @Mock private SaasWorkerProgressState state; - + @Mock + private LeaderPartition leaderPartition; private Crawler crawler; - - private static final int DEFAULT_BATCH_SIZE = 50; + private final Instant lastPollTime = Instant.ofEpochMilli(0); @BeforeEach public void setup() { crawler = new Crawler(client); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime))); } @Test public void crawlerConstructionTest() { + reset(leaderPartition); assertNotNull(crawler); } @Test public void executePartitionTest() { + reset(leaderPartition); crawler.executePartition(state, buffer, acknowledgementSet); verify(client).executePartition(state, buffer, acknowledgementSet); } @@ -70,45 +74,43 @@ public void executePartitionTest() { void testCrawlWithEmptyList() { Instant lastPollTime = Instant.ofEpochMilli(0); when(client.listItems()).thenReturn(Collections.emptyIterator()); - crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime))); + crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, never()).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithNonEmptyList(){ - Instant lastPollTime = Instant.ofEpochMilli(0); + void testCrawlWithNonEmptyList() { List itemInfoList = new ArrayList<>(); for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) { itemInfoList.add(new TestItemInfo("itemId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithMultiplePartitions(){ - Instant lastPollTime = Instant.ofEpochMilli(0); + void testCrawlWithMultiplePartitions() { List itemInfoList = new ArrayList<>(); for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class)); } @Test void testBatchSize() { - Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); int maxItemsPerPage = 50; for (int i = 0; i < maxItemsPerPage; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage); + crawler.crawl(leaderPartition, coordinator, maxItemsPerPage); int expectedNumberOfInvocations = 1; verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); @@ -117,33 +119,31 @@ void testBatchSize() { for (int i = 0; i < maxItemsPerPage; i++) { itemInfoList2.add(new TestItemInfo("testId")); } - when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2); + when(client.listItems()).thenReturn(itemInfoList2.iterator()); + crawler.crawl(leaderPartition, coordinator, maxItemsPerPage2); expectedNumberOfInvocations += 2; verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class)); } @Test - void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException { - Instant lastPollTime = Instant.ofEpochMilli(0); + void testCrawlWithNullItemsInList() { List itemInfoList = new ArrayList<>(); itemInfoList.add(null); for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) { itemInfoList.add(new TestItemInfo("testId")); } when(client.listItems()).thenReturn(itemInfoList.iterator()); - crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class)); } @Test void testUpdatingPollTimeNullMetaData() { - Instant lastPollTime = Instant.ofEpochMilli(0); List itemInfoList = new ArrayList<>(); ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime); } @@ -154,7 +154,7 @@ void testUpdatedPollTimeNiCreatedLarger() { ItemInfo testItem = createTestItemInfo("1"); itemInfoList.add(testItem); when(client.listItems()).thenReturn(itemInfoList.iterator()); - Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE); + Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE); assertNotEquals(lastPollTime, updatedPollTime); } diff --git a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java index d5ef0cecde..e0db0304aa 100644 --- a/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java +++ b/data-prepper-plugins/saas-source-plugins/source-crawler/src/test/java/org/opensearch/dataprepper/plugins/source/source_crawler/coordination/scheduler/LeaderSchedulerTest.java @@ -31,6 +31,7 @@ @ExtendWith(MockitoExtension.class) public class LeaderSchedulerTest { + private final int batchSize = 50; @Mock private EnhancedSourceCoordinator coordinator; @Mock @@ -38,8 +39,6 @@ public class LeaderSchedulerTest { @Mock private Crawler crawler; - private final int batchSize = 50; - @Test void testUnableToAcquireLeaderPartition() throws InterruptedException { LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize); @@ -69,8 +68,8 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize); - verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); + verify(crawler, times(1)).crawl(leaderPartition, coordinator, batchSize); + verify(coordinator, times(1)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class)); } @@ -100,7 +99,7 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { LeaderPartition leaderPartition = new LeaderPartition(); leaderPartition.getProgressState().get().setInitialized(false); leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L)); - when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10)); + when(crawler.crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10)); when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)) .thenReturn(Optional.of(leaderPartition)) .thenThrow(RuntimeException.class); @@ -113,6 +112,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException { executorService.shutdownNow(); // Check if crawler was invoked and updated leader lease renewal time - verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt()); + verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt()); } }