Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,23 @@
public interface ReactiveMessagePipelineBuilder<T> {

/**
* Sets a handler function that processes messages one-by-one.
* <p>
* Sets a handler function that processes messages one-by-one. When the message
* handler completes successfully, the message will be acknowledged. When the message
* handler emits an error, the error logger will be used to log the error and the
* message will be negatively acknowledged. If the error logger is not set, a default
* error message will be logged at the error level.
* </p>
* <p>
* NOTE: Be aware that negative acknowledgements on ordered subscription types such as
* Exclusive, Failover and Key_Shared typically cause failed messages to be sent to
* consumers out of their original order. Negative acknowledgements for Key_Shared
* subscriptions may also cause message delivery to be blocked on broker versions
* before Pulsar 4.0. To maintain ordered message processing, it is recommended to
* wrap the message handler with Project Reactor's native retry logic using <a href=
* "https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#retryWhen-reactor.util.retry.Retry-">Mono.retryWhen</a>
* to retry processing of each message indefinitely with backoff.
* </p>
* @param messageHandler a function that takes a message as input and returns an empty
* Publisher
* @return a builder for the pipeline handling messages one-by-one
Expand Down Expand Up @@ -95,6 +111,7 @@ interface OneByOneMessagePipelineBuilder<T> extends ReactiveMessagePipelineBuild

/**
* Sets a function which will be called when the message handler emits an error.
* If not set, a default message will be logged at the error level.
* @param errorLogger the error logger function
* @return the pipeline builder instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private Flux<MessageResult<Void>> createMessageConsumer(Flux<Message<T>> message
}

private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
return Mono.from(this.messageHandler.apply(message))
return Mono.defer(() -> Mono.from(this.messageHandler.apply(message)))
.transform(this::decorateMessageHandler)
.thenReturn(MessageResult.acknowledge(message.getMessageId()))
.onErrorResume((throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,28 @@ void handlingTimeout() throws Exception {
}
}

@Test
void negativeAcknowledgement() throws Exception {
int numMessages = 10;
TestConsumer testConsumer = new TestConsumer(numMessages);
CountDownLatch latch = new CountDownLatch(1);
testConsumer.setFinishedCallback(latch::countDown);
Function<Message<String>, Publisher<Void>> messageHandler = (message) -> {
if (message.getValue().equals("5")) {
throw new RuntimeException("Handling message 5 failed");
}
return Mono.empty();
};
try (ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build()) {
pipeline.start();
assertThat(latch.await(1, TimeUnit.SECONDS)).isTrue();
// 9 messages should have been acked
assertThat(testConsumer.getAcknowledgedMessages()).hasSize(9);
// 1 message should have been nacked
assertThat(testConsumer.getNackedMessages()).hasSize(1);
}
}

@Test
void errorLogger() throws Exception {
int numMessages = 10;
Expand Down