Skip to content

Commit

Permalink
Add ExportScheduler and LeaderScheduler for MongoDB/DocumentDB source (
Browse files Browse the repository at this point in the history
…#4277)

* Add ExportScheduler and LeaderScheduler for MongoDB/DocumentDB source

Signed-off-by: Dinu John <[email protected]>

* Update access modifier for static field

Signed-off-by: Dinu John <[email protected]>

* Unit test updates

Signed-off-by: Dinu John <[email protected]>

* Add MongoDB Export Partition supplier

Signed-off-by: Dinu John <[email protected]>

* Logging collection when creating scheduler

Signed-off-by: Dinu John <[email protected]>

---------

Signed-off-by: Dinu John <[email protected]>
  • Loading branch information
dinujoh authored Mar 14, 2024
1 parent c705c09 commit 8827ebf
Show file tree
Hide file tree
Showing 9 changed files with 902 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public ExportPartition(final SourcePartitionStoreItem sourcePartitionStoreItem)
}

public ExportPartition(final String collection, final int partitionSize, final Instant exportTime,
final Optional<ExportProgressState> state) {
final ExportProgressState state) {
this.collection = collection;
this.partitionSize = partitionSize;
this.exportTime = exportTime;
this.state = state.orElse(null);
this.state = state;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public GlobalState(SourcePartitionStoreItem sourcePartitionStoreItem) {
this.state = convertStringToPartitionProgressState(null, sourcePartitionStoreItem.getPartitionProgressState());
}

public GlobalState(String stateName, Optional<Map<String, Object>> state) {
public GlobalState(String stateName, Map<String, Object> state) {
this.stateName = stateName;
this.state = state.orElse(null);
this.state = state;

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.opensearch.dataprepper.plugins.mongo.export;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.DataQueryProgressState;
import org.opensearch.dataprepper.plugins.mongo.model.LoadStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

public class ExportScheduler implements Runnable {
public static final String EXPORT_PREFIX = "EXPORT-";
private static final Logger LOG = LoggerFactory.getLogger(ExportScheduler.class);
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
static final String EXPORT_JOB_SUCCESS_COUNT = "exportJobSuccess";
static final String EXPORT_JOB_FAILURE_COUNT = "exportJobFailure";
static final String EXPORT_PARTITION_QUERY_TOTAL_COUNT = "exportPartitionQueryTotal";
static final String EXPORT_RECORDS_TOTAL_COUNT = "exportRecordsTotal";
private final PluginMetrics pluginMetrics;
private final EnhancedSourceCoordinator enhancedSourceCoordinator;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
private final Counter exportJobSuccessCounter;
private final Counter exportJobFailureCounter;

private final Counter exportPartitionTotalCounter;
private final Counter exportRecordsTotalCounter;

public ExportScheduler(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier,
final PluginMetrics pluginMetrics) {
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.mongoDBExportPartitionSupplier = mongoDBExportPartitionSupplier;
this.pluginMetrics = pluginMetrics;

exportJobSuccessCounter = pluginMetrics.counter(EXPORT_JOB_SUCCESS_COUNT);
exportJobFailureCounter = pluginMetrics.counter(EXPORT_JOB_FAILURE_COUNT);
exportPartitionTotalCounter = pluginMetrics.counter(EXPORT_PARTITION_QUERY_TOTAL_COUNT);
exportRecordsTotalCounter = pluginMetrics.counter(EXPORT_RECORDS_TOTAL_COUNT);
}

@Override
public void run() {
LOG.info("Start running Export Scheduler");
while (!Thread.currentThread().isInterrupted()) {
try {
final Optional<EnhancedSourcePartition> sourcePartition = enhancedSourceCoordinator.acquireAvailablePartition(ExportPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
final ExportPartition exportPartition = (ExportPartition) sourcePartition.get();
LOG.info("Acquired an export partition: {}", exportPartition.getPartitionKey());

final List<PartitionIdentifier> partitionIdentifiers = mongoDBExportPartitionSupplier.apply(exportPartition);

createDataQueryPartitions(exportPartition.getPartitionKey(), Instant.now(), partitionIdentifiers);
}
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException e) {
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
break;
}
} catch (final Exception e) {
LOG.error("Received an exception during export from DocumentDB, backing off and retrying", e);
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
LOG.info("The ExportScheduler was interrupted while waiting to retry, stopping processing");
break;
}
}
}
LOG.warn("Export scheduler interrupted, looks like shutdown has triggered");
}

private void createDataQueryPartitions(final String collection,
final Instant exportTime,
final List<PartitionIdentifier> partitionIdentifiers) {
AtomicInteger totalQueries = new AtomicInteger();
partitionIdentifiers.forEach(partitionIdentifier -> {
final DataQueryProgressState progressState = new DataQueryProgressState();
progressState.setExecutedQueries(0);
progressState.setLoadedRecords(0);
progressState.setStartTime(exportTime.toEpochMilli());

totalQueries.getAndIncrement();
final DataQueryPartition partition = new DataQueryPartition(partitionIdentifier.getPartitionKey(), progressState);
enhancedSourceCoordinator.createPartition(partition);
});

exportPartitionTotalCounter.increment(totalQueries.get());

// Currently, we need to maintain a global state to track the overall progress.
// So that we can easily tell if all the export files are loaded
final LoadStatus loadStatus = new LoadStatus(totalQueries.get(), 0);
enhancedSourceCoordinator.createPartition(new GlobalState(EXPORT_PREFIX + collection, loadStatus.toMap()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.export;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.opensearch.dataprepper.model.source.coordinator.PartitionIdentifier;
import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection;
import org.opensearch.dataprepper.plugins.mongo.client.BsonHelper;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.ExportPartition;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class MongoDBExportPartitionSupplier implements Function<ExportPartition, List<PartitionIdentifier>> {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBExportPartitionSupplier.class);
private static final String MONGODB_PARTITION_KEY_FORMAT = "%s|%s|%s|%s"; // partition format: <db.collection>|<gte>|<lt>|<className>
private static final String COLLECTION_SPLITTER = "\\.";

private final MongoDBSourceConfig sourceConfig;

public MongoDBExportPartitionSupplier(final MongoDBSourceConfig sourceConfig) {
this.sourceConfig = sourceConfig;
}

private List<PartitionIdentifier> buildPartitions(final ExportPartition exportPartition) {
final List<PartitionIdentifier> collectionPartitions = new ArrayList<>();
final String collectionDbName = exportPartition.getCollection();
List<String> collection = List.of(collectionDbName.split(COLLECTION_SPLITTER));
if (collection.size() < 2) {
throw new IllegalArgumentException("Invalid Collection Name. Must be in db.collection format");
}
try (MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) {
final MongoDatabase db = mongoClient.getDatabase(collection.get(0));
final MongoCollection<Document> col = db.getCollection(collection.get(1));
final int partitionSize = exportPartition.getPartitionSize();
FindIterable<Document> startIterable = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
while (!Thread.currentThread().isInterrupted()) {
try (final MongoCursor<Document> startCursor = startIterable.iterator()) {
if (!startCursor.hasNext()) {
break;
}
final Document startDoc = startCursor.next();
final Object gteValue = startDoc.get("_id");
final String className = gteValue.getClass().getName();

// Get end doc
Document endDoc = startIterable.skip(partitionSize - 1).limit(1).first();
if (endDoc == null) {
// this means we have reached the end of the doc
endDoc = col.find()
.projection(new Document("_id", 1))
.sort(new Document("_id", -1))
.limit(1)
.first();
}
if (endDoc == null) {
break;
}

final Object lteValue = endDoc.get("_id");
final String gteValueString = BsonHelper.getPartitionStringFromMongoDBId(gteValue, className);
final String lteValueString = BsonHelper.getPartitionStringFromMongoDBId(lteValue, className);
LOG.info("Partition of " + collectionDbName + ": {gte: " + gteValueString + ", lte: " + lteValueString + "}");
collectionPartitions.add(
PartitionIdentifier
.builder()
.withPartitionKey(String.format(MONGODB_PARTITION_KEY_FORMAT, collectionDbName, gteValueString, lteValueString, className))
.build());

startIterable = col.find(Filters.gt("_id", lteValue))
.projection(new Document("_id", 1))
.sort(new Document("_id", 1))
.limit(1);
} catch (Exception e) {
LOG.error("Failed to read start cursor when build partitions", e);
throw new RuntimeException(e);
}
}
}
return collectionPartitions;
}

@Override
public List<PartitionIdentifier> apply(final ExportPartition exportPartition) {
return buildPartitions(exportPartition);
}
}
Loading

0 comments on commit 8827ebf

Please sign in to comment.