From f42e019a9cb1b2b1270dc5fa9931beabe3500c57 Mon Sep 17 00:00:00 2001 From: heechul Date: Sun, 17 Aug 2025 15:47:32 +0900 Subject: [PATCH 1/7] Refactor: Focus on BatchingSqsClientAdapter; --- docs/src/main/asciidoc/sqs.adoc | 125 ++++++++ .../autoconfigure/sqs/SqsProperties.java | 161 ++++++++++ .../sqs/SqsAutoConfigurationTest.java | 162 ++++++++++ .../source/AbstractSqsMessageSource.java | 9 +- .../operations/BatchingSqsClientAdapter.java | 228 +++++++++++++ .../cloud/sqs/operations/SqsTemplate.java | 8 +- ...chingSqsClientAdapterIntegrationTests.java | 300 ++++++++++++++++++ .../BatchingSqsClientAdapterTests.java | 284 +++++++++++++++++ 8 files changed, 1272 insertions(+), 5 deletions(-) create mode 100644 spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java create mode 100644 spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java create mode 100644 spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index e045d0911..6d2ba76b1 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -806,6 +806,131 @@ NOTE: The same factory can be used to create both `single message` and `batch` c IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods. +==== Automatic Batching with AWS SDK + +Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. + +IMPORTANT: This is different from the <> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. + +===== Enabling Automatic Batching + +To enable automatic batching, set the following property: + +[source,properties] +---- +spring.cloud.aws.sqs.batch.enabled=true +---- + +When enabled, Spring Cloud AWS will automatically wrap the `SqsAsyncClient` with a `BatchingSqsClientAdapter` that uses `SqsAsyncBatchManager` internally. + +===== Configuration Properties + +The following properties can be used to configure the batching behavior: + +[source,properties] +---- +# Enable automatic batching (default: false) +spring.cloud.aws.sqs.batch.enabled=true + +# Maximum number of messages in a batch (default: AWS SDK default, max: 10) +spring.cloud.aws.sqs.batch.max-number-of-messages=10 + +# Frequency at which batched requests are sent (default: AWS SDK default) +spring.cloud.aws.sqs.batch.send-batch-frequency=PT0.2S + +# Visibility timeout for received messages (default: queue default) +spring.cloud.aws.sqs.batch.visibility-timeout=PT30S + +# Wait time for receiveMessage requests (default: AWS SDK default) +spring.cloud.aws.sqs.batch.wait-time-seconds=PT5S + +# System attributes to request for receiveMessage calls +spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount + +# Message attributes to request for receiveMessage calls +spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId +---- + +===== Important Considerations + +WARNING: When using automatic batching, operations are processed asynchronously by the AWS SDK. This means that a method call may return successfully, but the actual request to AWS SQS might fail later during the batching process. This can result in **false positives** where operations appear to succeed locally but fail during transmission. + +Applications should: + +- **Always handle the returned `CompletableFuture`** to detect actual transmission errors +- **Implement appropriate error handling and monitoring** to detect delayed failures +- **Consider retry mechanisms** for critical operations + +IMPORTANT: **Batch Manager Bypass**: The AWS SDK batch manager will bypass batching and send regular asynchronous requests when `receiveMessage` is called with any of the following parameters: + +- `messageAttributeNames` +- `messageSystemAttributeNames` +- `messageSystemAttributeNamesWithStrings` (not used in Spring Cloud AWS `ReceiveMessageRequest`) +- `overrideConfiguration` (not used in Spring Cloud AWS `ReceiveMessageRequest`) + +When these parameters are used, the performance benefits of batching are lost for those specific requests. + +**Note**: When using Spring Cloud AWS's automatic batching feature, `SqsTemplate` automatically excludes `messageAttributeNames` and `messageSystemAttributeNames` from individual `receiveMessage` requests to maintain batching efficiency. These attributes should be configured globally in the batch configuration instead: + +[source,properties] +---- +# Configure globally for batched requests +spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount +spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId +---- + +If you need to use different attribute configurations per request, consider disabling automatic batching and using the standard `SqsAsyncClient` instead. + +Example of proper error handling: + +[source,java] +---- +@Service +public class MessageService { + + private final SqsTemplate sqsTemplate; + + public MessageService(SqsTemplate sqsTemplate) { + this.sqsTemplate = sqsTemplate; + } + + public void sendMessage(String queueName, String message) { + CompletableFuture> future = sqsTemplate.sendAsync(queueName, message); + + future.whenComplete((result, throwable) -> { + if (throwable != null) { + // Handle actual transmission error + log.error("Failed to send message to queue {}: {}", queueName, throwable.getMessage()); + // Implement retry or alternative handling logic + } else { + // Message sent successfully + log.info("Message sent successfully with ID: {}", result.messageId()); + } + }); + } +} +---- + +===== Performance Benefits + +Automatic batching provides several benefits: + +- **Reduced API calls**: Multiple operations are combined into single API calls +- **Lower costs**: Fewer API calls result in reduced AWS charges +- **Improved throughput**: Batching reduces network overhead and latency +- **Better resource utilization**: More efficient use of network and AWS resources + +===== Compatibility + +Automatic batching is compatible with: + +- `SqsTemplate` for sending and receiving messages +- `@SqsListener` methods for message processing +- Both standard and FIFO queues +- All message conversion and error handling features + +The batching is transparent to application code - existing `SqsTemplate` and `@SqsListener` code continues to work without changes. + ==== Container Options Each `MessageListenerContainer` can have a different set of options. diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java index aafbe9daa..079680856 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java @@ -18,8 +18,10 @@ import io.awspring.cloud.autoconfigure.AwsClientProperties; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; import java.time.Duration; +import java.util.List; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.lang.Nullable; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; /** * Properties related to AWS SQS. @@ -38,6 +40,8 @@ public class SqsProperties extends AwsClientProperties { private Listener listener = new Listener(); + private Batch batch = new Batch(); + public Listener getListener() { return this.listener; } @@ -46,6 +50,14 @@ public void setListener(Listener listener) { this.listener = listener; } + public Batch getBatch() { + return batch; + } + + public void setBatch(Batch batch) { + this.batch = batch; + } + @Nullable private QueueNotFoundStrategy queueNotFoundStrategy; @@ -153,6 +165,155 @@ public Boolean getAutoStartup() { public void setAutoStartup(Boolean autoStartup) { this.autoStartup = autoStartup; } + + } + + /** + * Configuration properties for SQS automatic batching using AWS SDK's {@code SqsAsyncBatchManager}. + * + *

+ * Automatic batching improves performance and reduces costs by combining multiple SQS requests into fewer AWS API + * calls. When enabled, Spring Cloud AWS will use a {@code BatchingSqsClientAdapter} that wraps the standard + * {@code SqsAsyncClient} with batching capabilities. + * + *

+ * Important: Batched operations are processed asynchronously, which may result in false positives + * where method calls appear to succeed locally but fail during actual transmission to AWS. Applications should + * handle the returned {@code CompletableFuture} objects to detect actual transmission errors. + * + * @since 3.2 + */ + public static class Batch { + + /** + * Enables SQS automatic batching using AWS SDK's SqsAsyncBatchManager. + * + *

+ * When set to {@code true}, the {@code SqsAsyncClient} bean will be wrapped with a + * {@code BatchingSqsClientAdapter} that automatically batches requests to improve performance and reduce AWS + * API calls. + * + *

+ * Default is {@code false}. + */ + private boolean enabled = false; + + /** + * The maximum number of messages that can be processed in a single batch. The maximum is 10. + */ + @Nullable + private Integer maxNumberOfMessages; + + /** + * The frequency at which requests are sent to SQS when processing messages in a batch. + */ + @Nullable + private Duration sendBatchFrequency; + + /** + * The visibility timeout to set for messages received in a batch. If unset, the queue default is used. + */ + @Nullable + private Duration visibilityTimeout; + + /** + * The minimum wait duration for a receiveMessage request in a batch. To avoid unnecessary CPU usage, do not set + * this value to 0. + */ + @Nullable + private Duration waitTimeSeconds; + + /** + * The list of system attribute names to request for receiveMessage calls. + */ + @Nullable + private List systemAttributeNames; + + /** + * The list of attribute names to request for receiveMessage calls. + */ + @Nullable + private List attributeNames; + + /** + * The size of the scheduled thread pool used for batching operations. This thread pool handles periodic batch + * sending and other scheduled tasks. + * + *

+ * Default is {@code 5}. + */ + private int scheduledExecutorPoolSize = 5; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + @Nullable + public Integer getMaxNumberOfMessages() { + return maxNumberOfMessages; + } + + public void setMaxNumberOfMessages(Integer maxNumberOfMessages) { + this.maxNumberOfMessages = maxNumberOfMessages; + } + + @Nullable + public Duration getSendBatchFrequency() { + return sendBatchFrequency; + } + + public void setSendBatchFrequency(Duration sendBatchFrequency) { + this.sendBatchFrequency = sendBatchFrequency; + } + + @Nullable + public Duration getVisibilityTimeout() { + return visibilityTimeout; + } + + public void setVisibilityTimeout(Duration visibilityTimeout) { + this.visibilityTimeout = visibilityTimeout; + } + + @Nullable + public Duration getWaitTimeSeconds() { + return waitTimeSeconds; + } + + public void setWaitTimeSeconds(Duration waitTimeSeconds) { + this.waitTimeSeconds = waitTimeSeconds; + } + + @Nullable + public List getSystemAttributeNames() { + return systemAttributeNames; + } + + public void setSystemAttributeNames(List systemAttributeNames) { + this.systemAttributeNames = systemAttributeNames; + } + + @Nullable + public List getAttributeNames() { + return attributeNames; + } + + public void setAttributeNames(List attributeNames) { + this.attributeNames = attributeNames; + } + + public int getScheduledExecutorPoolSize() { + return scheduledExecutorPoolSize; + } + + public void setScheduledExecutorPoolSize(int scheduledExecutorPoolSize) { + this.scheduledExecutorPoolSize = scheduledExecutorPoolSize; + } + } } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index 83a4fbec5..ec605f789 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -33,6 +33,7 @@ import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; +import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; @@ -60,6 +61,7 @@ * * @author Tomaz Fernandes * @author Wei Jiang + * @author Heechul Kang */ class SqsAutoConfigurationTest { @@ -303,6 +305,166 @@ void configuresMessageConverter() { }); } + @Test + void sqsBatchAutoConfigurationIsDisabledByDefault() { + this.contextRunner.run(context -> { + assertThat(context).hasSingleBean(SqsAsyncClient.class); + SqsAsyncClient client = context.getBean(SqsAsyncClient.class); + assertThat(client).isNotInstanceOf(BatchingSqsClientAdapter.class); + }); + } + + @Test + void sqsBatchAutoConfigurationIsEnabled() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true").run(context -> { + assertThat(context.getBeansOfType(SqsAsyncClient.class)).hasSize(2); + + SqsAsyncClient primary = context.getBean(SqsAsyncClient.class); + assertThat(primary).isInstanceOf(BatchingSqsClientAdapter.class); + + assertThat(context).hasBean("sqsAsyncClient"); + assertThat(context).hasBean("batchSqsAsyncClient"); + }); + } + + @Test + void sqsBatchConfigurationProperties() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.max-number-of-messages:5", + "spring.cloud.aws.sqs.batch.send-batch-frequency:PT0.5S").run(context -> { + SqsAsyncClient client = context.getBean(SqsAsyncClient.class); + assertThat(client).isInstanceOf(BatchingSqsClientAdapter.class); + }); + } + + @Test + void sqsBatchConfigurationPropertiesWithAllSettings() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.max-number-of-messages:8", + "spring.cloud.aws.sqs.batch.send-batch-frequency:PT1S", + "spring.cloud.aws.sqs.batch.visibility-timeout:PT30S", + "spring.cloud.aws.sqs.batch.wait-time-seconds:PT5S", + "spring.cloud.aws.sqs.batch.system-attribute-names:SentTimestamp,ApproximateReceiveCount", + "spring.cloud.aws.sqs.batch.attribute-names:attr1,attr2").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getMaxNumberOfMessages()).isEqualTo(8); + assertThat(batchConfig.getSendBatchFrequency()).isEqualTo(Duration.ofSeconds(1)); + assertThat(batchConfig.getVisibilityTimeout()).isEqualTo(Duration.ofSeconds(30)); + assertThat(batchConfig.getWaitTimeSeconds()).isEqualTo(Duration.ofSeconds(5)); + assertThat(batchConfig.getSystemAttributeNames()).containsExactly( + software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP, + software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT); + assertThat(batchConfig.getAttributeNames()).containsExactly("attr1", "attr2"); + + SqsAsyncClient client = context.getBean(SqsAsyncClient.class); + assertThat(client).isInstanceOf(BatchingSqsClientAdapter.class); + }); + } + + @Test + void sqsBatchConfigurationPropertiesWithDefaults() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:false").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isFalse(); + assertThat(batchConfig.getMaxNumberOfMessages()).isNull(); + assertThat(batchConfig.getSendBatchFrequency()).isNull(); + assertThat(batchConfig.getVisibilityTimeout()).isNull(); + assertThat(batchConfig.getWaitTimeSeconds()).isNull(); + assertThat(batchConfig.getSystemAttributeNames()).isNull(); + assertThat(batchConfig.getAttributeNames()).isNull(); + assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5); + + assertThat(context).hasSingleBean(SqsAsyncClient.class); + SqsAsyncClient client = context.getBean(SqsAsyncClient.class); + assertThat(client).isNotInstanceOf(BatchingSqsClientAdapter.class); + }); + } + + @Test + void sqsBatchConfigurationWithVisibilityTimeout() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.visibility-timeout:PT60S").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getVisibilityTimeout()).isEqualTo(Duration.ofSeconds(60)); + }); + } + + @Test + void sqsBatchConfigurationWithWaitTimeSeconds() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.wait-time-seconds:PT20S").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getWaitTimeSeconds()).isEqualTo(Duration.ofSeconds(20)); + }); + } + + @Test + void sqsBatchConfigurationWithAttributeNames() { + this.contextRunner + .withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.attribute-names:MessageGroupId,MessageDeduplicationId") + .run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getAttributeNames()).containsExactly("MessageGroupId", + "MessageDeduplicationId"); + }); + } + + @Test + void sqsBatchConfigurationWithDefaultScheduledExecutorPoolSize() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5); + + assertThat(context).hasBean("sqsBatchingScheduledExecutor"); + }); + } + + @Test + void sqsBatchConfigurationWithCustomScheduledExecutorPoolSize() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", + "spring.cloud.aws.sqs.batch.scheduled-executor-pool-size:10").run(context -> { + assertThat(context).hasSingleBean(SqsProperties.class); + SqsProperties sqsProperties = context.getBean(SqsProperties.class); + SqsProperties.Batch batchConfig = sqsProperties.getBatch(); + + assertThat(batchConfig.isEnabled()).isTrue(); + assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(10); + + assertThat(context).hasBean("sqsBatchingScheduledExecutor"); + }); + } + + @Test + void sqsBatchConfigurationWithBatchDisabledDoesNotCreateScheduledExecutor() { + this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:false").run(context -> { + assertThat(context).doesNotHaveBean("sqsBatchingScheduledExecutor"); + }); + } + @Configuration(proxyBeanMethods = false) static class CustomComponentsConfiguration { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java index 86f3e5ce1..3a04900d8 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java @@ -33,6 +33,8 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; + +import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.lang.Nullable; @@ -61,6 +63,7 @@ * @param the {@link Message} payload type. * * @author Tomaz Fernandes + * @author Heechul Kang * @since 3.0 */ public abstract class AbstractSqsMessageSource extends AbstractPollingMessageSource @@ -177,13 +180,15 @@ private ReceiveMessageRequest createRequest(int maxNumberOfMessages) { .builder() .queueUrl(this.queueUrl) .maxNumberOfMessages(maxNumberOfMessages) - .attributeNamesWithStrings(this.messageSystemAttributeNames) - .messageAttributeNames(this.messageAttributeNames) .waitTimeSeconds(this.pollTimeout); customizeRequest(builder); if (this.messageVisibility >= 0) { builder.visibilityTimeout(this.messageVisibility); } + if (!(this.sqsAsyncClient instanceof BatchingSqsClientAdapter)) { + builder.messageAttributeNames(this.messageAttributeNames) + .attributeNamesWithStrings(this.messageSystemAttributeNames); + } return builder.build(); } // @formatter:on diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java new file mode 100644 index 000000000..c58ea2fb7 --- /dev/null +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java @@ -0,0 +1,228 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.operations; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.springframework.util.Assert; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * An {@link SqsAsyncClient} adapter that provides automatic batching capabilities using AWS SDK's + * {@link SqsAsyncBatchManager}. + * + *

+ * This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple + * requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete + * message, and change message visibility. + * + *

+ * Important - False Positives Warning: This adapter processes requests asynchronously through + * batching. Method calls may return successfully before the actual request is sent to AWS SQS. This can result in false + * positives where the operation appears to succeed locally but fails during the actual transmission to AWS. + * Applications should: + *

    + *
  • Always handle the returned {@link CompletableFuture} to detect actual transmission errors
  • + *
  • Implement appropriate error handling and monitoring
  • + *
  • Consider retry mechanisms for critical operations
  • + *
+ * + *

+ * Batch Optimization: The AWS SDK bypasses batching when {@code receiveMessage} is called with any of + * the following parameters: {@code messageAttributeNames}, {@code messageSystemAttributeNames}, + * {@code messageSystemAttributeNamesWithStrings}, or {@code overrideConfiguration}. To maintain consistent batching + * performance, Spring Cloud AWS handles these parameters as follows: + *

    + *
  • {@code messageAttributeNames} - excluded from per-request, configured globally via + * {@code spring.cloud.aws.sqs.batch.attribute-names}
  • + *
  • {@code messageSystemAttributeNames} - excluded from per-request, configured globally via + * {@code spring.cloud.aws.sqs.batch.system-attribute-names}
  • + *
  • {@code messageSystemAttributeNamesWithStrings} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}
  • + *
  • {@code overrideConfiguration} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}
  • + *
+ *

+ * This design prevents batch bypass and ensures optimal performance. If per-request attribute configuration is + * required, consider disabling automatic batching. + * + *

+ * This adapter is automatically configured by Spring Cloud AWS when automatic batching is enabled. Users do not need to + * create instances directly - instead, enable batching through configuration: + * + *

+ * spring.cloud.aws.sqs.batch.enabled = true
+ * 
+ * + *

+ * Once enabled, all {@code SqsTemplate} operations will automatically use batching transparently. + * + * @author Heechul Kang + * @since 3.2 + * @see SqsAsyncBatchManager + * @see SqsAsyncClient + */ +public class BatchingSqsClientAdapter implements SqsAsyncClient { + private final SqsAsyncBatchManager batchManager; + + /** + * Creates a new {@code BatchingSqsClientAdapter} with the specified batch manager. + * + * @param batchManager the {@link SqsAsyncBatchManager} to use for batching operations + * @throws IllegalArgumentException if batchManager is null + */ + public BatchingSqsClientAdapter(SqsAsyncBatchManager batchManager) { + Assert.notNull(batchManager, "batchManager cannot be null"); + this.batchManager = batchManager; + } + + @Override + public String serviceName() { + return SqsAsyncClient.SERVICE_NAME; + } + + /** + * Closes the underlying batch manager and releases associated resources. + * + *

+ * This method should be called when the adapter is no longer needed to ensure proper cleanup of threads and + * connections. + */ + @Override + public void close() { + batchManager.close(); + } + + /** + * Sends a message to the specified SQS queue using automatic batching. + * + *

+ * Important: This method returns immediately, but the actual sending is performed asynchronously. + * Handle the returned {@link CompletableFuture} to detect transmission errors. + * + * @param sendMessageRequest the request containing queue URL and message details + * @return a {@link CompletableFuture} that completes with the send result + */ + @Override + public CompletableFuture sendMessage(SendMessageRequest sendMessageRequest) { + return batchManager.sendMessage(sendMessageRequest); + } + + /** + * Sends a message to the specified SQS queue using automatic batching. + * + *

+ * Important: This method returns immediately, but the actual sending is performed asynchronously. + * Handle the returned {@link CompletableFuture} to detect transmission errors. + * + * @param sendMessageRequest a consumer to configure the send message request + * @return a {@link CompletableFuture} that completes with the send result + */ + @Override + public CompletableFuture sendMessage(Consumer sendMessageRequest) { + return batchManager.sendMessage(sendMessageRequest); + } + + /** + * Receives messages from the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple receive requests to optimize AWS API usage. + * + * @param receiveMessageRequest the request containing queue URL and receive options + * @return a {@link CompletableFuture} that completes with the received messages + */ + @Override + public CompletableFuture receiveMessage(ReceiveMessageRequest receiveMessageRequest) { + return batchManager.receiveMessage(receiveMessageRequest); + } + + /** + * Receives messages from the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple receive requests to optimize AWS API usage. + * + * @param receiveMessageRequest a consumer to configure the receive message request + * @return a {@link CompletableFuture} that completes with the received messages + */ + @Override + public CompletableFuture receiveMessage( + Consumer receiveMessageRequest) { + return batchManager.receiveMessage(receiveMessageRequest); + } + + /** + * Deletes a message from the specified SQS queue using automatic batching. + * + *

+ * Important: The actual deletion may be delayed due to batching. Handle the returned + * {@link CompletableFuture} to confirm successful deletion. + * + * @param deleteMessageRequest the request containing queue URL and receipt handle + * @return a {@link CompletableFuture} that completes with the deletion result + */ + @Override + public CompletableFuture deleteMessage(DeleteMessageRequest deleteMessageRequest) { + return batchManager.deleteMessage(deleteMessageRequest); + } + + /** + * Deletes a message from the specified SQS queue using automatic batching. + * + *

+ * Important: The actual deletion may be delayed due to batching. Handle the returned + * {@link CompletableFuture} to confirm successful deletion. + * + * @param deleteMessageRequest a consumer to configure the delete message request + * @return a {@link CompletableFuture} that completes with the deletion result + */ + @Override + public CompletableFuture deleteMessage( + Consumer deleteMessageRequest) { + return batchManager.deleteMessage(deleteMessageRequest); + } + + /** + * Changes the visibility timeout of a message in the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple visibility change requests to optimize AWS API usage. + * + * @param changeMessageVisibilityRequest the request containing queue URL, receipt handle, and new timeout + * @return a {@link CompletableFuture} that completes with the visibility change result + */ + @Override + public CompletableFuture changeMessageVisibility( + ChangeMessageVisibilityRequest changeMessageVisibilityRequest) { + return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); + } + + /** + * Changes the visibility timeout of a message in the specified SQS queue using automatic batching. + * + *

+ * The batching manager may combine multiple visibility change requests to optimize AWS API usage. + * + * @param changeMessageVisibilityRequest a consumer to configure the change visibility request + * @return a {@link CompletableFuture} that completes with the visibility change result + */ + @Override + public CompletableFuture changeMessageVisibility( + Consumer changeMessageVisibilityRequest) { + return batchManager.changeMessageVisibility(changeMessageVisibilityRequest); + } +} diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java index 901affd03..f803f6742 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java @@ -605,9 +605,7 @@ private CompletableFuture createReceiveMessageRequest(Str private ReceiveMessageRequest doCreateReceiveMessageRequest(Duration pollTimeout, Integer maxNumberOfMessages, QueueAttributes attributes, Map additionalHeaders) { ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.builder().queueUrl(attributes.getQueueUrl()) - .maxNumberOfMessages(maxNumberOfMessages).messageAttributeNames(this.messageAttributeNames) - .attributeNamesWithStrings(this.messageSystemAttributeNames) - .waitTimeSeconds(toInt(pollTimeout.toSeconds())); + .maxNumberOfMessages(maxNumberOfMessages).waitTimeSeconds(toInt(pollTimeout.toSeconds())); if (additionalHeaders.containsKey(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER)) { builder.visibilityTimeout( toInt(getValueAs(additionalHeaders, SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Duration.class) @@ -618,6 +616,10 @@ private ReceiveMessageRequest doCreateReceiveMessageRequest(Duration pollTimeout getValueAs(additionalHeaders, SqsHeaders.SQS_RECEIVE_REQUEST_ATTEMPT_ID_HEADER, UUID.class) .toString()); } + if (!(this.sqsAsyncClient instanceof BatchingSqsClientAdapter)) { + builder.messageAttributeNames(this.messageAttributeNames) + .attributeNamesWithStrings(this.messageSystemAttributeNames); + } return builder.build(); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java new file mode 100644 index 000000000..031cd74a8 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java @@ -0,0 +1,300 @@ +/* + * Copyright 2013-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.integration; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * Integration tests for the Sqs Batching Client Adapter. + * + * @author Heechul Kang + */ +@SpringBootTest +public class BatchingSqsClientAdapterIntegrationTests extends BaseSqsIntegrationTest { + + private static final String BASE_QUEUE_NAME = "batching-test-queue"; + + @Autowired + private SqsAsyncClient asyncClient; + + @Test + void shouldReturnCorrectServiceName() { + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String serviceName = adapter.serviceName(); + assertThat(serviceName).isEqualTo(SqsAsyncClient.SERVICE_NAME); + } + } + + @Test + void shouldSendMessageThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for batching"; + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody) + .build(); + + SendMessageResponse response = adapter.sendMessage(request).join(); + + assertThat(response.messageId()).isNotNull(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + assertThat(received.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldSendMessageWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message with consumer"; + + SendMessageResponse response = adapter + .sendMessage(builder -> builder.queueUrl(queueName).messageBody(messageBody)).join(); + + assertThat(response.messageId()).isNotNull(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + assertThat(received.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldReceiveMessageThroughBatchManager() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for receiving"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + Thread.sleep(200); + + ReceiveMessageResponse response = adapter + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(response.messages()).hasSize(1); + assertThat(response.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldReceiveMessageWithConsumer() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for receiving with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + Thread.sleep(200); + + ReceiveMessageResponse response = adapter + .receiveMessage(builder -> builder.queueUrl(queueName).maxNumberOfMessages(1)).join(); + + assertThat(response.messages()).hasSize(1); + assertThat(response.messages().get(0).body()).isEqualTo(messageBody); + } + } + + @Test + void shouldDeleteMessageThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for deletion"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + assertThat(received.messages()).hasSize(1); + String receiptHandle = received.messages().get(0).receiptHandle(); + + DeleteMessageResponse deleteResponse = adapter + .deleteMessage( + DeleteMessageRequest.builder().queueUrl(queueName).receiptHandle(receiptHandle).build()) + .join(); + + assertThat(deleteResponse).isNotNull(); + + ReceiveMessageResponse afterDelete = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).waitTimeSeconds(1).build()).join(); + + assertThat(afterDelete.messages()).isEmpty(); + } + } + + @Test + void shouldDeleteMessageWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for deletion with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) + .join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + DeleteMessageResponse deleteResponse = adapter + .deleteMessage(builder -> builder.queueUrl(queueName).receiptHandle(receiptHandle)).join(); + + assertThat(deleteResponse).isNotNull(); + } + } + + @Test + void shouldChangeMessageVisibilityThroughBatchManager() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for visibility change"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + ChangeMessageVisibilityResponse response = adapter.changeMessageVisibility(ChangeMessageVisibilityRequest + .builder().queueUrl(queueName).receiptHandle(receiptHandle).visibilityTimeout(30).build()).join(); + + assertThat(response).isNotNull(); + } + } + + @Test + void shouldChangeMessageVisibilityWithConsumer() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + String messageBody = "Test message for visibility change with consumer"; + this.asyncClient + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + + String receiptHandle = received.messages().get(0).receiptHandle(); + + ChangeMessageVisibilityResponse response = adapter + .changeMessageVisibility( + builder -> builder.queueUrl(queueName).receiptHandle(receiptHandle).visibilityTimeout(30)) + .join(); + + assertThat(response).isNotNull(); + } + } + + @Test + void shouldHandleBatchingEfficiently() throws InterruptedException { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { + int messageCount = 5; + String messageBodyPrefix = "Batch test message "; + + CompletableFuture[] futures = new CompletableFuture[messageCount]; + + for (int i = 0; i < messageCount; i++) { + futures[i] = adapter.sendMessage( + SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBodyPrefix + i).build()); + } + + CompletableFuture.allOf(futures).join(); + + for (CompletableFuture future : futures) { + assertThat(future.join().messageId()).isNotNull(); + } + + Thread.sleep(200); + + ReceiveMessageResponse received = this.asyncClient + .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(10).build()) + .join(); + + assertThat(received.messages()).hasSize(messageCount); + } + } + + private String createUniqueQueueName() { + return BASE_QUEUE_NAME + "-" + UUID.randomUUID().toString().substring(0, 8); + } + + private BatchingSqsClientAdapter createBatchingAdapter() { + SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder().client(this.asyncClient) + .scheduledExecutor(Executors.newScheduledThreadPool(2)) + .overrideConfiguration(builder -> builder.maxBatchSize(10).sendRequestFrequency(Duration.ofMillis(100))) + .build(); + + return new BatchingSqsClientAdapter(batchManager); + } + + @Configuration + static class SQSConfiguration { + + @Bean + SqsAsyncClient client() { + return createAsyncClient(); + } + } +} \ No newline at end of file diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java new file mode 100644 index 000000000..9acc4fa89 --- /dev/null +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapterTests.java @@ -0,0 +1,284 @@ +/* + * Copyright 2013-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.awspring.cloud.sqs.operations; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.services.sqs.SqsAsyncClient; +import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager; +import software.amazon.awssdk.services.sqs.model.*; + +/** + * @author Heechul Kang + */ +@ExtendWith(MockitoExtension.class) +class BatchingSqsClientAdapterTests { + SqsAsyncBatchManager mockBatchManager; + + BatchingSqsClientAdapter mockAdapter; + + @BeforeEach + void beforeEach() { + mockBatchManager = mock(SqsAsyncBatchManager.class); + mockAdapter = new BatchingSqsClientAdapter(mockBatchManager); + } + + @Test + void shouldThrowExceptionWhenBatchManagerIsNull() { + assertThatThrownBy(() -> new BatchingSqsClientAdapter(null)).isInstanceOf(IllegalArgumentException.class) + .hasMessage("batchManager cannot be null"); + } + + @Test + void shouldReturnCorrectServiceName() { + String serviceName = mockAdapter.serviceName(); + assertThat(serviceName).isEqualTo(SqsAsyncClient.SERVICE_NAME); + } + + @Test + void shouldDelegateBatchManagerClose() { + mockAdapter.close(); + then(mockBatchManager).should().close(); + } + + @Test + void shouldDelegateSendMessageWithRequest() { + String queueUrl = "test-queue"; + String messageBody = "test-message"; + + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build(); + SendMessageResponse expectedResponse = SendMessageResponse.builder().messageId(UUID.randomUUID().toString()) + .build(); + given(mockBatchManager.sendMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.sendMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().sendMessage(request); + } + + @Test + void shouldDelegateSendMessageWithConsumer() { + String queueUrl = "test-queue"; + String messageBody = "test-message"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .messageBody(messageBody); + SendMessageResponse expectedResponse = SendMessageResponse.builder().messageId(UUID.randomUUID().toString()) + .build(); + given(mockBatchManager.sendMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.sendMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().sendMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateReceiveMessageWithRequest() { + String queueUrl = "test-queue"; + String body = "test-body"; + + ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(queueUrl).maxNumberOfMessages(10) + .build(); + Message message = Message.builder().messageId(UUID.randomUUID().toString()).body(body).build(); + ReceiveMessageResponse expectedResponse = ReceiveMessageResponse.builder().messages(message).build(); + given(mockBatchManager.receiveMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.receiveMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().receiveMessage(request); + } + + @Test + void shouldDelegateReceiveMessageWithConsumer() { + String queueUrl = "test-queue"; + String body = "test-body"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .maxNumberOfMessages(10); + Message message = Message.builder().messageId(UUID.randomUUID().toString()).body(body).build(); + ReceiveMessageResponse expectedResponse = ReceiveMessageResponse.builder().messages(message).build(); + given(mockBatchManager.receiveMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.receiveMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().receiveMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateDeleteMessageWithRequest() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + DeleteMessageRequest request = DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle) + .build(); + DeleteMessageResponse expectedResponse = DeleteMessageResponse.builder().build(); + given(mockBatchManager.deleteMessage(request)).willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.deleteMessage(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().deleteMessage(request); + } + + @Test + void shouldDelegateDeleteMessageWithConsumer() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .receiptHandle(receiptHandle); + DeleteMessageResponse expectedResponse = DeleteMessageResponse.builder().build(); + given(mockBatchManager.deleteMessage(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.deleteMessage(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Consumer.class); + then(mockBatchManager).should().deleteMessage(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldDelegateChangeMessageVisibilityWithRequest() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + ChangeMessageVisibilityRequest request = ChangeMessageVisibilityRequest.builder().queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30).build(); + ChangeMessageVisibilityResponse expectedResponse = ChangeMessageVisibilityResponse.builder().build(); + given(mockBatchManager.changeMessageVisibility(request)) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter.changeMessageVisibility(request); + + assertThat(result).isCompletedWithValue(expectedResponse); + then(mockBatchManager).should().changeMessageVisibility(request); + } + + @Test + void shouldDelegateChangeMessageVisibilityWithConsumer() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + Consumer requestConsumer = builder -> builder.queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30); + ChangeMessageVisibilityResponse expectedResponse = ChangeMessageVisibilityResponse.builder().build(); + given(mockBatchManager.changeMessageVisibility(any(Consumer.class))) + .willReturn(CompletableFuture.completedFuture(expectedResponse)); + + CompletableFuture result = mockAdapter + .changeMessageVisibility(requestConsumer); + + assertThat(result).isCompletedWithValue(expectedResponse); + ArgumentCaptor> captor = ArgumentCaptor + .forClass(Consumer.class); + then(mockBatchManager).should().changeMessageVisibility(captor.capture()); + assertThat(captor.getValue()).isEqualTo(requestConsumer); + } + + @Test + void shouldHandleExceptionalCompletionInSendMessage() { + String queueUrl = "test-queue"; + String body = "test-message"; + + SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueUrl).messageBody(body).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.sendMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.sendMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().sendMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInReceiveMessage() { + String queueUrl = "test-queue"; + + ReceiveMessageRequest request = ReceiveMessageRequest.builder().queueUrl(queueUrl).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.receiveMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.receiveMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().receiveMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInDeleteMessage() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + DeleteMessageRequest request = DeleteMessageRequest.builder().queueUrl(queueUrl).receiptHandle(receiptHandle) + .build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.deleteMessage(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.deleteMessage(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().deleteMessage(request); + } + + @Test + void shouldHandleExceptionalCompletionInChangeMessageVisibility() { + String queueUrl = "test-queue"; + String receiptHandle = "test-receipt-handle"; + + ChangeMessageVisibilityRequest request = ChangeMessageVisibilityRequest.builder().queueUrl(queueUrl) + .receiptHandle(receiptHandle).visibilityTimeout(30).build(); + RuntimeException exception = new RuntimeException("Batch manager error"); + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(exception); + given(mockBatchManager.changeMessageVisibility(request)).willReturn(failedFuture); + + CompletableFuture result = mockAdapter.changeMessageVisibility(request); + + assertThat(result).isCompletedExceptionally(); + then(mockBatchManager).should().changeMessageVisibility(request); + } +} \ No newline at end of file From 9f043a3e69fcee50825a2c20e7cd00d0c1b10a53 Mon Sep 17 00:00:00 2001 From: khc41 Date: Tue, 16 Sep 2025 21:34:48 +0900 Subject: [PATCH 2/7] Remove SQS batch configuration support and simplify `ReceiveMessageRequest` handling. --- .../autoconfigure/sqs/SqsProperties.java | 161 ----------------- .../sqs/SqsAutoConfigurationTest.java | 162 ------------------ .../source/AbstractSqsMessageSource.java | 9 +- .../cloud/sqs/operations/SqsTemplate.java | 8 +- 4 files changed, 5 insertions(+), 335 deletions(-) diff --git a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java index 079680856..aafbe9daa 100644 --- a/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java +++ b/spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java @@ -18,10 +18,8 @@ import io.awspring.cloud.autoconfigure.AwsClientProperties; import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; import java.time.Duration; -import java.util.List; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.lang.Nullable; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; /** * Properties related to AWS SQS. @@ -40,8 +38,6 @@ public class SqsProperties extends AwsClientProperties { private Listener listener = new Listener(); - private Batch batch = new Batch(); - public Listener getListener() { return this.listener; } @@ -50,14 +46,6 @@ public void setListener(Listener listener) { this.listener = listener; } - public Batch getBatch() { - return batch; - } - - public void setBatch(Batch batch) { - this.batch = batch; - } - @Nullable private QueueNotFoundStrategy queueNotFoundStrategy; @@ -165,155 +153,6 @@ public Boolean getAutoStartup() { public void setAutoStartup(Boolean autoStartup) { this.autoStartup = autoStartup; } - - } - - /** - * Configuration properties for SQS automatic batching using AWS SDK's {@code SqsAsyncBatchManager}. - * - *

- * Automatic batching improves performance and reduces costs by combining multiple SQS requests into fewer AWS API - * calls. When enabled, Spring Cloud AWS will use a {@code BatchingSqsClientAdapter} that wraps the standard - * {@code SqsAsyncClient} with batching capabilities. - * - *

- * Important: Batched operations are processed asynchronously, which may result in false positives - * where method calls appear to succeed locally but fail during actual transmission to AWS. Applications should - * handle the returned {@code CompletableFuture} objects to detect actual transmission errors. - * - * @since 3.2 - */ - public static class Batch { - - /** - * Enables SQS automatic batching using AWS SDK's SqsAsyncBatchManager. - * - *

- * When set to {@code true}, the {@code SqsAsyncClient} bean will be wrapped with a - * {@code BatchingSqsClientAdapter} that automatically batches requests to improve performance and reduce AWS - * API calls. - * - *

- * Default is {@code false}. - */ - private boolean enabled = false; - - /** - * The maximum number of messages that can be processed in a single batch. The maximum is 10. - */ - @Nullable - private Integer maxNumberOfMessages; - - /** - * The frequency at which requests are sent to SQS when processing messages in a batch. - */ - @Nullable - private Duration sendBatchFrequency; - - /** - * The visibility timeout to set for messages received in a batch. If unset, the queue default is used. - */ - @Nullable - private Duration visibilityTimeout; - - /** - * The minimum wait duration for a receiveMessage request in a batch. To avoid unnecessary CPU usage, do not set - * this value to 0. - */ - @Nullable - private Duration waitTimeSeconds; - - /** - * The list of system attribute names to request for receiveMessage calls. - */ - @Nullable - private List systemAttributeNames; - - /** - * The list of attribute names to request for receiveMessage calls. - */ - @Nullable - private List attributeNames; - - /** - * The size of the scheduled thread pool used for batching operations. This thread pool handles periodic batch - * sending and other scheduled tasks. - * - *

- * Default is {@code 5}. - */ - private int scheduledExecutorPoolSize = 5; - - public boolean isEnabled() { - return enabled; - } - - public void setEnabled(boolean enabled) { - this.enabled = enabled; - } - - @Nullable - public Integer getMaxNumberOfMessages() { - return maxNumberOfMessages; - } - - public void setMaxNumberOfMessages(Integer maxNumberOfMessages) { - this.maxNumberOfMessages = maxNumberOfMessages; - } - - @Nullable - public Duration getSendBatchFrequency() { - return sendBatchFrequency; - } - - public void setSendBatchFrequency(Duration sendBatchFrequency) { - this.sendBatchFrequency = sendBatchFrequency; - } - - @Nullable - public Duration getVisibilityTimeout() { - return visibilityTimeout; - } - - public void setVisibilityTimeout(Duration visibilityTimeout) { - this.visibilityTimeout = visibilityTimeout; - } - - @Nullable - public Duration getWaitTimeSeconds() { - return waitTimeSeconds; - } - - public void setWaitTimeSeconds(Duration waitTimeSeconds) { - this.waitTimeSeconds = waitTimeSeconds; - } - - @Nullable - public List getSystemAttributeNames() { - return systemAttributeNames; - } - - public void setSystemAttributeNames(List systemAttributeNames) { - this.systemAttributeNames = systemAttributeNames; - } - - @Nullable - public List getAttributeNames() { - return attributeNames; - } - - public void setAttributeNames(List attributeNames) { - this.attributeNames = attributeNames; - } - - public int getScheduledExecutorPoolSize() { - return scheduledExecutorPoolSize; - } - - public void setScheduledExecutorPoolSize(int scheduledExecutorPoolSize) { - this.scheduledExecutorPoolSize = scheduledExecutorPoolSize; - } - } } diff --git a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java index ec605f789..83a4fbec5 100644 --- a/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java +++ b/spring-cloud-aws-autoconfigure/src/test/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfigurationTest.java @@ -33,7 +33,6 @@ import io.awspring.cloud.sqs.listener.QueueNotFoundStrategy; import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler; import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor; -import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; import io.awspring.cloud.sqs.operations.SqsTemplate; import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter; import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter; @@ -61,7 +60,6 @@ * * @author Tomaz Fernandes * @author Wei Jiang - * @author Heechul Kang */ class SqsAutoConfigurationTest { @@ -305,166 +303,6 @@ void configuresMessageConverter() { }); } - @Test - void sqsBatchAutoConfigurationIsDisabledByDefault() { - this.contextRunner.run(context -> { - assertThat(context).hasSingleBean(SqsAsyncClient.class); - SqsAsyncClient client = context.getBean(SqsAsyncClient.class); - assertThat(client).isNotInstanceOf(BatchingSqsClientAdapter.class); - }); - } - - @Test - void sqsBatchAutoConfigurationIsEnabled() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true").run(context -> { - assertThat(context.getBeansOfType(SqsAsyncClient.class)).hasSize(2); - - SqsAsyncClient primary = context.getBean(SqsAsyncClient.class); - assertThat(primary).isInstanceOf(BatchingSqsClientAdapter.class); - - assertThat(context).hasBean("sqsAsyncClient"); - assertThat(context).hasBean("batchSqsAsyncClient"); - }); - } - - @Test - void sqsBatchConfigurationProperties() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.max-number-of-messages:5", - "spring.cloud.aws.sqs.batch.send-batch-frequency:PT0.5S").run(context -> { - SqsAsyncClient client = context.getBean(SqsAsyncClient.class); - assertThat(client).isInstanceOf(BatchingSqsClientAdapter.class); - }); - } - - @Test - void sqsBatchConfigurationPropertiesWithAllSettings() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.max-number-of-messages:8", - "spring.cloud.aws.sqs.batch.send-batch-frequency:PT1S", - "spring.cloud.aws.sqs.batch.visibility-timeout:PT30S", - "spring.cloud.aws.sqs.batch.wait-time-seconds:PT5S", - "spring.cloud.aws.sqs.batch.system-attribute-names:SentTimestamp,ApproximateReceiveCount", - "spring.cloud.aws.sqs.batch.attribute-names:attr1,attr2").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getMaxNumberOfMessages()).isEqualTo(8); - assertThat(batchConfig.getSendBatchFrequency()).isEqualTo(Duration.ofSeconds(1)); - assertThat(batchConfig.getVisibilityTimeout()).isEqualTo(Duration.ofSeconds(30)); - assertThat(batchConfig.getWaitTimeSeconds()).isEqualTo(Duration.ofSeconds(5)); - assertThat(batchConfig.getSystemAttributeNames()).containsExactly( - software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.SENT_TIMESTAMP, - software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT); - assertThat(batchConfig.getAttributeNames()).containsExactly("attr1", "attr2"); - - SqsAsyncClient client = context.getBean(SqsAsyncClient.class); - assertThat(client).isInstanceOf(BatchingSqsClientAdapter.class); - }); - } - - @Test - void sqsBatchConfigurationPropertiesWithDefaults() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:false").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isFalse(); - assertThat(batchConfig.getMaxNumberOfMessages()).isNull(); - assertThat(batchConfig.getSendBatchFrequency()).isNull(); - assertThat(batchConfig.getVisibilityTimeout()).isNull(); - assertThat(batchConfig.getWaitTimeSeconds()).isNull(); - assertThat(batchConfig.getSystemAttributeNames()).isNull(); - assertThat(batchConfig.getAttributeNames()).isNull(); - assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5); - - assertThat(context).hasSingleBean(SqsAsyncClient.class); - SqsAsyncClient client = context.getBean(SqsAsyncClient.class); - assertThat(client).isNotInstanceOf(BatchingSqsClientAdapter.class); - }); - } - - @Test - void sqsBatchConfigurationWithVisibilityTimeout() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.visibility-timeout:PT60S").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getVisibilityTimeout()).isEqualTo(Duration.ofSeconds(60)); - }); - } - - @Test - void sqsBatchConfigurationWithWaitTimeSeconds() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.wait-time-seconds:PT20S").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getWaitTimeSeconds()).isEqualTo(Duration.ofSeconds(20)); - }); - } - - @Test - void sqsBatchConfigurationWithAttributeNames() { - this.contextRunner - .withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.attribute-names:MessageGroupId,MessageDeduplicationId") - .run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getAttributeNames()).containsExactly("MessageGroupId", - "MessageDeduplicationId"); - }); - } - - @Test - void sqsBatchConfigurationWithDefaultScheduledExecutorPoolSize() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(5); - - assertThat(context).hasBean("sqsBatchingScheduledExecutor"); - }); - } - - @Test - void sqsBatchConfigurationWithCustomScheduledExecutorPoolSize() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:true", - "spring.cloud.aws.sqs.batch.scheduled-executor-pool-size:10").run(context -> { - assertThat(context).hasSingleBean(SqsProperties.class); - SqsProperties sqsProperties = context.getBean(SqsProperties.class); - SqsProperties.Batch batchConfig = sqsProperties.getBatch(); - - assertThat(batchConfig.isEnabled()).isTrue(); - assertThat(batchConfig.getScheduledExecutorPoolSize()).isEqualTo(10); - - assertThat(context).hasBean("sqsBatchingScheduledExecutor"); - }); - } - - @Test - void sqsBatchConfigurationWithBatchDisabledDoesNotCreateScheduledExecutor() { - this.contextRunner.withPropertyValues("spring.cloud.aws.sqs.batch.enabled:false").run(context -> { - assertThat(context).doesNotHaveBean("sqsBatchingScheduledExecutor"); - }); - } - @Configuration(proxyBeanMethods = false) static class CustomComponentsConfiguration { diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java index 3a04900d8..86f3e5ce1 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/listener/source/AbstractSqsMessageSource.java @@ -33,8 +33,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.IntStream; - -import io.awspring.cloud.sqs.operations.BatchingSqsClientAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.lang.Nullable; @@ -63,7 +61,6 @@ * @param the {@link Message} payload type. * * @author Tomaz Fernandes - * @author Heechul Kang * @since 3.0 */ public abstract class AbstractSqsMessageSource extends AbstractPollingMessageSource @@ -180,15 +177,13 @@ private ReceiveMessageRequest createRequest(int maxNumberOfMessages) { .builder() .queueUrl(this.queueUrl) .maxNumberOfMessages(maxNumberOfMessages) + .attributeNamesWithStrings(this.messageSystemAttributeNames) + .messageAttributeNames(this.messageAttributeNames) .waitTimeSeconds(this.pollTimeout); customizeRequest(builder); if (this.messageVisibility >= 0) { builder.visibilityTimeout(this.messageVisibility); } - if (!(this.sqsAsyncClient instanceof BatchingSqsClientAdapter)) { - builder.messageAttributeNames(this.messageAttributeNames) - .attributeNamesWithStrings(this.messageSystemAttributeNames); - } return builder.build(); } // @formatter:on diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java index f803f6742..901affd03 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java @@ -605,7 +605,9 @@ private CompletableFuture createReceiveMessageRequest(Str private ReceiveMessageRequest doCreateReceiveMessageRequest(Duration pollTimeout, Integer maxNumberOfMessages, QueueAttributes attributes, Map additionalHeaders) { ReceiveMessageRequest.Builder builder = ReceiveMessageRequest.builder().queueUrl(attributes.getQueueUrl()) - .maxNumberOfMessages(maxNumberOfMessages).waitTimeSeconds(toInt(pollTimeout.toSeconds())); + .maxNumberOfMessages(maxNumberOfMessages).messageAttributeNames(this.messageAttributeNames) + .attributeNamesWithStrings(this.messageSystemAttributeNames) + .waitTimeSeconds(toInt(pollTimeout.toSeconds())); if (additionalHeaders.containsKey(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER)) { builder.visibilityTimeout( toInt(getValueAs(additionalHeaders, SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Duration.class) @@ -616,10 +618,6 @@ private ReceiveMessageRequest doCreateReceiveMessageRequest(Duration pollTimeout getValueAs(additionalHeaders, SqsHeaders.SQS_RECEIVE_REQUEST_ATTEMPT_ID_HEADER, UUID.class) .toString()); } - if (!(this.sqsAsyncClient instanceof BatchingSqsClientAdapter)) { - builder.messageAttributeNames(this.messageAttributeNames) - .attributeNamesWithStrings(this.messageSystemAttributeNames); - } return builder.build(); } From 5e3ea665e2dd31e552ebb56a8dc26d96e907d055 Mon Sep 17 00:00:00 2001 From: khc41 Date: Sun, 21 Sep 2025 16:58:47 +0900 Subject: [PATCH 3/7] Update documentation to include manual configuration instructions for SQS automatic request batching. --- docs/src/main/asciidoc/sqs.adoc | 139 +++++++----------- .../operations/BatchingSqsClientAdapter.java | 36 ++--- 2 files changed, 63 insertions(+), 112 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index 6d2ba76b1..a9df9aa17 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -812,124 +812,87 @@ Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBa IMPORTANT: This is different from the <> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. -===== Enabling Automatic Batching +==== Automatic Request Batching with SqsAsyncBatchManager -To enable automatic batching, set the following property: +Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. -[source,properties] ----- -spring.cloud.aws.sqs.batch.enabled=true ----- - -When enabled, Spring Cloud AWS will automatically wrap the `SqsAsyncClient` with a `BatchingSqsClientAdapter` that uses `SqsAsyncBatchManager` internally. - -===== Configuration Properties - -The following properties can be used to configure the batching behavior: - -[source,properties] ----- -# Enable automatic batching (default: false) -spring.cloud.aws.sqs.batch.enabled=true - -# Maximum number of messages in a batch (default: AWS SDK default, max: 10) -spring.cloud.aws.sqs.batch.max-number-of-messages=10 +IMPORTANT: This is different from the <> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS. -# Frequency at which batched requests are sent (default: AWS SDK default) -spring.cloud.aws.sqs.batch.send-batch-frequency=PT0.2S +===== Manual Configuration of the Batching Client -# Visibility timeout for received messages (default: queue default) -spring.cloud.aws.sqs.batch.visibility-timeout=PT30S +Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`. -# Wait time for receiveMessage requests (default: AWS SDK default) -spring.cloud.aws.sqs.batch.wait-time-seconds=PT5S +###### 1. Defining the Batching Client Bean +The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity. -# System attributes to request for receiveMessage calls -spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount - -# Message attributes to request for receiveMessage calls -spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId +[source,java] ---- +@Configuration +public class SqsBatchingConfiguration { -===== Important Considerations - -WARNING: When using automatic batching, operations are processed asynchronously by the AWS SDK. This means that a method call may return successfully, but the actual request to AWS SQS might fail later during the batching process. This can result in **false positives** where operations appear to succeed locally but fail during transmission. - -Applications should: - -- **Always handle the returned `CompletableFuture`** to detect actual transmission errors -- **Implement appropriate error handling and monitoring** to detect delayed failures -- **Consider retry mechanisms** for critical operations - -IMPORTANT: **Batch Manager Bypass**: The AWS SDK batch manager will bypass batching and send regular asynchronous requests when `receiveMessage` is called with any of the following parameters: + // Define a constant for the bean name to avoid typos + public static final String BATCHING_SQS_ASYNC_CLIENT = "batchingSqsAsyncClient"; -- `messageAttributeNames` -- `messageSystemAttributeNames` -- `messageSystemAttributeNamesWithStrings` (not used in Spring Cloud AWS `ReceiveMessageRequest`) -- `overrideConfiguration` (not used in Spring Cloud AWS `ReceiveMessageRequest`) + @Bean(name = BATCHING_SQS_ASYNC_CLIENT) + public SqsAsyncClient batchingSqsAsyncClient( + // Inject the standard, auto-configured client to wrap it + @Qualifier("sqsAsyncClient") SqsAsyncClient standardSqsAsyncClient) { -When these parameters are used, the performance benefits of batching are lost for those specific requests. + ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5); -**Note**: When using Spring Cloud AWS's automatic batching feature, `SqsTemplate` automatically excludes `messageAttributeNames` and `messageSystemAttributeNames` from individual `receiveMessage` requests to maintain batching efficiency. These attributes should be configured globally in the batch configuration instead: + SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder() + .sqsAsyncClient(standardSqsAsyncClient) + .scheduledExecutor(scheduledExecutor) + .build(); -[source,properties] ----- -# Configure globally for batched requests -spring.cloud.aws.sqs.batch.system-attribute-names=SentTimestamp,ApproximateReceiveCount -spring.cloud.aws.sqs.batch.attribute-names=MessageGroupId,MessageDeduplicationId + return new BatchingSqsClientAdapter(batchManager); + } +} ---- -If you need to use different attribute configurations per request, consider disabling automatic batching and using the standard `SqsAsyncClient` instead. - -Example of proper error handling: +###### 2. Using the Batching Client +Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`. [source,java] ---- @Service -public class MessageService { - - private final SqsTemplate sqsTemplate; +public class MyBatchingMessageService { - public MessageService(SqsTemplate sqsTemplate) { - this.sqsTemplate = sqsTemplate; - } + private final SqsTemplate batchingSqsTemplate; - public void sendMessage(String queueName, String message) { - CompletableFuture> future = sqsTemplate.sendAsync(queueName, message); - - future.whenComplete((result, throwable) -> { - if (throwable != null) { - // Handle actual transmission error - log.error("Failed to send message to queue {}: {}", queueName, throwable.getMessage()); - // Implement retry or alternative handling logic - } else { - // Message sent successfully - log.info("Message sent successfully with ID: {}", result.messageId()); - } - }); + public MyBatchingMessageService( + @Qualifier(SqsBatchingConfiguration.BATCHING_SQS_ASYNC_CLIENT) SqsAsyncClient batchingClient) { + this.batchingSqsTemplate = SqsTemplate.builder() + .sqsAsyncClient(batchingClient) + .build(); } + // ... service methods using batchingSqsTemplate } ---- -===== Performance Benefits +===== Important Considerations & Best Practices -Automatic batching provides several benefits: +WARNING: **Asynchronous Operations and False Positives**. The batching client processes operations asynchronously. A call to `sqsTemplate.sendAsync(...)` might return a `CompletableFuture` that completes successfully before the message is actually sent to AWS. The actual transmission happens later in a background thread. This can lead to **false positives**. Always attach error handling to the `CompletableFuture` to detect and handle real transmission failures. -- **Reduced API calls**: Multiple operations are combined into single API calls -- **Lower costs**: Fewer API calls result in reduced AWS charges -- **Improved throughput**: Batching reduces network overhead and latency -- **Better resource utilization**: More efficient use of network and AWS resources +[source,java] +---- +CompletableFuture> future = batchingSqsTemplate.sendAsync(queueName, message); -===== Compatibility +future.whenComplete((result, ex) -> { + if (ex != null) { + // This is where you handle the actual transmission error + log.error("Failed to send message to queue {}: {}", queueName, ex.getMessage()); + } else { + log.info("Message acknowledged for batch sending with ID: {}", result.messageId()); + } +}); +---- -Automatic batching is compatible with: +WARNING: **Not Recommended for `@SqsListener`**. While technically compatible, using this batching client with `@SqsListener` for receiving messages is **not recommended**. The `@SqsListener` infrastructure already performs efficient batch receiving and has a complex acknowledgment lifecycle. Adding another layer of asynchronous batching provides limited performance benefits while significantly increasing complexity. For listeners, it's best to rely on the default `SqsAsyncClient`. -- `SqsTemplate` for sending and receiving messages -- `@SqsListener` methods for message processing -- Both standard and FIFO queues -- All message conversion and error handling features +IMPORTANT: **Bean Injection Safety**. By using a named bean and `@Qualifier` as shown in the configuration examples, you ensure the batching client is only used where intended. This prevents it from accidentally being injected into `@SqsListener` infrastructure, which should use the default `SqsAsyncClient`. -The batching is transparent to application code - existing `SqsTemplate` and `@SqsListener` code continues to work without changes. +IMPORTANT: **AWS SDK Batching Bypass**. The `SqsAsyncBatchManager` will bypass batching for `receiveMessage` calls if certain parameters like `messageAttributeNames` are set on a per-request basis. To ensure batching works effectively, these should be configured globally on the `SqsAsyncBatchManager` builder, not on individual `receiveMessage` calls. See the `BatchingSqsClientAdapter` Javadoc for more details. ==== Container Options diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java index c58ea2fb7..165381246 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java @@ -43,33 +43,21 @@ * * *

- * Batch Optimization: The AWS SDK bypasses batching when {@code receiveMessage} is called with any of - * the following parameters: {@code messageAttributeNames}, {@code messageSystemAttributeNames}, - * {@code messageSystemAttributeNamesWithStrings}, or {@code overrideConfiguration}. To maintain consistent batching - * performance, Spring Cloud AWS handles these parameters as follows: + * Batch Optimization: The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses + * batching for {@code receiveMessage} calls that include per-request configurations for certain parameters. + * To ensure batching is not bypassed, it is recommended to configure these settings globally on the + * {@code SqsAsyncBatchManager} builder instead of on each {@code ReceiveMessageRequest}. + * The parameters that trigger this bypass are: *

    - *
  • {@code messageAttributeNames} - excluded from per-request, configured globally via - * {@code spring.cloud.aws.sqs.batch.attribute-names}
  • - *
  • {@code messageSystemAttributeNames} - excluded from per-request, configured globally via - * {@code spring.cloud.aws.sqs.batch.system-attribute-names}
  • - *
  • {@code messageSystemAttributeNamesWithStrings} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}
  • - *
  • {@code overrideConfiguration} - not used in Spring Cloud AWS {@code ReceiveMessageRequest}
  • + *
  • {@code messageAttributeNames}
  • + *
  • {@code messageSystemAttributeNames}
  • + *
  • {@code messageSystemAttributeNamesWithStrings}
  • + *
  • {@code overrideConfiguration}
  • *
*

- * This design prevents batch bypass and ensures optimal performance. If per-request attribute configuration is - * required, consider disabling automatic batching. - * - *

- * This adapter is automatically configured by Spring Cloud AWS when automatic batching is enabled. Users do not need to - * create instances directly - instead, enable batching through configuration: - * - *

- * spring.cloud.aws.sqs.batch.enabled = true
- * 
- * - *

- * Once enabled, all {@code SqsTemplate} operations will automatically use batching transparently. - * + * By configuring these globally on the manager, you ensure consistent batching performance. If you require + * per-request attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be + * more appropriate. * @author Heechul Kang * @since 3.2 * @see SqsAsyncBatchManager From 1fc52346e109a83ed2841b00b0e6fcd5cdc3b8a0 Mon Sep 17 00:00:00 2001 From: khc41 Date: Thu, 9 Oct 2025 23:23:47 +0900 Subject: [PATCH 4/7] Enhance BatchingSqsClientAdapter: improve documentation on batch optimization and add integration test for message send frequency --- docs/src/main/asciidoc/sqs.adoc | 2 +- .../operations/BatchingSqsClientAdapter.java | 14 ++++----- ...chingSqsClientAdapterIntegrationTests.java | 31 ++++++++++++++++--- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index a9df9aa17..fb6e176ed 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -808,7 +808,7 @@ IMPORTANT: In case the same factory is shared by both delivery methods, any supp ==== Automatic Batching with AWS SDK -Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. +Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. Please note that this feature is primarily intended for use with `SqsTemplate` and is not recommended for use with `@SqsListener` due to the complexities involved in message acknowledgment and processing. IMPORTANT: This is different from the <> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java index 165381246..78f5ec035 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java @@ -43,11 +43,10 @@ * * *

- * Batch Optimization: The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses - * batching for {@code receiveMessage} calls that include per-request configurations for certain parameters. - * To ensure batching is not bypassed, it is recommended to configure these settings globally on the - * {@code SqsAsyncBatchManager} builder instead of on each {@code ReceiveMessageRequest}. - * The parameters that trigger this bypass are: + * Batch Optimization: The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses batching + * for {@code receiveMessage} calls that include per-request configurations for certain parameters. To ensure batching + * is not bypassed, it is recommended to configure these settings globally on the {@code SqsAsyncBatchManager} builder + * instead of on each {@code ReceiveMessageRequest}. The parameters that trigger this bypass are: *

    *
  • {@code messageAttributeNames}
  • *
  • {@code messageSystemAttributeNames}
  • @@ -55,9 +54,8 @@ *
  • {@code overrideConfiguration}
  • *
*

- * By configuring these globally on the manager, you ensure consistent batching performance. If you require - * per-request attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be - * more appropriate. + * By configuring these globally on the manager, you ensure consistent batching performance. If you require per-request + * attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be more appropriate. * @author Heechul Kang * @since 3.2 * @see SqsAsyncBatchManager diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java index 031cd74a8..3b6f3d9d6 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java @@ -59,10 +59,10 @@ void shouldSendMessageThroughBatchManager() { try (BatchingSqsClientAdapter adapter = createBatchingAdapter()) { String messageBody = "Test message for batching"; - SendMessageRequest request = SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody) - .build(); - SendMessageResponse response = adapter.sendMessage(request).join(); + SendMessageResponse response = adapter + .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); assertThat(response.messageId()).isNotNull(); @@ -276,14 +276,37 @@ void shouldHandleBatchingEfficiently() throws InterruptedException { } } + @Test + void shouldSendMessageJoinCompletesAfterFrequency() { + String queueName = createUniqueQueueName(); + createQueue(this.asyncClient, queueName).join(); + + try (BatchingSqsClientAdapter adapter = createBatchingAdapterWithFrequency(Duration.ofSeconds(3))) { + long startTime = System.nanoTime(); + + String messageBody = "Test message for join frequency"; + + adapter.sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) + .join(); + + long elapsedMillis = (System.nanoTime() - startTime) / 1_000_000; + + assertThat(elapsedMillis).isGreaterThanOrEqualTo(3000L); + } + } + private String createUniqueQueueName() { return BASE_QUEUE_NAME + "-" + UUID.randomUUID().toString().substring(0, 8); } private BatchingSqsClientAdapter createBatchingAdapter() { + return createBatchingAdapterWithFrequency(Duration.ofMillis(100)); + } + + private BatchingSqsClientAdapter createBatchingAdapterWithFrequency(Duration sendRequestFrequency) { SqsAsyncBatchManager batchManager = SqsAsyncBatchManager.builder().client(this.asyncClient) .scheduledExecutor(Executors.newScheduledThreadPool(2)) - .overrideConfiguration(builder -> builder.maxBatchSize(10).sendRequestFrequency(Duration.ofMillis(100))) + .overrideConfiguration(builder -> builder.maxBatchSize(10).sendRequestFrequency(sendRequestFrequency)) .build(); return new BatchingSqsClientAdapter(batchManager); From b09a9171352fa87abeddb22afee8e0e4caf93abf Mon Sep 17 00:00:00 2001 From: khc41 Date: Fri, 10 Oct 2025 13:37:40 +0900 Subject: [PATCH 5/7] Update documentation for automatic request batching: clarify usage with SqsAsyncClient and enhance warnings about asynchronous operations --- docs/src/main/asciidoc/sqs.adoc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/docs/src/main/asciidoc/sqs.adoc b/docs/src/main/asciidoc/sqs.adoc index fb6e176ed..dbad0e769 100644 --- a/docs/src/main/asciidoc/sqs.adoc +++ b/docs/src/main/asciidoc/sqs.adoc @@ -806,15 +806,9 @@ NOTE: The same factory can be used to create both `single message` and `batch` c IMPORTANT: In case the same factory is shared by both delivery methods, any supplied `ErrorHandler`, `MessageInterceptor` or `MessageListener` should implement the proper methods. -==== Automatic Batching with AWS SDK - -Spring Cloud AWS supports automatic message batching using AWS SDK's `SqsAsyncBatchManager`. This feature optimizes SQS operations by automatically batching requests under the hood to improve performance and reduce AWS API calls. Please note that this feature is primarily intended for use with `SqsTemplate` and is not recommended for use with `@SqsListener` due to the complexities involved in message acknowledgment and processing. - -IMPORTANT: This is different from the <> feature described above. Batch Processing refers to processing multiple messages in a single listener method call, while Automatic Batching refers to the AWS SDK automatically combining multiple SQS API calls into batched requests for efficiency. - ==== Automatic Request Batching with SqsAsyncBatchManager -Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. +Spring Cloud AWS allows you to leverage the AWS SDK's `SqsAsyncBatchManager` for automatic request batching. This feature can significantly improve performance and reduce costs by transparently combining multiple SQS API calls (`sendMessage`, `deleteMessage`, etc.) into single batch requests. Please note that this feature is primarily intended for use with `SqsTemplate` and is not recommended for use as a default `SqsAsyncClient` bean that could be injected into `@SqsListener` infrastructure. IMPORTANT: This is different from the <> feature for `@SqsListener`. Listener batch processing deals with handling multiple messages within a single listener invocation, whereas automatic request batching optimizes the underlying API calls to AWS. @@ -822,7 +816,7 @@ IMPORTANT: This is different from the <> feat Since automatic batching is a powerful feature with specific trade-offs, Spring Cloud AWS does not auto-configure it. You can enable it by creating your own `SqsAsyncClient` bean using the provided `BatchingSqsClientAdapter`. -###### 1. Defining the Batching Client Bean +====== 1. Defining the Batching Client Bean The following example shows how to define a bean named `batchingSqsAsyncClient`. Notice the use of `@Qualifier("sqsAsyncClient")` in the method parameter. This is crucial to explicitly inject the standard, auto-configured `SqsAsyncClient` and avoid ambiguity. [source,java] @@ -850,7 +844,7 @@ public class SqsBatchingConfiguration { } ---- -###### 2. Using the Batching Client +====== 2. Using the Batching Client Now, use `@Qualifier` to inject your named bean. The most common use case is configuring a dedicated `SqsTemplate`. [source,java] @@ -872,7 +866,9 @@ public class MyBatchingMessageService { ===== Important Considerations & Best Practices -WARNING: **Asynchronous Operations and False Positives**. The batching client processes operations asynchronously. A call to `sqsTemplate.sendAsync(...)` might return a `CompletableFuture` that completes successfully before the message is actually sent to AWS. The actual transmission happens later in a background thread. This can lead to **false positives**. Always attach error handling to the `CompletableFuture` to detect and handle real transmission failures. +WARNING: A call to `sqsTemplate.sendAsync(...)` returns a `CompletableFuture` that may complete successfully before the message is actually sent to AWS if the batch isn't yet flushed. The actual transmission happens later in a background thread, typically after the configured `sendRequestFrequency` interval. This can lead to **false positives** where the future appears to succeed prematurely. Always attach error handling to the `CompletableFuture` (e.g., via `.exceptionally()` or `.handle()`) to detect and handle real transmission failures. Note: Calling `.join()` on the future will block until the batch is flushed (after `sendRequestFrequency`), ensuring the operation is attempted, but still check for exceptions. + +NOTE: The background buffering for `receiveMessage` operations is initialized **lazily**. It only starts after the first call to `sqsTemplate.receiveMessage(...)` is made on a specific queue URL. **This means that no background polling or resource consumption for receiving messages will occur until the application actively attempts to receive a message from that queue for the first time.** [source,java] ---- From ec8d6cc664aa500d679b323a8bd3421138985e02 Mon Sep 17 00:00:00 2001 From: khc41 Date: Mon, 20 Oct 2025 20:21:04 +0900 Subject: [PATCH 6/7] Update BatchingSqsClientAdapter documentation: clarify asynchronous behavior and enhance error handling instructions --- .../operations/BatchingSqsClientAdapter.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java index 78f5ec035..986bbc41f 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/BatchingSqsClientAdapter.java @@ -30,18 +30,20 @@ * This adapter automatically batches SQS operations to improve performance and reduce costs by combining multiple * requests into fewer AWS API calls. All standard SQS operations are supported: send message, receive message, delete * message, and change message visibility. - * + * *

- * Important - False Positives Warning: This adapter processes requests asynchronously through - * batching. Method calls may return successfully before the actual request is sent to AWS SQS. This can result in false - * positives where the operation appears to succeed locally but fails during the actual transmission to AWS. - * Applications should: + * Important - Asynchronous Behavior: This adapter processes requests asynchronously through + * batching. The returned {@link CompletableFuture} reflects the batching operation, + * not the final transmission to AWS SQS. This can lead to false positives where the operation appears successful locally but fails during actual transmission. + * The actual transmission happens in a background thread, up to the configured {@code sendRequestFrequency} after enqueuing. + * Applications must: *

    - *
  • Always handle the returned {@link CompletableFuture} to detect actual transmission errors
  • - *
  • Implement appropriate error handling and monitoring
  • - *
  • Consider retry mechanisms for critical operations
  • + *
  • Handle the returned {@link CompletableFuture} to detect transmission errors. + * Calling {@code .join()} will block until the batch is sent (up to {@code sendRequestFrequency}), + * while {@code .exceptionally()} or {@code .handle()} are required for non-blocking error handling.
  • + *
  • Implement appropriate error handling, monitoring, and retry mechanisms for critical operations.
  • *
- * + * *

* Batch Optimization: The underlying {@code SqsAsyncBatchManager} from the AWS SDK bypasses batching * for {@code receiveMessage} calls that include per-request configurations for certain parameters. To ensure batching @@ -57,7 +59,7 @@ * By configuring these globally on the manager, you ensure consistent batching performance. If you require per-request * attribute configurations, using the standard {@code SqsAsyncClient} without this adapter may be more appropriate. * @author Heechul Kang - * @since 3.2 + * @since 4.0.0 * @see SqsAsyncBatchManager * @see SqsAsyncClient */ From ea99930a28726330b61d70490fb3eee83a71f277 Mon Sep 17 00:00:00 2001 From: khc41 Date: Sat, 1 Nov 2025 18:03:03 +0900 Subject: [PATCH 7/7] Refactor BatchingSqsClientAdapterIntegrationTests: remove unnecessary sleep and add wait time for message reception --- ...chingSqsClientAdapterIntegrationTests.java | 41 ++++++++----------- 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java index 3b6f3d9d6..1e6c52d1b 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/BatchingSqsClientAdapterIntegrationTests.java @@ -98,7 +98,7 @@ void shouldSendMessageWithConsumer() { } @Test - void shouldReceiveMessageThroughBatchManager() throws InterruptedException { + void shouldReceiveMessageThroughBatchManager() { String queueName = createUniqueQueueName(); createQueue(this.asyncClient, queueName).join(); @@ -108,11 +108,8 @@ void shouldReceiveMessageThroughBatchManager() throws InterruptedException { .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) .join(); - Thread.sleep(200); - - ReceiveMessageResponse response = adapter - .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) - .join(); + ReceiveMessageResponse response = adapter.receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName) + .maxNumberOfMessages(1).waitTimeSeconds(10).build()).join(); assertThat(response.messages()).hasSize(1); assertThat(response.messages().get(0).body()).isEqualTo(messageBody); @@ -120,7 +117,7 @@ void shouldReceiveMessageThroughBatchManager() throws InterruptedException { } @Test - void shouldReceiveMessageWithConsumer() throws InterruptedException { + void shouldReceiveMessageWithConsumer() { String queueName = createUniqueQueueName(); createQueue(this.asyncClient, queueName).join(); @@ -130,10 +127,9 @@ void shouldReceiveMessageWithConsumer() throws InterruptedException { .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) .join(); - Thread.sleep(200); - ReceiveMessageResponse response = adapter - .receiveMessage(builder -> builder.queueUrl(queueName).maxNumberOfMessages(1)).join(); + .receiveMessage(builder -> builder.queueUrl(queueName).maxNumberOfMessages(1).waitTimeSeconds(10)) + .join(); assertThat(response.messages()).hasSize(1); assertThat(response.messages().get(0).body()).isEqualTo(messageBody); @@ -151,9 +147,8 @@ void shouldDeleteMessageThroughBatchManager() { .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) .join(); - ReceiveMessageResponse received = this.asyncClient - .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) - .join(); + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).waitTimeSeconds(10).build()).join(); assertThat(received.messages()).hasSize(1); String receiptHandle = received.messages().get(0).receiptHandle(); @@ -183,9 +178,8 @@ void shouldDeleteMessageWithConsumer() { .sendMessage(SendMessageRequest.builder().queueUrl(queueName).messageBody(messageBody).build()) .join(); - ReceiveMessageResponse received = this.asyncClient - .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(1).build()) - .join(); + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(1).waitTimeSeconds(10).build()).join(); String receiptHandle = received.messages().get(0).receiptHandle(); @@ -208,7 +202,8 @@ void shouldChangeMessageVisibilityThroughBatchManager() { .join(); ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() - .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).waitTimeSeconds(10).build()) + .join(); String receiptHandle = received.messages().get(0).receiptHandle(); @@ -231,7 +226,8 @@ void shouldChangeMessageVisibilityWithConsumer() { .join(); ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() - .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).build()).join(); + .queueUrl(queueName).maxNumberOfMessages(1).visibilityTimeout(5).waitTimeSeconds(10).build()) + .join(); String receiptHandle = received.messages().get(0).receiptHandle(); @@ -245,7 +241,7 @@ void shouldChangeMessageVisibilityWithConsumer() { } @Test - void shouldHandleBatchingEfficiently() throws InterruptedException { + void shouldHandleBatchingEfficiently() { String queueName = createUniqueQueueName(); createQueue(this.asyncClient, queueName).join(); @@ -266,11 +262,8 @@ void shouldHandleBatchingEfficiently() throws InterruptedException { assertThat(future.join().messageId()).isNotNull(); } - Thread.sleep(200); - - ReceiveMessageResponse received = this.asyncClient - .receiveMessage(ReceiveMessageRequest.builder().queueUrl(queueName).maxNumberOfMessages(10).build()) - .join(); + ReceiveMessageResponse received = this.asyncClient.receiveMessage(ReceiveMessageRequest.builder() + .queueUrl(queueName).maxNumberOfMessages(10).waitTimeSeconds(10).build()).join(); assertThat(received.messages()).hasSize(messageCount); }