Skip to content

Commit

Permalink
Add ContentBasedDeduplication option when sending SQS messages (#938) (
Browse files Browse the repository at this point in the history
…#987)

Resolves #938
Closes #950
  • Loading branch information
tomazfernandes authored Dec 10, 2023
1 parent d2d4b5b commit c8e9d1b
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 48 deletions.
7 changes: 7 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ Such attributes are available as `MessageHeaders` in received messages.
|All
|Set the message system attribute names that will be retrieved with messages on receive operations.
Such attributes are available as `MessageHeaders` in received messages.

|`contentBasedDeduplication`
|ContentBasedDeduplication
|ContentBasedDeduplication
#AUTO
|Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to.
With `ContentBasedDeduplication#AUTO`, the queue attribute value will be resolved automatically.
|===

==== Sending Messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
/**
* Base class for {@link MessagingOperations}
* @param <S> the source message type for conversion
*
* @author Tomaz Fernandes
* @since 3.0
*/
public abstract class AbstractMessagingTemplate<S> implements MessagingOperations, AsyncMessagingOperations {

Expand Down Expand Up @@ -275,35 +278,44 @@ public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointN
@Override
public <T> CompletableFuture<SendResult<T>> sendAsync(@Nullable String endpointName, Message<T> message) {
String endpointToUse = getEndpointName(endpointName);
Message<T> messageToUse = preProcessMessageForSend(endpointToUse, message);
logger.trace("Sending message {} to endpoint {}", MessageHeaderUtils.getId(message), endpointName);
return doSendAsync(endpointToUse, convertMessageToSend(messageToUse), messageToUse)
.exceptionallyCompose(
t -> CompletableFuture
.failedFuture(new MessagingOperationFailedException(
return preProcessMessageForSendAsync(endpointToUse, message).thenCompose(
messageToUse -> doSendAsync(endpointToUse, convertMessageToSend(messageToUse), messageToUse)
.exceptionallyCompose(
t -> CompletableFuture.failedFuture(new MessagingOperationFailedException(
"Message send operation failed for message %s to endpoint %s"
.formatted(MessageHeaderUtils.getId(message), endpointToUse),
endpointToUse, message, t)))
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, message, t));
.whenComplete((v, t) -> logSendMessageResult(endpointToUse, message, t)));
}

protected abstract <T> Message<T> preProcessMessageForSend(String endpointToUse, Message<T> message);

protected <T> CompletableFuture<Message<T>> preProcessMessageForSendAsync(String endpointToUse,
Message<T> message) {
return CompletableFuture.completedFuture(preProcessMessageForSend(endpointToUse, message));
}

@Override
public <T> CompletableFuture<SendResult.Batch<T>> sendManyAsync(@Nullable String endpointName,
Collection<Message<T>> messages) {
logger.trace("Sending messages {} to endpoint {}", MessageHeaderUtils.getId(messages), endpointName);
String endpointToUse = getEndpointName(endpointName);
Collection<Message<T>> messagesToUse = preProcessMessagesForSend(endpointToUse, messages);
return doSendBatchAsync(endpointToUse, convertMessagesToSend(messagesToUse), messagesToUse)
.exceptionallyCompose(t -> wrapSendException(messagesToUse, endpointToUse, t))
.thenCompose(result -> handleFailedMessages(endpointToUse, result))
.whenComplete((v, t) -> logSendMessageBatchResult(endpointToUse, messagesToUse, t));
return preProcessMessagesForSendAsync(endpointToUse, messages).thenCompose(
messagesToUse -> doSendBatchAsync(endpointToUse, convertMessagesToSend(messagesToUse), messagesToUse)
.exceptionallyCompose(t -> wrapSendException(messagesToUse, endpointToUse, t))
.thenCompose(result -> handleFailedMessages(endpointToUse, result))
.whenComplete((v, t) -> logSendMessageBatchResult(endpointToUse, messagesToUse, t)));
}

protected abstract <T> Collection<Message<T>> preProcessMessagesForSend(String endpointToUse,
Collection<Message<T>> messages);

protected <T> CompletableFuture<Collection<Message<T>>> preProcessMessagesForSendAsync(String endpointToUse,
Collection<Message<T>> messages) {
return CompletableFuture.completedFuture(preProcessMessagesForSend(endpointToUse, messages));
}

private <T> CompletableFuture<SendResult.Batch<T>> handleFailedMessages(String endpointToUse,
SendResult.Batch<T> result) {
return !result.failed().isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
Expand All @@ -68,6 +67,14 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

/**
* Sqs-specific implementation of {@link AbstractMessagingTemplate}
*
* @author Tomaz Fernandes
* @author Zhong Xi Lu
*
* @since 3.0
*/
public class SqsTemplate extends AbstractMessagingTemplate<Message> implements SqsOperations, SqsAsyncOperations {

private static final Logger logger = LoggerFactory.getLogger(SqsTemplate.class);
Expand All @@ -86,6 +93,8 @@ public class SqsTemplate extends AbstractMessagingTemplate<Message> implements S

private final Collection<String> messageSystemAttributeNames;

private final TemplateContentBasedDeduplication contentBasedDeduplication;

private SqsTemplate(SqsTemplateBuilderImpl builder) {
super(builder.messageConverter, builder.options);
SqsTemplateOptionsImpl options = builder.options;
Expand All @@ -94,6 +103,7 @@ private SqsTemplate(SqsTemplateBuilderImpl builder) {
this.queueAttributeNames = options.queueAttributeNames;
this.queueNotFoundStrategy = options.queueNotFoundStrategy;
this.messageSystemAttributeNames = options.messageSystemAttributeNames;
this.contentBasedDeduplication = options.contentBasedDeduplication;
}

/**
Expand Down Expand Up @@ -127,6 +137,7 @@ public static SqsOperations newSyncTemplate(SqsAsyncClient sqsAsyncClient) {
/**
* Create a new {@link SqsTemplate} instance with the provided {@link SqsAsyncClient}, only exposing the async
* methods contained in {@link SqsAsyncOperations}.
*
* @param sqsAsyncClient the client.
* @return the new template instance.
*/
Expand Down Expand Up @@ -247,35 +258,61 @@ private Map<String, Object> addAdditionalReceiveHeaders(SqsReceiveOptionsImpl op
@Override
protected <T> org.springframework.messaging.Message<T> preProcessMessageForSend(String endpointToUse,
org.springframework.messaging.Message<T> message) {
return FifoUtils.isFifo(endpointToUse) ? addMissingFifoSendHeaders(endpointToUse, message) : message;
return message;
}

@Override
protected <T> Collection<org.springframework.messaging.Message<T>> preProcessMessagesForSend(String endpointToUse,
Collection<org.springframework.messaging.Message<T>> messages) {
return messages;
}

@Override
protected <T> CompletableFuture<org.springframework.messaging.Message<T>> preProcessMessageForSendAsync(
String endpointToUse, org.springframework.messaging.Message<T> message) {
return FifoUtils.isFifo(endpointToUse)
? messages.stream().map(message -> addMissingFifoSendHeaders(endpointToUse, message)).toList()
: messages;
? endpointHasContentBasedDeduplicationEnabled(endpointToUse)
.thenApply(enabled -> enabled ? addMissingFifoSendHeaders(message, Map.of())
: addMissingFifoSendHeaders(message, getRandomDeduplicationIdHeader()))
: CompletableFuture.completedFuture(message);
}

private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(String endpointName,
org.springframework.messaging.Message<T> message) {
@Override
protected <T> CompletableFuture<Collection<org.springframework.messaging.Message<T>>> preProcessMessagesForSendAsync(
String endpointToUse, Collection<org.springframework.messaging.Message<T>> messages) {
return FifoUtils.isFifo(endpointToUse)
? endpointHasContentBasedDeduplicationEnabled(endpointToUse).thenApply(enabled -> messages.stream()
.map(message -> enabled ? addMissingFifoSendHeaders(message, Map.of())
: addMissingFifoSendHeaders(message, getRandomDeduplicationIdHeader()))
.toList())
: CompletableFuture.completedFuture(messages);
}

Set<QueueAttributeName> additionalAttributes = Set.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
String contentBasedDedupQueueAttribute = getQueueAttributes(endpointName, additionalAttributes).join()
.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION);
private <T> org.springframework.messaging.Message<T> addMissingFifoSendHeaders(
org.springframework.messaging.Message<T> message, Map<String, Object> additionalHeaders) {
return MessageHeaderUtils.addHeadersIfAbsent(message,
Stream.concat(additionalHeaders.entrySet().stream(), getRandomMessageGroupIdHeader().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

boolean isContentBasedDedup = Boolean.parseBoolean(contentBasedDedupQueueAttribute);
Map<String, Object> defaultHeaders;
if (isContentBasedDedup) {
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString());
}
else {
defaultHeaders = Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString(),
MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
}
private Map<String, String> getRandomMessageGroupIdHeader() {
return Map.of(MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, UUID.randomUUID().toString());
}

private Map<String, Object> getRandomDeduplicationIdHeader() {
return Map.of(MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, UUID.randomUUID().toString());
}

private CompletableFuture<Boolean> endpointHasContentBasedDeduplicationEnabled(String endpointName) {
return TemplateContentBasedDeduplication.AUTO.equals(this.contentBasedDeduplication)
? handleAutoDeduplication(endpointName)
: CompletableFuture
.completedFuture(contentBasedDeduplication.equals(TemplateContentBasedDeduplication.ENABLED));
}

return MessageHeaderUtils.addHeadersIfAbsent(message, defaultHeaders);
private CompletableFuture<Boolean> handleAutoDeduplication(String endpointName) {
return getQueueAttributes(endpointName).thenApply(attributes -> Boolean
.parseBoolean(attributes.getQueueAttribute(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)));
}

@Override
Expand Down Expand Up @@ -363,16 +400,20 @@ private <T> SqsMessageConversionContext doGetSqsMessageConversionContext(String
SqsMessageConversionContext conversionContext = new SqsMessageConversionContext();
conversionContext.setSqsAsyncClient(this.sqsAsyncClient);
// At this point we'll already have retrieved and cached the queue attributes
CompletableFuture<QueueAttributes> queueAttributes = getQueueAttributes(newEndpoint);
Assert.isTrue(queueAttributes.isDone(), () -> "Queue attributes not done for " + newEndpoint);
conversionContext.setQueueAttributes(queueAttributes.join());
conversionContext.setQueueAttributes(getAttributesImmediately(newEndpoint));
if (payloadClass != null) {
conversionContext.setPayloadClass(payloadClass);
}
conversionContext.setAcknowledgementCallback(new TemplateAcknowledgementCallback<T>());
return conversionContext;
}

private QueueAttributes getAttributesImmediately(String newEndpoint) {
CompletableFuture<QueueAttributes> queueAttributes = getQueueAttributes(newEndpoint);
Assert.isTrue(queueAttributes.isDone(), () -> "Queue attributes not done for " + newEndpoint);
return queueAttributes.join();
}

private CompletableFuture<SendMessageBatchRequest> createSendMessageBatchRequest(String endpointName,
Collection<Message> messages) {
return getQueueAttributes(endpointName)
Expand Down Expand Up @@ -423,21 +464,23 @@ private boolean isSkipAttribute(MessageSystemAttributeName name) {
}

private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName) {
return getQueueAttributes(endpointName, Collections.emptySet());
return this.queueAttributesCache.computeIfAbsent(endpointName,
newName -> doGetQueueAttributes(endpointName, newName));
}

private CompletableFuture<QueueAttributes> getQueueAttributes(String endpointName,
Set<QueueAttributeName> additionalAttributes) {
return this.queueAttributesCache.computeIfAbsent(endpointName, newName -> {
// ensure we have the content based dedupe config to determine default fifo send headers
Set<QueueAttributeName> namesToRequest = new HashSet<>(queueAttributeNames);
if (additionalAttributes != null && !additionalAttributes.isEmpty()) {
namesToRequest.addAll(additionalAttributes);
}
return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
.queueNotFoundStrategy(this.queueNotFoundStrategy).queueAttributeNames(namesToRequest).build()
.resolveQueueAttributes();
});
private CompletableFuture<QueueAttributes> doGetQueueAttributes(String endpointName, String newName) {
return QueueAttributesResolver.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(newName)
.queueNotFoundStrategy(this.queueNotFoundStrategy)
.queueAttributeNames(maybeAddContentBasedDeduplicationAttribute(endpointName)).build()
.resolveQueueAttributes();
}

private Collection<QueueAttributeName> maybeAddContentBasedDeduplicationAttribute(String endpointName) {
return FifoUtils.isFifo(endpointName)
&& TemplateContentBasedDeduplication.AUTO.equals(this.contentBasedDeduplication)
? Stream.concat(queueAttributeNames.stream(),
Stream.of(QueueAttributeName.CONTENT_BASED_DEDUPLICATION)).toList()
: queueAttributeNames;
}

@Override
Expand Down Expand Up @@ -591,6 +634,8 @@ private static class SqsTemplateOptionsImpl extends AbstractMessagingTemplateOpt

private Collection<String> messageSystemAttributeNames = Collections.singletonList("All");

private TemplateContentBasedDeduplication contentBasedDeduplication = TemplateContentBasedDeduplication.AUTO;

@Override
public SqsTemplateOptions queueAttributeNames(Collection<QueueAttributeName> queueAttributeNames) {
Assert.notEmpty(queueAttributeNames, "queueAttributeNames cannot be null or empty");
Expand Down Expand Up @@ -625,6 +670,13 @@ public SqsTemplateOptions messageSystemAttributeNames(
return this;
}

@Override
public SqsTemplateOptions contentBasedDeduplication(
TemplateContentBasedDeduplication contentBasedDeduplication) {
this.contentBasedDeduplication = contentBasedDeduplication;
return this;
}

}

private static class SqsTemplateBuilderImpl implements SqsTemplateBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,13 @@ public interface SqsTemplateOptions extends MessagingTemplateOptions<SqsTemplate
*/
SqsTemplateOptions messageSystemAttributeNames(Collection<MessageSystemAttributeName> messageSystemAttributeNames);

/**
* Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to. By
* default, this is set to AUTO and the queue attribute value will be resolved automatically per queue. If set to
* ENABLED or DISABLED, the value will apply to all queues.
*
* @param contentBasedDeduplication the ContentBasedDeduplication value.
* @return the options instance.
*/
SqsTemplateOptions contentBasedDeduplication(TemplateContentBasedDeduplication contentBasedDeduplication);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.operations;

/**
* The ContentBasedDeduplication queue attribute value to be used by the {@link SqsTemplate} when sending messages to a
* FIFO queue.
*
* @author Zhong Xi Lu
* @since 3.0.4
*/
public enum TemplateContentBasedDeduplication {

/**
* The ContentBasedDeduplication queue attribute value will be resolved automatically at runtime.
*/
AUTO,

/**
* ContentBasedDeduplication is enabled on all FIFO SQS queues.
*/
ENABLED,

/**
* ContentBasedDeduplication is disabled on all FIFO SQS queues.
*/
DISABLED
}
Loading

0 comments on commit c8e9d1b

Please sign in to comment.