Skip to content

Commit

Permalink
addressed additional PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Jeremy Michael <[email protected]>
  • Loading branch information
Jeremy Michael committed Feb 4, 2025
1 parent 769d17a commit 3f60821
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs.common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ public class SqsService {
private final PluginFactory pluginFactory;
private final AcknowledgementSetManager acknowledgementSetManager;
private final List<ExecutorService> allSqsUrlExecutorServices;
private final SqsWorkerCommon sqsWorkerCommon;
private final List<SqsWorker> sqsWorkers;
private final Buffer<Record<Event>> buffer;
private final Backoff backoff;
private final Map<String, SqsClient> sqsClientMap = new HashMap<>();
private final AwsCredentialsProvider credentialsProvider;

Expand All @@ -71,8 +69,6 @@ public SqsService(final Buffer<Record<Event>> buffer,
this.allSqsUrlExecutorServices = new ArrayList<>();
this.sqsWorkers = new ArrayList<>();
this.buffer = buffer;
backoff = SqsBackoff.createExponentialBackoff();
sqsWorkerCommon = new SqsWorkerCommon(backoff, pluginMetrics, acknowledgementSetManager);
}

public void start() {
Expand All @@ -82,8 +78,9 @@ public void start() {
String region = extractRegionFromQueueUrl(queueUrl);
SqsClient sqsClient = sqsClientMap.computeIfAbsent(region,
r -> SqsClientFactory.createSqsClient(Region.of(r), credentialsProvider));

String queueName = queueUrl.substring(queueUrl.lastIndexOf('/') + 1);
Backoff backoff = SqsBackoff.createExponentialBackoff();
SqsWorkerCommon sqsWorkerCommon = new SqsWorkerCommon(backoff, pluginMetrics, acknowledgementSetManager);
int numWorkers = queueConfig.getNumWorkers();
SqsEventProcessor sqsEventProcessor;
MessageFieldStrategy strategy;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down

0 comments on commit 3f60821

Please sign in to comment.