Skip to content

Commit

Permalink
Optimize idle item between GetRecords call (#3593)
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <daixb@amazon.com>
daixba authored Nov 7, 2023
1 parent 97cd930 commit 2067523
Showing 1 changed file with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -7,10 +7,10 @@

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
@@ -47,6 +47,16 @@ public class ShardConsumer implements Runnable {
*/
private static final int GET_RECORD_INTERVAL_MILLS = 300;

/**
* Idle Time between GetRecords Reads
*/
private static final int MINIMUM_GET_RECORD_INTERVAL_MILLS = 10;

/**
* Minimum Idle Time between GetRecords Reads
*/
private static final long GET_RECORD_DELAY_THRESHOLD_MILLS = 15_000;

/**
* Default interval to check if export is completed.
*/
@@ -209,12 +219,15 @@ public void run() {

shardIterator = response.nextShardIterator();

int interval;

if (!response.records().isEmpty()) {
// Always use the last sequence number for checkpoint
sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber();
Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();

if (waitForExport) {
Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();

if (lastEventTime.compareTo(startTime) <= 0) {
LOG.debug("Get {} events before start time, ignore...", response.records().size());
continue;
@@ -230,10 +243,15 @@ public void run() {
records = response.records();
}
recordConverter.writeToBuffer(acknowledgementSet, records);
long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli();
interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS;

} else {
interval = GET_RECORD_INTERVAL_MILLS;
}
try {
// Idle between get records call.
Thread.sleep(GET_RECORD_INTERVAL_MILLS);
Thread.sleep(interval);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

0 comments on commit 2067523

Please sign in to comment.