diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 48e093f7b6..c5da709775 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -7,10 +7,10 @@ import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; -import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; @@ -47,6 +47,16 @@ public class ShardConsumer implements Runnable { */ private static final int GET_RECORD_INTERVAL_MILLS = 300; + /** + * Idle Time between GetRecords Reads + */ + private static final int MINIMUM_GET_RECORD_INTERVAL_MILLS = 10; + + /** + * Minimum Idle Time between GetRecords Reads + */ + private static final long GET_RECORD_DELAY_THRESHOLD_MILLS = 15_000; + /** * Default interval to check if export is completed. */ @@ -209,12 +219,15 @@ public void run() { shardIterator = response.nextShardIterator(); + int interval; + if (!response.records().isEmpty()) { // Always use the last sequence number for checkpoint sequenceNumber = response.records().get(response.records().size() - 1).dynamodb().sequenceNumber(); + Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime(); if (waitForExport) { - Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime(); + if (lastEventTime.compareTo(startTime) <= 0) { LOG.debug("Get {} events before start time, ignore...", response.records().size()); continue; @@ -230,10 +243,15 @@ public void run() { records = response.records(); } recordConverter.writeToBuffer(acknowledgementSet, records); + long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); + interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; + + } else { + interval = GET_RECORD_INTERVAL_MILLS; } try { // Idle between get records call. - Thread.sleep(GET_RECORD_INTERVAL_MILLS); + Thread.sleep(interval); } catch (InterruptedException e) { throw new RuntimeException(e); }