Description
At promregator/promregator#222 we are again tracking down a very ugly issue with JWT handling at the UAA using cf-java-client
. We are currently on 5.7.0.
I believe to have it tracked down around the area of
It appears that if multiple cf api requests are executed in parallel at the moment after the current access_token
has expired, there is a race condition for updating the access_token
/refresh_token
. The issue is not logged/loggable.
The creator of the initial issue has the situation that its access token validity is for only 20 minutes. However, several hundreds of requests are being sent with a short cadence - and at some point in time (by chance), there are two requests in the pipeline in parallel when the access_token
has expired. Both of them need to be refreshed through the grant_type=refresh_token
flow.
In production, it takes 6h+ to reproduce the issue.
I believe to have been able to reproduce the issue on my local machine within a couple of seconds using the following (very ugly and insecure - don't ever do this in production!) "unit test":
package org.cloudfoundry.promregator.cfaccessor;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.lang.ProcessBuilder.Redirect;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.security.KeyStore;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.cloudfoundry.client.v2.domains.Domains;
import org.cloudfoundry.client.v2.domains.ListDomainsRequest;
import org.cloudfoundry.client.v2.domains.ListDomainsResponse;
import org.cloudfoundry.reactor.DefaultConnectionContext;
import org.cloudfoundry.reactor.client.ReactorCloudFoundryClient;
import org.cloudfoundry.reactor.tokenprovider.PasswordGrantTokenProvider;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import com.sun.org.apache.xerces.internal.impl.dv.util.Base64;
import ch.qos.logback.classic.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import sun.net.httpserver.HttpsServerImpl;
public class AccessKeyRefreshMassTest {
private static final File KEYSTORE_FILE = new File(System.getProperty("java.io.tmpdir"), "test.jks");
private static final String KEYSTORE_PASSWORD = "pass_store";
private static final String KEY_PASSWORD = "pass_key";
/**
* Generates a new self-signed certificate in /tmp/test.jks, if it does not
* already exist. see also
* http://rememberjava.com/http/2017/04/29/simple_https_server.html
*/
static void generateCertificate() throws Exception {
File keytool = new File(System.getProperty("java.home"), "bin/keytool");
String[] genkeyCmd = new String[] { keytool.toString(), "-genkey", "-keyalg", "RSA", "-alias", "some_alias",
"-validity", "365", "-keysize", "2048", "-dname", "cn=John_Doe,ou=TestOrgUnit,o=TestOrg,c=US",
"-keystore", KEYSTORE_FILE.getAbsolutePath(), "-storepass", KEYSTORE_PASSWORD, "-keypass",
KEY_PASSWORD, "-storetype", "JKS" };
System.out.println(String.join(" ", genkeyCmd));
ProcessBuilder processBuilder = new ProcessBuilder(genkeyCmd);
processBuilder.redirectErrorStream(true);
processBuilder.redirectOutput(Redirect.INHERIT);
processBuilder.redirectError(Redirect.INHERIT);
Process exec = processBuilder.start();
exec.waitFor();
System.out.println("Exit value: " + exec.exitValue());
}
static SSLContext getSslContext() throws Exception {
KeyStore ks = KeyStore.getInstance("JKS");
ks.load(new FileInputStream(KEYSTORE_FILE), KEYSTORE_PASSWORD.toCharArray());
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, KEY_PASSWORD.toCharArray());
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(ks);
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
return sslContext;
}
private class InfoHttpHandler implements HttpHandler {
private final Logger log = LoggerFactory.getLogger(InfoHttpHandler.class);
@Override
public void handle(HttpExchange he) throws IOException {
log.info("Handling info request");
String response = "{\n" + " \"api_version\": \"2.172.0\",\n"
+ " \"app_ssh_endpoint\": \"localhost:2222\",\n"
+ " \"app_ssh_host_key_fingerprint\": \"c7:1f:89:2a:62:3b:78:a9:08:c9:33:81:fb:39:26:da\",\n"
+ " \"app_ssh_oauth_client\": \"ssh-proxy\",\n"
+ " \"authorization_endpoint\": \"https://localhost:9003/login\",\n" + " \"build\": \"v1.2.3\",\n"
+ " \"description\": \"Dummy Foundry\",\n"
+ " \"doppler_logging_endpoint\": \"wss://localhost:1234\",\n" + " \"min_cli_version\": null,\n"
+ " \"min_recommended_cli_version\": null,\n" + " \"name\": \"Dummy\",\n"
+ " \"osbapi_version\": \"2.15\",\n"
+ " \"support\": \"http://localhost:1234/supportinfo\",\n"
+ " \"token_endpoint\": \"https://localhost:9004\",\n" + " \"version\": 0\n" + "}";
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
PrintWriter pw = new PrintWriter(baos);
pw.print(response);
pw.flush();
he.sendResponseHeaders(200, baos.size());
OutputStream responseBody = he.getResponseBody();
responseBody.write(baos.toByteArray());
responseBody.flush();
}
}
private class RootHttpHandler implements HttpHandler {
private final Logger log = LoggerFactory.getLogger(RootHttpHandler.class);
@Override
public void handle(HttpExchange he) throws IOException {
log.info("Handling root request against path {}", he.getRequestURI());
String response = "{\n" +
" \"links\": {\n" +
" \"self\": {\n" +
" \"href\": \"https://localhost:9002\"\n" +
" },\n" +
" \"bits_service\": null,\n" +
" \"cloud_controller_v2\": {\n" +
" \"href\": \"https://localhost:9002/v2\",\n" +
" \"meta\": {\n" +
" \"version\": \"2.172.0\"\n" +
" }\n" +
" },\n" +
" \"cloud_controller_v3\": {\n" +
" \"href\": \"https://localhost:9002/v3\",\n" +
" \"meta\": {\n" +
" \"version\": \"3.107.0\"\n" +
" }\n" +
" },\n" +
" \"network_policy_v0\": {\n" +
" \"href\": \"https://localhost:9002/networking/v0/external\"\n" +
" },\n" +
" \"network_policy_v1\": {\n" +
" \"href\": \"https://localhost:9002/networking/v1/external\"\n" +
" },\n" +
" \"login\": {\n" +
" \"href\": \"https://localhost:9003/login\"\n" +
" },\n" +
" \"uaa\": {\n" +
" \"href\": \"https://localhost:9004\"\n" +
" },\n" +
" \"credhub\": null,\n" +
" \"routing\": null,\n" +
" \"logging\": {\n" +
" \"href\": \"wss://localhost:1234\"\n" +
" },\n" +
" \"log_cache\": {\n" +
" \"href\": \"https://localhost:9005\"\n" +
" },\n" +
" \"log_stream\": {\n" +
" \"href\": \"https://localhost:9006\"\n" +
" },\n" +
" \"app_ssh\": {\n" +
" \"href\": \"localhost:2222\",\n" +
" \"meta\": {\n" +
" \"host_key_fingerprint\": \"c7:1f:89:2a:62:3b:78:a9:08:c9:33:81:fb:39:26:da\",\n" +
" \"oauth_client\": \"ssh-proxy\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}\n" +
"";
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
PrintWriter pw = new PrintWriter(baos);
pw.print(response);
pw.flush();
he.sendResponseHeaders(200, baos.size());
OutputStream responseBody = he.getResponseBody();
responseBody.write(baos.toByteArray());
responseBody.flush();
}
}
public class DomainsHttpHandler implements HttpHandler {
private final Logger log = LoggerFactory.getLogger(DomainsHttpHandler.class);
private final Pattern extractJwtPayload = Pattern.compile(".+\\.(.+)\\..+");
@Override
public void handle(HttpExchange he) throws IOException {
log.info("Handling domains request");
List<String> authorizationList = he.getRequestHeaders().get("Authorization");
if (authorizationList.isEmpty()) {
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
return;
}
if (authorizationList.size() > 1) {
log.error("Multiple authentication headers received");
he.sendResponseHeaders(400, 0);
he.getResponseBody().flush();
return;
}
String authorizationValue = authorizationList.get(0);
log.info("Authorization Request Header Value: "+authorizationValue);
if (!authorizationValue.startsWith("Bearer ")) {
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
return;
}
String jwt = authorizationValue.substring(8);
log.info("Extracted Jwt: "+jwt);
Matcher m = extractJwtPayload.matcher(jwt);
if (!m.find()) {
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
return;
}
String jwtPayload = m.group(1);
Map<String, Object> claims = new ObjectMapper().readValue(Base64.decode(jwtPayload), Map.class);
Integer exp = (Integer) claims.get("exp");
long now = Instant.now().toEpochMilli() / 1000;
log.info("JWT Expires at {}, now is {}", exp, now);
if (exp.longValue() < now) {
log.warn("JWT has expired: {} vs. {}(now)", exp, now);
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
return;
}
log.info("JWT was still valid");
String response = "{\n" +
" \"next_url\": null,\n" +
" \"prev_url\": null,\n" +
" \"resources\": [\n" +
" {\n" +
" \"entity\": {\n" +
" \"internal\": false,\n" +
" \"name\": \"some.domain.example\",\n" +
" \"router_group_guid\": null,\n" +
" \"router_group_type\": null\n" +
" },\n" +
" \"metadata\": {\n" +
" \"created_at\": \"2021-10-20T19:21:39Z\",\n" +
" \"guid\": \"42049093-13e9-4520-80a6-2d6fea6542bc\",\n" +
" \"updated_at\": \"2021-09-16T20:40:47Z\",\n" +
" \"url\": \"/v2/domains/42049093-13e9-4520-80a6-2d6fea6542bc\"\n" +
" }\n" +
" }\n" +
" ],\n" +
" \"total_pages\": 1,\n" +
" \"total_results\": 1\n" +
"}";
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
PrintWriter pw = new PrintWriter(baos);
pw.print(response);
pw.flush();
he.sendResponseHeaders(200, baos.size());
OutputStream responseBody = he.getResponseBody();
responseBody.write(baos.toByteArray());
responseBody.flush();
}
}
public class APIMockServer {
private HttpsServer server;
protected int port = 9002;
private InfoHttpHandler infoHandler;
private RootHttpHandler rootHttpHandler;
private DomainsHttpHandler domainsHttpHandler;
public APIMockServer() {
}
public void start(SSLContext sslContext) throws IOException {
InetSocketAddress bindAddress = new InetSocketAddress("127.0.0.1", this.port);
this.server = HttpsServerImpl.create(bindAddress, 0);
this.server.createContext("/", this.getRootHttpHandler());
this.server.createContext("/v2/info", this.getInfoHttpHandler());
this.server.createContext("/v2/domains", this.getDomainsHttpHandler());
this.server.setHttpsConfigurator(new HttpsConfigurator(sslContext));
this.server.setExecutor(Executors.newFixedThreadPool(15));
this.server.start();
}
private HttpHandler getRootHttpHandler() {
if (this.rootHttpHandler == null) {
this.rootHttpHandler = new RootHttpHandler();
}
return this.rootHttpHandler;
}
private InfoHttpHandler getInfoHttpHandler() {
if (this.infoHandler != null) {
return this.infoHandler;
}
this.infoHandler = new InfoHttpHandler();
return this.infoHandler;
}
private HttpHandler getDomainsHttpHandler() {
if (this.domainsHttpHandler == null) {
this.domainsHttpHandler = new DomainsHttpHandler();
}
return this.domainsHttpHandler;
}
public void stop() {
this.server.stop(1);
}
}
public class LoginHttpHandler implements HttpHandler {
private final Logger log = LoggerFactory.getLogger(LoginHttpHandler.class);
private final Pattern patternGrantTypePassword = Pattern.compile("&grant_type=password");
private final Pattern patternGrantTypeRefreshToken = Pattern.compile("&grant_type=refresh_token");
private final Pattern patternRefreshToken = Pattern.compile("&refresh_token=([0-9]+)");
private String currentValidRefreshToken;
@Override
public void handle(HttpExchange he) throws IOException {
log.info("Login request received");
log.info("RequestHeaders:");
for (Entry<String, List<String>> entry : he.getRequestHeaders().entrySet()) {
log.info(String.format("%s: %s", entry.getKey(), entry.getValue().toString()));
}
log.info("RequestBody:");
String requestBody = new String(he.getRequestBody().readAllBytes());
log.info(requestBody);
if (patternGrantTypePassword.matcher(requestBody).find()) {
provideNewAccessAndRefreshToken(he);
} else if (patternGrantTypeRefreshToken.matcher(requestBody).find()) {
respondToGrantTypeRefreshToken(he, requestBody);
} else {
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
}
}
private synchronized void provideNewAccessAndRefreshToken(HttpExchange he) throws IOException {
Instant now = Instant.now();
String headerString = "{}";
String payloadString = String.format("{\"iat\":%d, \"exp\":%d}", now.toEpochMilli()/1000, now.plus(Duration.ofSeconds(5)).toEpochMilli()/1000);
String jwt = String.format("%s.%s.invalidsignature", Base64.encode(headerString.getBytes(Charset.defaultCharset())), Base64.encode(payloadString.getBytes(Charset.defaultCharset())));
log.info("Sending jwt: "+jwt);
String refreshToken = Instant.now().toEpochMilli()+"";
String response = String.format("{\"access_token\":\"%s\",\"expires_in\": 5, \"refresh_token\":\"%s\", \"scope\": \"openid network.write uaa.user cloud_controller.read password.write cloud_controller.write network.admin\", \"token_type\": \"Bearer\"}",
jwt, refreshToken);
log.info("Sending response body: {}", response);
this.currentValidRefreshToken = refreshToken;
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
PrintWriter pw = new PrintWriter(baos);
pw.print(response);
pw.flush();
he.sendResponseHeaders(200, baos.size());
OutputStream responseBody = he.getResponseBody();
responseBody.write(baos.toByteArray());
responseBody.flush();
}
private synchronized void respondToGrantTypeRefreshToken(HttpExchange he, String requestBody) throws IOException {
Matcher m = patternRefreshToken.matcher(requestBody);
if (!m.find()) {
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
return;
}
String requestRefreshToken = m.group(1);
if (currentValidRefreshToken.equals(requestRefreshToken)) {
log.info("Request with current refresh token; issuing new JWT");
provideNewAccessAndRefreshToken(he);
return;
}
log.warn("Request with invalid refresh token provided; responding with error");
he.sendResponseHeaders(401, 0);
he.getResponseBody().flush();
}
}
public class LoginMockServer {
private HttpsServer server;
protected int port = 9003;
private LoginHttpHandler loginHttpHandler;
public LoginMockServer() {
}
public void start(SSLContext sslContext) throws IOException {
InetSocketAddress bindAddress = new InetSocketAddress("127.0.0.1", this.port);
this.server = HttpsServerImpl.create(bindAddress, 0);
this.server.createContext("/login/oauth/token", this.getLoginHttpHandler());
this.server.setHttpsConfigurator(new HttpsConfigurator(sslContext));
this.server.setExecutor(Executors.newFixedThreadPool(3));
this.server.start();
}
private HttpHandler getLoginHttpHandler() {
if (this.loginHttpHandler == null) {
this.loginHttpHandler = new LoginHttpHandler();
}
return this.loginHttpHandler;
}
public void stop() {
this.server.stop(1);
}
}
@Test
public void runTest() throws Exception {
//System.setProperty("javax.net.debug", "all");
ch.qos.logback.classic.Logger tokenLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("cloudfoundry-client.token");
tokenLogger.setLevel(Level.DEBUG);
ch.qos.logback.classic.Logger rootLogger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("cloudfoundry-client");
rootLogger.setLevel(Level.DEBUG);
generateCertificate();
SSLContext sslContext = getSslContext();
APIMockServer apiMockServer = new APIMockServer();
apiMockServer.start(sslContext);
LoginMockServer loginMockServer = new LoginMockServer();
loginMockServer.start(sslContext);
final Logger log = LoggerFactory.getLogger(this.getClass());
try {
DefaultConnectionContext connectionContext = DefaultConnectionContext.builder().apiHost("localhost")
.port(9002).skipSslValidation(true).build();
PasswordGrantTokenProvider tokenProvider = PasswordGrantTokenProvider.builder().password("pw")
.username("username").build();
ReactorCloudFoundryClient cloudFoundryClient = ReactorCloudFoundryClient.builder()
.connectionContext(connectionContext).tokenProvider(tokenProvider).build();
Domains domains = cloudFoundryClient.domains();
final ListDomainsRequest request = ListDomainsRequest.builder().build();
Mono<ListDomainsResponse> listDomainsInitialMono = domains.list(request);
log.info("Stage 1: Sending initial domain list request");
listDomainsInitialMono.block(Duration.ofSeconds(50));
log.info("Waiting 7 seconds to ensure that the JWT has expired");
Thread.sleep(7000);
log.info("Stage 2: Sending serial requests");
Mono<ListDomainsResponse> listDomains1Mono = domains.list(request);
Mono<ListDomainsResponse> listDomains2Mono = domains.list(request);
log.info("Sending first request");
listDomains1Mono.block(Duration.ofSeconds(5));
log.info("Sending second request");
listDomains2Mono.block(Duration.ofSeconds(5));
log.info("Waiting 7 seconds to ensure that the JWT has expired");
Thread.sleep(7000);
log.info("Stage 3: Sending parallel requests");
Mono<ListDomainsResponse> listDomains3Mono = domains.list(request);
Mono<ListDomainsResponse> listDomains4Mono = domains.list(request);
listDomains3Mono.doOnError(e -> log.error("Received Exception for mono 1", e)).subscribe();
listDomains4Mono.doOnError(e -> log.error("Received Exception for mono 2", e)).subscribe();
log.info("Waiting 3 seconds to prevent any bad overlap");
Thread.sleep(3000);
log.info("Stage 4: Sending another isolated request");
Mono<ListDomainsResponse> listDomains5Mono = domains.list(request);
listDomains5Mono.block(Duration.ofSeconds(5));
} finally {
loginMockServer.stop();
apiMockServer.stop();
}
}
}
The coding above shows the following behavior:
- The initial, single request just goes through fine.
- Both sequential requests just go through fine, i.e. the first request initially received a 401, runs through the refresh_token flow and tries to get the domains endpoint's request a second time (which now works with a 200). The second request gets a 200 back on first shot.
- The parallel requests both are triggered and both get back a 401 from the server (as the JWT as expired). Afterwards both requests appear to fetch the
/
endpoint and the/v2/info
endpoint of the api-server but never try to trigger any refresh_token flow. After 2,1s they appear to fail with aresponse_incomplete
from netty, generating an artificial (?) 401 response. - Also all subsequent (serial) requests, which need a JWT, fail - this time even with an ugly
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed DURING response
exception onorg.cloudfoundry.reactor.tokenprovider.AbstractUaaTokenProvider.token(AbstractUaaTokenProvider.java:261)
.
For your convinience and analysis I have taken a snapshot of logs on one of my tests having set various "interesting" log levels to DEBUG. All appearing-to-be-sensitive-data there is fake/mocked/generated. No redaction was necessary. There was no running CF installation required for executing the test (mocked test in full isolation).
My expectation would have been that in the parallel case (3), both requests were put to a halt until a valid JWT is available at the client.
In particular the behavior after the failure has ocurred (4) is very ugly for consuming application: Appearingly all requests that the consumer wants to send fail with an UaaException of which the consumer has no control. It appears that the ConnectionContext
is in "some broken state".
Could you have a look? How can we assist you with your analysis?