Skip to content

Commit

Permalink
Add max_receive_count configuration option in S3-SQS source to delete…
Browse files Browse the repository at this point in the history
… messages that have been received many times (#5408)

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Feb 7, 2025
1 parent 705c0d9 commit 1ce9dbf
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.services.sts.model.StsException;
Expand Down Expand Up @@ -189,6 +190,7 @@ private ReceiveMessageRequest createReceiveMessageRequest() {
.maxNumberOfMessages(sqsOptions.getMaximumMessages())
.visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds())
.waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds())
.attributeNamesWithStrings(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT.toString())
.build();
}

Expand Down Expand Up @@ -232,10 +234,20 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {
LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size());

for (ParsedMessage parsedMessage : parsedMessagesToRead) {
sqsMessageDelayTimer.record(Duration.between(
Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()),
Instant.now()
));
final int approximateReceiveCount = getApproximateReceiveCount(parsedMessage.getMessage());
if (s3SourceConfig.getSqsOptions().getMaxReceiveAttempts() != null &&
approximateReceiveCount > s3SourceConfig.getSqsOptions().getMaxReceiveAttempts()) {
deleteSqsMessages(List.of(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage())));
parsedMessage.setShouldSkipProcessing(true);
continue;
}

if (approximateReceiveCount <= 1) {
sqsMessageDelayTimer.record(Duration.between(
Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()),
Instant.now()
));
}
List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = new ArrayList<>();
List<S3ObjectReference> s3ObjectDeletionWaitingForAcknowledgments = new ArrayList<>();
AcknowledgementSet acknowledgementSet = null;
Expand Down Expand Up @@ -288,6 +300,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) {

// Use a separate loop for processing the S3 objects
for (ParsedMessage parsedMessage : parsedMessagesToRead) {
if (parsedMessage.isShouldSkipProcessing()) {
continue;
}

final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage);
final List<DeleteMessageBatchRequestEntry> waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage);
final List<S3ObjectReference> s3ObjectDeletionsWaitingForAcknowledgments = messagesWaitingForS3ObjectDeletion.get(parsedMessage);
Expand Down Expand Up @@ -428,6 +444,11 @@ private S3ObjectReference populateS3Reference(final String bucketName, final Str
.build();
}

private int getApproximateReceiveCount(final Message message) {
return message.attributes() != null && message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT) != null ?
Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) : 0;
}

void stop() {
isStopped = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.source.s3.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
Expand Down Expand Up @@ -53,6 +54,11 @@ public class SqsOptions {
@DurationMin(seconds = 0)
private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS;

@JsonPropertyDescription("Messages that contain an ApproximateReceiveCount greater than this value will be deleted")
@JsonProperty("max_receive_attempts")
@Min(1)
private Integer maxReceiveAttempts;

public String getSqsUrl() {
return sqsUrl;
}
Expand Down Expand Up @@ -80,4 +86,6 @@ public Duration getWaitTime() {
public Duration getPollDelay() {
return pollDelay;
}

public Integer getMaxReceiveAttempts() { return maxReceiveAttempts; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class ParsedMessage {
private boolean emptyNotification;
private String detailType;

private boolean shouldSkipProcessing;

public ParsedMessage(final Message message, final boolean failedParsing) {
this.message = Objects.requireNonNull(message);
this.failedParsing = failedParsing;
Expand All @@ -40,6 +42,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) {
this.eventTime = notificationRecords.get(0).getEventTime();
this.failedParsing = false;
this.emptyNotification = notificationRecords.isEmpty();
this.shouldSkipProcessing = false;
}

ParsedMessage(final Message message, final S3EventBridgeNotification eventBridgeNotification) {
Expand All @@ -49,6 +52,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) {
this.objectSize = eventBridgeNotification.getDetail().getObject().getSize();
this.detailType = eventBridgeNotification.getDetailType();
this.eventTime = eventBridgeNotification.getTime();
this.shouldSkipProcessing = false;
}

public Message getMessage() {
Expand Down Expand Up @@ -87,6 +91,12 @@ public String getDetailType() {
return detailType;
}

public boolean isShouldSkipProcessing () { return shouldSkipProcessing; }

public void setShouldSkipProcessing(final boolean shouldSkipProcessing) {
this.shouldSkipProcessing = shouldSkipProcessing;
}

@Override
public String toString() {
return "Message{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;
Expand All @@ -53,6 +54,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -202,6 +204,42 @@ void processSqsMessages_should_return_number_of_messages_processed(final String
assertThat(actualDelay, greaterThanOrEqualTo(Duration.ofHours(1).minus(Duration.ofSeconds(5))));
}

@Test
void processSqsMessages_with_max_receive_count_reached_deletes_message_and_skips_processing() throws IOException {
when(sqsOptions.getMaxReceiveAttempts()).thenReturn(4);

final String eventName = "ObjectCreated:Put";
Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS);
final Message message = mock(Message.class);
when(message.attributes()).thenReturn(Map.of(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT, "5"));
when(message.body()).thenReturn(createEventNotification(eventName, startTime));
final String testReceiptHandle = UUID.randomUUID().toString();
when(message.messageId()).thenReturn(testReceiptHandle);
when(message.receiptHandle()).thenReturn(testReceiptHandle);

final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class);
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse);
when(receiveMessageResponse.messages()).thenReturn(Collections.singletonList(message));

final int messagesProcessed = createObjectUnderTest().processSqsMessages();
final ArgumentCaptor<DeleteMessageBatchRequest> deleteMessageBatchRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
verify(sqsClient).deleteMessageBatch(deleteMessageBatchRequestArgumentCaptor.capture());
final DeleteMessageBatchRequest actualDeleteMessageBatchRequest = deleteMessageBatchRequestArgumentCaptor.getValue();

verifyNoInteractions(sqsMessageDelayTimer);

assertThat(actualDeleteMessageBatchRequest, notNullValue());
assertThat(actualDeleteMessageBatchRequest.entries().size(), equalTo(1));
assertThat(actualDeleteMessageBatchRequest.queueUrl(), equalTo(s3SourceConfig.getSqsOptions().getSqsUrl()));
assertThat(actualDeleteMessageBatchRequest.entries().get(0).id(), equalTo(message.messageId()));
assertThat(actualDeleteMessageBatchRequest.entries().get(0).receiptHandle(), equalTo(message.receiptHandle()));
assertThat(messagesProcessed, equalTo(1));
verifyNoInteractions(s3Service);
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
verify(sqsMessagesReceivedCounter).increment(1);
verify(sqsMessagesDeletedCounter).increment(1);
}

@Test
void processSqsMessages_should_not_interact_with_S3Service_and_delete_message_if_TestEvent() {
final String messageId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ void test_parsed_message_with_S3EventNotificationRecord() {
assertThat(parsedMessage.getEventTime(), equalTo(testEventTime));
assertThat(parsedMessage.isFailedParsing(), equalTo(false));
assertThat(parsedMessage.isEmptyNotification(), equalTo(false));
assertThat(parsedMessage.isShouldSkipProcessing(), equalTo(false));
}

@Test
Expand Down

0 comments on commit 1ce9dbf

Please sign in to comment.