diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/ExportPartition.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/ExportPartition.java index 328e275023..d41eefdd3c 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/ExportPartition.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/ExportPartition.java @@ -39,11 +39,11 @@ public ExportPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) } public ExportPartition(final String collection, final int partitionSize, final Instant exportTime, - final Optional state) { + final ExportProgressState state) { this.collection = collection; this.partitionSize = partitionSize; this.exportTime = exportTime; - this.state = state.orElse(null); + this.state = state; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/GlobalState.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/GlobalState.java index e6ef7d6090..ad2a6f868e 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/GlobalState.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/coordination/partition/GlobalState.java @@ -29,9 +29,9 @@ public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) { this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState()); } - public GlobalState(String stateName, Optional> state) { + public GlobalState(String stateName, Map state) { this.stateName = stateName; - this.state = state.orElse(null); + this.state = state; } diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java new file mode 100644 index 0000000000..33dea18255 --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/ExportScheduler.java @@ -0,0 +1,107 @@ +package org.opensearch.dataprepper.plugins.mongo.export; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.mongo.coordination.state.DataQueryProgressState; +import org.opensearch.dataprepper.plugins.mongo.model.LoadStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +public class ExportScheduler implements Runnable { + public static final String EXPORT_PREFIX = "EXPORT-"; + private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class); + private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000; + static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess"; + static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure"; + static final String EXPORT_PARTITION_QUERY_TOTAL_COUNT = "exportPartitionQueryTotal"; + static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal"; + private final PluginMetrics pluginMetrics; + private final EnhancedSourceCoordinator enhancedSourceCoordinator; + private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier; + private final Counter exportJobSuccessCounter; + private final Counter exportJobFailureCounter; + + private final Counter exportPartitionTotalCounter; + private final Counter exportRecordsTotalCounter; + + public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator, + final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier, + final PluginMetrics pluginMetrics) { + this.enhancedSourceCoordinator = enhancedSourceCoordinator; + this.mongoDBExportPartitionSupplier = mongoDBExportPartitionSupplier; + this.pluginMetrics = pluginMetrics; + + exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT); + exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT); + exportPartitionTotalCounter = pluginMetrics.counter(EXPORT_PARTITION_QUERY_TOTAL_COUNT); + exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT); + } + + @Override + public void run() { + LOG.info("Start running Export Scheduler"); + while (!Thread.currentThread().isInterrupted()) { + try { + final Optional sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE); + if (sourcePartition.isPresent()) { + final ExportPartition exportPartition = (ExportPartition) sourcePartition.get(); + LOG.info("Acquired an export partition: {}", exportPartition.getPartitionKey()); + + final List partitionIdentifiers = mongoDBExportPartitionSupplier.apply(exportPartition); + + createDataQueryPartitions(exportPartition.getPartitionKey(), Instant.now(), partitionIdentifiers); + } + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException e) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } catch (final Exception e) { + LOG.error("Received an exception during export from DocumentDB, backing off and retrying", e); + try { + Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS); + } catch (final InterruptedException ex) { + LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing"); + break; + } + } + } + LOG.warn("Export scheduler interrupted, looks like shutdown has triggered"); + } + + private void createDataQueryPartitions(final String collection, + final Instant exportTime, + final List partitionIdentifiers) { + AtomicInteger totalQueries = new AtomicInteger(); + partitionIdentifiers.forEach(partitionIdentifier -> { + final DataQueryProgressState progressState = new DataQueryProgressState(); + progressState.setExecutedQueries(0); + progressState.setLoadedRecords(0); + progressState.setStartTime(exportTime.toEpochMilli()); + + totalQueries.getAndIncrement(); + final DataQueryPartition partition = new DataQueryPartition(partitionIdentifier.getPartitionKey(), progressState); + enhancedSourceCoordinator.createPartition(partition); + }); + + exportPartitionTotalCounter.increment(totalQueries.get()); + + // Currently, we need to maintain a global state to track the overall progress. + // So that we can easily tell if all the export files are loaded + final LoadStatus loadStatus = new LoadStatus(totalQueries.get(), 0); + enhancedSourceCoordinator.createPartition(new GlobalState(EXPORT_PREFIX + collection, loadStatus.toMap())); + } + +} diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplier.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplier.java new file mode 100644 index 0000000000..6d258db7ba --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplier.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.mongo.export; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import org.bson.Document; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection; +import org.opensearch.dataprepper.plugins.mongo.client.BsonHelper; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +public class MongoDBExportPartitionSupplier implements Function> { + private static final Logger LOG = LoggerFactory.getLogger(MongoDBExportPartitionSupplier.class); + private static final String MONGODB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: ||| + private static final String COLLECTION_SPLITTER = "\\."; + + private final MongoDBSourceConfig sourceConfig; + + public MongoDBExportPartitionSupplier(final MongoDBSourceConfig sourceConfig) { + this.sourceConfig = sourceConfig; + } + + private List buildPartitions(final ExportPartition exportPartition) { + final List collectionPartitions = new ArrayList<>(); + final String collectionDbName = exportPartition.getCollection(); + List collection = List.of(collectionDbName.split(COLLECTION_SPLITTER)); + if (collection.size() < 2) { + throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format"); + } + try (MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) { + final MongoDatabase db = mongoClient.getDatabase(collection.get(0)); + final MongoCollection col = db.getCollection(collection.get(1)); + final int partitionSize = exportPartition.getPartitionSize(); + FindIterable startIterable = col.find() + .projection(new Document("_id", 1)) + .sort(new Document("_id", 1)) + .limit(1); + while (!Thread.currentThread().isInterrupted()) { + try (final MongoCursor startCursor = startIterable.iterator()) { + if (!startCursor.hasNext()) { + break; + } + final Document startDoc = startCursor.next(); + final Object gteValue = startDoc.get("_id"); + final String className = gteValue.getClass().getName(); + + // Get end doc + Document endDoc = startIterable.skip(partitionSize - 1).limit(1).first(); + if (endDoc == null) { + // this means we have reached the end of the doc + endDoc = col.find() + .projection(new Document("_id", 1)) + .sort(new Document("_id", -1)) + .limit(1) + .first(); + } + if (endDoc == null) { + break; + } + + final Object lteValue = endDoc.get("_id"); + final String gteValueString = BsonHelper.getPartitionStringFromMongoDBId(gteValue, className); + final String lteValueString = BsonHelper.getPartitionStringFromMongoDBId(lteValue, className); + LOG.info("Partition of " + collectionDbName + ": {gte: " + gteValueString + ", lte: " + lteValueString + "}"); + collectionPartitions.add( + PartitionIdentifier + .builder() + .withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionDbName, gteValueString, lteValueString, className)) + .build()); + + startIterable = col.find(Filters.gt("_id", lteValue)) + .projection(new Document("_id", 1)) + .sort(new Document("_id", 1)) + .limit(1); + } catch (Exception e) { + LOG.error("Failed to read start cursor when build partitions", e); + throw new RuntimeException(e); + } + } + } + return collectionPartitions; + } + + @Override + public List apply(final ExportPartition exportPartition) { + return buildPartitions(exportPartition); + } +} diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java new file mode 100644 index 0000000000..dc32946d51 --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderScheduler.java @@ -0,0 +1,172 @@ +package org.opensearch.dataprepper.plugins.mongo.leader; + +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.state.ExportProgressState; +import org.opensearch.dataprepper.plugins.mongo.coordination.state.LeaderProgressState; +import org.opensearch.dataprepper.plugins.mongo.coordination.state.StreamProgressState; +import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +public class LeaderScheduler implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderScheduler.class); + + /** + * Default duration to extend the timeout of lease + */ + private static final int DEFAULT_EXTEND_LEASE_MINUTES = 3; + + /** + * Default interval to run lease check and shard discovery + */ + private static final Duration DEFAULT_LEASE_INTERVAL = Duration.ofMinutes(1); + + private final List collectionConfigs; + + private final EnhancedSourceCoordinator coordinator; + + private final Duration leaseInterval; + + private LeaderPartition leaderPartition; + + public LeaderScheduler(EnhancedSourceCoordinator coordinator, List collectionConfigs) { + this(coordinator, collectionConfigs, DEFAULT_LEASE_INTERVAL); + } + + LeaderScheduler(EnhancedSourceCoordinator coordinator, + List collectionConfigs, + Duration leaseInterval) { + this.collectionConfigs = collectionConfigs; + this.coordinator = coordinator; + this.leaseInterval = leaseInterval; + } + + @Override + public void run() { + LOG.info("Starting Leader Scheduler for initialization and stream discovery"); + + while (!Thread.currentThread().isInterrupted()) { + try { + // Try to acquire the lease if not owned. + if (leaderPartition == null) { + final Optional sourcePartition = coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE); + LOG.info("Leader partition {}", sourcePartition); + if (sourcePartition.isPresent()) { + LOG.info("Running as a LEADER node"); + leaderPartition = (LeaderPartition) sourcePartition.get(); + } + } + // Once owned, run Normal LEADER node process. + // May want to quit this scheduler if streaming is not required + if (leaderPartition != null) { + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + if (!leaderProgressState.isInitialized()) { + LOG.info("The service is not been initialized"); + init(); + } + + } + + } catch (Exception e) { + LOG.error("Exception occurred in primary scheduling loop", e); + } finally { + if(leaderPartition != null) { + // Extend the timeout + // will always be a leader until shutdown + coordinator.saveProgressStateForPartition(leaderPartition, Duration.ofMinutes(DEFAULT_EXTEND_LEASE_MINUTES)); + } + try { + Thread.sleep(leaseInterval.toMillis()); + } catch (final InterruptedException e) { + LOG.info("InterruptedException occurred"); + break; + } + } + } + // Should Stop + LOG.warn("Quitting Leader Scheduler"); + if (leaderPartition != null) { + coordinator.giveUpPartition(leaderPartition); + } + } + + private boolean isExportRequired(final CollectionConfig.IngestionMode ingestionMode) { + return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || + ingestionMode == CollectionConfig.IngestionMode.EXPORT; + } + + private boolean isStreamRequired(final CollectionConfig.IngestionMode ingestionMode) { + return ingestionMode == CollectionConfig.IngestionMode.EXPORT_STREAM || + ingestionMode == CollectionConfig.IngestionMode.STREAM; + } + private void init() { + LOG.info("Try to initialize DocumentDB Leader Partition"); + + collectionConfigs.forEach(collectionConfig -> { + // Create a Global state in the coordination table for the configuration. + // Global State here is designed to be able to read whenever needed + // So that the jobs can refer to the configuration. + coordinator.createPartition(new GlobalState(collectionConfig.getCollection(), null)); + + final Instant startTime = Instant.now(); + final boolean exportRequired = isExportRequired(collectionConfig.getIngestionMode()); + LOG.info("Ingestion mode {} for Collection {}", collectionConfig.getIngestionMode(), collectionConfig.getCollection()); + if (exportRequired) { + createExportPartition(collectionConfig, startTime); + } + + if (isStreamRequired(collectionConfig.getIngestionMode())) { + createStreamPartition(collectionConfig, startTime, exportRequired); + } + + }); + + LOG.debug("Update initialization state"); + LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get(); + leaderProgressState.setInitialized(true); + } + + + /** + * Create a partition for a stream job in the coordination table. + * + * @param collectionConfig collection configuration object containing collection details + * @param streamTime the start time for change events, any change events with creation datetime before this should be ignored. + */ + private void createStreamPartition(final CollectionConfig collectionConfig, final Instant streamTime, final boolean waitForExport) { + LOG.info("Creating stream global partition: {}", collectionConfig.getCollection()); + final StreamProgressState streamProgressState = new StreamProgressState(); + streamProgressState.setWaitForExport(waitForExport); + streamProgressState.setStartTime(streamTime.toEpochMilli()); + coordinator.createPartition(new StreamPartition(collectionConfig.getCollection(), Optional.of(streamProgressState))); + } + + /** + * Create a partition for an export job in the coordination table. + * + * @param collectionConfig collection configuration object containing collection details + * @param exportTime the start time for Export + */ + private void createExportPartition(final CollectionConfig collectionConfig, final Instant exportTime) { + LOG.info("Creating export global partition for collection: {}", collectionConfig.getCollection()); + final ExportProgressState exportProgressState = new ExportProgressState(); + exportProgressState.setCollectionName(collectionConfig.getCollectionName()); + exportProgressState.setDatabaseName(collectionConfig.getDatabaseName()); + exportProgressState.setExportTime(exportTime.toString()); // information purpose + final ExportPartition exportPartition = new ExportPartition(collectionConfig.getCollection(), + collectionConfig.getExportConfig().getItemsPerPartition(), exportTime, exportProgressState); + coordinator.createPartition(exportPartition); + } + +} diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/LoadStatus.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/LoadStatus.java new file mode 100644 index 0000000000..8e26737caf --- /dev/null +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/model/LoadStatus.java @@ -0,0 +1,58 @@ +package org.opensearch.dataprepper.plugins.mongo.model; + +import java.util.Map; + +public class LoadStatus { + + private static final String TOTAL_PARTITIONS = "totalPartitions"; + private static final String LOADED_PARTITIONS = "loadedPartitions"; + private static final String LOADED_RECORDS = "loadedRecords"; + + private long totalPartitions; + private long loadedPartitions; + private long loadedRecords; + + public LoadStatus(int totalPartitions, long loadedRecords) { + this.totalPartitions = totalPartitions; + this.loadedRecords = loadedRecords; + } + + public long getTotalPartitions() { + return totalPartitions; + } + + public void setLoadedPartitions(long loadedPartitions) { + this.loadedPartitions = loadedPartitions; + } + + public long getLoadedPartitions() { + return loadedPartitions; + } + + public void setTotalPartitions(long totalPartitions) { + this.totalPartitions = totalPartitions; + } + + public long getLoadedRecords() { + return loadedRecords; + } + + public void setLoadedRecords(long loadedRecords) { + this.loadedRecords = loadedRecords; + } + + public Map toMap() { + return Map.of( + TOTAL_PARTITIONS, totalPartitions, + LOADED_PARTITIONS, loadedPartitions, + LOADED_RECORDS, loadedRecords + ); + } + + public static LoadStatus fromMap(Map map) { + return new LoadStatus( + ((Number) map.get(TOTAL_PARTITIONS)).intValue(), + ((Number) map.get(LOADED_RECORDS)).longValue() + ); + } +} diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java new file mode 100644 index 0000000000..c6aa55e6ab --- /dev/null +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/ExportSchedulerTest.java @@ -0,0 +1,203 @@ +package org.opensearch.dataprepper.plugins.mongo.export; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler.EXPORT_JOB_FAILURE_COUNT; +import static org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler.EXPORT_JOB_SUCCESS_COUNT; +import static org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler.EXPORT_PARTITION_QUERY_TOTAL_COUNT; +import static org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler.EXPORT_PREFIX; +import static org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler.EXPORT_RECORDS_TOTAL_COUNT; + +@ExtendWith(MockitoExtension.class) +public class ExportSchedulerTest { + @Mock + private EnhancedSourceCoordinator coordinator; + + @Mock + private MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier; + + @Mock + private PartitionIdentifier partitionIdentifier; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter exportJobSuccessCounter; + + @Mock + private Counter exportJobFailureCounter; + + @Mock + private Counter exportPartitionTotalCounter; + + @Mock + private Counter exportRecordsTotalCounter; + + private ExportScheduler exportScheduler; + private ExportPartition exportPartition; + + @BeforeEach + void setup() throws Exception { + given(pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT)).willReturn(exportJobSuccessCounter); + given(pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT)).willReturn(exportJobFailureCounter); + given(pluginMetrics.counter(EXPORT_PARTITION_QUERY_TOTAL_COUNT)).willReturn(exportPartitionTotalCounter); + given(pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT)).willReturn(exportRecordsTotalCounter); + + } + + @Test + void test_no_export_run() throws InterruptedException { + exportScheduler = new ExportScheduler(coordinator, mongoDBExportPartitionSupplier, pluginMetrics); + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.empty()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> exportScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + verifyNoInteractions(mongoDBExportPartitionSupplier); + verify(coordinator, never()).createPartition(any()); + } + + @Test + void test_export_run() throws InterruptedException { + exportScheduler = new ExportScheduler(coordinator, mongoDBExportPartitionSupplier, pluginMetrics); + final String collection = UUID.randomUUID().toString(); + final int partitionSize = new Random().nextInt(); + final Instant exportTime = Instant.now(); + final String partitionKey = collection + "|" + UUID.randomUUID(); + final String globalPartitionKey = collection + "|" + partitionSize + "|" + exportTime.toEpochMilli(); + + exportPartition = new ExportPartition(collection, partitionSize, exportTime, null); + given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey); + given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(List.of(partitionIdentifier)); + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> exportScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE)); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(EnhancedSourcePartition.class); + // Should create 1 export partition + 1 stream partitions + 1 global table state + verify(coordinator, times(2)).createPartition(argumentCaptor.capture()); + final List partitions = argumentCaptor.getAllValues(); + var dataQueryPartitions = partitions.stream() + .filter(partition -> partition instanceof DataQueryPartition) + .map(partition -> (DataQueryPartition)partition).collect(Collectors.toList()); + assertThat(dataQueryPartitions.size(), equalTo(1)); + dataQueryPartitions.forEach(dataQueryPartition -> { + assertThat(dataQueryPartition.getCollection(), equalTo(collection)); + assertThat(dataQueryPartition.getPartitionKey(), equalTo(partitionKey)); + assertThat(dataQueryPartition.getQuery(), equalTo(partitionKey)); + assertThat(partitions.get(0).getPartitionType(), equalTo(DataQueryPartition.PARTITION_TYPE)); + }); + + var globalStates = partitions.stream() + .filter(partition -> partition instanceof GlobalState) + .map(partition -> (GlobalState)partition).collect(Collectors.toList()); + assertThat(globalStates.size(), equalTo(1)); + globalStates.forEach(globalState -> { + assertThat(globalState.getPartitionKey(), equalTo(EXPORT_PREFIX + globalPartitionKey)); + assertThat(globalState.getProgressState().get().toString(), is(Map.of( + "totalPartitions", 1, + "loadedPartitions", 0, + "loadedRecords", 0 + ).toString())); + assertThat(globalState.getPartitionType(), equalTo(null)); + }); + verify(exportPartitionTotalCounter).increment(1); + } + + @Test + void test_export_run_multiple_partitions() throws InterruptedException { + exportScheduler = new ExportScheduler(coordinator, mongoDBExportPartitionSupplier, pluginMetrics); + final String collection = UUID.randomUUID().toString(); + final int partitionSize = new Random().nextInt(); + final Instant exportTime = Instant.now(); + final String partitionKey = collection + "|" + UUID.randomUUID(); + final String globalPartitionKey = collection + "|" + partitionSize + "|" + exportTime.toEpochMilli(); + + exportPartition = new ExportPartition(collection, partitionSize, exportTime, null); + given(partitionIdentifier.getPartitionKey()).willReturn(partitionKey); + given(mongoDBExportPartitionSupplier.apply(exportPartition)).willReturn(List.of(partitionIdentifier, partitionIdentifier, partitionIdentifier)); + given(coordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE)).willReturn(Optional.of(exportPartition)); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> exportScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(ExportPartition.PARTITION_TYPE)); + + final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(EnhancedSourcePartition.class); + // Should create 1 export partition + 1 stream partitions + 1 global table state + verify(coordinator, times(4)).createPartition(argumentCaptor.capture()); + final List partitions = argumentCaptor.getAllValues(); + var dataQueryPartitions = partitions.stream() + .filter(partition -> partition instanceof DataQueryPartition) + .map(partition -> (DataQueryPartition)partition).collect(Collectors.toList()); + assertThat(dataQueryPartitions.size(), equalTo(3)); + dataQueryPartitions.forEach(dataQueryPartition -> { + assertThat(dataQueryPartition.getCollection(), equalTo(collection)); + assertThat(dataQueryPartition.getPartitionKey(), equalTo(partitionKey)); + assertThat(dataQueryPartition.getQuery(), equalTo(partitionKey)); + assertThat(partitions.get(0).getPartitionType(), equalTo(DataQueryPartition.PARTITION_TYPE)); + }); + + var globalStates = partitions.stream() + .filter(partition -> partition instanceof GlobalState) + .map(partition -> (GlobalState)partition).collect(Collectors.toList()); + assertThat(globalStates.size(), equalTo(1)); + globalStates.forEach(globalState -> { + assertThat(globalState.getPartitionKey(), equalTo(EXPORT_PREFIX + globalPartitionKey)); + assertThat(globalState.getProgressState().get().toString(), is(Map.of( + "totalPartitions", 3, + "loadedPartitions", 0, + "loadedRecords", 0 + ).toString())); + assertThat(globalState.getPartitionType(), equalTo(null)); + }); + verify(exportPartitionTotalCounter).increment(3); + } +} diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplierTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplierTest.java new file mode 100644 index 0000000000..821f19a73d --- /dev/null +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/export/MongoDBExportPartitionSupplierTest.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.mongo.export; + +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier; +import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection; +import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig; +import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition; + +import java.util.Collections; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class MongoDBExportPartitionSupplierTest { + private static final String TEST_COLLECTION_NAME = "test.collection"; + @Mock + private MongoDBSourceConfig mongoDBConfig; + + @Mock + private CollectionConfig collectionConfig; + + @Mock + private ExportPartition exportPartition; + + private MongoDBExportPartitionSupplier testSupplier; + + @BeforeEach + public void setup() { + when(exportPartition.getCollection()).thenReturn(TEST_COLLECTION_NAME); + lenient().when(collectionConfig.getCollectionName()).thenReturn(TEST_COLLECTION_NAME); + lenient().when(mongoDBConfig.getCollections()).thenReturn(Collections.singletonList(collectionConfig)); + testSupplier = new MongoDBExportPartitionSupplier(mongoDBConfig); + } + + @Test + public void test_buildPartitionsCollection() { + try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + // Given a collection with 5000 items which should be split to two partitions: 0-3999 and 4000-4999 + MongoClient mongoClient = mock(MongoClient.class); + MongoDatabase mongoDatabase = mock(MongoDatabase.class); + MongoCollection col = mock(MongoCollection.class); + FindIterable findIterable = mock(FindIterable.class); + MongoCursor cursor = mock(MongoCursor.class); + mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) + .thenReturn(mongoClient); + when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); + when(mongoDatabase.getCollection(anyString())).thenReturn(col); + when(col.find()).thenReturn(findIterable); + when(col.find(any(Bson.class))).thenReturn(findIterable); + when(findIterable.projection(any())).thenReturn(findIterable); + when(findIterable.sort(any())).thenReturn(findIterable); + when(findIterable.skip(anyInt())).thenReturn(findIterable); + when(findIterable.limit(anyInt())).thenReturn(findIterable); + when(findIterable.iterator()).thenReturn(cursor); + when(cursor.hasNext()).thenReturn(true, true, false); + // mock startDoc and endDoc returns, 0-3999, and 4000-4999 + when(cursor.next()) + .thenReturn(new Document("_id", "0")) + .thenReturn(new Document("_id", "4000")); + when(findIterable.first()) + .thenReturn(new Document("_id", "3999")) + .thenReturn(null) + .thenReturn(new Document("_id", "4999")); + // When Apply Partition create logics + List partitions = testSupplier.apply(exportPartition); + // Then dependencies are called + verify(mongoClient).getDatabase(eq("test")); + verify(mongoClient, times(1)).close(); + verify(mongoDatabase).getCollection(eq("collection")); + // And partitions are created + assertThat(partitions.size(), is(2)); + assertThat(partitions.get(0).getPartitionKey(), is("test.collection|0|3999|java.lang.String")); + assertThat(partitions.get(1).getPartitionKey(), is("test.collection|4000|4999|java.lang.String")); + } + } + + @Test + public void test_buildPartitionsForCollection_error() { + when(exportPartition.getCollection()).thenReturn("invalidDBName"); + assertThrows(IllegalArgumentException.class, () -> testSupplier.apply(exportPartition)); + } + + @Test + public void test_buildPartitions_dbException() { + try (MockedStatic mongoDBConnectionMockedStatic = mockStatic(MongoDBConnection.class)) { + mongoDBConnectionMockedStatic.when(() -> MongoDBConnection.getMongoClient(any(MongoDBSourceConfig.class))) + .thenThrow(RuntimeException.class); + assertThrows(RuntimeException.class, () -> testSupplier.apply(exportPartition)); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java new file mode 100644 index 0000000000..71b5c58f66 --- /dev/null +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/leader/LeaderSchedulerTest.java @@ -0,0 +1,133 @@ +package org.opensearch.dataprepper.plugins.mongo.leader; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.mongo.configuration.CollectionConfig; +import org.opensearch.dataprepper.plugins.mongo.coordination.partition.LeaderPartition; + +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +@ExtendWith(MockitoExtension.class) +public class LeaderSchedulerTest { + @Mock + private EnhancedSourceCoordinator coordinator; + + @Mock + private CollectionConfig collectionConfig; + + @Mock + private CollectionConfig.ExportConfig exportConfig; + + private LeaderScheduler leaderScheduler; + private LeaderPartition leaderPartition; + + @Test + void test_non_leader_run() throws InterruptedException { + leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig)); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.empty()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> leaderScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + verifyNoInteractions(collectionConfig); + } + + @Test + void test_should_init() throws InterruptedException { + + leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig)); + leaderPartition = new LeaderPartition(); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.EXPORT_STREAM); + given(collectionConfig.getExportConfig()).willReturn(exportConfig); + given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); + given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> leaderScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(LeaderPartition.PARTITION_TYPE)); + + // Should create 1 export partition + 1 stream partitions + 1 global table state + verify(coordinator, times(3)).createPartition(any(EnhancedSourcePartition.class)); + verify(coordinator).giveUpPartition(leaderPartition); + + assertThat(leaderPartition.getProgressState().get().isInitialized(), equalTo(true)); + } + + @Test + void test_should_init_export() throws InterruptedException { + + leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig)); + leaderPartition = new LeaderPartition(); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.EXPORT); + given(collectionConfig.getExportConfig()).willReturn(exportConfig); + given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt()); + given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> leaderScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(LeaderPartition.PARTITION_TYPE)); + + // Should create 1 export partition + 1 stream partitions + 1 global table state + verify(coordinator, times(2)).createPartition(any(EnhancedSourcePartition.class)); + verify(coordinator).giveUpPartition(leaderPartition); + + assertThat(leaderPartition.getProgressState().get().isInitialized(), equalTo(true)); + } + + @Test + void test_should_init_stream() throws InterruptedException { + + leaderScheduler = new LeaderScheduler(coordinator, List.of(collectionConfig)); + leaderPartition = new LeaderPartition(); + given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition)); + given(collectionConfig.getIngestionMode()).willReturn(CollectionConfig.IngestionMode.STREAM); + given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString()); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.submit(() -> leaderScheduler.run()); + Thread.sleep(100); + executorService.shutdownNow(); + + + // Acquire the init partition + verify(coordinator).acquireAvailablePartition(eq(LeaderPartition.PARTITION_TYPE)); + + // Should create 1 export partition + 1 stream partitions + 1 global table state + verify(coordinator, times(2)).createPartition(any(EnhancedSourcePartition.class)); + verify(coordinator).giveUpPartition(leaderPartition); + + assertThat(leaderPartition.getProgressState().get().isInitialized(), equalTo(true)); + } +}