Skip to content

Commit 9b7b0d9

Browse files
committed
FluxMessageChannel: try.catch not onErrorContinue
It turns out that some upstream fluxes may come with the `onErrorResume()` logic. The `onErrorContinue()` here downstream in the `FluxMessageChannel.java` eliminates an `onErrorResume()` purpose. * Change the logic to `try..catch` in the `doOnNext()` instead and let that upstream `onErrorResume()` to do its job
1 parent a62a572 commit 9b7b0d9

File tree

1 file changed

+8
-2
lines changed

1 file changed

+8
-2
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,14 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
7878
Flux.from(publisher)
7979
.delaySubscription(this.subscribedSignal.filter(Boolean::booleanValue).next())
8080
.publishOn(Schedulers.boundedElastic())
81-
.doOnNext(this::send)
82-
.onErrorContinue((ex, message) -> logger.warn("Error during processing event: " + message, ex))
81+
.doOnNext((message) -> {
82+
try {
83+
send(message);
84+
}
85+
catch (Exception ex) {
86+
logger.warn("Error during processing event: " + message, ex);
87+
}
88+
})
8389
.subscribe());
8490
}
8591

0 commit comments

Comments
 (0)