-
Notifications
You must be signed in to change notification settings - Fork 214
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Jeremy Michael <[email protected]>
- Loading branch information
Jeremy Michael
committed
Dec 26, 2024
1 parent
29701d0
commit b103f7e
Showing
9 changed files
with
341 additions
and
19 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 26 additions & 0 deletions
26
...s-source/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/QueueConfigTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package org.opensearch.dataprepper.plugins.source.sqs; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import java.time.Duration; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertNull; | ||
|
||
public class QueueConfigTest { | ||
|
||
@Test | ||
void testDefaultValues() { | ||
final QueueConfig queueConfig = new QueueConfig(); | ||
|
||
assertNull(queueConfig.getUrl(), "URL should be null by default"); | ||
assertEquals(1, queueConfig.getNumWorkers(), "Number of workers should default to 1"); | ||
assertNull(queueConfig.getMaximumMessages(), "Maximum messages should be null by default"); | ||
assertEquals(Duration.ofSeconds(0), queueConfig.getPollDelay(), "Poll delay should default to 0 seconds"); | ||
assertNull(queueConfig.getVisibilityTimeout(), "Visibility timeout should be null by default"); | ||
assertFalse(queueConfig.getVisibilityDuplicateProtection(), "Visibility duplicate protection should default to false"); | ||
assertEquals(Duration.ofHours(2), queueConfig.getVisibilityDuplicateProtectionTimeout(), | ||
"Visibility duplicate protection timeout should default to 2 hours"); | ||
assertEquals(Duration.ofSeconds(20), queueConfig.getWaitTime(), "Wait time should default to 20 seconds"); | ||
} | ||
} |
62 changes: 62 additions & 0 deletions
62
...src/test/java/org/opensearch/dataprepper/plugins/source/sqs/RawSqsMessageHandlerTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package org.opensearch.dataprepper.plugins.source.sqs; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.ArgumentCaptor; | ||
import org.opensearch.dataprepper.buffer.common.BufferAccumulator; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import software.amazon.awssdk.services.sqs.model.Message; | ||
|
||
import java.util.Map; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
||
class RawSqsMessageHandlerTest { | ||
|
||
private final RawSqsMessageHandler rawSqsMessageHandler = new RawSqsMessageHandler(); | ||
private BufferAccumulator<Record<Event>> mockBufferAccumulator; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
mockBufferAccumulator = mock(BufferAccumulator.class); | ||
} | ||
|
||
@Test | ||
void parseMessageBody_validJsonString_returnsJsonNode() { | ||
String validJson = "{\"key\":\"value\"}"; | ||
JsonNode result = rawSqsMessageHandler.parseMessageBody(validJson); | ||
assertTrue(result.isObject(), "Result should be a JSON object"); | ||
assertEquals("value", result.get("key").asText(), "The value of 'key' should be 'value'"); | ||
} | ||
|
||
@Test | ||
void handleMessage_callsBufferAccumulatorAddOnce() throws Exception { | ||
Message message = Message.builder().body("{\"key\":\"value\"}").build(); | ||
String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; | ||
rawSqsMessageHandler.handleMessage(message, queueUrl, mockBufferAccumulator, null); | ||
ArgumentCaptor<Record<Event>> argumentCaptor = ArgumentCaptor.forClass(Record.class); | ||
verify(mockBufferAccumulator, times(1)).add(argumentCaptor.capture()); | ||
Record<Event> capturedRecord = argumentCaptor.getValue(); | ||
assertEquals("sqs-event", capturedRecord.getData().getMetadata().getEventType(), "Event type should be 'sqs-event'"); | ||
} | ||
|
||
@Test | ||
void handleMessage_handlesInvalidJsonBodyGracefully() throws Exception { | ||
String invalidJsonBody = "Invalid JSON string"; | ||
Message message = Message.builder().body(invalidJsonBody).build(); | ||
String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; | ||
rawSqsMessageHandler.handleMessage(message, queueUrl, mockBufferAccumulator, null); | ||
ArgumentCaptor<Record<Event>> argumentCaptor = ArgumentCaptor.forClass(Record.class); | ||
verify(mockBufferAccumulator, times(1)).add(argumentCaptor.capture()); | ||
Record<Event> capturedRecord = argumentCaptor.getValue(); | ||
Map<String, Object> eventData = capturedRecord.getData().toMap(); | ||
assertEquals(invalidJsonBody, eventData.get("message"), "The message should be a text node containing the original string"); | ||
} | ||
|
||
} |
53 changes: 53 additions & 0 deletions
53
...ce/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsEventProcessorTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
package org.opensearch.dataprepper.plugins.source.sqs; | ||
|
||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
import org.mockito.Mockito; | ||
import org.opensearch.dataprepper.buffer.common.BufferAccumulator; | ||
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; | ||
import org.opensearch.dataprepper.model.event.Event; | ||
import org.opensearch.dataprepper.model.record.Record; | ||
import software.amazon.awssdk.services.sqs.model.Message; | ||
|
||
import java.io.IOException; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertThrows; | ||
import static org.mockito.Mockito.doThrow; | ||
import static org.mockito.Mockito.times; | ||
import static org.mockito.Mockito.verify; | ||
|
||
class SqsEventProcessorTest { | ||
|
||
private SqsMessageHandler mockSqsMessageHandler; | ||
private SqsEventProcessor sqsEventProcessor; | ||
private BufferAccumulator<Record<Event>> mockBufferAccumulator; | ||
private AcknowledgementSet mockAcknowledgementSet; | ||
|
||
@BeforeEach | ||
void setUp() { | ||
mockSqsMessageHandler = Mockito.mock(SqsMessageHandler.class); | ||
mockBufferAccumulator = Mockito.mock(BufferAccumulator.class); | ||
mockAcknowledgementSet = Mockito.mock(AcknowledgementSet.class); | ||
sqsEventProcessor = new SqsEventProcessor(mockSqsMessageHandler); | ||
} | ||
|
||
@Test | ||
void addSqsObject_callsHandleMessageWithCorrectParameters() throws IOException { | ||
Message message = Message.builder().body("Test Message Body").build(); | ||
String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; | ||
sqsEventProcessor.addSqsObject(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); | ||
verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); | ||
} | ||
|
||
@Test | ||
void addSqsObject_propagatesIOExceptionThrownByHandleMessage() throws IOException { | ||
Message message = Message.builder().body("Test Message Body").build(); | ||
String queueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"; | ||
doThrow(new IOException("Handle message failed")).when(mockSqsMessageHandler).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); | ||
IOException thrownException = assertThrows(IOException.class, () -> | ||
sqsEventProcessor.addSqsObject(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet) | ||
); | ||
assert(thrownException.getMessage().equals("Handle message failed")); | ||
verify(mockSqsMessageHandler, times(1)).handleMessage(message, queueUrl, mockBufferAccumulator, mockAcknowledgementSet); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
...urce/src/test/java/org/opensearch/dataprepper/plugins/source/sqs/SqsSourceConfigTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package org.opensearch.dataprepper.plugins.source.sqs; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertNull; | ||
|
||
public class SqsSourceConfigTest { | ||
|
||
private final ObjectMapper objectMapper = new ObjectMapper(); | ||
|
||
@Test | ||
void testDefaultValues() { | ||
final SqsSourceConfig config = new SqsSourceConfig(); | ||
assertNull(config.getAwsAuthenticationOptions(), "AWS Authentication Options should be null by default"); | ||
assertFalse(config.getAcknowledgements(), "Acknowledgments should be false by default"); | ||
assertEquals(SqsSourceConfig.DEFAULT_BUFFER_TIMEOUT, config.getBufferTimeout(), "Buffer timeout should default to 10 seconds"); | ||
assertEquals(SqsSourceConfig.DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, config.getNumberOfRecordsToAccumulate(), | ||
"Number of records to accumulate should default to 100"); | ||
assertNull(config.getQueues(), "Queues should be null by default"); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.