diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java index 4d77edc448..99da366de3 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptions.java @@ -48,8 +48,7 @@ private Arn getArn() { try { return Arn.fromString(awsStsRoleArn); } catch (final Exception e) { - throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn)); - } + throw new IllegalArgumentException(String.format("The value provided for sts_role_arn is not a valid AWS ARN. Provided value: %s", awsStsRoleArn)); } } public String getAwsStsRoleArn() { diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java index aa8c24eb62..ca5566d6cd 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfig.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.source.sqs; import com.fasterxml.jackson.annotation.JsonProperty; @@ -14,10 +19,10 @@ public class QueueConfig { private static final Integer DEFAULT_MAXIMUM_MESSAGES = null; - private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false; + private static final boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false; private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = null; private static final Duration DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT = Duration.ofHours(2); - private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20); + private static final Duration DEFAULT_WAIT_TIME_SECONDS = null; private static final Duration DEFAULT_POLL_DELAY_SECONDS = Duration.ofSeconds(0); static final int DEFAULT_NUMBER_OF_WORKERS = 1; @@ -45,7 +50,7 @@ public class QueueConfig { @JsonProperty("visibility_duplication_protection") @NotNull - private Boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION; + private boolean visibilityDuplicateProtection = DEFAULT_VISIBILITY_DUPLICATE_PROTECTION; @JsonProperty("visibility_duplicate_protection_timeout") @DurationMin(seconds = 30) @@ -73,7 +78,7 @@ public Duration getVisibilityTimeout() { return visibilityTimeout; } - public Boolean getVisibilityDuplicateProtection() { + public boolean getVisibilityDuplicateProtection() { return visibilityDuplicateProtection; } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java index 479fae6ec4..ee31e89a5a 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandler.java @@ -1,76 +1,51 @@ package org.opensearch.dataprepper.plugins.source.sqs; -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventMetadata; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.model.Message; -import com.fasterxml.jackson.databind.node.ObjectNode; -import java.time.Instant; -import java.util.Objects; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; + +import java.util.Collections; +import java.util.Map; -/** - * Implements the SqsMessageHandler to read and parse SQS messages generically and push to buffer. - */ public class RawSqsMessageHandler implements SqsMessageHandler { private static final Logger LOG = LoggerFactory.getLogger(RawSqsMessageHandler.class); - private static final ObjectMapper objectMapper = new ObjectMapper(); - /** - * Processes the SQS message, attempting to parse it as JSON, and adds it to the buffer. - * - * @param message - the SQS message for processing - * @param url - the SQS queue url - * @param bufferAccumulator - the buffer accumulator - * @param acknowledgementSet - the acknowledgement set for end-to-end acknowledgements - */ @Override public void handleMessage(final Message message, - final String url, - final BufferAccumulator> bufferAccumulator, + final String url, + final Buffer> buffer, + final int bufferTimeoutMillis, final AcknowledgementSet acknowledgementSet) { try { - ObjectNode dataNode = objectMapper.createObjectNode(); - dataNode.set("message", parseMessageBody(message.body())); - dataNode.put("queueUrl", url); - - Instant now = Instant.now(); - int unixTimestamp = (int) now.getEpochSecond(); - dataNode.put("sentTimestamp", unixTimestamp); - - final Record event = new Record(JacksonEvent.builder() - .withEventType("sqs-event") - .withData(dataNode) - .build()); - - if (Objects.nonNull(acknowledgementSet)) { - acknowledgementSet.add(event.getData()); + final Map systemAttributes = message.attributes(); + final Map customAttributes = message.messageAttributes(); + final Event event = JacksonEvent.builder() + .withEventType("DOCUMENT") + .withData(Collections.singletonMap("message", message.body())) + .build(); + final EventMetadata eventMetadata = event.getMetadata(); + eventMetadata.setAttribute("url", url); + final String sentTimestamp = systemAttributes.get(MessageSystemAttributeName.SENT_TIMESTAMP); + eventMetadata.setAttribute("SentTimestamp", sentTimestamp); + for (Map.Entry entry : customAttributes.entrySet()) { + eventMetadata.setAttribute(entry.getKey(), entry.getValue().stringValue()); } - - bufferAccumulator.add(new Record<>(event.getData())); - + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + buffer.write(new Record<>(event), bufferTimeoutMillis); } catch (Exception e) { LOG.error("Error processing SQS message: {}", e.getMessage(), e); throw new RuntimeException(e); } } - - JsonNode parseMessageBody(String messageBody) { - try { - return objectMapper.readTree(messageBody); - } catch (Exception e) { - return objectMapper.getNodeFactory().textNode(messageBody); - } - } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java index 5801f76885..a03c485c37 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessor.java @@ -5,15 +5,13 @@ package org.opensearch.dataprepper.plugins.source.sqs; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; - import software.amazon.awssdk.services.sqs.model.Message; - import java.io.IOException; - + public class SqsEventProcessor { private final SqsMessageHandler sqsMessageHandler; SqsEventProcessor(final SqsMessageHandler sqsMessageHandler) { @@ -22,9 +20,10 @@ public class SqsEventProcessor { void addSqsObject(final Message message, final String url, - final BufferAccumulator> bufferAccumulator, + final Buffer> buffer, + final int bufferTimeoutmillis, final AcknowledgementSet acknowledgementSet) throws IOException { - sqsMessageHandler.handleMessage(message, url, bufferAccumulator, acknowledgementSet); + sqsMessageHandler.handleMessage(message, url, buffer, bufferTimeoutmillis, acknowledgementSet); } } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java index dbe041cd16..79012b5e00 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsMessageHandler.java @@ -4,8 +4,8 @@ */ package org.opensearch.dataprepper.plugins.source.sqs; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.model.Message; @@ -14,6 +14,7 @@ public interface SqsMessageHandler { void handleMessage(final Message message, final String url, - final BufferAccumulator> bufferAccumulator, + final Buffer> buffer, + final int bufferTimeoutMillis, final AcknowledgementSet acknowledgementSet) throws IOException ; } diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java index 7a14f53144..4e1f9507e6 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsRetriesExhaustedException.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.source.sqs; /** diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java index a13b81dd4a..d53f269323 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsService.java @@ -69,9 +69,11 @@ public void start() { sqsSourceConfig.getQueues().forEach(queueConfig -> { String queueUrl = queueConfig.getUrl(); + String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1); + int numWorkers = queueConfig.getNumWorkers(); ExecutorService executorService = Executors.newFixedThreadPool( - numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source-new-" + queueUrl)); + numWorkers, BackgroundThreadFactory.defaultExecutorThreadFactory("sqs-source" + queueName)); allSqsUrlExecutorServices.add(executorService); List workers = IntStream.range(0, numWorkers) .mapToObj(i -> new SqsWorker( diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java index 71951d5fe8..980e59048b 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSource.java @@ -2,6 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ + package org.opensearch.dataprepper.plugins.source.sqs; import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java index f99201c0b3..db9ee278a5 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfig.java @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.dataprepper.plugins.source.sqs; import com.fasterxml.jackson.annotation.JsonProperty; diff --git a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java index 56db71ec27..3f58906b33 100644 --- a/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java +++ b/data-prepper-plugins/sqs-source/src/main/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorker.java @@ -16,15 +16,15 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; +import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; import software.amazon.awssdk.services.sqs.model.SqsException; import software.amazon.awssdk.services.sts.model.StsException; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.buffer.Buffer; import java.time.Duration; @@ -61,7 +61,8 @@ public class SqsWorker implements Runnable { private final boolean endToEndAcknowledgementsEnabled; private final AcknowledgementSetManager acknowledgementSetManager; private volatile boolean isStopped = false; - private final BufferAccumulator> bufferAccumulator; + private final Buffer> buffer; + private final int bufferTimeoutMillis; private Map messageVisibilityTimesMap; public SqsWorker(final Buffer> buffer, @@ -79,12 +80,11 @@ public SqsWorker(final Buffer> buffer, this.acknowledgementSetManager = acknowledgementSetManager; this.standardBackoff = backoff; this.endToEndAcknowledgementsEnabled = sqsSourceConfig.getAcknowledgements(); - this.bufferAccumulator = BufferAccumulator.create(buffer, sqsSourceConfig.getNumberOfRecordsToAccumulate(), sqsSourceConfig.getBufferTimeout()); + this.buffer = buffer; + this.bufferTimeoutMillis = (int) sqsSourceConfig.getBufferTimeout().toMillis(); messageVisibilityTimesMap = new HashMap<>(); - failedAttemptCount = 0; - sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME); sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME); sqsMessagesFailedCounter = pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME); @@ -131,9 +131,11 @@ int processSqsMessages() { private List getMessagesFromSqs() { try { final ReceiveMessageRequest request = createReceiveMessageRequest(); - final List messages = sqsClient.receiveMessage(request).messages(); + final ReceiveMessageResponse response = sqsClient.receiveMessage(request); + List messages = response.messages(); failedAttemptCount = 0; return messages; + } catch (final SqsException | StsException e) { LOG.error("Error reading from SQS: {}. Retrying with exponential backoff.", e.getMessage()); applyBackoff(); @@ -160,16 +162,18 @@ private void applyBackoff() { private ReceiveMessageRequest createReceiveMessageRequest() { ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder() .queueUrl(queueConfig.getUrl()) - .waitTimeSeconds((int) queueConfig.getWaitTime().getSeconds()); + .attributeNamesWithStrings("All") + .messageAttributeNames("All"); + if (queueConfig.getWaitTime() != null) { + requestBuilder.waitTimeSeconds((int) queueConfig.getWaitTime().getSeconds()); + } if (queueConfig.getMaximumMessages() != null) { requestBuilder.maxNumberOfMessages(queueConfig.getMaximumMessages()); } - if (queueConfig.getVisibilityTimeout() != null) { requestBuilder.visibilityTimeout((int) queueConfig.getVisibilityTimeout().getSeconds()); } - return requestBuilder.build(); } @@ -244,14 +248,6 @@ private List processSqsEvents(final List processSqsObject( final Message message, final AcknowledgementSet acknowledgementSet) { try { - sqsEventProcessor.addSqsObject(message, queueConfig.getUrl(), bufferAccumulator, acknowledgementSet); + sqsEventProcessor.addSqsObject(message, queueConfig.getUrl(), buffer, bufferTimeoutMillis, acknowledgementSet); return Optional.of(buildDeleteMessageBatchRequestEntry(message)); } catch (final Exception e) { sqsMessagesFailedCounter.increment(); diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java index c90c18bb2d..77eeeb519a 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/AwsAuthenticationOptionsTest.java @@ -87,12 +87,12 @@ void validateStsRoleArn_with_invalid_format_throws_exception() throws NoSuchFiel try (final MockedStatic arnMockedStatic = mockStatic(Arn.class)) { arnMockedStatic.when(() -> Arn.fromString(invalidFormatArn)) - .thenThrow(new IllegalArgumentException("Invalid ARN format for awsStsRoleArn. Check the format of " + invalidFormatArn)); + .thenThrow(new IllegalArgumentException("The value provided for sts_role_arn is not a valid AWS ARN. Provided value: " + invalidFormatArn)); IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, () -> { awsAuthenticationOptions.validateStsRoleArn(); }); - assertThat(exception.getMessage(), equalTo("Invalid ARN format for awsStsRoleArn. Check the format of " + invalidFormatArn)); + assertThat(exception.getMessage(), equalTo("The value provided for sts_role_arn is not a valid AWS ARN. Provided value: " + invalidFormatArn)); } } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java index f33a227de8..7e62f327a1 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java @@ -21,6 +21,6 @@ void testDefaultValues() { assertFalse(queueConfig.getVisibilityDuplicateProtection(), "Visibility duplicate protection should default to false"); assertEquals(Duration.ofHours(2), queueConfig.getVisibilityDuplicateProtectionTimeout(), "Visibility duplicate protection timeout should default to 2 hours"); - assertEquals(Duration.ofSeconds(20), queueConfig.getWaitTime(), "Wait time should default to 20 seconds"); + assertNull(queueConfig.getWaitTime(), "Wait time should default to null"); } } \ No newline at end of file diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java index cfb7241459..1e6a0baa82 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java @@ -1,18 +1,15 @@ package org.opensearch.dataprepper.plugins.source.sqs; -import com.fasterxml.jackson.databind.JsonNode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.model.Message; -import java.util.Map; - import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -20,43 +17,23 @@ class RawSqsMessageHandlerTest { private final RawSqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); - private BufferAccumulator> mockBufferAccumulator; + private Buffer> mockBuffer; + private int mockBufferTimeoutMillis; @BeforeEach void setUp() { - mockBufferAccumulator = mock(BufferAccumulator.class); - } - - @Test - void parseMessageBody_validJsonString_returnsJsonNode() { - String validJson = "{\"key\":\"value\"}"; - JsonNode result = rawSqsMessageHandler.parseMessageBody(validJson); - assertTrue(result.isObject(), "Result should be a JSON object"); - assertEquals("value", result.get("key").asText(), "The value of 'key' should be 'value'"); + mockBuffer = mock(Buffer.class); + mockBufferTimeoutMillis = 10000; } @Test - void handleMessage_callsBufferAccumulatorAddOnce() throws Exception { + void handleMessage_callsBufferWriteOnce() throws Exception { Message message = Message.builder().body("{\"key\":\"value\"}").build(); String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - rawSqsMessageHandler.handleMessage(message, queueUrl, mockBufferAccumulator, null); + rawSqsMessageHandler.handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, null); ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(mockBufferAccumulator, times(1)).add(argumentCaptor.capture()); + verify(mockBuffer, times(1)).write(argumentCaptor.capture(), eq(mockBufferTimeoutMillis)); Record capturedRecord = argumentCaptor.getValue(); - assertEquals("sqs-event", capturedRecord.getData().getMetadata().getEventType(), "Event type should be 'sqs-event'"); + assertEquals("DOCUMENT", capturedRecord.getData().getMetadata().getEventType(), "Event type should be 'DOCUMENT'"); } - - @Test - void handleMessage_handlesInvalidJsonBodyGracefully() throws Exception { - String invalidJsonBody = "Invalid JSON string"; - Message message = Message.builder().body(invalidJsonBody).build(); - String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - rawSqsMessageHandler.handleMessage(message, queueUrl, mockBufferAccumulator, null); - ArgumentCaptor> argumentCaptor = ArgumentCaptor.forClass(Record.class); - verify(mockBufferAccumulator, times(1)).add(argumentCaptor.capture()); - Record capturedRecord = argumentCaptor.getValue(); - Map eventData = capturedRecord.getData().toMap(); - assertEquals(invalidJsonBody, eventData.get("message"), "The message should be a text node containing the original string"); - } - } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java index 00b5f18640..62c6385166 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java @@ -3,8 +3,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import software.amazon.awssdk.services.sqs.model.Message; @@ -20,14 +20,16 @@ class SqsEventProcessorTest { private SqsMessageHandler mockSqsMessageHandler; private SqsEventProcessor sqsEventProcessor; - private BufferAccumulator> mockBufferAccumulator; + private Buffer> mockBuffer; + private int mockBufferTimeoutMillis; private AcknowledgementSet mockAcknowledgementSet; @BeforeEach void setUp() { mockSqsMessageHandler = Mockito.mock(SqsMessageHandler.class); - mockBufferAccumulator = Mockito.mock(BufferAccumulator.class); + mockBuffer = Mockito.mock(Buffer.class); mockAcknowledgementSet = Mockito.mock(AcknowledgementSet.class); + mockBufferTimeoutMillis = 10000; sqsEventProcessor = new SqsEventProcessor(mockSqsMessageHandler); } @@ -35,19 +37,19 @@ void setUp() { void addSqsObject_callsHandleMessageWithCorrectParameters() throws IOException { Message message = Message.builder().body("Test Message Body").build(); String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - sqsEventProcessor.addSqsObject(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); - verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); + sqsEventProcessor.addSqsObject(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, mockAcknowledgementSet); + verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, mockAcknowledgementSet); } @Test void addSqsObject_propagatesIOExceptionThrownByHandleMessage() throws IOException { Message message = Message.builder().body("Test Message Body").build(); String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; - doThrow(new IOException("Handle message failed")).when(mockSqsMessageHandler).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); + doThrow(new IOException("Handle message failed")).when(mockSqsMessageHandler).handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, mockAcknowledgementSet); IOException thrownException = assertThrows(IOException.class, () -> - sqsEventProcessor.addSqsObject(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet) + sqsEventProcessor.addSqsObject(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, mockAcknowledgementSet) ); assert(thrownException.getMessage().equals("Handle message failed")); - verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); + verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBuffer, mockBufferTimeoutMillis, mockAcknowledgementSet); } } diff --git a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java index c03715a137..7bb8e082cc 100644 --- a/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java +++ b/data-prepper-plugins/sqs-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsWorkerTest.java @@ -87,6 +87,7 @@ class SqsWorkerTest { private Counter sqsVisibilityTimeoutChangedCount; @Mock private Counter sqsVisibilityTimeoutChangeFailedCount; + private int mockBufferTimeoutMillis = 10000; private SqsWorker createObjectUnderTest() { return new SqsWorker( @@ -110,7 +111,7 @@ void setUp() { when(pluginMetrics.counter(SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME)).thenReturn(sqsVisibilityTimeoutChangedCount); when(pluginMetrics.counter(SqsWorker.SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME)).thenReturn(sqsVisibilityTimeoutChangeFailedCount); when(sqsSourceConfig.getAcknowledgements()).thenReturn(false); - when(sqsSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(100); + when(sqsSourceConfig.getBufferTimeout()).thenReturn(Duration.ofSeconds(10)); when(queueConfig.getUrl()).thenReturn("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"); when(queueConfig.getWaitTime()).thenReturn(Duration.ofSeconds(1)); } @@ -163,7 +164,7 @@ void processSqsMessages_should_invoke_processSqsEvent_and_deleteSqsMessages_when int messagesProcessed = sqsWorker.processSqsMessages(); assertThat(messagesProcessed, equalTo(1)); - verify(sqsEventProcessor, times(1)).addSqsObject(eq(message), eq("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"), any(), isNull()); + verify(sqsEventProcessor, times(1)).addSqsObject(eq(message), eq("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"), eq(buffer), eq(mockBufferTimeoutMillis), isNull()); verify(sqsClient, times(1)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); verify(sqsMessagesReceivedCounter).increment(1); verify(sqsMessagesDeletedCounter).increment(1); @@ -177,7 +178,7 @@ void processSqsMessages_should_not_invoke_processSqsEvent_and_deleteSqsMessages_ SqsWorker sqsWorker = createObjectUnderTest(); int messagesProcessed = sqsWorker.processSqsMessages(); assertThat(messagesProcessed, equalTo(0)); - verify(sqsEventProcessor, never()).addSqsObject(any(), anyString(), any(), any()); + verify(sqsEventProcessor, never()).addSqsObject(any(), anyString(), any(), anyInt(), any()); verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); verify(sqsMessagesReceivedCounter, never()).increment(anyDouble()); verify(sqsMessagesDeletedCounter, never()).increment(anyDouble()); @@ -201,9 +202,10 @@ void processSqsMessages_should_not_delete_messages_if_acknowledgements_enabled_u when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(response); int messagesProcessed = createObjectUnderTest().processSqsMessages(); assertThat(messagesProcessed, equalTo(1)); - verify(sqsEventProcessor).addSqsObject(eq(message), + verify(sqsEventProcessor, times(1)).addSqsObject(eq(message), eq("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"), - any(), + eq(buffer), + eq(mockBufferTimeoutMillis), eq(acknowledgementSet)); verify(sqsMessagesReceivedCounter).increment(1); verifyNoInteractions(sqsMessagesDeletedCounter); @@ -264,7 +266,7 @@ void processSqsMessages_should_update_visibility_timeout_when_progress_changes() final int messagesProcessed = createObjectUnderTest().processSqsMessages(); assertThat(messagesProcessed, equalTo(1)); - verify(sqsEventProcessor).addSqsObject(any(), anyString(), any(), any()); + verify(sqsEventProcessor).addSqsObject(any(), anyString(), any(), anyInt(), any()); verify(acknowledgementSetManager).create(any(), any(Duration.class)); ArgumentCaptor> progressConsumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class); @@ -300,9 +302,10 @@ void increaseVisibilityTimeout_doesNothing_whenIsStopped() throws IOException { sqsWorker.stop(); int messagesProcessed = sqsWorker.processSqsMessages(); assertThat(messagesProcessed, equalTo(1)); - verify(sqsEventProcessor).addSqsObject(eq(message), + verify(sqsEventProcessor, times(1)).addSqsObject(eq(message), eq("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"), - any(), + eq(buffer), + eq(mockBufferTimeoutMillis), eq(mockAcknowledgementSet)); verify(sqsClient, never()).changeMessageVisibility(any(ChangeMessageVisibilityRequest.class)); verify(sqsVisibilityTimeoutChangeFailedCount, never()).increment(); @@ -362,7 +365,7 @@ void processSqsMessages_handlesException_correctly_when_addSqsObject_throwsExcep ); doThrow(new RuntimeException("Processing failed")).when(sqsEventProcessor) .addSqsObject(eq(message), eq("https://sqs.us-east-1.amazonaws.com/123456789012/MyQueue"), - any(), any()); + any(), anyInt(), any()); SqsWorker sqsWorker = createObjectUnderTest(); int messagesProcessed = sqsWorker.processSqsMessages(); assertThat(messagesProcessed, equalTo(1));