-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implementation of sqs-common plugin and refactored sqs and s3 source #5361
base: main
Are you sure you want to change the base?
Conversation
@@ -154,8 +155,7 @@ public void test_sqsService(int numWorkers) throws IOException { | |||
} | |||
|
|||
private void clearSqsQueue() { | |||
Backoff backoff = Backoff.exponential(SqsService.INITIAL_DELAY, SqsService.MAXIMUM_DELAY).withJitter(SqsService.JITTER_RATE) | |||
.withMaxAttempts(Integer.MAX_VALUE); | |||
Backoff backoff = SqsBackoff.createExponentialBackoff(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility that this Backoff strategy could be different for S3 source vs sqs source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Backoff strategy is currently the same for both S3 and SQS sources, as both handle SQS polling. If needed, we can extend SqsBackoff in the future to support options like linear backoff.
@@ -78,24 +55,16 @@ public SqsWorker(final Buffer<Record<Event>> buffer, | |||
final PluginMetrics pluginMetrics, | |||
final SqsEventProcessor sqsEventProcessor, | |||
final Backoff backoff) { | |||
|
|||
this.sqsClient = sqsClient; | |||
this.sqsWorkerCommon = new SqsWorkerCommon(sqsClient, backoff, pluginMetrics, acknowledgementSetManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought, one instance of SqsWorkerCommon
shared between all the workers. It looks like you are instantiating SqsWorkerCommon
per worker. Is that the intention? I am not sure you want per worker an instance of SqsWorkerCommon
like for example, if you take isStopped
flag, should be shared between all the workers. You don't want each worker to have their own isStopped
state - right? I thought, that is why you made isStopped
volatile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, you're absolutely right. In a future PR, I'm planning to support multiple SQS clients for cases where we have queues from different regions. In most scenarios (when the configuration only includes queues from a single region), only one SqsWorkerCommon instance will be created. Generally, we'll have one SqsWorkerCommon per region specified in the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SqsWorkerCommon currently handles polling from an SqsClient, which is tied to a specific AWS region. If customers need to poll from SQS queues in different regions, we’ll need multiple SqsClient instances, and therefore, multiple SqsWorkerCommon instances.
Alternatively, we could remove the SqsClient from SqsWorkerCommon and pass it dynamically to the polling methods there. This makes SqsWorkerCommon region-agnostic.
I am planning to support multiple regions in an upcoming PR, so I’m thinking it’s better to address this there instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am Ok, as long as it is addressed in the next PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for reducing the redundant code
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Signed-off-by: Jeremy Michael <[email protected]>
Description
This PR integrates the sqs-common plugin. The sqs-source has been completely refactored to leverage sqs-common, while only the SqsService has been updated for the s3-source. A full refactor of the s3-source will be addressed in a future PR, but the scope has been intentionally limited for now.
Issue Resolved
#1049
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.