Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS source plugin implementation #5274

Merged
merged 15 commits into from
Jan 9, 2025
Merged

Conversation

jmsusanto
Copy link
Contributor

@jmsusanto jmsusanto commented Dec 18, 2024

Description

added the sqs source plugin

Issues Resolved

#1049

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Jeremy Michael and others added 9 commits December 18, 2024 14:58
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
@jmsusanto jmsusanto changed the title SQS source: initial working implementation SQS source plugin implementation Dec 26, 2024
Signed-off-by: Jeremy Michael <[email protected]>

@JsonProperty("sts_role_arn")
@Size(min = 20, max = 2048, message = "awsStsRoleArn length should be between 1 and 2048 characters")
protected String awsStsRoleArn;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to make these are protected? Will there be a possible child class or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great catch, there's no reason for awsStsRoleArn to be protected. I think IntelliJ originally changed it for a test. Changing this to private.


void addSqsObject(final Message message,
final String url,
final BufferAccumulator<Record<Event>> bufferAccumulator,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufferAccumulator is not thread safe. I see that you are creating BufferAccumulator in each SqsWorker thread which is good. Just that I am not sure event processor (or the message handler) becomes another thread? if not, then we are good. If yes then you need make sure that it is not creating any synchronization issues here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you've said, I created each BufferAccumulator in each SqsWorker thread. Within each SqsWorker, no new threads are created, so concurrency shouldn't be an issue here. However, please let me know if my understanding is incorrect here.

Signed-off-by: Jeremy Michael <[email protected]>
Copy link
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write an integration Test like https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsServiceIT.java and https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java

private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false;
private static final Duration DEFAULT_VISIBILITY_TIMEOUT_SECONDS = null;
private static final Duration DEFAULT_VISIBILITY_DUPLICATE_PROTECTION_TIMEOUT = Duration.ofHours(2);
private static final Duration DEFAULT_WAIT_TIME_SECONDS = Duration.ofSeconds(20);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default should also be null so that we use the SQS queue default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQS queue default is 0 seconds which results in short-polling. Is this desirable? There are many benefits to long-polling (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-short-and-long-polling.html#sqs-long-polling)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that long-polling is more desirable. So maybe this would make more sense to use a default of 20 seconds. The main reason I suggested using the queue default is because it would be consistent with the other behaviors.

But, maybe because long polling is a little different from the other configurations, this would be better to use a Data Prepper-defined default.

@kkondaka , Any thoughts on this behavior?

public class QueueConfig {

private static final Integer DEFAULT_MAXIMUM_MESSAGES = null;
private static final Boolean DEFAULT_VISIBILITY_DUPLICATE_PROTECTION = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use this by default. It provides a lot of benefit and the main downside is that it requires an additional permission.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3 source current defaults to not using this, and i think this is more significant in the S3 case since a single message can refer to a large s3 object and hence taking a lot of time.

On the contrary, I don't see a reason not to use this given the benefits

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd like to make the S3 source using this as the default in Data Prepper 3.0. See #3679.

try {
return Arn.fromString(awsStsRoleArn);
} catch (final Exception e) {
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new IllegalArgumentException(String.format("Invalid ARN format for awsStsRoleArn. Check the format of %s", awsStsRoleArn));
throw new IllegalArgumentException(String.format("The value provided for sts_role_arn is not a valid AWS ARN. Provided value: %s", awsStsRoleArn));

Copy link
Contributor Author

@jmsusanto jmsusanto Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make this change in s3 source's aws config as well for consistency

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We should also consolidate this code.

import java.util.Optional;


public class SqsWorker implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code looks to duplicate much of the work from the S3 source. Having duplicate classes is going to be very difficult to maintain as we fix bugs. We should consolidate this into a library.

@Size(max = 5, message = "sts_header_overrides supports a maximum of 5 headers to override")
private Map<String, String> awsStsHeaderOverrides;

void validateStsRoleArn() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use @AssertTrue here.

Something like:

@AssertTrue("The sts_role_arn must be a valid IAM Role ARN.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar to above, we should change this in S3 source too for consistency

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another note: this method isn't used by either s3 source or sqs at the moment

@dlvenable
Copy link
Member

Please write an integration Test like https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsServiceIT.java and https://github.com/opensearch-project/data-prepper/blob/main/data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java

I completely agree, but I am also ok with this coming in a follow-on PR in order to reduce the PR scope.

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jmsusanto ! Just a few more smaller comments now.

@@ -1,76 +1,51 @@
package org.opensearch.dataprepper.plugins.source.sqs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file appears to be missing the copyright header.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. Thought I already looked over every file, my bad

@@ -160,16 +162,18 @@ private void applyBackoff() {
private ReceiveMessageRequest createReceiveMessageRequest() {
ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
.queueUrl(queueConfig.getUrl())
.waitTimeSeconds((int) queueConfig.getWaitTime().getSeconds());
.attributeNamesWithStrings("All")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this configurable because it probably affects performance. But, I think that would be appropriate for a follow-on PR. Using All as the default makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have the customer specify the metadata that they want to intake in the config. Definitely has performance and cost implications

.withData(Collections.singletonMap("message", message.body()))
.build();
final EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute("url", url);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use metadata keys that match the terminology from SQS. So keep this as queueUrl as you had previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

final EventMetadata eventMetadata = event.getMetadata();
eventMetadata.setAttribute("url", url);
final String sentTimestamp = systemAttributes.get(MessageSystemAttributeName.SENT_TIMESTAMP);
eventMetadata.setAttribute("SentTimestamp", sentTimestamp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with other Data Prepper metadata, we should probably use lowerCamelCase here - sentTimestamp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed that, thanks

final String sentTimestamp = systemAttributes.get(MessageSystemAttributeName.SENT_TIMESTAMP);
eventMetadata.setAttribute("SentTimestamp", sentTimestamp);
for (Map.Entry<String, MessageAttributeValue> entry : customAttributes.entrySet()) {
eventMetadata.setAttribute(entry.getKey(), entry.getValue().stringValue());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use this for SentTimestamp as well?

I think it may sense to lowercase the first letter of the key to have consistency with other Data Prepper conventions. So we'd have keys like senderId and messageDeduplicationId.

@kkondaka , Any thoughts on this?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dlvenable I agree. Let's make it consistent with lowercase "s"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the lower the first letter of all custom keys? l think this may confuse the customer since they specify their own custom keys when sending the sqs message. They'll probably expect the exact same key name when referencing it later in the pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is what we mean. I think this may help with consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, changed

@@ -0,0 +1,26 @@
package org.opensearch.dataprepper.plugins.source.sqs;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the copyright header to all the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kkondaka
Copy link
Collaborator

kkondaka commented Jan 7, 2025

@jmsusanto following up from email chain with "cyberark", I think we should parse the sqs message to create multiple events if there are array of events. Can you check with CyberArk folks see how exactly their SQS message with multiple events would look and see if that can be addressed in the SQS source?

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jmsusanto for this contribution and the improvements toward it!

@kkondaka kkondaka merged commit 2aa376e into opensearch-project:main Jan 9, 2025
72 of 79 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants