Skip to content

Commit 1d71100

Browse files
authored
Fix negative acknowledgement handling for ReactiveMessagePipeline when message handler throws an exception (#213)
* Add unit test for negative acknowledgement * Fix negative acknowledgement handling when messageHandler function throws an exception * Add javadoc * Add javadoc comment about ordered processing and negative acknowledgements
1 parent 8627a0f commit 1d71100

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,23 @@
3737
public interface ReactiveMessagePipelineBuilder<T> {
3838

3939
/**
40-
* Sets a handler function that processes messages one-by-one.
40+
* <p>
41+
* Sets a handler function that processes messages one-by-one. When the message
42+
* handler completes successfully, the message will be acknowledged. When the message
43+
* handler emits an error, the error logger will be used to log the error and the
44+
* message will be negatively acknowledged. If the error logger is not set, a default
45+
* error message will be logged at the error level.
46+
* </p>
47+
* <p>
48+
* NOTE: Be aware that negative acknowledgements on ordered subscription types such as
49+
* Exclusive, Failover and Key_Shared typically cause failed messages to be sent to
50+
* consumers out of their original order. Negative acknowledgements for Key_Shared
51+
* subscriptions may also cause message delivery to be blocked on broker versions
52+
* before Pulsar 4.0. To maintain ordered message processing, it is recommended to
53+
* wrap the message handler with Project Reactor's native retry logic using <a href=
54+
* "https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#retryWhen-reactor.util.retry.Retry-">Mono.retryWhen</a>
55+
* to retry processing of each message indefinitely with backoff.
56+
* </p>
4157
* @param messageHandler a function that takes a message as input and returns an empty
4258
* Publisher
4359
* @return a builder for the pipeline handling messages one-by-one
@@ -95,6 +111,7 @@ interface OneByOneMessagePipelineBuilder<T> extends ReactiveMessagePipelineBuild
95111

96112
/**
97113
* Sets a function which will be called when the message handler emits an error.
114+
* If not set, a default message will be logged at the error level.
98115
* @param errorLogger the error logger function
99116
* @return the pipeline builder instance
100117
*/

pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private Flux<MessageResult<Void>> createMessageConsumer(Flux<Message<T>> message
143143
}
144144

145145
private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
146-
return Mono.from(this.messageHandler.apply(message))
146+
return Mono.defer(() -> Mono.from(this.messageHandler.apply(message)))
147147
.transform(this::decorateMessageHandler)
148148
.thenReturn(MessageResult.acknowledge(message.getMessageId()))
149149
.onErrorResume((throwable) -> {

pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,28 @@ void handlingTimeout() throws Exception {
227227
}
228228
}
229229

230+
@Test
231+
void negativeAcknowledgement() throws Exception {
232+
int numMessages = 10;
233+
TestConsumer testConsumer = new TestConsumer(numMessages);
234+
CountDownLatch latch = new CountDownLatch(1);
235+
testConsumer.setFinishedCallback(latch::countDown);
236+
Function<Message<String>, Publisher<Void>> messageHandler = (message) -> {
237+
if (message.getValue().equals("5")) {
238+
throw new RuntimeException("Handling message 5 failed");
239+
}
240+
return Mono.empty();
241+
};
242+
try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) {
243+
pipeline.start();
244+
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
245+
// 9 messages should have been acked
246+
assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
247+
// 1 message should have been nacked
248+
assertThat(testConsumer.getNackedMessages()).hasSize(1);
249+
}
250+
}
251+
230252
@Test
231253
void errorLogger() throws Exception {
232254
int numMessages = 10;

0 commit comments

Comments
 (0)