diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 81c7f9ca9c..14451be31a 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.services.sts.model.StsException; @@ -189,6 +190,7 @@ private ReceiveMessageRequest createReceiveMessageRequest() { .maxNumberOfMessages(sqsOptions.getMaximumMessages()) .visibilityTimeout((int) sqsOptions.getVisibilityTimeout().getSeconds()) .waitTimeSeconds((int) sqsOptions.getWaitTime().getSeconds()) + .attributeNamesWithStrings(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT.toString()) .build(); } @@ -232,10 +234,20 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); for (ParsedMessage parsedMessage : parsedMessagesToRead) { - sqsMessageDelayTimer.record(Duration.between( - Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()), - Instant.now() - )); + final int approximateReceiveCount = getApproximateReceiveCount(parsedMessage.getMessage()); + if (s3SourceConfig.getSqsOptions().getMaxReceiveAttempts() != null && + approximateReceiveCount > s3SourceConfig.getSqsOptions().getMaxReceiveAttempts()) { + deleteSqsMessages(List.of(buildDeleteMessageBatchRequestEntry(parsedMessage.getMessage()))); + parsedMessage.setShouldSkipProcessing(true); + continue; + } + + if (approximateReceiveCount <= 1) { + sqsMessageDelayTimer.record(Duration.between( + Instant.ofEpochMilli(parsedMessage.getEventTime().toInstant().getMillis()), + Instant.now() + )); + } List waitingForAcknowledgements = new ArrayList<>(); List s3ObjectDeletionWaitingForAcknowledgments = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; @@ -288,6 +300,10 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { // Use a separate loop for processing the S3 objects for (ParsedMessage parsedMessage : parsedMessagesToRead) { + if (parsedMessage.isShouldSkipProcessing()) { + continue; + } + final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage); final List waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage); final List s3ObjectDeletionsWaitingForAcknowledgments = messagesWaitingForS3ObjectDeletion.get(parsedMessage); @@ -428,6 +444,11 @@ private S3ObjectReference populateS3Reference(final String bucketName, final Str .build(); } + private int getApproximateReceiveCount(final Message message) { + return message.attributes() != null && message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT) != null ? + Integer.parseInt(message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT)) : 0; + } + void stop() { isStopped = true; } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java index 1242a6525b..8d4eebdd08 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/configuration/SqsOptions.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.source.s3.configuration; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotBlank; @@ -53,6 +54,11 @@ public class SqsOptions { @DurationMin(seconds = 0) private Duration pollDelay = DEFAULT_POLL_DELAY_SECONDS; + @JsonPropertyDescription("Messages that contain an ApproximateReceiveCount greater than this value will be deleted") + @JsonProperty("max_receive_attempts") + @Min(1) + private Integer maxReceiveAttempts; + public String getSqsUrl() { return sqsUrl; } @@ -80,4 +86,6 @@ public Duration getWaitTime() { public Duration getPollDelay() { return pollDelay; } + + public Integer getMaxReceiveAttempts() { return maxReceiveAttempts; } } diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessage.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessage.java index ed68dff063..095e244f1d 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessage.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessage.java @@ -24,6 +24,8 @@ public class ParsedMessage { private boolean emptyNotification; private String detailType; + private boolean shouldSkipProcessing; + public ParsedMessage(final Message message, final boolean failedParsing) { this.message = Objects.requireNonNull(message); this.failedParsing = failedParsing; @@ -40,6 +42,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) { this.eventTime = notificationRecords.get(0).getEventTime(); this.failedParsing = false; this.emptyNotification = notificationRecords.isEmpty(); + this.shouldSkipProcessing = false; } ParsedMessage(final Message message, final S3EventBridgeNotification eventBridgeNotification) { @@ -49,6 +52,7 @@ public ParsedMessage(final Message message, final boolean failedParsing) { this.objectSize = eventBridgeNotification.getDetail().getObject().getSize(); this.detailType = eventBridgeNotification.getDetailType(); this.eventTime = eventBridgeNotification.getTime(); + this.shouldSkipProcessing = false; } public Message getMessage() { @@ -87,6 +91,12 @@ public String getDetailType() { return detailType; } + public boolean isShouldSkipProcessing () { return shouldSkipProcessing; } + + public void setShouldSkipProcessing(final boolean shouldSkipProcessing) { + this.shouldSkipProcessing = shouldSkipProcessing; + } + @Override public String toString() { return "Message{" + diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java index 8c0522619e..c506d295c3 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerTest.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResultEntry; import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SqsException; @@ -53,6 +54,7 @@ import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -202,6 +204,42 @@ void processSqsMessages_should_return_number_of_messages_processed(final String assertThat(actualDelay, greaterThanOrEqualTo(Duration.ofHours(1).minus(Duration.ofSeconds(5)))); } + @Test + void processSqsMessages_with_max_receive_count_reached_deletes_message_and_skips_processing() throws IOException { + when(sqsOptions.getMaxReceiveAttempts()).thenReturn(4); + + final String eventName = "ObjectCreated:Put"; + Instant startTime = Instant.now().minus(1, ChronoUnit.HOURS); + final Message message = mock(Message.class); + when(message.attributes()).thenReturn(Map.of(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT, "5")); + when(message.body()).thenReturn(createEventNotification(eventName, startTime)); + final String testReceiptHandle = UUID.randomUUID().toString(); + when(message.messageId()).thenReturn(testReceiptHandle); + when(message.receiptHandle()).thenReturn(testReceiptHandle); + + final ReceiveMessageResponse receiveMessageResponse = mock(ReceiveMessageResponse.class); + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResponse); + when(receiveMessageResponse.messages()).thenReturn(Collections.singletonList(message)); + + final int messagesProcessed = createObjectUnderTest().processSqsMessages(); + final ArgumentCaptor deleteMessageBatchRequestArgumentCaptor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(sqsClient).deleteMessageBatch(deleteMessageBatchRequestArgumentCaptor.capture()); + final DeleteMessageBatchRequest actualDeleteMessageBatchRequest = deleteMessageBatchRequestArgumentCaptor.getValue(); + + verifyNoInteractions(sqsMessageDelayTimer); + + assertThat(actualDeleteMessageBatchRequest, notNullValue()); + assertThat(actualDeleteMessageBatchRequest.entries().size(), equalTo(1)); + assertThat(actualDeleteMessageBatchRequest.queueUrl(), equalTo(s3SourceConfig.getSqsOptions().getSqsUrl())); + assertThat(actualDeleteMessageBatchRequest.entries().get(0).id(), equalTo(message.messageId())); + assertThat(actualDeleteMessageBatchRequest.entries().get(0).receiptHandle(), equalTo(message.receiptHandle())); + assertThat(messagesProcessed, equalTo(1)); + verifyNoInteractions(s3Service); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + verify(sqsMessagesReceivedCounter).increment(1); + verify(sqsMessagesDeletedCounter).increment(1); + } + @Test void processSqsMessages_should_not_interact_with_S3Service_and_delete_message_if_TestEvent() { final String messageId = UUID.randomUUID().toString(); diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessageTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessageTest.java index 51f3abad06..4a33f8f12a 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessageTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/parser/ParsedMessageTest.java @@ -120,6 +120,7 @@ void test_parsed_message_with_S3EventNotificationRecord() { assertThat(parsedMessage.getEventTime(), equalTo(testEventTime)); assertThat(parsedMessage.isFailedParsing(), equalTo(false)); assertThat(parsedMessage.isEmptyNotification(), equalTo(false)); + assertThat(parsedMessage.isShouldSkipProcessing(), equalTo(false)); } @Test