Skip to content

Commit 047565b

Browse files
committed
Resolve second set of test failures
* Remove Nullunmarked from FluxAggregatorMessageHandler * Add TaskScheuler to tests that are failing because of nullability * In the past the TaskScheduler attribute in the testscould be set to null and the tests would succeed. But because of nullability additions these need to be populated
1 parent 750cf77 commit 047565b

File tree

25 files changed

+119
-368
lines changed

25 files changed

+119
-368
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.springframework.integration.aggregator;
1818

1919
import java.time.Duration;
20+
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicBoolean;
2122
import java.util.function.Function;
2223
import java.util.function.Predicate;
2324

24-
import org.jspecify.annotations.NullUnmarked;
2525
import org.jspecify.annotations.Nullable;
2626
import reactor.core.Disposable;
2727
import reactor.core.publisher.Flux;
@@ -108,7 +108,6 @@ private Flux<Message<?>> releaseBy(Flux<Message<?>> groupFlux) {
108108
.flatMap((windowFlux) -> windowFlux.transform(this.combineFunction));
109109
}
110110

111-
@NullUnmarked
112111
private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
113112
if (this.boundaryTrigger != null) {
114113
return groupFlux.windowUntil(this.boundaryTrigger);
@@ -117,7 +116,7 @@ private Flux<Flux<Message<?>>> applyWindowOptions(Flux<Message<?>> groupFlux) {
117116
.switchOnFirst((signal, group) -> {
118117
if (signal.hasValue()) {
119118
Assert.notNull(this.windowSizeFunction, "'windowSizeFunction' must not be null");
120-
Integer maxSize = this.windowSizeFunction.apply(signal.get());
119+
Integer maxSize = this.windowSizeFunction.apply(Objects.requireNonNull(signal.get()));
121120
if (maxSize != null) {
122121
if (this.windowTimespan != null) {
123122
return group.windowTimeout(maxSize, this.windowTimespan);
@@ -196,7 +195,7 @@ public void setWindowSize(int windowSize) {
196195
/**
197196
* Specify a {@link Function} to determine a size for windows to close against the first message in group.
198197
* Tne result of the function can be combined with the {@link #setWindowTimespan(Duration)}.
199-
* By default an {@link IntegrationMessageHeaderAccessor#SEQUENCE_SIZE} header is consulted.
198+
* By default, an {@link IntegrationMessageHeaderAccessor#SEQUENCE_SIZE} header is consulted.
200199
* @param windowSizeFunction the {@link Function} to use to determine a window size
201200
* against a first message in the group.
202201
* @see Flux#window(int)

spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ public final void setPrimaryExpression(Expression expression) {
224224
}
225225

226226
@Override
227-
@SuppressWarnings("NullAway.Init")
227+
@SuppressWarnings("NullAway")
228228
public final void afterPropertiesSet() {
229229
this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
230230
if (this.messageBuilderFactory == null) {

spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,14 @@
2929
import reactor.core.publisher.Flux;
3030
import reactor.test.StepVerifier;
3131

32-
import org.springframework.beans.factory.BeanFactory;
3332
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3433
import org.springframework.integration.channel.QueueChannel;
3534
import org.springframework.integration.support.MessageBuilder;
3635
import org.springframework.messaging.Message;
3736
import org.springframework.messaging.support.GenericMessage;
38-
import org.springframework.scheduling.TaskScheduler;
3937

4038
import static org.assertj.core.api.Assertions.assertThat;
41-
import static org.mockito.ArgumentMatchers.any;
42-
import static org.mockito.ArgumentMatchers.eq;
4339
import static org.mockito.Mockito.mock;
44-
import static org.mockito.Mockito.when;
4540

4641
/**
4742
* @author Artem Bilan
@@ -165,7 +160,7 @@ void testCustomCombineFunction() {
165160
void testWindowTimespan() {
166161
QueueChannel resultChannel = new QueueChannel();
167162
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
168-
fluxAggregatorMessageHandler.setBeanFactory(getBeanFactory());
163+
fluxAggregatorMessageHandler.setTaskScheduler(mock());
169164
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
170165
fluxAggregatorMessageHandler.setWindowTimespan(Duration.ofMillis(100));
171166
fluxAggregatorMessageHandler.start();
@@ -220,15 +215,6 @@ void testWindowTimespan() {
220215
executorService.shutdown();
221216
}
222217

223-
private BeanFactory getBeanFactory() {
224-
BeanFactory beanFactory = mock(BeanFactory.class);
225-
TaskScheduler taskScheduler = mock(TaskScheduler.class);
226-
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
227-
.thenReturn(taskScheduler);
228-
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
229-
return beanFactory;
230-
}
231-
232218
@Test
233219
void testBoundaryTrigger() {
234220
QueueChannel resultChannel = new QueueChannel();

spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,12 @@
4141
import org.springframework.messaging.support.ErrorMessage;
4242
import org.springframework.messaging.support.GenericMessage;
4343
import org.springframework.scheduling.TaskScheduler;
44+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
4445
import org.springframework.test.annotation.DirtiesContext;
4546
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4647

4748
import static org.assertj.core.api.Assertions.assertThat;
4849
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
49-
import static org.mockito.ArgumentMatchers.any;
50-
import static org.mockito.ArgumentMatchers.eq;
5150
import static org.mockito.Mockito.doAnswer;
5251
import static org.mockito.Mockito.mock;
5352
import static org.mockito.Mockito.when;
@@ -240,7 +239,7 @@ public void testBFCRNoRegistry() {
240239
@Test
241240
public void testRemoveOnGet() {
242241
DefaultHeaderChannelRegistry registry = new DefaultHeaderChannelRegistry();
243-
registry.setBeanFactory(getBeanFactory());
242+
registry.setTaskScheduler(new SimpleAsyncTaskScheduler());
244243
MessageChannel channel = new DirectChannel();
245244
String foo = (String) registry.channelToChannelName(channel);
246245
Map<?, ?> map = TestUtils.getPropertyValue(registry, "channels", Map.class);
@@ -252,15 +251,6 @@ public void testRemoveOnGet() {
252251
assertThat(map.size()).isEqualTo(0);
253252
}
254253

255-
private BeanFactory getBeanFactory() {
256-
BeanFactory beanFactory = mock(BeanFactory.class);
257-
TaskScheduler taskScheduler = mock(TaskScheduler.class);
258-
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
259-
.thenReturn(taskScheduler);
260-
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
261-
return beanFactory;
262-
}
263-
264254
public static class Foo extends AbstractReplyProducingMessageHandler {
265255

266256
@Override

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import reactor.core.scheduler.Schedulers;
3737
import reactor.test.StepVerifier;
3838

39-
import org.springframework.beans.factory.BeanFactory;
4039
import org.springframework.beans.factory.annotation.Autowired;
4140
import org.springframework.beans.factory.annotation.Qualifier;
4241
import org.springframework.context.ConfigurableApplicationContext;
@@ -58,15 +57,11 @@
5857
import org.springframework.messaging.Message;
5958
import org.springframework.messaging.MessageChannel;
6059
import org.springframework.messaging.support.GenericMessage;
61-
import org.springframework.scheduling.TaskScheduler;
60+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
6261
import org.springframework.test.annotation.DirtiesContext;
6362
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6463

6564
import static org.assertj.core.api.Assertions.assertThat;
66-
import static org.mockito.ArgumentMatchers.any;
67-
import static org.mockito.ArgumentMatchers.eq;
68-
import static org.mockito.Mockito.mock;
69-
import static org.mockito.Mockito.when;
7065

7166
/**
7267
* @author Artem Bilan
@@ -136,7 +131,7 @@ void testReactiveFlow() throws Exception {
136131
@Test
137132
void testPollableReactiveFlow() throws Exception {
138133
assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
139-
this.reactiveTransformer.setBeanFactory(getBeanFactory());
134+
this.reactiveTransformer.setTaskScheduler(new SimpleAsyncTaskScheduler());
140135
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
141136

142137
CountDownLatch latch = new CountDownLatch(6);
@@ -172,15 +167,6 @@ void testPollableReactiveFlow() throws Exception {
172167
exec.shutdownNow();
173168
}
174169

175-
private BeanFactory getBeanFactory() {
176-
BeanFactory beanFactory = mock(BeanFactory.class);
177-
TaskScheduler taskScheduler = mock(TaskScheduler.class);
178-
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
179-
.thenReturn(taskScheduler);
180-
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
181-
return beanFactory;
182-
}
183-
184170
@Test
185171
void testFromPublisher() {
186172
Flux<Message<?>> messageFlux =

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 22 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import reactor.test.StepVerifier;
3030

3131
import org.springframework.beans.DirectFieldAccessor;
32-
import org.springframework.beans.factory.BeanFactory;
3332
import org.springframework.integration.MessageDispatchingException;
3433
import org.springframework.integration.annotation.Gateway;
3534
import org.springframework.integration.annotation.GatewayHeader;
@@ -42,15 +41,12 @@
4241
import org.springframework.messaging.support.ChannelInterceptor;
4342
import org.springframework.messaging.support.GenericMessage;
4443
import org.springframework.messaging.support.MessageBuilder;
45-
import org.springframework.scheduling.TaskScheduler;
44+
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
4645
import org.springframework.util.ReflectionUtils;
4746

4847
import static org.assertj.core.api.Assertions.assertThat;
4948
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
50-
import static org.mockito.ArgumentMatchers.any;
51-
import static org.mockito.ArgumentMatchers.eq;
5249
import static org.mockito.Mockito.mock;
53-
import static org.mockito.Mockito.when;
5450

5551
/**
5652
* @author Mark Fisher
@@ -67,9 +63,7 @@ public void futureWithMessageReturned() throws Exception {
6763
QueueChannel requestChannel = new QueueChannel();
6864
startResponder(requestChannel);
6965
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
70-
proxyFactory.setDefaultRequestChannel(requestChannel);
71-
proxyFactory.setBeanName("testGateway");
72-
proxyFactory.setBeanFactory(getBeanFactory());
66+
setupProxyFactory(requestChannel, proxyFactory);
7367
proxyFactory.afterPropertiesSet();
7468
TestEchoService service = proxyFactory.getObject();
7569
Future<Message<?>> f = service.returnMessage("foo");
@@ -90,9 +84,7 @@ protected boolean doSend(Message<?> message, long timeout) {
9084

9185
};
9286
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
93-
proxyFactory.setDefaultRequestChannel(channel);
94-
proxyFactory.setBeanName("testGateway");
95-
proxyFactory.setBeanFactory(getBeanFactory());
87+
setupProxyFactory(channel, proxyFactory);
9688
proxyFactory.afterPropertiesSet();
9789
TestEchoService service = proxyFactory.getObject();
9890
Future<Message<?>> f = service.returnMessage("foo");
@@ -108,9 +100,7 @@ public void listenableFutureWithMessageReturned() throws Exception {
108100
addThreadEnricher(requestChannel);
109101
startResponder(requestChannel);
110102
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
111-
proxyFactory.setDefaultRequestChannel(requestChannel);
112-
proxyFactory.setBeanName("testGateway");
113-
proxyFactory.setBeanFactory(getBeanFactory());
103+
setupProxyFactory(requestChannel, proxyFactory);
114104
proxyFactory.afterPropertiesSet();
115105
TestEchoService service = proxyFactory.getObject();
116106
CompletableFuture<Message<?>> f = service.returnMessageListenable("foo");
@@ -134,9 +124,7 @@ public void customFutureReturned() {
134124
addThreadEnricher(requestChannel);
135125
startResponder(requestChannel);
136126
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
137-
proxyFactory.setDefaultRequestChannel(requestChannel);
138-
proxyFactory.setBeanName("testGateway");
139-
proxyFactory.setBeanFactory(getBeanFactory());
127+
setupProxyFactory(requestChannel, proxyFactory);
140128
proxyFactory.afterPropertiesSet();
141129
TestEchoService service = proxyFactory.getObject();
142130
CustomFuture f = service.returnCustomFuture("foo");
@@ -151,10 +139,7 @@ public void nonAsyncFutureReturned() {
151139
addThreadEnricher(requestChannel);
152140
startResponder(requestChannel);
153141
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
154-
proxyFactory.setDefaultRequestChannel(requestChannel);
155-
proxyFactory.setBeanName("testGateway");
156-
proxyFactory.setBeanFactory(getBeanFactory());
157-
142+
setupProxyFactory(requestChannel, proxyFactory);
158143
proxyFactory.setAsyncExecutor(null); // Not async - user flow returns Future<?>
159144

160145
proxyFactory.afterPropertiesSet();
@@ -183,9 +168,7 @@ public void futureWithPayloadReturned() throws Exception {
183168
QueueChannel requestChannel = new QueueChannel();
184169
startResponder(requestChannel);
185170
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
186-
proxyFactory.setDefaultRequestChannel(requestChannel);
187-
proxyFactory.setBeanName("testGateway");
188-
proxyFactory.setBeanFactory(getBeanFactory());
171+
setupProxyFactory(requestChannel, proxyFactory);
189172
proxyFactory.afterPropertiesSet();
190173
TestEchoService service = proxyFactory.getObject();
191174
Future<String> f = service.returnString("foo");
@@ -199,9 +182,7 @@ public void futureWithWildcardReturned() throws Exception {
199182
QueueChannel requestChannel = new QueueChannel();
200183
startResponder(requestChannel);
201184
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
202-
proxyFactory.setDefaultRequestChannel(requestChannel);
203-
proxyFactory.setBeanName("testGateway");
204-
proxyFactory.setBeanFactory(getBeanFactory());
185+
setupProxyFactory(requestChannel, proxyFactory);
205186
proxyFactory.afterPropertiesSet();
206187
TestEchoService service = proxyFactory.getObject();
207188
Future<?> f = service.returnSomething("foo");
@@ -213,9 +194,7 @@ public void futureWithWildcardReturned() throws Exception {
213194
@Test
214195
public void futureVoid() throws Exception {
215196
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
216-
proxyFactory.setDefaultRequestChannel(new NullChannel());
217-
proxyFactory.setBeanName("testGateway");
218-
proxyFactory.setBeanFactory(getBeanFactory());
197+
setupProxyFactory(new NullChannel(), proxyFactory);
219198
proxyFactory.afterPropertiesSet();
220199
TestEchoService service = proxyFactory.getObject();
221200
Future<Void> f = service.asyncSendAndForget("test1");
@@ -253,9 +232,7 @@ public void futureVoidReply() throws Exception {
253232
}
254233
}).start();
255234
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
256-
proxyFactory.setDefaultRequestChannel(requestChannel);
257-
proxyFactory.setBeanName("testGateway");
258-
proxyFactory.setBeanFactory(getBeanFactory());
235+
setupProxyFactory(requestChannel, proxyFactory);
259236
proxyFactory.setAsyncExecutor(null);
260237
proxyFactory.afterPropertiesSet();
261238
TestEchoService service = proxyFactory.getObject();
@@ -270,9 +247,7 @@ public void monoWithMessageReturned() {
270247
QueueChannel requestChannel = new QueueChannel();
271248
startResponder(requestChannel);
272249
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
273-
proxyFactory.setDefaultRequestChannel(requestChannel);
274-
proxyFactory.setBeanFactory(getBeanFactory());
275-
proxyFactory.setBeanName("testGateway");
250+
setupProxyFactory(requestChannel, proxyFactory);
276251
proxyFactory.afterPropertiesSet();
277252
TestEchoService service = proxyFactory.getObject();
278253
Mono<Message<?>> mono = service.returnMessagePromise("foo");
@@ -285,9 +260,7 @@ public void monoWithPayloadReturned() {
285260
QueueChannel requestChannel = new QueueChannel();
286261
startResponder(requestChannel);
287262
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
288-
proxyFactory.setDefaultRequestChannel(requestChannel);
289-
proxyFactory.setBeanFactory(getBeanFactory());
290-
proxyFactory.setBeanName("testGateway");
263+
setupProxyFactory(requestChannel, proxyFactory);
291264
proxyFactory.afterPropertiesSet();
292265
TestEchoService service = proxyFactory.getObject();
293266
Mono<String> mono = service.returnStringPromise("foo");
@@ -300,9 +273,7 @@ public void monoWithWildcardReturned() {
300273
QueueChannel requestChannel = new QueueChannel();
301274
startResponder(requestChannel);
302275
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
303-
proxyFactory.setDefaultRequestChannel(requestChannel);
304-
proxyFactory.setBeanFactory(getBeanFactory());
305-
proxyFactory.setBeanName("testGateway");
276+
setupProxyFactory(requestChannel, proxyFactory);
306277
proxyFactory.afterPropertiesSet();
307278
TestEchoService service = proxyFactory.getObject();
308279
Mono<?> mono = service.returnSomethingPromise("foo");
@@ -316,9 +287,7 @@ public void monoWithConsumer() {
316287
QueueChannel requestChannel = new QueueChannel();
317288
startResponder(requestChannel);
318289
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
319-
proxyFactory.setDefaultRequestChannel(requestChannel);
320-
proxyFactory.setBeanFactory(getBeanFactory());
321-
proxyFactory.setBeanName("testGateway");
290+
setupProxyFactory(requestChannel, proxyFactory);
322291
proxyFactory.afterPropertiesSet();
323292
TestEchoService service = proxyFactory.getObject();
324293
Mono<String> mono = service.returnStringPromise("foo");
@@ -331,9 +300,7 @@ public void monoWithConsumer() {
331300
@Test
332301
public void monoVoid() throws InterruptedException {
333302
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
334-
proxyFactory.setDefaultRequestChannel(new NullChannel());
335-
proxyFactory.setBeanName("testGateway");
336-
proxyFactory.setBeanFactory(getBeanFactory());
303+
setupProxyFactory(new NullChannel(), proxyFactory);
337304
proxyFactory.afterPropertiesSet();
338305
TestEchoService service = proxyFactory.getObject();
339306
Mono<Void> mono = service.monoVoid("test1");
@@ -376,15 +343,6 @@ private static void startResponder(final PollableChannel requestChannel) {
376343
}).start();
377344
}
378345

379-
private BeanFactory getBeanFactory() {
380-
BeanFactory beanFactory = mock(BeanFactory.class);
381-
TaskScheduler taskScheduler = mock(TaskScheduler.class);
382-
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
383-
.thenReturn(taskScheduler);
384-
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
385-
return beanFactory;
386-
}
387-
388346
private interface TestEchoService {
389347

390348
Future<String> returnString(String s);
@@ -415,6 +373,13 @@ private interface TestEchoService {
415373

416374
}
417375

376+
private static void setupProxyFactory(MessageChannel messageChannel, GatewayProxyFactoryBean proxyFactory) {
377+
proxyFactory.setDefaultRequestChannel(messageChannel);
378+
proxyFactory.setBeanName("testGateway");
379+
proxyFactory.setTaskScheduler(new SimpleAsyncTaskScheduler());
380+
proxyFactory.setBeanFactory(mock());
381+
}
382+
418383
private record CustomFuture(String result, Thread thread) implements Future<String> {
419384

420385
@Override

0 commit comments

Comments
 (0)