Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS source plugin implementation #5274

Merged
merged 15 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 15 additions & 63 deletions data-prepper-plugins/sqs-source/README.md
Original file line number Diff line number Diff line change
@@ -1,70 +1,22 @@
# SQS Source
# SQS Source

This source allows Data Prepper to use SQS as a source. It uses SQS for notifications
of which data are new and loads those messages to push out events.
This source allows Data Prepper to use SQS as a source. It reads messages from specified SQS queues and processes them into events.

### Example:
## Example Configuration

The following configuration shows a minimum configuration for reading and Sqs messages and push out events.

```
```yaml
sqs-pipeline:
source:
sqs:
acknowledgments: true
queue_urls:
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-1
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-2
- https://sqs.us-east-1.amazonaws.com/895099421235/MyQueue-3
number_of_threads : 1
batch_size : 10
visibility_timeout: PT30S
wait_time : PT20S
queues:
- url: <SQS_QUEUE_URL_1>
batch_size: 10
workers: 1
- url: <SQS_QUEUE_URL_2>
batch_size: 10
workers: 1
aws:
sts_region: us-east-1
sts_role_arn: arn:aws:iam::895099421235:role/test-arn
```

## Configuration Options

All Duration values are a string that represents a duration. They support ISO_8601 notation string ("PT20.345S", "PT15M", etc.) as well as simple notation Strings for seconds ("60s") and milliseconds ("1500ms").

* `queue_url or queue_urls` (Required) : The SQS configuration. See [SQS Configuration](#sqs_configuration) for details.

* `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details.

* `acknowledgments` (Optional) : Enables End-to-end acknowledgments. If set to `true`, sqs message is deleted only after all events from the sqs message are successfully acknowledged by all sinks. Default value `false`.

### <a name="sqs_configuration">SQS Configuration</a>

* `number_of_threads` (Optional) : define no of threads for sqs queue processing. default to 1.
* `batch_size` (Optional) : define batch size for sqs messages processing. default to 10.
* `polling_frequency` (Optional) : Duration - A delay to place between reading and processing a batch of SQS messages and making a subsequent request. Defaults to 0 seconds.
* `visibility_timeout` (Optional) : Duration - The visibility timeout to apply to messages read from the SQS queue. Defaults to null
* `wait_time` (Optional) : Duration - The time to wait for long-polling on the SQS API. Defaults to null.

### <a name="aws_configuration">AWS Configuration</a>

The AWS configuration is the same for both SQS.

* `sts_region` (Optional) : The AWS region to use for credentials. Defaults to [standard SDK behavior to determine the region](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/region-selection.html).
* `sts_role_arn` (Optional) : The AWS STS role to assume for requests to SQS. Defaults to null, which will use the [standard SDK behavior for credentials](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html).
* `sts_header_overrides` (Optional): A map of header overrides to make when assuming the IAM role for the sink plugin.

## Metrics

* `sqsMessagesReceived` - The number of SQS messages received from the queue by the SQS Source.
* `sqsMessagesDeleted` - The number of SQS messages deleted from the queue by the SQS Source.
* `sqsMessagesFailed` - The number of SQS messages that the SQS Source failed to parse.
* `sqsMessagesDeleteFailed` - The number of SQS messages that the SQS Source failed to delete from the SQS queue.
* `acknowledgementSetCallbackCounter` - The number of SQS messages processed by SQS Source and successfully acknowledge by sink.

## Developer Guide

The integration tests for this plugin do not run as part of the Data Prepper build.

The following command runs the integration tests:

```
./gradlew :data-prepper-plugins:sqs-source:integrationTest -Dtests.sqs.source.aws.region=<your-aws-region> -Dtests.sqs.source.queue.url=<your-queue-url>
```
region: <AWS_REGION>
sts_role_arn: <IAM_ROLE_ARN>
sink:
- stdout:
45 changes: 12 additions & 33 deletions data-prepper-plugins/sqs-source/build.gradle
Original file line number Diff line number Diff line change
@@ -1,49 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

plugins {
id 'java'
}

dependencies {
implementation project(':data-prepper-api')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-sqs-common')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:common')
implementation libs.armeria.core
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:sqs'
implementation 'software.amazon.awssdk:arns'
implementation 'software.amazon.awssdk:sts'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-annotations'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'org.hibernate.validator:hibernate-validator:8.0.1.Final'
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
testImplementation project(':data-prepper-plugins:blocking-buffer')
}
test {
useJUnitPlatform()
}

sourceSets {
integrationTest {
java {
compileClasspath += main.output + test.output
runtimeClasspath += main.output + test.output
srcDir file('src/integrationTest/java')
}
resources.srcDir file('src/integrationTest/resources')
}
}

configurations {
integrationTestImplementation.extendsFrom testImplementation
integrationTestRuntime.extendsFrom testRuntime
}

task integrationTest(type: Test) {
group = 'verification'
testClassesDirs = sourceSets.integrationTest.output.classesDirs

useJUnitPlatform()

classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'tests.sqs.source.aws.region', System.getProperty('tests.sqs.source.aws.region')
systemProperty 'tests.sqs.source.queue.url', System.getProperty('tests.sqs.source.queue.url')

filter {
includeTestsMatching '*IT'
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.sqs;

import org.opensearch.dataprepper.aws.api.AwsCredentialsOptions;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;

class AwsAuthenticationAdapter {
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final SqsSourceConfig sqsSourceConfig;


AwsAuthenticationAdapter(
final AwsCredentialsSupplier awsCredentialsSupplier,
final SqsSourceConfig sqsSourceConfig) {
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sqsSourceConfig = sqsSourceConfig;
}

AwsCredentialsProvider getCredentialsProvider() {
final AwsAuthenticationOptions awsAuthenticationOptions = sqsSourceConfig.getAwsAuthenticationOptions();

final AwsCredentialsOptions options = AwsCredentialsOptions.builder()
.withStsRoleArn(awsAuthenticationOptions.getAwsStsRoleArn())
.withRegion(awsAuthenticationOptions.getAwsRegion())
.withStsHeaderOverrides(awsAuthenticationOptions.getAwsStsHeaderOverrides())
.withStsExternalId(awsAuthenticationOptions.getAwsStsExternalId())
.build();

return awsCredentialsSupplier.getProvider(options);
}
}
Loading
Loading