diff --git a/spring-cloud-aws-dependencies/pom.xml b/spring-cloud-aws-dependencies/pom.xml
index 9bdadc626..c9e2ff83c 100644
--- a/spring-cloud-aws-dependencies/pom.xml
+++ b/spring-cloud-aws-dependencies/pom.xml
@@ -32,7 +32,7 @@
2.1.0
1.0.0
1.14.9
- 1.2.3
+ 1.3.1
3.3.1
2.3.0
diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/main/java/io/awspring/cloud/modulith/events/sns/SnsEventExternalizerConfiguration.java b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/main/java/io/awspring/cloud/modulith/events/sns/SnsEventExternalizerConfiguration.java
index 042fbc7ed..8c6f43928 100644
--- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/main/java/io/awspring/cloud/modulith/events/sns/SnsEventExternalizerConfiguration.java
+++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/main/java/io/awspring/cloud/modulith/events/sns/SnsEventExternalizerConfiguration.java
@@ -61,7 +61,7 @@ DelegatingEventExternalizer snsEventExternalizer(EventExternalizationConfigurati
return new DelegatingEventExternalizer(configuration, (target, payload) -> {
var routing = BrokerRouting.of(target, context);
- var builder = SnsNotification.builder(payload);
+ var builder = SnsNotification.builder(payload).headers(configuration.getHeadersFor(payload));
var key = routing.getKey(payload);
// when routing key is set, SNS topic must be a FIFO topic
@@ -69,7 +69,7 @@ DelegatingEventExternalizer snsEventExternalizer(EventExternalizationConfigurati
builder.groupId(key);
}
- operations.sendNotification(routing.getTarget(), builder.build());
+ operations.sendNotification(routing.getTarget(payload), builder.build());
return CompletableFuture.completedFuture(null);
});
diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/test/java/io/awspring/cloud/modulith/events/sns/SnsEventPublicationIntegrationTests.java b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/test/java/io/awspring/cloud/modulith/events/sns/SnsEventPublicationIntegrationTests.java
index 0a3344da2..10ae9a299 100644
--- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/test/java/io/awspring/cloud/modulith/events/sns/SnsEventPublicationIntegrationTests.java
+++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sns/src/test/java/io/awspring/cloud/modulith/events/sns/SnsEventPublicationIntegrationTests.java
@@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
+import static org.springframework.modulith.events.EventExternalizationConfiguration.*;
import java.util.Map;
import org.junit.jupiter.api.Test;
@@ -26,6 +27,7 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.modulith.events.ApplicationModuleListener;
+import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.Externalized;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.springframework.transaction.annotation.Transactional;
@@ -33,6 +35,8 @@
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
/**
@@ -78,6 +82,13 @@ TestPublisher testPublisher(ApplicationEventPublisher publisher) {
TestListener testListener() {
return new TestListener();
}
+
+ @Bean
+ EventExternalizationConfiguration eventExternalizationConfiguration() {
+
+ return externalizing().select(annotatedAsExternalized())
+ .headers(Object.class, __ -> Map.of("testKey", "testValue")).build();
+ }
}
@Test // GH-344
@@ -91,15 +102,20 @@ void publishesEventToSns() {
.getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)).join()
.attributes().get(QueueAttributeName.QUEUE_ARN);
- snsClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn));
+ snsClient.subscribe(r -> r.attributes(Map.of("RawMessageDelivery", "true")).topicArn(topicArn).protocol("sqs")
+ .endpoint(queueArn));
publisher.publishEvent();
await().untilAsserted(() -> {
-
- var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join();
+ var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl).messageAttributeNames("testKey"))
+ .join();
assertThat(response.hasMessages()).isTrue();
+
+ // Assert header added
+ assertThat(response.messages()).extracting(Message::messageAttributes).extracting(it -> it.get("testKey"))
+ .extracting(MessageAttributeValue::stringValue).containsExactly("testValue");
});
}
diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/main/java/io/awspring/cloud/modulith/events/sqs/SqsEventExternalizerConfiguration.java b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/main/java/io/awspring/cloud/modulith/events/sqs/SqsEventExternalizerConfiguration.java
index ec25ae6c7..ba8436163 100644
--- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/main/java/io/awspring/cloud/modulith/events/sqs/SqsEventExternalizerConfiguration.java
+++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/main/java/io/awspring/cloud/modulith/events/sqs/SqsEventExternalizerConfiguration.java
@@ -63,7 +63,9 @@ DelegatingEventExternalizer sqsEventExternalizer(EventExternalizationConfigurati
return CompletableFuture.completedFuture(operations.send(sqsSendOptions -> {
- var options = sqsSendOptions.queue(routing.getTarget()).payload(payload);
+ var options = sqsSendOptions.queue(routing.getTarget(payload))
+ .headers(configuration.getHeadersFor(payload)).payload(payload);
+
var key = routing.getKey(payload);
if (key != null) {
diff --git a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/test/java/io/awspring/cloud/modulith/events/sqs/SqsEventPublicationIntegrationTests.java b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/test/java/io/awspring/cloud/modulith/events/sqs/SqsEventPublicationIntegrationTests.java
index 56948e372..189df8846 100644
--- a/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/test/java/io/awspring/cloud/modulith/events/sqs/SqsEventPublicationIntegrationTests.java
+++ b/spring-cloud-aws-modulith/spring-cloud-aws-modulith-events-sqs/src/test/java/io/awspring/cloud/modulith/events/sqs/SqsEventPublicationIntegrationTests.java
@@ -17,6 +17,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
+import static org.springframework.modulith.events.EventExternalizationConfiguration.*;
import java.util.Map;
import org.junit.jupiter.api.Test;
@@ -26,12 +27,15 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.modulith.events.ApplicationModuleListener;
+import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.Externalized;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.springframework.transaction.annotation.Transactional;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
/**
@@ -76,6 +80,13 @@ TestPublisher testPublisher(ApplicationEventPublisher publisher) {
TestListener testListener() {
return new TestListener();
}
+
+ @Bean
+ EventExternalizationConfiguration eventExternalizationConfiguration() {
+
+ return externalizing().select(annotatedAsExternalized())
+ .headers(Object.class, __ -> Map.of("testKey", "testValue")).build();
+ }
}
@Test
@@ -86,9 +97,14 @@ void publishesEventToSqs() throws Exception {
publisher.publishEvent();
await().untilAsserted(() -> {
- var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join();
+ var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl).messageAttributeNames("testKey"))
+ .join();
assertThat(response.hasMessages()).isTrue();
+
+ // Assert header added
+ assertThat(response.messages()).extracting(Message::messageAttributes).extracting(it -> it.get("testKey"))
+ .extracting(MessageAttributeValue::stringValue).containsExactly("testValue");
});
}