Skip to content

Commit 750cf77

Browse files
artembilancppwfs
authored andcommitted
Fix nullability in the FluxAggregatorMessageHandler
* This PR is not ready for merge. There are failing tests, but wanted to capture where it is at. * Also still having issue with the .apply() in FluxAggregatorMessageHandler * Make `windowSizeFunction` as `Function<Message<?>, @nullable Integer>` because `sequenceSizeHeader()` may return `null` from message headers * Extract local `subscriptionToDispose` in the `stop()` to satisfy null check context * Use `Objects.requireNonNull(signal.get())` to satisfy `Function.apply()` contract. The `if (signal.hasValue()) {` does the trick for us, but currently that is not visible for that `signal.get()` * Remove `@NullUnmarked` since we have just mitigated all the null problems Updated tests so that they work with nullability changes Update the tests so that they will pass with nullify changes
1 parent 564b1c1 commit 750cf77

28 files changed

+484
-127
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandler.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.integration.aggregator;
1818

1919
import java.time.Duration;
20-
import java.util.Objects;
2120
import java.util.concurrent.atomic.AtomicBoolean;
2221
import java.util.function.Function;
2322
import java.util.function.Predicate;
@@ -203,7 +202,7 @@ public void setWindowSize(int windowSize) {
203202
* @see Flux#window(int)
204203
* @see Flux#windowTimeout(int, Duration)
205204
*/
206-
public void setWindowSizeFunction(Function<Message<?>, @Nullable Integer> windowSizeFunction) {
205+
public void setWindowSizeFunction(Function<Message<?>, @Nullable Integer> windowSizeFunction) {
207206
Assert.notNull(windowSizeFunction, "'windowSizeFunction' must not be null");
208207
this.windowSizeFunction = windowSizeFunction;
209208
}
@@ -255,8 +254,9 @@ public void start() {
255254

256255
@Override
257256
public void stop() {
258-
if (this.subscribed.compareAndSet(true, false) && this.subscription != null) {
259-
this.subscription.dispose();
257+
Disposable subscriptionToDispose = this.subscription;
258+
if (this.subscribed.compareAndSet(true, false) && subscriptionToDispose != null) {
259+
subscriptionToDispose.dispose();
260260
}
261261
}
262262

@@ -289,9 +289,7 @@ private Mono<Message<?>> messageForWindowFlux(Flux<Message<?>> messageFlux) {
289289
.build());
290290
}
291291

292-
293-
private static Integer sequenceSizeHeader(Message<?> message) {
294-
return Objects.requireNonNull(message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class));
292+
private static @Nullable Integer sequenceSizeHeader(Message<?> message) {
293+
return message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, Integer.class);
295294
}
296-
297295
}

spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageListProcessor.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import java.util.Collection;
2020

21-
import org.jspecify.annotations.Nullable;
22-
2321
import org.springframework.messaging.Message;
2422

2523
/**

spring-integration-core/src/test/java/org/springframework/integration/aggregator/FluxAggregatorMessageHandlerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2024 the original author or authors.
2+
* Copyright 2019-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,13 +29,19 @@
2929
import reactor.core.publisher.Flux;
3030
import reactor.test.StepVerifier;
3131

32+
import org.springframework.beans.factory.BeanFactory;
3233
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3334
import org.springframework.integration.channel.QueueChannel;
3435
import org.springframework.integration.support.MessageBuilder;
3536
import org.springframework.messaging.Message;
3637
import org.springframework.messaging.support.GenericMessage;
38+
import org.springframework.scheduling.TaskScheduler;
3739

3840
import static org.assertj.core.api.Assertions.assertThat;
41+
import static org.mockito.ArgumentMatchers.any;
42+
import static org.mockito.ArgumentMatchers.eq;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.when;
3945

4046
/**
4147
* @author Artem Bilan
@@ -159,6 +165,7 @@ void testCustomCombineFunction() {
159165
void testWindowTimespan() {
160166
QueueChannel resultChannel = new QueueChannel();
161167
FluxAggregatorMessageHandler fluxAggregatorMessageHandler = new FluxAggregatorMessageHandler();
168+
fluxAggregatorMessageHandler.setBeanFactory(getBeanFactory());
162169
fluxAggregatorMessageHandler.setOutputChannel(resultChannel);
163170
fluxAggregatorMessageHandler.setWindowTimespan(Duration.ofMillis(100));
164171
fluxAggregatorMessageHandler.start();
@@ -213,6 +220,15 @@ void testWindowTimespan() {
213220
executorService.shutdown();
214221
}
215222

223+
private BeanFactory getBeanFactory() {
224+
BeanFactory beanFactory = mock(BeanFactory.class);
225+
TaskScheduler taskScheduler = mock(TaskScheduler.class);
226+
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
227+
.thenReturn(taskScheduler);
228+
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
229+
return beanFactory;
230+
}
231+
216232
@Test
217233
void testBoundaryTrigger() {
218234
QueueChannel resultChannel = new QueueChannel();

spring-integration-core/src/test/java/org/springframework/integration/channel/registry/HeaderChannelRegistryTests.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2024 the original author or authors.
2+
* Copyright 2013-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,6 +46,8 @@
4646

4747
import static org.assertj.core.api.Assertions.assertThat;
4848
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
49+
import static org.mockito.ArgumentMatchers.any;
50+
import static org.mockito.ArgumentMatchers.eq;
4951
import static org.mockito.Mockito.doAnswer;
5052
import static org.mockito.Mockito.mock;
5153
import static org.mockito.Mockito.when;
@@ -238,6 +240,7 @@ public void testBFCRNoRegistry() {
238240
@Test
239241
public void testRemoveOnGet() {
240242
DefaultHeaderChannelRegistry registry = new DefaultHeaderChannelRegistry();
243+
registry.setBeanFactory(getBeanFactory());
241244
MessageChannel channel = new DirectChannel();
242245
String foo = (String) registry.channelToChannelName(channel);
243246
Map<?, ?> map = TestUtils.getPropertyValue(registry, "channels", Map.class);
@@ -249,6 +252,15 @@ public void testRemoveOnGet() {
249252
assertThat(map.size()).isEqualTo(0);
250253
}
251254

255+
private BeanFactory getBeanFactory() {
256+
BeanFactory beanFactory = mock(BeanFactory.class);
257+
TaskScheduler taskScheduler = mock(TaskScheduler.class);
258+
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
259+
.thenReturn(taskScheduler);
260+
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
261+
return beanFactory;
262+
}
263+
252264
public static class Foo extends AbstractReplyProducingMessageHandler {
253265

254266
@Override

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import reactor.core.scheduler.Schedulers;
3737
import reactor.test.StepVerifier;
3838

39+
import org.springframework.beans.factory.BeanFactory;
3940
import org.springframework.beans.factory.annotation.Autowired;
4041
import org.springframework.beans.factory.annotation.Qualifier;
4142
import org.springframework.context.ConfigurableApplicationContext;
@@ -57,10 +58,15 @@
5758
import org.springframework.messaging.Message;
5859
import org.springframework.messaging.MessageChannel;
5960
import org.springframework.messaging.support.GenericMessage;
61+
import org.springframework.scheduling.TaskScheduler;
6062
import org.springframework.test.annotation.DirtiesContext;
6163
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6264

6365
import static org.assertj.core.api.Assertions.assertThat;
66+
import static org.mockito.ArgumentMatchers.any;
67+
import static org.mockito.ArgumentMatchers.eq;
68+
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.when;
6470

6571
/**
6672
* @author Artem Bilan
@@ -130,6 +136,7 @@ void testReactiveFlow() throws Exception {
130136
@Test
131137
void testPollableReactiveFlow() throws Exception {
132138
assertThat(this.reactiveTransformer).isInstanceOf(ReactiveStreamsConsumer.class);
139+
this.reactiveTransformer.setBeanFactory(getBeanFactory());
133140
this.inputChannel.send(new GenericMessage<>("1,2,3,4,5"));
134141

135142
CountDownLatch latch = new CountDownLatch(6);
@@ -165,6 +172,15 @@ void testPollableReactiveFlow() throws Exception {
165172
exec.shutdownNow();
166173
}
167174

175+
private BeanFactory getBeanFactory() {
176+
BeanFactory beanFactory = mock(BeanFactory.class);
177+
TaskScheduler taskScheduler = mock(TaskScheduler.class);
178+
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
179+
.thenReturn(taskScheduler);
180+
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
181+
return beanFactory;
182+
}
183+
168184
@Test
169185
void testFromPublisher() {
170186
Flux<Message<?>> messageFlux =

spring-integration-core/src/test/java/org/springframework/integration/gateway/AsyncGatewayTests.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,11 +42,15 @@
4242
import org.springframework.messaging.support.ChannelInterceptor;
4343
import org.springframework.messaging.support.GenericMessage;
4444
import org.springframework.messaging.support.MessageBuilder;
45+
import org.springframework.scheduling.TaskScheduler;
4546
import org.springframework.util.ReflectionUtils;
4647

4748
import static org.assertj.core.api.Assertions.assertThat;
4849
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
50+
import static org.mockito.ArgumentMatchers.any;
51+
import static org.mockito.ArgumentMatchers.eq;
4952
import static org.mockito.Mockito.mock;
53+
import static org.mockito.Mockito.when;
5054

5155
/**
5256
* @author Mark Fisher
@@ -65,7 +69,7 @@ public void futureWithMessageReturned() throws Exception {
6569
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
6670
proxyFactory.setDefaultRequestChannel(requestChannel);
6771
proxyFactory.setBeanName("testGateway");
68-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
72+
proxyFactory.setBeanFactory(getBeanFactory());
6973
proxyFactory.afterPropertiesSet();
7074
TestEchoService service = proxyFactory.getObject();
7175
Future<Message<?>> f = service.returnMessage("foo");
@@ -88,7 +92,7 @@ protected boolean doSend(Message<?> message, long timeout) {
8892
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
8993
proxyFactory.setDefaultRequestChannel(channel);
9094
proxyFactory.setBeanName("testGateway");
91-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
95+
proxyFactory.setBeanFactory(getBeanFactory());
9296
proxyFactory.afterPropertiesSet();
9397
TestEchoService service = proxyFactory.getObject();
9498
Future<Message<?>> f = service.returnMessage("foo");
@@ -106,7 +110,7 @@ public void listenableFutureWithMessageReturned() throws Exception {
106110
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
107111
proxyFactory.setDefaultRequestChannel(requestChannel);
108112
proxyFactory.setBeanName("testGateway");
109-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
113+
proxyFactory.setBeanFactory(getBeanFactory());
110114
proxyFactory.afterPropertiesSet();
111115
TestEchoService service = proxyFactory.getObject();
112116
CompletableFuture<Message<?>> f = service.returnMessageListenable("foo");
@@ -132,7 +136,7 @@ public void customFutureReturned() {
132136
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
133137
proxyFactory.setDefaultRequestChannel(requestChannel);
134138
proxyFactory.setBeanName("testGateway");
135-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
139+
proxyFactory.setBeanFactory(getBeanFactory());
136140
proxyFactory.afterPropertiesSet();
137141
TestEchoService service = proxyFactory.getObject();
138142
CustomFuture f = service.returnCustomFuture("foo");
@@ -149,7 +153,7 @@ public void nonAsyncFutureReturned() {
149153
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
150154
proxyFactory.setDefaultRequestChannel(requestChannel);
151155
proxyFactory.setBeanName("testGateway");
152-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
156+
proxyFactory.setBeanFactory(getBeanFactory());
153157

154158
proxyFactory.setAsyncExecutor(null); // Not async - user flow returns Future<?>
155159

@@ -181,7 +185,7 @@ public void futureWithPayloadReturned() throws Exception {
181185
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
182186
proxyFactory.setDefaultRequestChannel(requestChannel);
183187
proxyFactory.setBeanName("testGateway");
184-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
188+
proxyFactory.setBeanFactory(getBeanFactory());
185189
proxyFactory.afterPropertiesSet();
186190
TestEchoService service = proxyFactory.getObject();
187191
Future<String> f = service.returnString("foo");
@@ -197,7 +201,7 @@ public void futureWithWildcardReturned() throws Exception {
197201
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
198202
proxyFactory.setDefaultRequestChannel(requestChannel);
199203
proxyFactory.setBeanName("testGateway");
200-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
204+
proxyFactory.setBeanFactory(getBeanFactory());
201205
proxyFactory.afterPropertiesSet();
202206
TestEchoService service = proxyFactory.getObject();
203207
Future<?> f = service.returnSomething("foo");
@@ -211,7 +215,7 @@ public void futureVoid() throws Exception {
211215
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
212216
proxyFactory.setDefaultRequestChannel(new NullChannel());
213217
proxyFactory.setBeanName("testGateway");
214-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
218+
proxyFactory.setBeanFactory(getBeanFactory());
215219
proxyFactory.afterPropertiesSet();
216220
TestEchoService service = proxyFactory.getObject();
217221
Future<Void> f = service.asyncSendAndForget("test1");
@@ -251,7 +255,7 @@ public void futureVoidReply() throws Exception {
251255
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
252256
proxyFactory.setDefaultRequestChannel(requestChannel);
253257
proxyFactory.setBeanName("testGateway");
254-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
258+
proxyFactory.setBeanFactory(getBeanFactory());
255259
proxyFactory.setAsyncExecutor(null);
256260
proxyFactory.afterPropertiesSet();
257261
TestEchoService service = proxyFactory.getObject();
@@ -267,7 +271,7 @@ public void monoWithMessageReturned() {
267271
startResponder(requestChannel);
268272
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
269273
proxyFactory.setDefaultRequestChannel(requestChannel);
270-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
274+
proxyFactory.setBeanFactory(getBeanFactory());
271275
proxyFactory.setBeanName("testGateway");
272276
proxyFactory.afterPropertiesSet();
273277
TestEchoService service = proxyFactory.getObject();
@@ -282,7 +286,7 @@ public void monoWithPayloadReturned() {
282286
startResponder(requestChannel);
283287
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
284288
proxyFactory.setDefaultRequestChannel(requestChannel);
285-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
289+
proxyFactory.setBeanFactory(getBeanFactory());
286290
proxyFactory.setBeanName("testGateway");
287291
proxyFactory.afterPropertiesSet();
288292
TestEchoService service = proxyFactory.getObject();
@@ -297,7 +301,7 @@ public void monoWithWildcardReturned() {
297301
startResponder(requestChannel);
298302
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
299303
proxyFactory.setDefaultRequestChannel(requestChannel);
300-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
304+
proxyFactory.setBeanFactory(getBeanFactory());
301305
proxyFactory.setBeanName("testGateway");
302306
proxyFactory.afterPropertiesSet();
303307
TestEchoService service = proxyFactory.getObject();
@@ -313,7 +317,7 @@ public void monoWithConsumer() {
313317
startResponder(requestChannel);
314318
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
315319
proxyFactory.setDefaultRequestChannel(requestChannel);
316-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
320+
proxyFactory.setBeanFactory(getBeanFactory());
317321
proxyFactory.setBeanName("testGateway");
318322
proxyFactory.afterPropertiesSet();
319323
TestEchoService service = proxyFactory.getObject();
@@ -329,7 +333,7 @@ public void monoVoid() throws InterruptedException {
329333
GatewayProxyFactoryBean<TestEchoService> proxyFactory = new GatewayProxyFactoryBean<>(TestEchoService.class);
330334
proxyFactory.setDefaultRequestChannel(new NullChannel());
331335
proxyFactory.setBeanName("testGateway");
332-
proxyFactory.setBeanFactory(mock(BeanFactory.class));
336+
proxyFactory.setBeanFactory(getBeanFactory());
333337
proxyFactory.afterPropertiesSet();
334338
TestEchoService service = proxyFactory.getObject();
335339
Mono<Void> mono = service.monoVoid("test1");
@@ -372,6 +376,15 @@ private static void startResponder(final PollableChannel requestChannel) {
372376
}).start();
373377
}
374378

379+
private BeanFactory getBeanFactory() {
380+
BeanFactory beanFactory = mock(BeanFactory.class);
381+
TaskScheduler taskScheduler = mock(TaskScheduler.class);
382+
when(beanFactory.getBean(eq("taskScheduler"), any(Class.class)))
383+
.thenReturn(taskScheduler);
384+
when(beanFactory.containsBean("taskScheduler")).thenReturn(true);
385+
return beanFactory;
386+
}
387+
375388
private interface TestEchoService {
376389

377390
Future<String> returnString(String s);

spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayInterfaceTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -89,6 +89,7 @@
8989
import org.springframework.messaging.handler.annotation.Payload;
9090
import org.springframework.messaging.support.ChannelInterceptor;
9191
import org.springframework.messaging.support.MessageHeaderAccessor;
92+
import org.springframework.scheduling.TaskScheduler;
9293
import org.springframework.stereotype.Component;
9394
import org.springframework.test.annotation.DirtiesContext;
9495
import org.springframework.test.context.ActiveProfiles;
@@ -340,6 +341,7 @@ public void testWithServiceEquals() {
340341
bf.registerSingleton("requestChannelBar", channel);
341342
bf.registerSingleton("requestChannelBaz", channel);
342343
bf.registerSingleton("requestChannelFoo", channel);
344+
bf.registerSingleton("taskScheduler", mock(TaskScheduler.class));
343345
fb.setBeanFactory(bf);
344346
fb.afterPropertiesSet();
345347
assertThat(fb.getObject()).isNotSameAs(bar);

0 commit comments

Comments
 (0)