Skip to content

Conversation

@vroyer
Copy link
Contributor

@vroyer vroyer commented May 5, 2025

Here is a proposal to provide a ReactiveMessagePipeline implementation allowing to negatively ack messages and provide a RedliveryBackoff to the ReactiveMessageSender. Thus, you can really benefit from Pulsar redelivery support.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution and putting effort in addressing the issue with negative acknowledgements, @vroyer.

Would it be possible to split this PR into the part that configures negativeAckRedeliveryBackoff and ackTimeoutRedeliveryBackoff? That could be handled separately before the other parts.

Another thought is that negative acknowledgement is already supported, but it's only when an error (Mono.error) is returned from the publisher that handles messages:

private Mono<MessageResult<Void>> handleMessage(Message<T> message) {
return Mono.from(this.messageHandler.apply(message))
.transform(this::decorateMessageHandler)
.thenReturn(MessageResult.acknowledge(message.getMessageId()))
.onErrorResume((throwable) -> {
if (this.errorLogger != null) {
try {
this.errorLogger.accept(message, throwable);
}
catch (Exception ex) {
LOG.error("Error in calling error logger", ex);
}
}
else {
LOG.error("Message handling for message id {} failed.", message.getMessageId(), throwable);
}
// TODO: nack doesn't work for batch messages due to Pulsar bugs
return Mono.just(MessageResult.negativeAcknowledge(message.getMessageId()));
});
}

It looks like there are some gaps in the existing implementation's error handling. Just by looking at code , it seems that
Mono.from(this.messageHandler.apply(message)) should be wrapped with Mono.defer. Perhaps that's the reason why negative acknowledgement hasn't been working for your use case? Do you have a failing test case just for your use case? That type of test would also be a great addition to pulsar-client-reactive test cases.

Assuming that negative acknowledgements should work and are supported with the current interfaces, it would be better to focus on fixing that first before adding messageHandlerWithResult which shouldn't be needed.

@vroyer
Copy link
Contributor Author

vroyer commented May 5, 2025

Hi Lari,

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message. To avoid a breaking change with the current auto-acknowledged messageHandler, the best solution seems to introduce another "messageHandler2" returning a MessageResult where you can ack or nack messages depending on your use case ?

Regards,
Vincent.

@lhotari
Copy link
Member

lhotari commented May 6, 2025

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

That's a bug in the current implementation.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message.

Yes, that's a bug and it can be fixed without interface changes. The intention has been that when the Publisher signals an error, the message would be nacked. Do you see any problem in your use case with that?

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

@vroyer
Copy link
Contributor Author

vroyer commented May 7, 2025

With the current implementation, the only way to nack for some business reason is to throw a RuntimeException and the retry is managed by reactor (see pipelineRetrySpec). In this case, the pulsar DeliveryBackoff is ineffective and this is the reason why I went further with the second commit.

That's a bug in the current implementation.

The pipeline retry should occurs when the handler unexpectedly fails, not when you need to nack a message for some business logic and rely on the pulsar redelivery to reprocess the message.

Yes, that's a bug and it can be fixed without interface changes. The intention has been that when the Publisher signals an error, the message would be nacked. Do you see any problem in your use case with that?

I just wonder why you don't have the same approche for both the consumer and the pipeline. The consumer returns a MessageResult to ack/nack, while the pipeline signal an error. In the last case, you need to carefully manage Publisher errors and PulsarClientException (due to connection issues) to not nack because of an ack issue.

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

I can't answer to this assumption. According to my tests with the proposed PR, the pipeline retry properly manages pulsar reconnection.

@lhotari
Copy link
Member

lhotari commented May 7, 2025

I just wonder why you don't have the same approche for both the consumer and the pipeline. The consumer returns a MessageResult to ack/nack, while the pipeline signal an error. In the last case, you need to carefully manage Publisher errors and PulsarClientException (due to connection issues) to not nack because of an ack issue.

The original purpose of the messageHandler is to use acknowledge the message when the subscription to the publisher completes successfully and to nack when it errors. We should fix that bug first before adding another interface.
The reason why the interface is designed like this is that the user doesn't misuse the API and return acknowledgements for messages that were received earlier. For pipelining, it's necessary to rely on the framework provided options or use the streamingMessageHandler.
I don't currently see why there would be a need to complicate the interface since a publisher can properly signal the acknowledgement and negative acknowledgement of a message. Obviously the bugs in the current implementation should be fixed so that this would work properly.

One of the bugs in the existing framework code for handling the message handler is the lack of using Mono.defer. The result of this is that errors bubble up to the "pipeline" and that's why the pipeline retry kicks in. At least this is my assumption of the bug without validating it. We don't currently have a test case to ensure the correct behavior and the documentation of the expected behavior is also missing.

I can't answer to this assumption. According to my tests with the proposed PR, the pipeline retry properly manages pulsar reconnection.

We'll find out with test cases.

@vroyer
Copy link
Contributor Author

vroyer commented May 7, 2025

Ok, you can just pick up the first commit for the DeliveryBackoff and fix the pipeline to nack without making a retry unless there is a PulsarClientException. That will be also fine.

@lhotari
Copy link
Member

lhotari commented May 19, 2025

Ok, you can just pick up the first commit for the DeliveryBackoff and fix the pipeline to nack without making a retry unless there is a PulsarClientException. That will be also fine.

@vroyer There's a fix for the negative acknowledgement bug in #213. I'll close this PR. Please reopen a separate PR for the RedeliveryBackoff changes.

@lhotari lhotari closed this May 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants