Skip to content

Commit 4eef639

Browse files
authored
Publisher#flatMapConcatIterable propagate error even if no onSubscribe (#1671)
Motivation: The ReactiveStreams specification says that onSubscribe(Subscription) must be called prior to any other signals being delivered [1]. However in practice user code may throw in onSubscribe which may stop the propagation. In this case Publisher#flatMapConcatIterable has an assertion which may prevent the error from propagating downstream and result in a control flow deadlock. [1] https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9 Modifications: - Move assertion in Publisher#flatMapConcatIterable to allow for null subscription, add a runtime check before it is used. Result: More robust error propagation for Publisher#flatMapConcatIterable in the precense of Reactive Streams specification violations.
1 parent 06be29f commit 4eef639

File tree

2 files changed

+22
-2
lines changed

2 files changed

+22
-2
lines changed

servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/PublisherConcatMapIterable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ private enum ErrorHandlingStrategyInDrain {
172172
}
173173

174174
private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategyInDrain) {
175-
assert sourceSubscription != null;
176175
boolean hasNext = false;
177176
boolean thrown = false;
178177
boolean terminated = false;
@@ -241,7 +240,9 @@ private void tryDrainIterator(ErrorHandlingStrategyInDrain errorHandlingStrategy
241240
// here before we unlock emitting so visibility to other threads should be taken care of
242241
// by the write to emitting below (and later read).
243242
currentIterator = EmptyIterator.instance();
244-
sourceSubscription.request(1);
243+
if (sourceSubscription != null) {
244+
sourceSubscription.request(1);
245+
}
245246
}
246247
} finally {
247248
// The lock must be released after we interact with the subscription for thread safety

servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/PublisherConcatMapIterableTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,25 @@ class PublisherConcatMapIterableTest {
6868
private final TestPublisherSubscriber<String> subscriber = new TestPublisherSubscriber<>();
6969
private final TestSubscription subscription = new TestSubscription();
7070

71+
@Test
72+
void onSubscribeThrowsPropagatesError() throws Exception {
73+
@SuppressWarnings("unchecked")
74+
Subscriber<String> mockSubscriber = mock(Subscriber.class);
75+
CountDownLatch latchOnError = new CountDownLatch(1);
76+
AtomicReference<Throwable> causeRef = new AtomicReference<>();
77+
doAnswer(a -> {
78+
causeRef.set(a.getArgument(0));
79+
latchOnError.countDown();
80+
return null;
81+
}).when(mockSubscriber).onError(any());
82+
83+
toSource(from(singletonList("foo")).beforeOnSubscribe(subscription1 -> {
84+
throw DELIBERATE_EXCEPTION;
85+
}).flatMapConcatIterable(identity())).subscribe(mockSubscriber);
86+
latchOnError.await();
87+
assertThat(causeRef.get(), is(DELIBERATE_EXCEPTION));
88+
}
89+
7190
@Test
7291
void upstreamRecoverWithMakesProgress() throws Exception {
7392
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)