3434import io .awspring .cloud .sqs .support .observation .SqsListenerObservation ;
3535import io .awspring .cloud .sqs .support .observation .SqsTemplateObservation ;
3636import io .micrometer .observation .ObservationRegistry ;
37+ import java .util .concurrent .Executors ;
3738import org .springframework .beans .factory .ObjectProvider ;
3839import org .springframework .boot .autoconfigure .AutoConfiguration ;
3940import org .springframework .boot .autoconfigure .AutoConfigureAfter ;
5253import software .amazon .awssdk .services .sqs .batchmanager .SqsAsyncBatchManager ;
5354import software .amazon .awssdk .services .sqs .model .Message ;
5455
55- import java .util .concurrent .Executors ;
56-
5756/**
5857 * {@link EnableAutoConfiguration Auto-configuration} for SQS integration.
5958 *
@@ -84,10 +83,10 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
8483 ObjectProvider <AwsConnectionDetails > connectionDetails ,
8584 ObjectProvider <SqsAsyncClientCustomizer > sqsAsyncClientCustomizers ,
8685 ObjectProvider <AwsAsyncClientCustomizer > awsAsyncClientCustomizers ) {
87- return awsClientBuilderConfigurer .configureAsyncClient (SqsAsyncClient .builder (), this .sqsProperties ,
88- connectionDetails .getIfAvailable (), configurer .getIfAvailable (),
89- sqsAsyncClientCustomizers .orderedStream (), awsAsyncClientCustomizers .orderedStream ()).build ();
90- }
86+ return awsClientBuilderConfigurer .configureAsyncClient (SqsAsyncClient .builder (), this .sqsProperties ,
87+ connectionDetails .getIfAvailable (), configurer .getIfAvailable (),
88+ sqsAsyncClientCustomizers .orderedStream (), awsAsyncClientCustomizers .orderedStream ()).build ();
89+ }
9190
9291 @ ConditionalOnProperty (name = "spring.cloud.aws.sqs.batch.enabled" , havingValue = "true" )
9392 @ Bean
@@ -98,11 +97,9 @@ public SqsAsyncClient batchSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
9897 }
9998
10099 private SqsAsyncBatchManager createBatchManager (SqsAsyncClient sqsAsyncClient ) {
101- return SqsAsyncBatchManager .builder ()
102- .client (sqsAsyncClient )
103- .scheduledExecutor (Executors .newScheduledThreadPool (5 ))
104- .overrideConfiguration (this ::configurationProperties )
105- .build ();
100+ return SqsAsyncBatchManager .builder ().client (sqsAsyncClient )
101+ .scheduledExecutor (Executors .newScheduledThreadPool (5 ))
102+ .overrideConfiguration (this ::configurationProperties ).build ();
106103 }
107104
108105 private void configurationProperties (BatchOverrideConfiguration .Builder options ) {
@@ -112,7 +109,7 @@ private void configurationProperties(BatchOverrideConfiguration.Builder options)
112109 mapper .from (this .sqsProperties .getBatch ().getWaitTimeSeconds ()).to (options ::receiveMessageMinWaitDuration );
113110 mapper .from (this .sqsProperties .getBatch ().getVisibilityTimeout ()).to (options ::receiveMessageVisibilityTimeout );
114111 mapper .from (this .sqsProperties .getBatch ().getSystemAttributeNames ())
115- .to (options ::receiveMessageSystemAttributeNames );
112+ .to (options ::receiveMessageSystemAttributeNames );
116113 mapper .from (this .sqsProperties .getBatch ().getAttributeNames ()).to (options ::receiveMessageAttributeNames );
117114 }
118115
@@ -122,15 +119,15 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<Obj
122119 ObjectProvider <ObservationRegistry > observationRegistryProvider ,
123120 ObjectProvider <SqsTemplateObservation .Convention > observationConventionProvider ,
124121 MessagingMessageConverter <Message > messageConverter ) {
125- SqsTemplateBuilder builder = SqsTemplate .builder ().sqsAsyncClient (sqsAsyncClient )
126- .messageConverter (messageConverter );
122+ SqsTemplateBuilder builder = SqsTemplate .builder ().sqsAsyncClient (sqsAsyncClient )
123+ .messageConverter (messageConverter );
127124 objectMapperProvider .ifAvailable (om -> setMapperToConverter (messageConverter , om ));
128- if (this .sqsProperties .isObservationEnabled ()) {
129- observationRegistryProvider
130- .ifAvailable (registry -> builder .configure (options -> options .observationRegistry (registry )));
131- observationConventionProvider
132- .ifAvailable (convention -> builder .configure (options -> options .observationConvention (convention )));
133- }
125+ if (this .sqsProperties .isObservationEnabled ()) {
126+ observationRegistryProvider
127+ .ifAvailable (registry -> builder .configure (options -> options .observationRegistry (registry )));
128+ observationConventionProvider
129+ .ifAvailable (convention -> builder .configure (options -> options .observationConvention (convention )));
130+ }
134131 if (sqsProperties .getQueueNotFoundStrategy () != null ) {
135132 builder .configure ((options ) -> options .queueNotFoundStrategy (sqsProperties .getQueueNotFoundStrategy ()));
136133 }
@@ -156,12 +153,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
156153 interceptors .forEach (factory ::addMessageInterceptor );
157154 asyncInterceptors .forEach (factory ::addMessageInterceptor );
158155 objectMapperProvider .ifAvailable (om -> setMapperToConverter (messagingMessageConverter , om ));
159- if (this .sqsProperties .isObservationEnabled ()) {
160- observationRegistry
161- .ifAvailable (registry -> factory .configure (options -> options .observationRegistry (registry )));
162- observationConventionProvider
163- .ifAvailable (convention -> factory .configure (options -> options .observationConvention (convention )));
164- }
156+ if (this .sqsProperties .isObservationEnabled ()) {
157+ observationRegistry
158+ .ifAvailable (registry -> factory .configure (options -> options .observationRegistry (registry )));
159+ observationConventionProvider
160+ .ifAvailable (convention -> factory .configure (options -> options .observationConvention (convention )));
161+ }
165162 factory .configure (options -> options .messageConverter (messagingMessageConverter ));
166163 return factory ;
167164 }
0 commit comments