Skip to content

8299338: AssertionError in ResponseSubscribers$HttpResponseInputStream::onSubscribe #3695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void connectFlows(HttpConnection connection) {
// The Http1ResponseBodySubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
private void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
private boolean registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
Throwable failed = null;
synchronized (lock) {
failed = this.failed;
Expand All @@ -276,13 +276,14 @@ private void registerResponseSubscriber(Http1ResponseBodySubscriber<T> subscribe
}
if (failed != null) {
subscriber.onError(failed);
return false;
} else {
client.registerSubscriber(subscriber);
return client.registerSubscriber(subscriber);
}
}

private void unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
client.unregisterSubscriber(subscriber);
private boolean unregisterResponseSubscriber(Http1ResponseBodySubscriber<T> subscriber) {
return client.unregisterSubscriber(subscriber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,14 @@ private static void closeSubscribers(HttpClientImpl client, Throwable t) {
client.subscribers.forEach(s -> s.onError(t));
}

public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
/**
* Adds the given subscriber to the subscribers list, or call
* its {@linkplain HttpBodySubscriberWrapper#onError onError}
* method if the client is shutting down.
* @param subscriber the subscriber
* @return true if the subscriber was added to the list.
*/
public boolean registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (!selmgr.isClosed()) {
synchronized (selmgr) {
if (!selmgr.isClosed()) {
Expand All @@ -552,20 +559,28 @@ public void registerSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
debug.log("body subscriber registered: " + count);
}
}
return;
return true;
}
}
}
subscriber.onError(selmgr.selectorClosedException());
return false;
}

public void unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
/**
* Remove the given subscriber from the subscribers list.
* @param subscriber the subscriber
* @return true if the subscriber was found and removed from the list.
*/
public boolean unregisterSubscriber(HttpBodySubscriberWrapper<?> subscriber) {
if (subscribers.remove(subscriber)) {
long count = pendingSubscribersCount.decrementAndGet();
if (debug.on()) {
debug.log("body subscriber unregistered: " + count);
}
return true;
}
return false;
}

private void closeConnection(HttpConnection conn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public int available() throws IOException {
@Override
public void onSubscribe(Flow.Subscription s) {
Objects.requireNonNull(s);
if (debug.on()) debug.log("onSubscribed called");
if (debug.on()) debug.log("onSubscribe called");
try {
if (!subscribed.compareAndSet(false, true)) {
if (debug.on()) debug.log("Already subscribed: canceling");
Expand All @@ -554,10 +554,12 @@ public void onSubscribe(Flow.Subscription s) {
closed = this.closed;
if (!closed) {
this.subscription = s;
// should contain at least 2
assert buffers.remainingCapacity() > 1
// should contain at least 2, unless closed or failed.
assert buffers.remainingCapacity() > 1 || failed != null
: "buffers capacity: " + buffers.remainingCapacity()
+ " closed: " + closed + " failed: " + failed;
+ ", closed: " + closed + ", terminated: "
+ buffers.contains(LAST_LIST)
+ ", failed: " + failed;
}
}
if (closed) {
Expand All @@ -573,7 +575,7 @@ public void onSubscribe(Flow.Subscription s) {
} catch (Throwable t) {
failed = t;
if (debug.on())
debug.log("onSubscribed failed", t);
debug.log("onSubscribe failed", t);
try {
close();
} catch (IOException x) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,12 @@ Http2StreamResponseSubscriber<T> createResponseSubscriber(BodyHandler<T> handler
// The Http2StreamResponseSubscriber is registered with the HttpClient
// to ensure that it gets completed if the SelectorManager aborts due
// to unexpected exceptions.
private void registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().registerSubscriber(subscriber);
private boolean registerResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
return client().registerSubscriber(subscriber);
}

private void unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
client().unregisterSubscriber(subscriber);
private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber<?> subscriber) {
return client().unregisterSubscriber(subscriber);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,21 +274,6 @@ protected void onCancel() {
tryUnregister();
}

/**
* Called right before the userSubscriber::onSubscribe is called.
* @apiNote
* This method may be used by subclasses to perform cleanup
* related actions after a subscription has been successfully
* accepted.
* This method is called while holding a subscription
* lock.
* @implSpec
* This method calls {@link #tryRegister()}
*/
protected void onSubscribed() {
tryRegister();
}

/**
* Complete the subscriber, either normally or exceptionally
* ensure that the subscriber is completed only once.
Expand Down Expand Up @@ -381,8 +366,8 @@ public void onSubscribe(Flow.Subscription subscription) {
// subscription is finished before calling onError;
subscriptionLock.lock();
try {
tryRegister();
if (markSubscribed()) {
onSubscribed();
SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription);
userSubscriber.onSubscribe(this.subscription = wrapped);
} else {
Expand Down
2 changes: 1 addition & 1 deletion test/jdk/java/net/httpclient/AsyncExecutorShutdown.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

/*
* @test
* @bug 8277969
* @bug 8277969 8299338
* @summary Test for edge case where the executor is not accepting
* new tasks while the client is still running
* @library /test/lib /test/jdk/java/net/httpclient/lib
Expand Down