Skip to content

Commit

Permalink
Upgrade to Spring Modulith 1.3 upgrade (#1317)
Browse files Browse the repository at this point in the history
* Upgrade to Spring Modulith 1.3.1.
* Add Spring Modulith 1.3's header support for externalization.
* Pass event into externalization target calculation.

To benefit from the Support for SpEL expressions using the event when calculating the routing target. See spring-projects/spring-modulith#881 for details.
---------

Co-authored-by: Maciej Walkowiak <[email protected]>
  • Loading branch information
odrotbohm and maciejwalkowiak authored Jan 17, 2025
1 parent 425b6c8 commit d902994
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 8 deletions.
2 changes: 1 addition & 1 deletion spring-cloud-aws-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
<jakarta.mail.version>2.1.0</jakarta.mail.version>
<eclipse.jakarta.mail.version>1.0.0</eclipse.jakarta.mail.version>
<bytebuddy.version>1.14.9</bytebuddy.version>
<spring-modulith.version>1.2.3</spring-modulith.version>
<spring-modulith.version>1.3.1</spring-modulith.version>
<wiremock-standalone.version>3.3.1</wiremock-standalone.version>
<amazon.s3.accessgrants>2.3.0</amazon.s3.accessgrants>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ DelegatingEventExternalizer snsEventExternalizer(EventExternalizationConfigurati
return new DelegatingEventExternalizer(configuration, (target, payload) -> {

var routing = BrokerRouting.of(target, context);
var builder = SnsNotification.builder(payload);
var builder = SnsNotification.builder(payload).headers(configuration.getHeadersFor(payload));
var key = routing.getKey(payload);

// when routing key is set, SNS topic must be a FIFO topic
if (key != null) {
builder.groupId(key);
}

operations.sendNotification(routing.getTarget(), builder.build());
operations.sendNotification(routing.getTarget(payload), builder.build());

return CompletableFuture.completedFuture(null);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
import static org.springframework.modulith.events.EventExternalizationConfiguration.*;

import java.util.Map;
import org.junit.jupiter.api.Test;
Expand All @@ -26,13 +27,16 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.modulith.events.ApplicationModuleListener;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.Externalized;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.springframework.transaction.annotation.Transactional;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

/**
Expand Down Expand Up @@ -78,6 +82,13 @@ TestPublisher testPublisher(ApplicationEventPublisher publisher) {
TestListener testListener() {
return new TestListener();
}

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {

return externalizing().select(annotatedAsExternalized())
.headers(Object.class, __ -> Map.of("testKey", "testValue")).build();
}
}

@Test // GH-344
Expand All @@ -91,15 +102,20 @@ void publishesEventToSns() {
.getQueueAttributes(r -> r.queueUrl(queueUrl).attributeNames(QueueAttributeName.QUEUE_ARN)).join()
.attributes().get(QueueAttributeName.QUEUE_ARN);

snsClient.subscribe(r -> r.topicArn(topicArn).protocol("sqs").endpoint(queueArn));
snsClient.subscribe(r -> r.attributes(Map.of("RawMessageDelivery", "true")).topicArn(topicArn).protocol("sqs")
.endpoint(queueArn));

publisher.publishEvent();

await().untilAsserted(() -> {

var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join();
var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl).messageAttributeNames("testKey"))
.join();

assertThat(response.hasMessages()).isTrue();

// Assert header added
assertThat(response.messages()).extracting(Message::messageAttributes).extracting(it -> it.get("testKey"))
.extracting(MessageAttributeValue::stringValue).containsExactly("testValue");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ DelegatingEventExternalizer sqsEventExternalizer(EventExternalizationConfigurati

return CompletableFuture.completedFuture(operations.send(sqsSendOptions -> {

var options = sqsSendOptions.queue(routing.getTarget()).payload(payload);
var options = sqsSendOptions.queue(routing.getTarget(payload))
.headers(configuration.getHeadersFor(payload)).payload(payload);

var key = routing.getKey(payload);

if (key != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.*;
import static org.springframework.modulith.events.EventExternalizationConfiguration.*;

import java.util.Map;
import org.junit.jupiter.api.Test;
Expand All @@ -26,12 +27,15 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Bean;
import org.springframework.modulith.events.ApplicationModuleListener;
import org.springframework.modulith.events.EventExternalizationConfiguration;
import org.springframework.modulith.events.Externalized;
import org.springframework.test.context.DynamicPropertyRegistrar;
import org.springframework.transaction.annotation.Transactional;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

/**
Expand Down Expand Up @@ -76,6 +80,13 @@ TestPublisher testPublisher(ApplicationEventPublisher publisher) {
TestListener testListener() {
return new TestListener();
}

@Bean
EventExternalizationConfiguration eventExternalizationConfiguration() {

return externalizing().select(annotatedAsExternalized())
.headers(Object.class, __ -> Map.of("testKey", "testValue")).build();
}
}

@Test
Expand All @@ -86,9 +97,14 @@ void publishesEventToSqs() throws Exception {
publisher.publishEvent();

await().untilAsserted(() -> {
var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl)).join();
var response = sqsAsyncClient.receiveMessage(r -> r.queueUrl(queueUrl).messageAttributeNames("testKey"))
.join();

assertThat(response.hasMessages()).isTrue();

// Assert header added
assertThat(response.messages()).extracting(Message::messageAttributes).extracting(it -> it.get("testKey"))
.extracting(MessageAttributeValue::stringValue).containsExactly("testValue");
});
}

Expand Down

0 comments on commit d902994

Please sign in to comment.