Skip to content

Commit

Permalink
Improvements to DynamoDB source
Browse files Browse the repository at this point in the history
Signed-off-by: Aiden Dai <daixb@amazon.com>
  • Loading branch information
daixba committed Nov 9, 2023
1 parent a1c56d0 commit 32cdb43
Showing 12 changed files with 184 additions and 135 deletions.
Original file line number Diff line number Diff line change
@@ -39,10 +39,10 @@
import software.amazon.awssdk.services.dynamodb.model.DescribeContinuousBackupsResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
@@ -75,9 +75,6 @@ public class DynamoDBService {

private final AcknowledgementSetManager acknowledgementSetManager;

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;


public DynamoDBService(final EnhancedSourceCoordinator coordinator,
final ClientFactory clientFactory,
@@ -180,24 +177,14 @@ public void init() {
}

if (tableInfo.getMetadata().isStreamRequired()) {
List<String> shardIds;
// start position by default is TRIM_HORIZON if not provided.
if (tableInfo.getMetadata().isExportRequired() || tableInfo.getMetadata().getStreamStartPosition() == StreamStartPosition.LATEST) {
// For a continued data extraction process that involves both export and stream
// The export must be completed and loaded before stream can start.
// Moreover, there should not be any gaps between the export time and the time start reading the stream
// The design here is to start reading from the beginning of current active shards
// and then check if the change event datetime is greater than the export time.
shardIds = shardManager.getActiveShards(tableInfo.getMetadata().getStreamArn());
shardIds.forEach(shardId -> {
createStreamPartition(tableInfo.getMetadata().getStreamArn(), shardId, startTime, tableInfo.getMetadata().isExportRequired());
});
} else {
shardIds = shardManager.getRootShardIds(tableInfo.getMetadata().getStreamArn());
shardIds.forEach(shardId -> {
createStreamPartition(tableInfo.getMetadata().getStreamArn(), shardId, null, false);
});
}
// TODO: Revisit the use of start position.
// The behaviour is same for all cases regardless the configuration provided.
// Only process event from current date time but still traverse the shards from the beginning.
List<Shard> shards = shardManager.getRootShards(tableInfo.getMetadata().getStreamArn());
shards.forEach(shard -> {
createStreamPartition(tableInfo.getMetadata().getStreamArn(), shard, startTime, tableInfo.getMetadata().isExportRequired());
});

}
});
// Mark initialization as done, so that it won't be triggered again.
@@ -229,16 +216,15 @@ private void createExportPartition(String tableArn, Instant exportTime, String b
* Create a partition for a stream job in the coordination table.
*
* @param streamArn Stream Arn
* @param shardId Shard Id
* @param shard A {@link Shard}
* @param exportTime the start time for change events, any change events with creation datetime before this should be ignored.
*/
private void createStreamPartition(String streamArn, String shardId, Instant exportTime, boolean waitForExport) {
private void createStreamPartition(String streamArn, Shard shard, Instant exportTime, boolean waitForExport) {
StreamProgressState streamProgressState = new StreamProgressState();
streamProgressState.setWaitForExport(waitForExport);
if (exportTime != null) {
streamProgressState.setStartTime(exportTime.toEpochMilli());
}
coordinator.createPartition(new StreamPartition(streamArn, shardId, Optional.of(streamProgressState)));
streamProgressState.setStartTime(exportTime.toEpochMilli());
streamProgressState.setEndingSequenceNumber(shard.sequenceNumberRange().endingSequenceNumber());
coordinator.createPartition(new StreamPartition(streamArn, shard.shardId(), Optional.of(streamProgressState)));
}

private String getContinuousBackupsStatus(String tableName) {
@@ -313,7 +299,7 @@ private TableInfo getTableInfo(TableConfig tableConfig) {
.streamArn(describeTableResult.table().latestStreamArn())
.streamRequired(tableConfig.getStreamConfig() != null)
.exportRequired(tableConfig.getExportConfig() != null)
.streamStartPosition(streamStartPosition)
.streamStartPosition(streamStartPosition) // Will be ignored
.exportBucket(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Bucket())
.exportPrefix(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3Prefix())
.exportKmsKeyId(tableConfig.getExportConfig() == null ? null : tableConfig.getExportConfig().getS3SseKmsKeyId())
Original file line number Diff line number Diff line change
@@ -5,7 +5,13 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;


/**
* Always default to LATEST.
* Support of TRIM_HORIZON is taken out,
* that is due to a concern raised recently about the root shards may be expired while processing.
*/
public enum StreamStartPosition {
TRIM_HORIZON,
// TRIM_HORIZON,
LATEST
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,9 @@ public class StreamProgressState {

@JsonProperty("waitForExport")
private boolean waitForExport = false;

@JsonProperty("endingSequenceNumber")
private String endingSequenceNumber;

public long getStartTime() {
return startTime;
@@ -42,4 +45,12 @@ public boolean shouldWaitForExport() {
public void setWaitForExport(boolean waitForExport) {
this.waitForExport = waitForExport;
}

public String getEndingSequenceNumber() {
return endingSequenceNumber;
}

public void setEndingSequenceNumber(String endingSequenceNumber) {
this.endingSequenceNumber = endingSequenceNumber;
}
}
Original file line number Diff line number Diff line change
@@ -15,7 +15,6 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
@@ -37,6 +36,11 @@ public class ShardConsumer implements Runnable {
*/
private static volatile boolean shouldStop = false;

/**
* An overlap added between the event creation time and the export time
*/
private static final Duration STREAM_EVENT_OVERLAP_TIME = Duration.ofMinutes(5);

/**
* Max number of items to return per GetRecords call, maximum 1000.
*/
@@ -85,6 +89,8 @@ public class ShardConsumer implements Runnable {

private String shardIterator;

private final String lastShardIterator;

private final Instant startTime;

private boolean waitForExport;
@@ -97,7 +103,9 @@ private ShardConsumer(Builder builder) {
this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient;
this.checkpointer = builder.checkpointer;
this.shardIterator = builder.shardIterator;
this.startTime = builder.startTime;
this.lastShardIterator = builder.lastShardIterator;
// Introduce an overlap
this.startTime = builder.startTime == null ? Instant.MIN : builder.startTime.minus(STREAM_EVENT_OVERLAP_TIME);
this.waitForExport = builder.waitForExport;
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
@@ -124,6 +132,7 @@ static class Builder {

private String shardIterator;

private String lastShardIterator;

private Instant startTime;

@@ -153,6 +162,11 @@ public Builder shardIterator(String shardIterator) {
return this;
}

public Builder lastShardIterator(String lastShardIterator) {
this.lastShardIterator = lastShardIterator;
return this;
}

public Builder startTime(Instant startTime) {
this.startTime = startTime;
return this;
@@ -183,9 +197,14 @@ public ShardConsumer build() {
@Override
public void run() {
LOG.debug("Shard Consumer start to run...");
// Check should skip processing or not.
if (shouldSkip()) return;

long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";
int interval;
// GetRecordsResponse response;
List<software.amazon.awssdk.services.dynamodb.model.Record> records;

while (!shouldStop) {
if (shardIterator == null) {
@@ -201,25 +220,7 @@ public void run() {
lastCheckpointTime = System.currentTimeMillis();
}

// Use the shard iterator to read the stream records
GetRecordsRequest req = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(MAX_GET_RECORD_ITEM_COUNT)
.build();


List<software.amazon.awssdk.services.dynamodb.model.Record> records;
GetRecordsResponse response;
try {
response = dynamoDbStreamsClient.getRecords(req);
} catch (SdkException e) {
checkpointer.checkpoint(sequenceNumber);
throw e;
}

shardIterator = response.nextShardIterator();

int interval;
GetRecordsResponse response = callGetRecords(shardIterator);

if (!response.records().isEmpty()) {
// Always use the last sequence number for checkpoint
@@ -228,7 +229,7 @@ public void run() {

if (waitForExport) {

if (lastEventTime.compareTo(startTime) <= 0) {
if (lastEventTime.isBefore(startTime)) {
LOG.debug("Get {} events before start time, ignore...", response.records().size());
continue;
}
@@ -237,7 +238,7 @@ public void run() {
waitForExport = false;

records = response.records().stream()
.filter(record -> record.dynamodb().approximateCreationDateTime().compareTo(startTime) > 0)
.filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime))
.collect(Collectors.toList());
} else {
records = response.records();
@@ -249,6 +250,7 @@ public void run() {
} else {
interval = GET_RECORD_INTERVAL_MILLS;
}
shardIterator = response.nextShardIterator();
try {
// Idle between get records call.
Thread.sleep(interval);
@@ -275,6 +277,25 @@ public void run() {
}
}

/**
* Wrap of GetRecords call
*/
private GetRecordsResponse callGetRecords(String shardIterator) {
// Use the shard iterator to read the stream records
GetRecordsRequest req = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(MAX_GET_RECORD_ITEM_COUNT)
.build();

try {
GetRecordsResponse response = dynamoDbStreamsClient.getRecords(req);
return response;
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}

}

private void waitForExport() {
LOG.debug("Start waiting for export to be done and loaded");
int numberOfWaits = 0;
@@ -298,6 +319,34 @@ private void waitForExport() {
}
}

/**
* Only to skip processing when below two conditions are met.
* - Last Shard Iterator is provided (Shard with ending sequence number)
* - Last Event Timestamp is later than start time or No Last Event Timestamp (empty shard)
*/
private boolean shouldSkip() {
// Do skip check
if (lastShardIterator != null && !lastShardIterator.isEmpty()) {
GetRecordsResponse response = callGetRecords(lastShardIterator);
if (response.records().isEmpty()) {
// Empty shard
LOG.info("LastShardIterator is provided, but there is no Last Event Time, skip processing");
return true;
}

Instant lastEventTime = response.records().get(response.records().size() - 1).dynamodb().approximateCreationDateTime();
if (lastEventTime.isBefore(startTime)) {
LOG.info("LastShardIterator is provided, and Last Event Time is earlier than export time, skip processing");
return true;
} else {
LOG.info("LastShardIterator is provided, and Last Event Time is later than export time, start processing");
return false;
}
}

return false;
}


/**
* Currently, this is to stop all consumers.
Original file line number Diff line number Diff line change
@@ -30,8 +30,6 @@
public class ShardConsumerFactory {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);

private static final int STREAM_TO_TABLE_OFFSET = "stream/".length();


private final DynamoDbStreamsClient streamsClient;

@@ -62,6 +60,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,
// Check and get the current state.
Optional<StreamProgressState> progressState = streamPartition.getProgressState();
String sequenceNumber = null;
String lastShardIterator = null;
Instant startTime = null;
boolean waitForExport = false;
if (progressState.isPresent()) {
@@ -71,10 +70,15 @@ public Runnable createConsumer(final StreamPartition streamPartition,
if (progressState.get().getStartTime() != 0) {
startTime = Instant.ofEpochMilli(progressState.get().getStartTime());
}
// If ending sequence number is present, get the shardIterator for last record
String endingSequenceNumber = progressState.get().getEndingSequenceNumber();
if (endingSequenceNumber != null && !endingSequenceNumber.isEmpty()) {
lastShardIterator = shardManager.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), endingSequenceNumber);
}
}

String shardIter = shardManager.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber);
if (shardIter == null) {
String shardIterator = shardManager.getShardIterator(streamPartition.getStreamArn(), streamPartition.getShardId(), sequenceNumber);
if (shardIterator == null) {
LOG.info("Unable to get a shard iterator, looks like the shard has expired");
LOG.error("Failed to start a Shard Consumer for " + streamPartition.getShardId());
return null;
@@ -84,10 +88,13 @@ public Runnable createConsumer(final StreamPartition streamPartition,
String tableArn = getTableArn(streamPartition.getStreamArn());
TableInfo tableInfo = getTableInfo(tableArn);

LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator);
LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator);
ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, buffer)
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.shardIterator(shardIter)
.shardIterator(shardIterator)
.lastShardIterator(lastShardIterator)
.startTime(startTime)
.waitForExport(waitForExport)
.acknowledgmentSet(acknowledgementSet)
@@ -105,6 +112,6 @@ private TableInfo getTableInfo(String tableArn) {
private String getTableArn(String streamArn) {
// e.g. Given a stream arn: arn:aws:dynamodb:us-west-2:xxx:table/test-table/stream/2023-07-31T04:59:58.190
// Returns arn:aws:dynamodb:us-west-2:xxx:table/test-table
return streamArn.substring(0, streamArn.lastIndexOf('/') - STREAM_TO_TABLE_OFFSET);
return streamArn.substring(0, streamArn.lastIndexOf('/') - "stream/".length());
}
}
Loading

0 comments on commit 32cdb43

Please sign in to comment.