Skip to content

Commit 34f92aa

Browse files
committed
Replace order of given/willAnswer for spied objects
The cause of the test hang was that the test was incorrectly setting up the spy on the type message builder impl. In previous Pulsar version of TypedMessageBuilderImpl, the fact that the method sendAsync was being called at mock setup time was not causing an issue. However, in the latest impl it did not like that and was throwing things off. Spied objects should always use the `doReturn|Answer|Throw()` family as described in https://javadoc.io/doc/org .mockito/mockito-core/latest/org/mockito/Mockito.html#important-gotcha-on-spying-real-objects--heading
1 parent fb53ad9 commit 34f92aa

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTests.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.mockito.ArgumentMatchers.eq;
7878
import static org.mockito.ArgumentMatchers.isNull;
7979
import static org.mockito.BDDMockito.given;
80+
import static org.mockito.BDDMockito.willAnswer;
8081
import static org.mockito.Mockito.doReturn;
8182
import static org.mockito.Mockito.mock;
8283
import static org.mockito.Mockito.spy;
@@ -195,11 +196,11 @@ void sendOnePulsarException() throws Exception {
195196
given(producer.newMessage()).willAnswer((__) -> {
196197
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
197198
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
198-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
199+
willAnswer((___) -> {
199200
CompletableFuture<MessageId> failed = new CompletableFuture<>();
200201
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
201202
return failed;
202-
});
203+
}).given(typedMessageBuilder).sendAsync();
203204
return typedMessageBuilder;
204205
});
205206

@@ -231,7 +232,7 @@ void sendManyStopOnError() throws Exception {
231232
given(producer.newMessage()).willAnswer((__) -> {
232233
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
233234
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
234-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
235+
willAnswer((___) -> {
235236
if (entryId.get() == 1) {
236237
CompletableFuture<MessageId> failed = new CompletableFuture<>();
237238
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
@@ -241,7 +242,7 @@ void sendManyStopOnError() throws Exception {
241242
.newMessageId(1, entryId.incrementAndGet(), 1);
242243
messageIds.add(messageId);
243244
return CompletableFuture.completedFuture(messageId);
244-
});
245+
}).given(typedMessageBuilder).sendAsync();
245246
return typedMessageBuilder;
246247
});
247248

@@ -279,7 +280,7 @@ void sendMany() throws Exception {
279280
given(producer.newMessage()).willAnswer((__) -> {
280281
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
281282
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
282-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
283+
willAnswer((___) -> {
283284
if (entryId.get() == 2) {
284285
CompletableFuture<MessageId> failed = new CompletableFuture<>();
285286
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
@@ -289,7 +290,7 @@ void sendMany() throws Exception {
289290
.newMessageId(1, entryId.incrementAndGet(), 1);
290291
messageIds.add(messageId);
291292
return CompletableFuture.completedFuture(messageId);
292-
});
293+
}).given(typedMessageBuilder).sendAsync();
293294
return typedMessageBuilder;
294295
});
295296

@@ -498,7 +499,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
498499
given(producer.newMessage()).willAnswer((__) -> {
499500
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
500501
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
501-
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
502+
willAnswer((___) -> {
502503
CompletableFuture<MessageId> messageSender = new CompletableFuture<>();
503504
finalExecutorService.execute(() -> {
504505
long current = totalRequests.incrementAndGet();
@@ -512,7 +513,7 @@ void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>,
512513
DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1));
513514
}, 100, TimeUnit.MILLISECONDS);
514515
return messageSender;
515-
});
516+
}).given(typedMessageBuilder).sendAsync();
516517
return typedMessageBuilder;
517518
});
518519

0 commit comments

Comments
 (0)