Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,90 @@ NOTE: The same factory can be used to create both `single message` and `batch` c

IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods.

==== Automatic Request Batching with SqsAsyncBatchManager

Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. Please note that this feature is primarily intended for use with `SqsTemplate` and is not recommended for use as a default `SqsAsyncClient` bean that could be injected into `@SqsListener` infrastructure.

IMPORTANT: This is different from the <<Batch Processing,Batch Processing>> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS.

===== Manual Configuration of the Batching Client

Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`.

====== 1. Defining the Batching Client Bean
The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity.

[source,java]
----
@Configuration
public class SqsBatchingConfiguration {

// Define a constant for the bean name to avoid typos
public static final String BATCHING_SQS_ASYNC_CLIENT = "batchingSqsAsyncClient";

@Bean(name = BATCHING_SQS_ASYNC_CLIENT)
public SqsAsyncClient batchingSqsAsyncClient(
// Inject the standard, auto-configured client to wrap it
@Qualifier("sqsAsyncClient") SqsAsyncClient standardSqsAsyncClient) {

ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5);

SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder()
.sqsAsyncClient(standardSqsAsyncClient)
.scheduledExecutor(scheduledExecutor)
.build();

return new BatchingSqsClientAdapter(batchManager);
}
}
----

====== 2. Using the Batching Client
Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`.

[source,java]
----
@Service
public class MyBatchingMessageService {

private final SqsTemplate batchingSqsTemplate;

public MyBatchingMessageService(
@Qualifier(SqsBatchingConfiguration.BATCHING_SQS_ASYNC_CLIENT) SqsAsyncClient batchingClient) {
this.batchingSqsTemplate = SqsTemplate.builder()
.sqsAsyncClient(batchingClient)
.build();
}
// ... service methods using batchingSqsTemplate
}
----

===== Important Considerations & Best Practices

WARNING: A call to `sqsTemplate.sendAsync(...)` returns a `CompletableFuture` that may complete successfully before the message is actually sent to AWS if the batch isn't yet flushed. The actual transmission happens later in a background thread, typically after the configured `sendRequestFrequency` interval. This can lead to **false positives** where the future appears to succeed prematurely. Always attach error handling to the `CompletableFuture` (e.g., via `.exceptionally()` or `.handle()`) to detect and handle real transmission failures. Note: Calling `.join()` on the future will block until the batch is flushed (after `sendRequestFrequency`), ensuring the operation is attempted, but still check for exceptions.

NOTE: The background buffering for `receiveMessage` operations is initialized **lazily**. It only starts after the first call to `sqsTemplate.receiveMessage(...)` is made on a specific queue URL. **This means that no background polling or resource consumption for receiving messages will occur until the application actively attempts to receive a message from that queue for the first time.**

[source,java]
----
CompletableFuture<SendResult<String>> future = batchingSqsTemplate.sendAsync(queueName, message);

future.whenComplete((result, ex) -> {
if (ex != null) {
// This is where you handle the actual transmission error
log.error("Failed to send message to queue {}: {}", queueName, ex.getMessage());
} else {
log.info("Message acknowledged for batch sending with ID: {}", result.messageId());
}
});
----

WARNING: **Not Recommended for `@SqsListener`**. While technically compatible, using this batching client with `@SqsListener` for receiving messages is **not recommended**. The `@SqsListener` infrastructure already performs efficient batch receiving and has a complex acknowledgment lifecycle. Adding another layer of asynchronous batching provides limited performance benefits while significantly increasing complexity. For listeners, it's best to rely on the default `SqsAsyncClient`.

IMPORTANT: **Bean Injection Safety**. By using a named bean and `@Qualifier` as shown in the configuration examples, you ensure the batching client is only used where intended. This prevents it from accidentally being injected into `@SqsListener` infrastructure, which should use the default `SqsAsyncClient`.

IMPORTANT: **AWS SDK Batching Bypass**. The `SqsAsyncBatchManager` will bypass batching for `receiveMessage` calls if certain parameters like `messageAttributeNames` are set on a per-request basis. To ensure batching works effectively, these should be configured globally on the `SqsAsyncBatchManager` builder, not on individual `receiveMessage` calls. See the `BatchingSqsClientAdapter` Javadoc for more details.

==== Container Options

Each `MessageListenerContainer` can have a different set of options.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.operations;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.springframework.util.Assert;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
import software.amazon.awssdk.services.sqs.model.*;

/**
* An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's
* {@link SqsAsyncBatchManager}.
*
* <p>
* This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple
* requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete
* message, and change message visibility.
*
* <p>
* <strong>Important - False Positives Warning:</strong> This adapter processes requests asynchronously through
* batching. Method calls may return successfully before the actual request is sent to AWS SQS. This can result in false
* positives where the operation appears to succeed locally but fails during the actual transmission to AWS.
* Applications should:
* <ul>
* <li>Always handle the returned {@link CompletableFuture} to detect actual transmission errors</li>
* <li>Implement appropriate error handling and monitoring</li>
* <li>Consider retry mechanisms for critical operations</li>
* </ul>
*
* <p>
* <strong>Batch Optimization:</strong> The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses batching
* for {@code receiveMessage} calls that include per-request configurations for certain parameters. To ensure batching
* is not bypassed, it is recommended to configure these settings globally on the {@code SqsAsyncBatchManager} builder
* instead of on each {@code ReceiveMessageRequest}. The parameters that trigger this bypass are:
* <ul>
* <li>{@code messageAttributeNames}</li>
* <li>{@code messageSystemAttributeNames}</li>
* <li>{@code messageSystemAttributeNamesWithStrings}</li>
* <li>{@code overrideConfiguration}</li>
* </ul>
* <p>
* By configuring these globally on the manager, you ensure consistent batching performance. If you require per-request
* attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be more appropriate.
* @author Heechul Kang
* @since 3.2
Copy link
Contributor

@tomazfernandes tomazfernandes Oct 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be included in the upcoming 4.0.0 release.

* @see SqsAsyncBatchManager
* @see SqsAsyncClient
*/
public class BatchingSqsClientAdapter implements SqsAsyncClient {
private final SqsAsyncBatchManager batchManager;

/**
* Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager.
*
* @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations
* @throws IllegalArgumentException if batchManager is null
*/
public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) {
Assert.notNull(batchManager, "batchManager cannot be null");
this.batchManager = batchManager;
}

@Override
public String serviceName() {
return SqsAsyncClient.SERVICE_NAME;
}

/**
* Closes the underlying batch manager and releases associated resources.
*
* <p>
* This method should be called when the adapter is no longer needed to ensure proper cleanup of threads and
* connections.
*/
@Override
public void close() {
batchManager.close();
}

/**
* Sends a message to the specified SQS queue using automatic batching.
*
* <p>
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously.
* Handle the returned {@link CompletableFuture} to detect transmission errors.
*
* @param sendMessageRequest the request containing queue URL and message details
* @return a {@link CompletableFuture} that completes with the send result
*/
@Override
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest sendMessageRequest) {
return batchManager.sendMessage(sendMessageRequest);
}

/**
* Sends a message to the specified SQS queue using automatic batching.
*
* <p>
* <strong>Important:</strong> This method returns immediately, but the actual sending is performed asynchronously.
* Handle the returned {@link CompletableFuture} to detect transmission errors.
*
* @param sendMessageRequest a consumer to configure the send message request
* @return a {@link CompletableFuture} that completes with the send result
*/
@Override
public CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
return batchManager.sendMessage(sendMessageRequest);
}

/**
* Receives messages from the specified SQS queue using automatic batching.
*
* <p>
* The batching manager may combine multiple receive requests to optimize AWS API usage.
*
* @param receiveMessageRequest the request containing queue URL and receive options
* @return a {@link CompletableFuture} that completes with the received messages
*/
@Override
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest receiveMessageRequest) {
return batchManager.receiveMessage(receiveMessageRequest);
}

/**
* Receives messages from the specified SQS queue using automatic batching.
*
* <p>
* The batching manager may combine multiple receive requests to optimize AWS API usage.
*
* @param receiveMessageRequest a consumer to configure the receive message request
* @return a {@link CompletableFuture} that completes with the received messages
*/
@Override
public CompletableFuture<ReceiveMessageResponse> receiveMessage(
Consumer<ReceiveMessageRequest.Builder> receiveMessageRequest) {
return batchManager.receiveMessage(receiveMessageRequest);
}

/**
* Deletes a message from the specified SQS queue using automatic batching.
*
* <p>
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned
* {@link CompletableFuture} to confirm successful deletion.
*
* @param deleteMessageRequest the request containing queue URL and receipt handle
* @return a {@link CompletableFuture} that completes with the deletion result
*/
@Override
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest deleteMessageRequest) {
return batchManager.deleteMessage(deleteMessageRequest);
}

/**
* Deletes a message from the specified SQS queue using automatic batching.
*
* <p>
* <strong>Important:</strong> The actual deletion may be delayed due to batching. Handle the returned
* {@link CompletableFuture} to confirm successful deletion.
*
* @param deleteMessageRequest a consumer to configure the delete message request
* @return a {@link CompletableFuture} that completes with the deletion result
*/
@Override
public CompletableFuture<DeleteMessageResponse> deleteMessage(
Consumer<DeleteMessageRequest.Builder> deleteMessageRequest) {
return batchManager.deleteMessage(deleteMessageRequest);
}

/**
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
*
* <p>
* The batching manager may combine multiple visibility change requests to optimize AWS API usage.
*
* @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout
* @return a {@link CompletableFuture} that completes with the visibility change result
*/
@Override
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
ChangeMessageVisibilityRequest changeMessageVisibilityRequest) {
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);
}

/**
* Changes the visibility timeout of a message in the specified SQS queue using automatic batching.
*
* <p>
* The batching manager may combine multiple visibility change requests to optimize AWS API usage.
*
* @param changeMessageVisibilityRequest a consumer to configure the change visibility request
* @return a {@link CompletableFuture} that completes with the visibility change result
*/
@Override
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
Consumer<ChangeMessageVisibilityRequest.Builder> changeMessageVisibilityRequest) {
return batchManager.changeMessageVisibility(changeMessageVisibilityRequest);
}
}
Loading
Loading