Skip to content

Commit 2bcbb83

Browse files
authored
Ignore exceptions when closing consumers, producers and readers (#218)
1 parent 6170ebe commit 2bcbb83

File tree

6 files changed

+106
-8
lines changed

6 files changed

+106
-8
lines changed

pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,17 @@ private Mono<Consumer<T>> createConsumerMono() {
5454
}
5555

5656
private Mono<Void> closeConsumer(Consumer<?> consumer) {
57-
return Mono.deferContextual((contextView) -> Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
58-
this.LOG.info("Consumer closed {}", consumer);
59-
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
60-
.ifPresent((listener) -> listener.onConsumerClosed(consumer));
61-
}));
62-
57+
return Mono
58+
.deferContextual((contextView) -> AdapterImplementationFactory.adaptPulsarFuture(consumer::closeAsync)
59+
.onErrorResume((t) -> {
60+
this.LOG.debug("Error closing consumer {}", consumer, t);
61+
return Mono.empty();
62+
})
63+
.doFinally((signalType) -> {
64+
this.LOG.info("Consumer closed {}", consumer);
65+
contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
66+
.ifPresent((listener) -> listener.onConsumerClosed(consumer));
67+
}));
6368
}
6469

6570
<R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>> usingConsumerAction) {

pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveProducerAdapter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.pulsar.client.api.PulsarClient;
2929
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
3030
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133
import reactor.core.Disposable;
3234
import reactor.core.publisher.Flux;
3335
import reactor.core.publisher.Mono;
@@ -36,6 +38,8 @@
3638

3739
class ReactiveProducerAdapter<T> {
3840

41+
private static final Logger log = LoggerFactory.getLogger(ReactiveProducerAdapter.class);
42+
3943
private final ProducerCache producerCache;
4044

4145
private final Function<PulsarClient, ProducerBuilder<T>> producerBuilderFactory;
@@ -73,7 +77,10 @@ private Mono<Tuple2<ProducerCacheKey, Mono<Producer<T>>>> createCachedProducerKe
7377
}
7478

7579
private Mono<Void> closeProducer(Producer<?> producer) {
76-
return AdapterImplementationFactory.adaptPulsarFuture(producer::closeAsync);
80+
return AdapterImplementationFactory.adaptPulsarFuture(producer::closeAsync).onErrorResume((t) -> {
81+
log.debug("Error closing producer {}", producer, t);
82+
return Mono.empty();
83+
});
7784
}
7885

7986
<R> Mono<R> usingProducer(BiFunction<Producer<T>, PublisherTransformer, Mono<R>> usingProducerAction) {

pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveReaderAdapter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525
import org.apache.pulsar.client.api.PulsarClient;
2626
import org.apache.pulsar.client.api.Reader;
2727
import org.apache.pulsar.client.api.ReaderBuilder;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2830
import reactor.core.publisher.Flux;
2931
import reactor.core.publisher.Mono;
3032

3133
class ReactiveReaderAdapter<T> {
3234

35+
private static final Logger log = LoggerFactory.getLogger(ReactiveReaderAdapter.class);
36+
3337
private final Supplier<PulsarClient> pulsarClientSupplier;
3438

3539
private final Function<PulsarClient, ReaderBuilder<T>> readerBuilderFactory;
@@ -46,7 +50,10 @@ private Mono<Reader<T>> createReaderMono() {
4650
}
4751

4852
private Mono<Void> closeReader(Reader<?> reader) {
49-
return AdapterImplementationFactory.adaptPulsarFuture(reader::closeAsync);
53+
return AdapterImplementationFactory.adaptPulsarFuture(reader::closeAsync).onErrorResume((t) -> {
54+
log.debug("Error closing reader {}", reader, t);
55+
return Mono.empty();
56+
});
5057
}
5158

5259
<R> Mono<R> usingReader(Function<Reader<T>, Mono<R>> usingReaderAction) {

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.pulsar.client.api.MessageId;
4141
import org.apache.pulsar.client.api.PulsarClient;
4242
import org.apache.pulsar.client.api.PulsarClientException;
43+
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
4344
import org.apache.pulsar.client.api.RegexSubscriptionMode;
4445
import org.apache.pulsar.client.api.Schema;
4546
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -406,4 +407,26 @@ void consumePulsarException() throws Exception {
406407
.verifyError(PulsarClientException.InvalidMessageException.class);
407408
}
408409

410+
@Test
411+
void closeConsumerExceptionIsIgnored() throws Exception {
412+
PulsarClientImpl pulsarClient = spy(
413+
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
414+
415+
Consumer<String> consumer = mock(Consumer.class);
416+
doReturn(CompletableFuture.failedFuture(new AlreadyClosedException("Already closed"))).when(consumer)
417+
.closeAsync();
418+
419+
doReturn(CompletableFuture.completedFuture(consumer)).when(pulsarClient)
420+
.subscribeAsync(any(ConsumerConfigurationData.class), eq(Schema.STRING), isNull());
421+
422+
ReactiveMessageConsumer<String> reactiveConsumer = AdaptedReactivePulsarClientFactory.create(pulsarClient)
423+
.messageConsumer(Schema.STRING)
424+
.topic("my-topic")
425+
.subscriptionName("my-sub")
426+
.build();
427+
428+
StepVerifier.create(reactiveConsumer.consumeNothing()).expectComplete().verify();
429+
verify(consumer).closeAsync();
430+
}
431+
409432
}

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.pulsar.client.api.MessageId;
3131
import org.apache.pulsar.client.api.PulsarClient;
3232
import org.apache.pulsar.client.api.PulsarClientException;
33+
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
3334
import org.apache.pulsar.client.api.Range;
3435
import org.apache.pulsar.client.api.Reader;
3536
import org.apache.pulsar.client.api.Schema;
@@ -168,6 +169,29 @@ void readPulsarException() throws Exception {
168169
StepVerifier.create(reactiveReader.readMany()).verifyError(PulsarClientException.InvalidMessageException.class);
169170
}
170171

172+
@Test
173+
void closeReaderExceptionIsIgnored() throws Exception {
174+
PulsarClientImpl pulsarClient = spy(
175+
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
176+
177+
Reader<String> reader = mock(Reader.class);
178+
doReturn(CompletableFuture.failedFuture(new AlreadyClosedException("Already closed"))).when(reader)
179+
.closeAsync();
180+
doReturn(CompletableFuture.completedFuture(false)).when(reader).hasMessageAvailableAsync();
181+
182+
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient)
183+
.createReaderAsync(any(), eq(Schema.STRING));
184+
185+
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
186+
.messageReader(Schema.STRING)
187+
.endOfStreamAction(EndOfStreamAction.COMPLETE)
188+
.topic("my-topic")
189+
.build();
190+
191+
StepVerifier.create(reactiveReader.readOne()).expectComplete().verify();
192+
verify(reader).closeAsync();
193+
}
194+
171195
@Test
172196
void endOfStreamPoll() throws Exception {
173197
PulsarClientImpl pulsarClient = spy(

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.pulsar.client.api.ProducerAccessMode;
4848
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
4949
import org.apache.pulsar.client.api.PulsarClient;
50+
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
5051
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
5152
import org.apache.pulsar.client.api.Schema;
5253
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -547,4 +548,35 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
547548
}
548549
}
549550

551+
@Test
552+
void closeProducerExceptionIsIgnored() throws Exception {
553+
PulsarClientImpl pulsarClient = spy(
554+
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
555+
556+
ProducerBase<String> producer = mock(ProducerBase.class);
557+
doReturn(CompletableFuture.failedFuture(new AlreadyClosedException("Already closed"))).when(producer)
558+
.closeAsync();
559+
560+
MessageId messageId = DefaultImplementation.getDefaultImplementation().newMessageId(1, 1, 1);
561+
562+
given(producer.newMessage()).willAnswer((__) -> {
563+
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
564+
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
565+
willAnswer((___) -> CompletableFuture.completedFuture(messageId)).given(typedMessageBuilder).sendAsync();
566+
return typedMessageBuilder;
567+
});
568+
569+
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient)
570+
.createProducerAsync(any(), eq(Schema.STRING), isNull());
571+
572+
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
573+
.messageSender(Schema.STRING)
574+
.topic("my-topic")
575+
.build();
576+
577+
StepVerifier.create(reactiveSender.sendOne(MessageSpec.of("test1"))).expectNext(messageId).verifyComplete();
578+
579+
verify(producer).closeAsync();
580+
}
581+
550582
}

0 commit comments

Comments
 (0)