diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java index 371b2d4e..0b70258b 100644 --- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java +++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineBuilder.java @@ -37,7 +37,23 @@ public interface ReactiveMessagePipelineBuilder { /** - * Sets a handler function that processes messages one-by-one. + *

+ * Sets a handler function that processes messages one-by-one. When the message + * handler completes successfully, the message will be acknowledged. When the message + * handler emits an error, the error logger will be used to log the error and the + * message will be negatively acknowledged. If the error logger is not set, a default + * error message will be logged at the error level. + *

+ *

+ * NOTE: Be aware that negative acknowledgements on ordered subscription types such as + * Exclusive, Failover and Key_Shared typically cause failed messages to be sent to + * consumers out of their original order. Negative acknowledgements for Key_Shared + * subscriptions may also cause message delivery to be blocked on broker versions + * before Pulsar 4.0. To maintain ordered message processing, it is recommended to + * wrap the message handler with Project Reactor's native retry logic using Mono.retryWhen + * to retry processing of each message indefinitely with backoff. + *

* @param messageHandler a function that takes a message as input and returns an empty * Publisher * @return a builder for the pipeline handling messages one-by-one @@ -95,6 +111,7 @@ interface OneByOneMessagePipelineBuilder extends ReactiveMessagePipelineBuild /** * Sets a function which will be called when the message handler emits an error. + * If not set, a default message will be logged at the error level. * @param errorLogger the error logger function * @return the pipeline builder instance */ diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java index 7dc1150d..cd94f19b 100644 --- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java +++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java @@ -143,7 +143,7 @@ private Flux> createMessageConsumer(Flux> message } private Mono> handleMessage(Message message) { - return Mono.from(this.messageHandler.apply(message)) + return Mono.defer(() -> Mono.from(this.messageHandler.apply(message))) .transform(this::decorateMessageHandler) .thenReturn(MessageResult.acknowledge(message.getMessageId())) .onErrorResume((throwable) -> { diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java index 3e0ddb8d..34154819 100644 --- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java +++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java @@ -227,6 +227,28 @@ void handlingTimeout() throws Exception { } } + @Test + void negativeAcknowledgement() throws Exception { + int numMessages = 10; + TestConsumer testConsumer = new TestConsumer(numMessages); + CountDownLatch latch = new CountDownLatch(1); + testConsumer.setFinishedCallback(latch::countDown); + Function, Publisher> messageHandler = (message) -> { + if (message.getValue().equals("5")) { + throw new RuntimeException("Handling message 5 failed"); + } + return Mono.empty(); + }; + try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) { + pipeline.start(); + assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue(); + // 9 messages should have been acked + assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9); + // 1 message should have been nacked + assertThat(testConsumer.getNackedMessages()).hasSize(1); + } + } + @Test void errorLogger() throws Exception { int numMessages = 10;