Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into mapToList-processor-multi…
Browse files Browse the repository at this point in the history
…ple-entries
  • Loading branch information
niketan16 authored Feb 6, 2025
2 parents 3545fc3 + 137e1e7 commit 59841ce
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 349 deletions.
16 changes: 8 additions & 8 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Taylor Gray | [graytaylor0](https://github.com/graytaylor0) | Amazon |
| Dinu John | [dinujoh](https://github.com/dinujoh) | Amazon |
| Krishna Kondaka | [kkondaka](https://github.com/kkondaka) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| Karsten Schnitter | [KarstenSchnitter](https://github.com/KarstenSchnitter) | SAP |
| David Venable | [dlvenable](https://github.com/dlvenable) | Amazon |
| Hai Yan | [oeyh](https://github.com/oeyh) | Amazon |


## Emeritus

| Maintainer | GitHub ID | Affiliation |
| -------------------- | ----------------------------------------------------- | ----------- |
| Steven Bayer | [sbayer55](https://github.com/sbayer55) | Amazon |
| Christopher Manning | [cmanning09](https://github.com/cmanning09) | Amazon |
| David Powers | [dapowers87](https://github.com/dapowers87) | Amazon |
| Shivani Shukla | [sshivanii](https://github.com/sshivanii) | Amazon |
| Phill Treddenick | [treddeni-amazon](https://github.com/treddeni-amazon) | Amazon |
| Maintainer | GitHub ID | Affiliation |
| ---------------------- | ----------------------------------------------------- | ----------- |
| Steven Bayer | [sbayer55](https://github.com/sbayer55) | Amazon |
| Christopher Manning | [cmanning09](https://github.com/cmanning09) | Amazon |
| Asif Sohail Mohammed | [asifsmohammed](https://github.com/asifsmohammed) | Amazon |
| David Powers | [dapowers87](https://github.com/dapowers87) | Amazon |
| Shivani Shukla | [sshivanii](https://github.com/sshivanii) | Amazon |
| Phill Treddenick | [treddeni-amazon](https://github.com/treddeni-amazon) | Amazon |
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.sqs.common;

import com.fasterxml.jackson.annotation.JsonCreator;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum OnErrorOption {
DELETE_MESSAGES("delete_messages"),
RETAIN_MESSAGES("retain_messages");

private static final Map<String, OnErrorOption> OPTIONS_MAP = Arrays.stream(OnErrorOption.values())
.collect(Collectors.toMap(
value -> value.option,
value -> value
));

private final String option;

OnErrorOption(final String option) {
this.option = option;
}

@JsonCreator
static OnErrorOption fromOptionValue(final String option) {
return OPTIONS_MAP.get(option);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class SqsWorkerCommon {
public static final String SQS_VISIBILITY_TIMEOUT_CHANGED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangedCount";
public static final String SQS_VISIBILITY_TIMEOUT_CHANGE_FAILED_COUNT_METRIC_NAME = "sqsVisibilityTimeoutChangeFailedCount";

private final SqsClient sqsClient;
private final Backoff standardBackoff;
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
Expand All @@ -56,18 +55,17 @@ public class SqsWorkerCommon {
private final Counter sqsVisibilityTimeoutChangedCount;
private final Counter sqsVisibilityTimeoutChangeFailedCount;

public SqsWorkerCommon(final SqsClient sqsClient,
final Backoff standardBackoff,
public SqsWorkerCommon(final Backoff standardBackoff,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {

this.sqsClient = sqsClient;
this.standardBackoff = standardBackoff;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.isStopped = false;
this.failedAttemptCount = 0;


sqsMessagesReceivedCounter = pluginMetrics.counter(SQS_MESSAGES_RECEIVED_METRIC_NAME);
sqsMessagesDeletedCounter = pluginMetrics.counter(SQS_MESSAGES_DELETED_METRIC_NAME);
sqsMessagesFailedCounter = pluginMetrics.counter(SQS_MESSAGES_FAILED_METRIC_NAME);
Expand All @@ -78,6 +76,7 @@ public SqsWorkerCommon(final SqsClient sqsClient,
}

public List<Message> pollSqsMessages(final String queueUrl,
final SqsClient sqsClient,
final Integer maxNumberOfMessages,
final Duration waitTime,
final Duration visibilityTimeout) {
Expand Down Expand Up @@ -134,7 +133,7 @@ public void applyBackoff() {
}
}

public void deleteSqsMessages(final String queueUrl, final List<DeleteMessageBatchRequestEntry> entries) {
public void deleteSqsMessages(final String queueUrl, final SqsClient sqsClient, final List<DeleteMessageBatchRequestEntry> entries) {
if (entries == null || entries.isEmpty() || isStopped) {
return;
}
Expand All @@ -146,7 +145,6 @@ public void deleteSqsMessages(final String queueUrl, final List<DeleteMessageBat
.build();

final DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(request);

if (response.hasSuccessful()) {
final int successCount = response.successful().size();
sqsMessagesDeletedCounter.increment(successCount);
Expand All @@ -164,6 +162,7 @@ public void deleteSqsMessages(final String queueUrl, final List<DeleteMessageBat
}

public void increaseVisibilityTimeout(final String queueUrl,
final SqsClient sqsClient,
final String receiptHandle,
final int newVisibilityTimeoutSeconds,
final String messageIdForLogging) {
Expand Down Expand Up @@ -206,6 +205,10 @@ public Counter getSqsMessagesFailedCounter() {
return sqsMessagesFailedCounter;
}

public Counter getSqsMessagesDeletedCounter() {
return sqsMessagesDeletedCounter;
}

public void stop() {
isStopped = true;
}
Expand Down
Loading

0 comments on commit 59841ce

Please sign in to comment.