Skip to content

Commit 10a69b7

Browse files
authored
Propagate exception to inputstream subscriber (#6569)
Within InputStreamResponseTransformer, if exceptionOccurred is invoked after we already have an InputStream subscriber, propagate the exception to that subscriber.
1 parent a37610b commit 10a69b7

File tree

3 files changed

+129
-1
lines changed

3 files changed

+129
-1
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix an issue with `AsyncResponseTransformer.toBlockingInputStream()` where `read()` operations on the stream can hang if there if the transformer encounters an error while the stream is being read. Fixes [#5755](https://github.com/aws/aws-sdk-java-v2/issues/5755)."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamResponseTransformer.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import java.nio.ByteBuffer;
1919
import java.util.concurrent.CompletableFuture;
20+
import org.reactivestreams.Subscriber;
21+
import org.reactivestreams.Subscription;
2022
import software.amazon.awssdk.annotations.SdkInternalApi;
2123
import software.amazon.awssdk.core.ResponseInputStream;
2224
import software.amazon.awssdk.core.SdkResponse;
@@ -35,6 +37,7 @@ public class InputStreamResponseTransformer<ResponseT extends SdkResponse>
3537

3638
private volatile CompletableFuture<ResponseInputStream<ResponseT>> future;
3739
private volatile ResponseT response;
40+
private volatile WaitForSubscribeOnErrorWrapper subscriber;
3841

3942
@Override
4043
public CompletableFuture<ResponseInputStream<ResponseT>> prepare() {
@@ -51,17 +54,74 @@ public void onResponse(ResponseT response) {
5154
@Override
5255
public void onStream(SdkPublisher<ByteBuffer> publisher) {
5356
AbortableInputStreamSubscriber inputStreamSubscriber = AbortableInputStreamSubscriber.builder().build();
54-
publisher.subscribe(inputStreamSubscriber);
57+
WaitForSubscribeOnErrorWrapper waitForSubscribeSubscriber = new WaitForSubscribeOnErrorWrapper(inputStreamSubscriber);
58+
59+
this.subscriber = waitForSubscribeSubscriber;
60+
61+
publisher.subscribe(waitForSubscribeSubscriber);
5562
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
5663
}
5764

5865
@Override
5966
public void exceptionOccurred(Throwable error) {
6067
future.completeExceptionally(error);
68+
if (subscriber != null) {
69+
this.subscriber.onError(error);
70+
}
6171
}
6272

6373
@Override
6474
public String name() {
6575
return TransformerType.STREAM.getName();
6676
}
77+
78+
// Simple wrapper subscriber that ensures we don't forward the `onError` to the delegate until onSubscribe is called, to be
79+
// compliant with the reactive streams spec. We use onError for forwarding the exception given to exceptionOccurred.
80+
private static final class WaitForSubscribeOnErrorWrapper implements Subscriber<ByteBuffer> {
81+
private final Object lock = new Object();
82+
private final AbortableInputStreamSubscriber delegate;
83+
84+
private boolean subscribed = false;
85+
private Throwable transformerException;
86+
87+
88+
private WaitForSubscribeOnErrorWrapper(AbortableInputStreamSubscriber delegate) {
89+
this.delegate = delegate;
90+
}
91+
92+
@Override
93+
public void onSubscribe(Subscription s) {
94+
synchronized (lock) {
95+
subscribed = true;
96+
delegate.onSubscribe(s);
97+
98+
if (transformerException != null) {
99+
delegate.onError(transformerException);
100+
transformerException = null;
101+
}
102+
}
103+
}
104+
105+
@Override
106+
public void onNext(ByteBuffer byteBuffer) {
107+
this.delegate.onNext(byteBuffer);
108+
}
109+
110+
@Override
111+
public void onError(Throwable t) {
112+
synchronized (lock) {
113+
if (subscribed) {
114+
delegate.onError(t);
115+
} else {
116+
// We're not subscribed yet, save the exception for until we are.
117+
transformerException = t;
118+
}
119+
}
120+
}
121+
122+
@Override
123+
public void onComplete() {
124+
this.delegate.onComplete();
125+
}
126+
}
67127
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamResponseTransformerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,20 @@
1616
package software.amazon.awssdk.core.internal.async;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatNoException;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1921

2022
import java.io.IOException;
2123
import java.io.InputStream;
2224
import java.nio.ByteBuffer;
2325
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Phaser;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2430
import org.junit.jupiter.api.BeforeEach;
2531
import org.junit.jupiter.api.Test;
32+
import org.reactivestreams.Subscription;
2633
import software.amazon.awssdk.core.ResponseInputStream;
2734
import software.amazon.awssdk.core.SdkResponse;
2835
import software.amazon.awssdk.core.async.SdkPublisher;
@@ -80,4 +87,59 @@ public void inputStreamArrayReadsAreFromPublisher() throws IOException {
8087
assertThat(data[2]).isEqualTo((byte) 2);
8188
assertThat(stream.read(data)).isEqualTo(-1);
8289
}
90+
91+
@Test
92+
void exceptionOccurred_onStreamCalled_exceptionPropagatedToRead() {
93+
ResponseInputStream<SdkResponse> inputStream = resultFuture.join();
94+
transformer.exceptionOccurred(new IOException("I/O issue"));
95+
assertThatThrownBy(inputStream::read).isInstanceOf(IOException.class);
96+
}
97+
98+
@Test
99+
void exceptionOccurred_onStreamNotCalled_doesNotFail() {
100+
InputStreamResponseTransformer<SdkResponse> responseTransformer = new InputStreamResponseTransformer<>();
101+
responseTransformer.prepare();
102+
responseTransformer.onResponse(response);
103+
assertThatNoException().isThrownBy(() -> responseTransformer.exceptionOccurred(new IOException("I/O issue")));
104+
}
105+
106+
@Test
107+
public void exceptionOccurred_onStreamCalled_onSubscribeNotCalled_doesNotThrow() throws Exception {
108+
ExecutorService exec = Executors.newSingleThreadExecutor();
109+
110+
for (int i = 0; i < 1024; ++i) {
111+
InputStreamResponseTransformer<SdkResponse> responseTransformer = new InputStreamResponseTransformer<>();
112+
CompletableFuture<ResponseInputStream<SdkResponse>> cf = responseTransformer.prepare();
113+
responseTransformer.onResponse(response);
114+
115+
AtomicBoolean onSubscribeCalled = new AtomicBoolean();
116+
117+
Phaser phaser = new Phaser(2);
118+
119+
responseTransformer.exceptionOccurred(new IOException("I/O issue"));
120+
phaser.arrive();
121+
122+
exec.submit(() -> {
123+
phaser.arriveAndAwaitAdvance();
124+
125+
responseTransformer.onStream(SdkPublisher.adapt(s -> {
126+
// the exception should not be forwarded to the inputstream subscriber if onSubscribe is not yet called
127+
128+
s.onSubscribe(new Subscription() {
129+
@Override
130+
public void request(long n) {
131+
}
132+
133+
@Override
134+
public void cancel() {
135+
}
136+
});
137+
onSubscribeCalled.set(true);
138+
}));
139+
}).get();
140+
141+
assertThatThrownBy(() -> cf.join().read()).hasMessageContaining("I/O issue");
142+
assertThat(onSubscribeCalled.get()).isTrue();
143+
}
144+
}
83145
}

0 commit comments

Comments
 (0)