Skip to content

Commit

Permalink
Check pointing Leader Scheduler State (#5352)
Browse files Browse the repository at this point in the history
* Handling end to end acknowledgement

Signed-off-by: Santhosh Gandhe <[email protected]>

* checking pointing leader state for every one minute

Signed-off-by: Santhosh Gandhe <[email protected]>

* corresponding test cases fix

Signed-off-by: Santhosh Gandhe <[email protected]>


---------

Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 authored Jan 28, 2025
1 parent f217b24 commit 440fb2d
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,28 @@ public Map<String, Object> getKeyAttributes() {

@Override
public Instant getLastModifiedAt() {
long updatedAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.UPDATED, "0"));
long createdAtMillis = Long.parseLong((String) this.metadata.getOrDefault(Constants.CREATED, "0"));
long updatedAtMillis = getMetadataField(Constants.UPDATED);
long createdAtMillis = getMetadataField(Constants.CREATED);
return createdAtMillis > updatedAtMillis ?
Instant.ofEpochMilli(createdAtMillis) : Instant.ofEpochMilli(updatedAtMillis);
}

private Long getMetadataField(String fieldName) {
Object value = this.metadata.get(fieldName);
if (value == null) {
return 0L;
} else if (value instanceof Long) {
return (Long) value;
} else if (value instanceof String) {
try {
return Long.parseLong((String) value);
} catch (Exception e) {
return 0L;
}
}
return 0L;
}

public static class JiraItemInfoBuilder {
private Map<String, Object> metadata;
private Instant eventTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.source.jira.utils.Constants;

import java.time.Instant;
import java.util.Map;
Expand Down Expand Up @@ -95,12 +96,12 @@ void testGetPartitionKey() {

@Test
void testGetLastModifiedAt() {
when(metadata.getOrDefault("updated", "0")).thenReturn("5");
when(metadata.getOrDefault("created", "0")).thenReturn("0");
when(metadata.get(Constants.UPDATED)).thenReturn("5");
when(metadata.get(Constants.CREATED)).thenReturn("0");
assertEquals(Instant.ofEpochMilli(5), jiraItemInfo.getLastModifiedAt());

when(metadata.getOrDefault("updated", "0")).thenReturn("5");
when(metadata.getOrDefault("created", "0")).thenReturn("7");
when(metadata.get(Constants.UPDATED)).thenReturn("5");
when(metadata.get(Constants.CREATED)).thenReturn("7");
assertEquals(Instant.ofEpochMilli(7), jiraItemInfo.getLastModifiedAt());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,25 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Named;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.dataprepper.plugins.source.source_crawler.coordination.scheduler.LeaderScheduler.DEFAULT_EXTEND_LEASE_MINUTES;

@Named
public class Crawler {
private static final Logger log = LoggerFactory.getLogger(Crawler.class);
Expand All @@ -35,11 +40,15 @@ public Crawler(CrawlerClient client) {
this.crawlingTimer = pluginMetrics.timer("crawlingTime");
}

public Instant crawl(Instant lastPollTime,
public Instant crawl(LeaderPartition leaderPartition,
EnhancedSourceCoordinator coordinator, int batchSize) {
long startTime = System.currentTimeMillis();
Instant lastLeaderSavedInstant = Instant.now();
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
Instant lastPollTime = leaderProgressState.getLastPollTime();
client.setLastPollTime(lastPollTime);
Iterator<ItemInfo> itemInfoIterator = client.listItems();
Instant latestModifiedTime = Instant.from(lastPollTime);
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
do {
final List<ItemInfo> itemInfoList = new ArrayList<>();
Expand All @@ -51,20 +60,40 @@ public Instant crawl(Instant lastPollTime,
continue;
}
itemInfoList.add(nextItem);
if (nextItem.getLastModifiedAt().isAfter(latestModifiedTime)) {
latestModifiedTime = nextItem.getLastModifiedAt();
}
}
createPartition(itemInfoList, coordinator);
// intermediate updates to master partition state is required here

// Check point leader progress state at every minute interval.
Instant currentTimeInstance = Instant.now();
if (Duration.between(lastLeaderSavedInstant, currentTimeInstance).toMinutes() >= 1) {
// intermediate updates to master partition state
updateLeaderProgressState(leaderPartition, latestModifiedTime, coordinator);
lastLeaderSavedInstant = currentTimeInstance;
}

} while (itemInfoIterator.hasNext());
Instant startTimeInstant = Instant.ofEpochMilli(startTime);
updateLeaderProgressState(leaderPartition, startTimeInstant, coordinator);
long crawlTimeMillis = System.currentTimeMillis() - startTime;
log.debug("Crawling completed in {} ms", crawlTimeMillis);
crawlingTimer.record(crawlTimeMillis, TimeUnit.MILLISECONDS);
return Instant.ofEpochMilli(startTime);
return startTimeInstant;
}

public void executePartition(SaasWorkerProgressState state, Buffer<Record<Event>> buffer, AcknowledgementSet acknowledgementSet) {
client.executePartition(state, buffer, acknowledgementSet);
}

private void updateLeaderProgressState(LeaderPartition leaderPartition, Instant updatedPollTime, EnhancedSourceCoordinator coordinator) {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
}

private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordinator coordinator) {
if (itemInfoList.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,19 @@
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
import org.opensearch.dataprepper.plugins.source.source_crawler.base.CrawlerSourcePlugin;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.Instant;
import java.util.Optional;

public class LeaderScheduler implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);

/**
* Default duration to extend the timeout of lease
*/
private static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);

public static final Duration DEFAULT_EXTEND_LEASE_MINUTES = Duration.ofMinutes(3);
private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class);
/**
* Default interval to run lease check and shard discovery
*/
Expand Down Expand Up @@ -64,14 +60,8 @@ public void run() {
// Once owned, run Normal LEADER node process.
// May want to quit this scheduler if we don't want to monitor future changes
if (leaderPartition != null) {
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
Instant lastPollTime = leaderProgressState.getLastPollTime();

//Start crawling and create child partitions
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, batchSize);
leaderProgressState.setLastPollTime(updatedPollTime);
leaderPartition.setLeaderProgressState(leaderProgressState);
coordinator.saveProgressStateForPartition(leaderPartition, DEFAULT_EXTEND_LEASE_MINUTES);
//Start crawling, create child partitions and also continue to update leader partition state
crawler.crawl(leaderPartition, coordinator, batchSize);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.partition.SaasSourcePartition;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state.SaasWorkerProgressState;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.ItemInfo;
import org.opensearch.dataprepper.plugins.source.source_crawler.model.TestItemInfo;
Expand All @@ -20,48 +22,50 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

@ExtendWith(MockitoExtension.class)
public class CrawlerTest {
private static final int DEFAULT_BATCH_SIZE = 50;
@Mock
private AcknowledgementSet acknowledgementSet;

@Mock
private EnhancedSourceCoordinator coordinator;

@Mock
private Buffer<Record<Event>> buffer;

@Mock
private CrawlerClient client;

@Mock
private SaasWorkerProgressState state;

@Mock
private LeaderPartition leaderPartition;
private Crawler crawler;

private static final int DEFAULT_BATCH_SIZE = 50;
private final Instant lastPollTime = Instant.ofEpochMilli(0);

@BeforeEach
public void setup() {
crawler = new Crawler(client);
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime)));
}

@Test
public void crawlerConstructionTest() {
reset(leaderPartition);
assertNotNull(crawler);
}

@Test
public void executePartitionTest() {
reset(leaderPartition);
crawler.executePartition(state, buffer, acknowledgementSet);
verify(client).executePartition(state, buffer, acknowledgementSet);
}
Expand All @@ -70,45 +74,43 @@ public void executePartitionTest() {
void testCrawlWithEmptyList() {
Instant lastPollTime = Instant.ofEpochMilli(0);
when(client.listItems()).thenReturn(Collections.emptyIterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
when(leaderPartition.getProgressState()).thenReturn(Optional.of(new LeaderProgressState(lastPollTime)));
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, never()).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNonEmptyList(){
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithNonEmptyList() {
List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < DEFAULT_BATCH_SIZE; i++) {
itemInfoList.add(new TestItemInfo("itemId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));

}

@Test
void testCrawlWithMultiplePartitions(){
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithMultiplePartitions() {
List<ItemInfo> itemInfoList = new ArrayList<>();
for (int i = 0; i < DEFAULT_BATCH_SIZE + 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(2)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testBatchSize() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
int maxItemsPerPage = 50;
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage);
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage);
int expectedNumberOfInvocations = 1;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));

Expand All @@ -117,33 +119,31 @@ void testBatchSize() {
for (int i = 0; i < maxItemsPerPage; i++) {
itemInfoList2.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, maxItemsPerPage2);
when(client.listItems()).thenReturn(itemInfoList2.iterator());
crawler.crawl(leaderPartition, coordinator, maxItemsPerPage2);
expectedNumberOfInvocations += 2;
verify(coordinator, times(expectedNumberOfInvocations)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testCrawlWithNullItemsInList() throws NoSuchFieldException, IllegalAccessException {
Instant lastPollTime = Instant.ofEpochMilli(0);
void testCrawlWithNullItemsInList() {
List<ItemInfo> itemInfoList = new ArrayList<>();
itemInfoList.add(null);
for (int i = 0; i < DEFAULT_BATCH_SIZE - 1; i++) {
itemInfoList.add(new TestItemInfo("testId"));
}
when(client.listItems()).thenReturn(itemInfoList.iterator());
crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
verify(coordinator, times(1)).createPartition(any(SaasSourcePartition.class));
}

@Test
void testUpdatingPollTimeNullMetaData() {
Instant lastPollTime = Instant.ofEpochMilli(0);
List<ItemInfo> itemInfoList = new ArrayList<>();
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(Instant.ofEpochMilli(0), updatedPollTime);
}

Expand All @@ -154,7 +154,7 @@ void testUpdatedPollTimeNiCreatedLarger() {
ItemInfo testItem = createTestItemInfo("1");
itemInfoList.add(testItem);
when(client.listItems()).thenReturn(itemInfoList.iterator());
Instant updatedPollTime = crawler.crawl(lastPollTime, coordinator, DEFAULT_BATCH_SIZE);
Instant updatedPollTime = crawler.crawl(leaderPartition, coordinator, DEFAULT_BATCH_SIZE);
assertNotEquals(lastPollTime, updatedPollTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
@ExtendWith(MockitoExtension.class)
public class LeaderSchedulerTest {

private final int batchSize = 50;
@Mock
private EnhancedSourceCoordinator coordinator;
@Mock
private CrawlerSourcePlugin saasSourcePlugin;
@Mock
private Crawler crawler;

private final int batchSize = 50;

@Test
void testUnableToAcquireLeaderPartition() throws InterruptedException {
LeaderScheduler leaderScheduler = new LeaderScheduler(coordinator, saasSourcePlugin, crawler, batchSize);
Expand Down Expand Up @@ -69,8 +68,8 @@ void testLeaderPartitionsCreation(boolean initializationState) throws Interrupte
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, times(1)).crawl(Instant.ofEpochMilli(0L), coordinator, batchSize);
verify(coordinator, times(2)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));
verify(crawler, times(1)).crawl(leaderPartition, coordinator, batchSize);
verify(coordinator, times(1)).saveProgressStateForPartition(eq(leaderPartition), any(Duration.class));

}

Expand Down Expand Up @@ -100,7 +99,7 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
LeaderPartition leaderPartition = new LeaderPartition();
leaderPartition.getProgressState().get().setInitialized(false);
leaderPartition.getProgressState().get().setLastPollTime(Instant.ofEpochMilli(0L));
when(crawler.crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(crawler.crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt())).thenReturn(Instant.ofEpochMilli(10));
when(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE))
.thenReturn(Optional.of(leaderPartition))
.thenThrow(RuntimeException.class);
Expand All @@ -113,6 +112,6 @@ void testWhileLoopRunnningAfterTheSleep() throws InterruptedException {
executorService.shutdownNow();

// Check if crawler was invoked and updated leader lease renewal time
verify(crawler, atLeast(2)).crawl(any(Instant.class), any(EnhancedSourceCoordinator.class), anyInt());
verify(crawler, atLeast(2)).crawl(any(LeaderPartition.class), any(EnhancedSourceCoordinator.class), anyInt());
}
}

0 comments on commit 440fb2d

Please sign in to comment.