From a72d8c218747389d188e0e768753eb8f1437b687 Mon Sep 17 00:00:00 2001 From: duke Date: Mon, 30 Jun 2025 14:35:06 +0000 Subject: [PATCH 1/2] Backport edf238b65e441a1d626f3a4ba06170badd05ca7c --- .../jdk/internal/net/http/Http1Exchange.java | 18 +- .../classes/jdk/internal/net/http/Stream.java | 13 +- .../common/HttpBodySubscriberWrapper.java | 225 ++++++++++++++++-- .../net/httpclient/CancelRequestTest.java | 2 +- 4 files changed, 213 insertions(+), 45 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java index 02461893d00..2daba87fb3f 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2015, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2015, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -209,24 +209,12 @@ static final class Http1ResponseBodySubscriber extends HttpBodySubscriberWrap } @Override - protected void onSubscribed() { + protected void register() { exchange.registerResponseSubscriber(this); } @Override - protected void complete(Throwable t) { - try { - exchange.unregisterResponseSubscriber(this); - } finally { - super.complete(t); - } - } - - @Override - protected void onCancel() { - // If the subscription is cancelled the - // subscriber may or may not get completed. - // Therefore we need to unregister it + protected void unregister() { exchange.unregisterResponseSubscriber(this); } } diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index a7dd61d7c9d..bd3c4cdc049 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -1692,21 +1692,12 @@ final class Http2StreamResponseSubscriber extends HttpBodySubscriberWrapper userSubscriber; - final AtomicBoolean completed = new AtomicBoolean(); - final AtomicBoolean subscribed = new AtomicBoolean(); + private volatile int state; final ReentrantLock subscriptionLock = new ReentrantLock(); volatile SubscriptionWrapper subscription; volatile Throwable withError; @@ -83,14 +88,55 @@ public void request(long n) { @Override public void cancel() { try { - subscription.cancel(); - onCancel(); + try { + subscription.cancel(); + } finally { + if (markCancelled()) { + onCancel(); + } + } } catch (Throwable t) { onError(t); } } } + private final boolean markState(final int flag) { + int state = this.state; + if ((state & flag) == flag) { + return false; + } + synchronized (this) { + state = this.state; + if ((state & flag) == flag) { + return false; + } + state = this.state = (state | flag); + } + assert (state & flag) == flag; + return true; + } + + private boolean markSubscribed() { + return markState(SUBSCRIBED); + } + + private boolean markCancelled() { + return markState(CANCELLED); + } + + private boolean markCompleted() { + return markState(COMPLETED); + } + + private boolean markRegistered() { + return markState(REGISTERED); + } + + private boolean markUnregistered() { + return markState(UNREGISTERED); + } + final long id() { return id; } @Override @@ -101,8 +147,9 @@ public boolean needsExecutor() { // propagate the error to the user subscriber, even if not // subscribed yet. private void propagateError(Throwable t) { + var state = this.state; assert t != null; - assert completed.get(); + assert (state & COMPLETED) != 0; try { // if unsubscribed at this point, it will not // get subscribed later - so do it now and @@ -111,7 +158,7 @@ private void propagateError(Throwable t) { // subscription is finished before calling onError; subscriptionLock.lock(); try { - if (subscribed.compareAndSet(false, true)) { + if (markSubscribed()) { userSubscriber.onSubscribe(NOP); } } finally { @@ -125,34 +172,139 @@ private void propagateError(Throwable t) { } } + /** + * This method attempts to mark the state of this + * object as registered, and then call the + * {@link #register()} method. + *

+ * The state will be marked as registered, and the + * {@code register()} method will be called only + * if not already registered or unregistered, + * or cancelled, or completed. + * + * @return {@code true} if {@link #register()} was called, + * false otherwise. + */ + protected final boolean tryRegister() { + subscriptionLock.lock(); + try { + int state = this.state; + if ((state & (REGISTERED | UNREGISTERED | CANCELLED | COMPLETED)) != 0) return false; + if (markRegistered()) { + register(); + return true; + } + } finally { + subscriptionLock.unlock(); + } + return false; + } + + /** + * This method attempts to mark the state of this + * object as unregistered, and then call the + * {@link #unregister()} method. + *

+ * The {@code unregister()} method will be called only + * if already registered and not yet unregistered. + * Whether {@code unregister()} is called or not, + * the state is marked as unregistered, to prevent + * {@link #tryRegister()} from calling {@link #register()} + * after {@link #tryUnregister()} has been called. + * + * @return {@code true} if {@link #unregister()} was called, + * false otherwise. + */ + protected final boolean tryUnregister() { + subscriptionLock.lock(); + try { + int state = this.state; + if ((state & REGISTERED) == 0) { + markUnregistered(); + return false; + } + if (markUnregistered()) { + unregister(); + return true; + } + } finally { + subscriptionLock.unlock(); + } + return false; + } + + /** + * This method can be implemented by subclasses + * to perform registration actions. It will not be + * called if already registered or unregistered. + * @apiNote + * This method is called while holding a subscription + * lock. + * @see #tryRegister() + */ + protected void register() { + assert subscriptionLock.isHeldByCurrentThread(); + } + + /** + * This method can be implemented by subclasses + * to perform unregistration actions. It will not be + * called if not already registered, or already unregistered. + * @apiNote + * This method is called while holding a subscription + * lock. + * @see #tryUnregister() + */ + protected void unregister() { + assert subscriptionLock.isHeldByCurrentThread(); + } + /** * Called when the subscriber cancels its subscription. * @apiNote * This method may be used by subclasses to perform cleanup * actions after a subscription has been cancelled. + * @implSpec + * This method calls {@link #tryUnregister()} */ - protected void onCancel() { } + protected void onCancel() { + // If the subscription is cancelled the + // subscriber may or may not get completed. + // Therefore we need to unregister it + tryUnregister(); + } /** * Called right before the userSubscriber::onSubscribe is called. * @apiNote * This method may be used by subclasses to perform cleanup - * related actions after a subscription has been succesfully + * related actions after a subscription has been successfully * accepted. + * This method is called while holding a subscription + * lock. + * @implSpec + * This method calls {@link #tryRegister()} */ - protected void onSubscribed() { } + protected void onSubscribed() { + tryRegister(); + } /** * Complete the subscriber, either normally or exceptionally * ensure that the subscriber is completed only once. * @param t a throwable, or {@code null} + * @implSpec + * If not {@linkplain #completed()} yet, this method + * calls {@link #tryUnregister()} */ - protected void complete(Throwable t) { - if (completed.compareAndSet(false, true)) { + public final void complete(Throwable t) { + if (markCompleted()) { + tryUnregister(); t = withError = Utils.getCompletionCause(t); if (t == null) { try { - assert subscribed.get(); + var state = this.state; + assert (state & SUBSCRIBED) != 0; userSubscriber.onComplete(); } catch (Throwable x) { // Simply propagate the error by calling @@ -179,10 +331,45 @@ protected void complete(Throwable t) { * {@return true if this subscriber has already completed, either normally * or abnormally} */ - public boolean completed() { - return completed.get(); + public final boolean completed() { + int state = this.state; + return (state & COMPLETED) != 0; } + /** + * {@return true if this subscriber has already subscribed} + */ + public final boolean subscribed() { + int state = this.state; + return (state & SUBSCRIBED) != 0; + } + + /** + * {@return true if this subscriber has already been registered} + */ + public final boolean registered() { + int state = this.state; + return (state & REGISTERED) != 0; + } + + /** + * {@return true if this subscriber has already been unregistered} + */ + public final boolean unregistered() { + int state = this.state; + return (state & UNREGISTERED) != 0; + } + + /** + * {@return true if this subscriber's subscription has already + * been cancelled} + */ + public final boolean cancelled() { + int state = this.state; + return (state & CANCELLED) != 0; + } + + @Override public CompletionStage getBody() { return userSubscriber.getBody(); @@ -194,7 +381,7 @@ public void onSubscribe(Flow.Subscription subscription) { // subscription is finished before calling onError; subscriptionLock.lock(); try { - if (subscribed.compareAndSet(false, true)) { + if (markSubscribed()) { onSubscribed(); SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription); userSubscriber.onSubscribe(this.subscription = wrapped); @@ -208,8 +395,9 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(List item) { - assert subscribed.get(); - if (completed.get()) { + var state = this.state; + assert (state & SUBSCRIBED) != 0; + if ((state & COMPLETED) != 0) { SubscriptionWrapper subscription = this.subscription; if (subscription != null) { subscription.subscription.cancel(); @@ -222,6 +410,7 @@ public void onNext(List item) { public void onError(Throwable throwable) { complete(throwable); } + @Override public void onComplete() { complete(null); diff --git a/test/jdk/java/net/httpclient/CancelRequestTest.java b/test/jdk/java/net/httpclient/CancelRequestTest.java index 28ad68bd409..f393d437e28 100644 --- a/test/jdk/java/net/httpclient/CancelRequestTest.java +++ b/test/jdk/java/net/httpclient/CancelRequestTest.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8245462 8229822 8254786 8297075 8297149 8298340 + * @bug 8245462 8229822 8254786 8297075 8297149 8298340 8302635 * @summary Tests cancelling the request. * @library /test/lib /test/jdk/java/net/httpclient/lib * @key randomness From d799544c4fb39bd8ae5bde596c10bac48074e928 Mon Sep 17 00:00:00 2001 From: Goetz Lindenmaier Date: Mon, 30 Jun 2025 16:55:55 +0200 Subject: [PATCH 2/2] Backport 575484806ce11634d4fa8bef6c0c5987e4e0a1c7 --- .../jdk/internal/net/http/Http1Exchange.java | 9 ++++---- .../jdk/internal/net/http/HttpClientImpl.java | 21 ++++++++++++++++--- .../net/http/ResponseSubscribers.java | 12 ++++++----- .../classes/jdk/internal/net/http/Stream.java | 8 +++---- .../common/HttpBodySubscriberWrapper.java | 17 +-------------- .../net/httpclient/AsyncExecutorShutdown.java | 2 +- 6 files changed, 36 insertions(+), 33 deletions(-) diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java index 2daba87fb3f..e59ef8233cf 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java @@ -266,7 +266,7 @@ private void connectFlows(HttpConnection connection) { // The Http1ResponseBodySubscriber is registered with the HttpClient // to ensure that it gets completed if the SelectorManager aborts due // to unexpected exceptions. - private void registerResponseSubscriber(Http1ResponseBodySubscriber subscriber) { + private boolean registerResponseSubscriber(Http1ResponseBodySubscriber subscriber) { Throwable failed = null; synchronized (lock) { failed = this.failed; @@ -276,13 +276,14 @@ private void registerResponseSubscriber(Http1ResponseBodySubscriber subscribe } if (failed != null) { subscriber.onError(failed); + return false; } else { - client.registerSubscriber(subscriber); + return client.registerSubscriber(subscriber); } } - private void unregisterResponseSubscriber(Http1ResponseBodySubscriber subscriber) { - client.unregisterSubscriber(subscriber); + private boolean unregisterResponseSubscriber(Http1ResponseBodySubscriber subscriber) { + return client.unregisterSubscriber(subscriber); } @Override diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java index c7643106640..96bdda7eb9c 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java @@ -542,7 +542,14 @@ private static void closeSubscribers(HttpClientImpl client, Throwable t) { client.subscribers.forEach(s -> s.onError(t)); } - public void registerSubscriber(HttpBodySubscriberWrapper subscriber) { + /** + * Adds the given subscriber to the subscribers list, or call + * its {@linkplain HttpBodySubscriberWrapper#onError onError} + * method if the client is shutting down. + * @param subscriber the subscriber + * @return true if the subscriber was added to the list. + */ + public boolean registerSubscriber(HttpBodySubscriberWrapper subscriber) { if (!selmgr.isClosed()) { synchronized (selmgr) { if (!selmgr.isClosed()) { @@ -552,20 +559,28 @@ public void registerSubscriber(HttpBodySubscriberWrapper subscriber) { debug.log("body subscriber registered: " + count); } } - return; + return true; } } } subscriber.onError(selmgr.selectorClosedException()); + return false; } - public void unregisterSubscriber(HttpBodySubscriberWrapper subscriber) { + /** + * Remove the given subscriber from the subscribers list. + * @param subscriber the subscriber + * @return true if the subscriber was found and removed from the list. + */ + public boolean unregisterSubscriber(HttpBodySubscriberWrapper subscriber) { if (subscribers.remove(subscriber)) { long count = pendingSubscribersCount.decrementAndGet(); if (debug.on()) { debug.log("body subscriber unregistered: " + count); } + return true; } + return false; } private void closeConnection(HttpConnection conn) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java index be3f73de524..372a02a224b 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java @@ -540,7 +540,7 @@ public int available() throws IOException { @Override public void onSubscribe(Flow.Subscription s) { Objects.requireNonNull(s); - if (debug.on()) debug.log("onSubscribed called"); + if (debug.on()) debug.log("onSubscribe called"); try { if (!subscribed.compareAndSet(false, true)) { if (debug.on()) debug.log("Already subscribed: canceling"); @@ -554,10 +554,12 @@ public void onSubscribe(Flow.Subscription s) { closed = this.closed; if (!closed) { this.subscription = s; - // should contain at least 2 - assert buffers.remainingCapacity() > 1 + // should contain at least 2, unless closed or failed. + assert buffers.remainingCapacity() > 1 || failed != null : "buffers capacity: " + buffers.remainingCapacity() - + " closed: " + closed + " failed: " + failed; + + ", closed: " + closed + ", terminated: " + + buffers.contains(LAST_LIST) + + ", failed: " + failed; } } if (closed) { @@ -573,7 +575,7 @@ public void onSubscribe(Flow.Subscription s) { } catch (Throwable t) { failed = t; if (debug.on()) - debug.log("onSubscribed failed", t); + debug.log("onSubscribe failed", t); try { close(); } catch (IOException x) { diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java index bd3c4cdc049..b145c1c3fc0 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java @@ -352,12 +352,12 @@ Http2StreamResponseSubscriber createResponseSubscriber(BodyHandler handler // The Http2StreamResponseSubscriber is registered with the HttpClient // to ensure that it gets completed if the SelectorManager aborts due // to unexpected exceptions. - private void registerResponseSubscriber(Http2StreamResponseSubscriber subscriber) { - client().registerSubscriber(subscriber); + private boolean registerResponseSubscriber(Http2StreamResponseSubscriber subscriber) { + return client().registerSubscriber(subscriber); } - private void unregisterResponseSubscriber(Http2StreamResponseSubscriber subscriber) { - client().unregisterSubscriber(subscriber); + private boolean unregisterResponseSubscriber(Http2StreamResponseSubscriber subscriber) { + return client().unregisterSubscriber(subscriber); } @Override diff --git a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java index 7cd0943aa60..6dc79760b0a 100644 --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/HttpBodySubscriberWrapper.java @@ -274,21 +274,6 @@ protected void onCancel() { tryUnregister(); } - /** - * Called right before the userSubscriber::onSubscribe is called. - * @apiNote - * This method may be used by subclasses to perform cleanup - * related actions after a subscription has been successfully - * accepted. - * This method is called while holding a subscription - * lock. - * @implSpec - * This method calls {@link #tryRegister()} - */ - protected void onSubscribed() { - tryRegister(); - } - /** * Complete the subscriber, either normally or exceptionally * ensure that the subscriber is completed only once. @@ -381,8 +366,8 @@ public void onSubscribe(Flow.Subscription subscription) { // subscription is finished before calling onError; subscriptionLock.lock(); try { + tryRegister(); if (markSubscribed()) { - onSubscribed(); SubscriptionWrapper wrapped = new SubscriptionWrapper(subscription); userSubscriber.onSubscribe(this.subscription = wrapped); } else { diff --git a/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java b/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java index 97037824a6a..1163c97fa3f 100644 --- a/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java +++ b/test/jdk/java/net/httpclient/AsyncExecutorShutdown.java @@ -23,7 +23,7 @@ /* * @test - * @bug 8277969 + * @bug 8277969 8299338 * @summary Test for edge case where the executor is not accepting * new tasks while the client is still running * @library /test/lib /test/jdk/java/net/httpclient/lib