Skip to content

Commit

Permalink
test:refactor: AbstractSimultaneousConnectionsTest uses Vert.x as bac…
Browse files Browse the repository at this point in the history
…kend server

Signed-off-by: Marc Nuri <[email protected]>
  • Loading branch information
manusa authored Dec 23, 2024
1 parent c96ad2f commit 5fd904c
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 70 deletions.
5 changes: 5 additions & 0 deletions httpclient-jdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,12 @@ public class JdkHttpClientSimultaneousConnectionsTest extends AbstractSimultaneo
protected HttpClient.Factory getHttpClientFactory() {
return new JdkHttpClientFactory();
}

@Override
public void http1Connections() {
// NO-OP
// This test will only pass when it's run in isolation, it seems that the JDK HttpClient eventually uses a shared thread
// pool that reaches a limit and this test will effectively block any further processing after a few connections are open.
// - jdk.internal.net.http.HttpClientImpl.ASYNC_POOL
}
}
5 changes: 5 additions & 0 deletions httpclient-jetty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions httpclient-okhttp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
5 changes: 5 additions & 0 deletions httpclient-vertx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<!-- Required by SslTest -->
<groupId>org.bouncycastle</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public synchronized void shutdown() {
httpClose.onComplete(onComplete);
await(httpClose, "Unable to close MockWebServer");
}
await(vertx.close(), "Unable to close Vertx");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public void consumeBytesProcessesLargeBodies() throws Exception {
asyncBodyResponse.body().consume();
asyncBodyResponse.body().done().get(10L, TimeUnit.SECONDS);
assertThat(responseText.toString()).isEqualTo(largeBody);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,28 @@
*/
package io.fabric8.kubernetes.client.http;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.fabric8.kubernetes.client.RequestConfigBuilder;
import io.fabric8.mockwebserver.MockWebServer;
import io.fabric8.mockwebserver.MockWebServerListener;
import io.fabric8.mockwebserver.http.MockResponse;
import io.fabric8.mockwebserver.http.RecordedHttpConnection;
import io.fabric8.mockwebserver.http.Response;
import io.fabric8.mockwebserver.http.WebSocketListener;
import io.fabric8.mockwebserver.vertx.Protocol;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.NetServerOptions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -42,8 +45,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand All @@ -59,29 +60,24 @@ public abstract class AbstractSimultaneousConnectionsTest {

private RegisteredConnections registeredConnections;
private MockWebServer mockWebServer;
private ExecutorService httpExecutor;
private HttpServer httpServer;
private Vertx vertx;

private HttpClient.Builder clientBuilder;

@BeforeEach
void prepareServerAndBuilder() throws IOException {
void prepareServerAndBuilder() {
registeredConnections = new RegisteredConnections();
mockWebServer = new MockWebServer();
mockWebServer.addListener(registeredConnections);
httpExecutor = Executors.newCachedThreadPool();
httpServer = HttpServer.create(new InetSocketAddress(0), 0);
httpServer.setExecutor(httpExecutor);
httpServer.start();
vertx = Vertx.vertx();
clientBuilder = getHttpClientFactory().newBuilder()
.connectTimeout(60, TimeUnit.SECONDS);
}

@AfterEach
void stopServer() {
mockWebServer.shutdown();
httpServer.stop(0);
httpExecutor.shutdownNow();
vertx.close();
}

protected abstract HttpClient.Factory getHttpClientFactory();
Expand All @@ -95,20 +91,21 @@ private void withHttp1() {
@DisplayName("Should be able to make 2048 simultaneous HTTP/1.x connections before processing the response")
@DisabledOnOs(OS.WINDOWS)
public void http1Connections() throws Exception {
final DelayedResponseHandler handler = new DelayedResponseHandler(MAX_HTTP_1_CONNECTIONS,
exchange -> {
exchange.sendResponseHeaders(204, -1);
exchange.close();
});
httpServer.createContext("/http", handler);
try (final HttpClient client = clientBuilder.build()) {
final Collection<CompletableFuture<HttpResponse<AsyncBody>>> asyncResponses = ConcurrentHashMap.newKeySet();
final HttpRequest request = client.newHttpRequestBuilder()
.uri(String.format("http://localhost:%s/http", httpServer.getAddress().getPort()))
.build();
final Collection<CompletableFuture<HttpResponse<AsyncBody>>> asyncResponses = ConcurrentHashMap.newKeySet();
try (
var server = new DelayedResponseHttp1Server(vertx, MAX_HTTP_1_CONNECTIONS);
var client = clientBuilder.tag(new RequestConfigBuilder().withRequestRetryBackoffLimit(0).build()).build()) {
for (int it = 0; it < MAX_HTTP_1_CONNECTIONS; it++) {
final HttpRequest request = client.newHttpRequestBuilder()
.uri(server.uri() + "?" + it)
.build();
asyncResponses.add(client.consumeBytes(request, (value, asyncBody) -> asyncBody.consume()));
handler.await();
}
server.await();
assertThat(server.requests)
.hasSize(MAX_HTTP_1_CONNECTIONS);
for (HttpServerRequest serverRequest : server.requests) {
serverRequest.response().setStatusCode(204).end();
}
CompletableFuture.allOf(asyncResponses.toArray(new CompletableFuture[0])).get(70, TimeUnit.SECONDS);
assertThat(asyncResponses)
Expand All @@ -126,19 +123,18 @@ public void http1Connections() throws Exception {
@DisplayName("Should be able to make 1024 simultaneous HTTP connections before upgrading to WebSocket")
@DisabledOnOs(OS.WINDOWS)
public void http1WebSocketConnectionsBeforeUpgrade() throws Exception {
final DelayedResponseHandler handler = new DelayedResponseHandler(MAX_HTTP_1_WS_CONNECTIONS,
exchange -> exchange.sendResponseHeaders(404, -1));
httpServer.createContext("/http", handler);
try (final HttpClient client = clientBuilder.build()) {
try (var server = new DelayedResponseHttp1Server(vertx, MAX_HTTP_1_WS_CONNECTIONS); var client = clientBuilder.build()) {
for (int it = 0; it < MAX_HTTP_1_WS_CONNECTIONS; it++) {
client.newWebSocketBuilder()
.uri(URI.create(String.format("http://localhost:%s/http", httpServer.getAddress().getPort())))
.uri(URI.create(server.uri()))
.buildAsync(new WebSocket.Listener() {
});
handler.await();
}
server.await();
assertThat(server.requests)
.hasSize(MAX_HTTP_1_WS_CONNECTIONS);
server.requests.forEach(request -> request.response().setStatusCode(101).end());
}
assertThat(handler.connectionCount.get(60, TimeUnit.SECONDS)).isEqualTo(MAX_HTTP_1_WS_CONNECTIONS);
}

@Test
Expand Down Expand Up @@ -192,47 +188,46 @@ public void onMessage(WebSocket webSocket, String text) {
}
}

private static class DelayedResponseHandler implements HttpHandler {

private final int requestCount;
private final CyclicBarrier barrier;
private final Set<HttpExchange> exchanges;
private final CompletableFuture<Integer> connectionCount;
private final ExecutorService executorService;

private DelayedResponseHandler(int requestCount, HttpHandler handler) {
this.requestCount = requestCount;
this.barrier = new CyclicBarrier(2);
exchanges = ConcurrentHashMap.newKeySet();
connectionCount = new CompletableFuture<>();
executorService = Executors.newFixedThreadPool(1);
connectionCount.thenRunAsync(() -> {
for (HttpExchange exchange : exchanges) {
try {
handler.handle(exchange);
} catch (IOException ignore) {
// NO OP
}
}
}, executorService)
.whenComplete((unused, throwable) -> executorService.shutdownNow());
private static class DelayedResponseHttp1Server implements AutoCloseable {

private final int connections;
private final HttpServer httpServer;
private final Collection<HttpServerRequest> requests;
private final CountDownLatch connectionLatch;

private DelayedResponseHttp1Server(Vertx vertx, int connections) throws Exception {
this.connections = connections;
requests = ConcurrentHashMap.newKeySet();
connectionLatch = new CountDownLatch(connections);
httpServer = vertx.createHttpServer(new HttpServerOptions()
.setPort(NetServerOptions.DEFAULT_PORT)
.setAlpnVersions(Collections.singletonList(HttpVersion.HTTP_1_1)));
httpServer.connectionHandler(event -> connectionLatch.countDown());
httpServer.requestHandler(requests::add);
httpServer.listen().toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS);
}

@Override
public void handle(HttpExchange exchange) {
exchanges.add(exchange);
await();
if (exchanges.size() == requestCount) {
connectionCount.complete(requestCount);
}
public void close() throws Exception {
requests.forEach(request -> request.connection().close());
requests.clear();
httpServer.close().toCompletionStage().toCompletableFuture().get(10, TimeUnit.SECONDS);
}

private String uri() {
return String.format("http://localhost:%s/http-1-connections", httpServer.actualPort());
}

public final void await() {
private void await() {
try {
barrier.await(5, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new RuntimeException("Failed to await the barrier");
if (!connectionLatch.await(10, TimeUnit.SECONDS)) {
throw new AssertionError(
"Failed to await the connection latch, remaining connections to open: " + connectionLatch.getCount());
}
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> requests.size() == connections);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Failed to await the connection latch (interrupted)", e);
}
}
}
Expand Down

0 comments on commit 5fd904c

Please sign in to comment.