Skip to content

Commit 052baff

Browse files
committed
Resubscribe on Token Expiration II
Well, it turns out that my first attempt at this was only partially correct. The code did re-subscribe appropriately on an invalid token, but the subscription ended up being lost within a callback that returned a Mono.just(); This change forces the subscription chain out of that callback and to the request/response pair itself, resulting in a proper transparent re-request. A minor change was also made to how the trigger for resubscribe happens. Signed-off-by: Ben Hale <[email protected]>
1 parent 5f69c68 commit 052baff

File tree

8 files changed

+97
-80
lines changed

8 files changed

+97
-80
lines changed

.idea/encodings.xml

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, F
108108
.parseBody(responseType));
109109
}
110110

111+
protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Consumer<HttpHeaders> headersTransformer,
112+
Function<HttpHeaders, Mono<? extends HttpHeaders>> headersWhenTransformer) {
113+
return createOperator()
114+
.flatMap(operator -> operator.headers(headers -> addHeaders(headers, requestPayload, headersTransformer)).headersWhen(headersWhenTransformer)
115+
.post()
116+
.uri(queryTransformer(requestPayload).andThen(uriTransformer))
117+
.send(requestPayload)
118+
.response()
119+
.parseBody(responseType));
120+
}
121+
111122
protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
112123
return createOperator()
113124
.flatMap(operator -> operator.headers(headers -> addHeaders(headers, requestPayload))

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/tokens/ReactorTokens.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ public ReactorTokens(ConnectionContext connectionContext, Mono<String> root, Tok
7070

7171
@Override
7272
public Mono<CheckTokenResponse> check(CheckTokenRequest request) {
73-
return post(request, CheckTokenResponse.class, builder -> builder.pathSegment("check_token"), outbound -> {
74-
String encoded = Base64.getEncoder().encodeToString(new AsciiString(request.getClientId()).concat(":").concat(request.getClientSecret()).toByteArray());
75-
outbound.set(AUTHORIZATION, BASIC_PREAMBLE + encoded);
76-
})
73+
return post(request, CheckTokenResponse.class, builder -> builder.pathSegment("check_token"),
74+
outbound -> {
75+
},
76+
outbound -> {
77+
String encoded = Base64.getEncoder().encodeToString(new AsciiString(request.getClientId()).concat(":").concat(request.getClientSecret()).toByteArray());
78+
return Mono.just(outbound.set(AUTHORIZATION, BASIC_PREAMBLE + encoded));
79+
})
7780
.checkpoint();
7881
}
7982

@@ -82,10 +85,8 @@ public Mono<GetTokenByAuthorizationCodeResponse> getByAuthorizationCode(GetToken
8285
return post(request, GetTokenByAuthorizationCodeResponse.class, builder -> builder.pathSegment("oauth", "token")
8386
.queryParam("grant_type", AUTHORIZATION_CODE)
8487
.queryParam("response_type", ResponseType.TOKEN),
85-
outbound -> {
86-
ReactorTokens.removeAuthorization(outbound);
87-
ReactorTokens.setUrlEncoded(outbound);
88-
})
88+
ReactorTokens::setUrlEncoded,
89+
ReactorTokens::removeAuthorization)
8990
.checkpoint();
9091
}
9192

@@ -94,10 +95,8 @@ public Mono<GetTokenByClientCredentialsResponse> getByClientCredentials(GetToken
9495
return post(request, GetTokenByClientCredentialsResponse.class, builder -> builder.pathSegment("oauth", "token")
9596
.queryParam("grant_type", CLIENT_CREDENTIALS)
9697
.queryParam("response_type", ResponseType.TOKEN),
97-
outbound -> {
98-
ReactorTokens.removeAuthorization(outbound);
99-
ReactorTokens.setUrlEncoded(outbound);
100-
})
98+
ReactorTokens::setUrlEncoded,
99+
ReactorTokens::removeAuthorization)
101100
.checkpoint();
102101
}
103102

@@ -106,10 +105,8 @@ public Mono<GetTokenByOneTimePasscodeResponse> getByOneTimePasscode(GetTokenByOn
106105
return post(request, GetTokenByOneTimePasscodeResponse.class, builder -> builder.pathSegment("oauth", "token")
107106
.queryParam("grant_type", PASSWORD)
108107
.queryParam("response_type", ResponseType.TOKEN),
109-
outbound -> {
110-
ReactorTokens.removeAuthorization(outbound);
111-
ReactorTokens.setUrlEncoded(outbound);
112-
})
108+
ReactorTokens::setUrlEncoded,
109+
ReactorTokens::removeAuthorization)
113110
.checkpoint();
114111
}
115112

@@ -118,10 +115,8 @@ public Mono<GetTokenByOpenIdResponse> getByOpenId(GetTokenByOpenIdRequest reques
118115
return post(request, GetTokenByOpenIdResponse.class, builder -> builder.pathSegment("oauth", "token")
119116
.queryParam("grant_type", AUTHORIZATION_CODE)
120117
.queryParam("response_type", ResponseType.ID_TOKEN),
121-
outbound -> {
122-
ReactorTokens.removeAuthorization(outbound);
123-
ReactorTokens.setUrlEncoded(outbound);
124-
})
118+
ReactorTokens::setUrlEncoded,
119+
ReactorTokens::removeAuthorization)
125120
.checkpoint();
126121
}
127122

@@ -130,10 +125,8 @@ public Mono<GetTokenByPasswordResponse> getByPassword(GetTokenByPasswordRequest
130125
return post(request, GetTokenByPasswordResponse.class, builder -> builder.pathSegment("oauth", "token")
131126
.queryParam("grant_type", PASSWORD)
132127
.queryParam("response_type", ResponseType.TOKEN),
133-
outbound -> {
134-
ReactorTokens.removeAuthorization(outbound);
135-
ReactorTokens.setUrlEncoded(outbound);
136-
})
128+
ReactorTokens::setUrlEncoded,
129+
ReactorTokens::removeAuthorization)
137130
.checkpoint();
138131
}
139132

@@ -153,15 +146,13 @@ public Mono<ListTokenKeysResponse> listKeys(ListTokenKeysRequest request) {
153146
public Mono<RefreshTokenResponse> refresh(RefreshTokenRequest request) {
154147
return post(request, RefreshTokenResponse.class, builder -> builder.pathSegment("oauth", "token")
155148
.queryParam("grant_type", REFRESH_TOKEN),
156-
outbound -> {
157-
ReactorTokens.removeAuthorization(outbound);
158-
ReactorTokens.setUrlEncoded(outbound);
159-
})
149+
ReactorTokens::setUrlEncoded,
150+
ReactorTokens::removeAuthorization)
160151
.checkpoint();
161152
}
162153

163-
private static void removeAuthorization(HttpHeaders request) {
164-
request.remove(AUTHORIZATION);
154+
private static Mono<HttpHeaders> removeAuthorization(HttpHeaders request) {
155+
return Mono.just(request.remove(AUTHORIZATION));
165156
}
166157

167158
private static void setUrlEncoded(HttpHeaders request) {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19+
import io.netty.handler.codec.http.HttpHeaders;
1920
import org.cloudfoundry.reactor.ConnectionContext;
2021
import org.cloudfoundry.reactor.TokenProvider;
2122
import reactor.core.publisher.Mono;
@@ -44,8 +45,18 @@ protected Mono<Operator> createOperator() {
4445

4546
return this.root.map(this::buildOperatorContext)
4647
.map(context -> new Operator(context, httpClient))
47-
.flatMap(operator -> this.tokenProvider.getToken(this.connectionContext)
48-
.map(token -> setHeaders(operator, token)));
48+
.map(operator -> operator.headers(this::addHeaders))
49+
.map(operator -> operator.headersWhen(this::addHeadersWhen));
50+
}
51+
52+
private void addHeaders(HttpHeaders httpHeaders) {
53+
UserAgent.setUserAgent(httpHeaders);
54+
JsonCodec.setDecodeHeaders(httpHeaders);
55+
}
56+
57+
private Mono<? extends HttpHeaders> addHeadersWhen(HttpHeaders httpHeaders) {
58+
return this.tokenProvider.getToken(this.connectionContext)
59+
.map(token -> httpHeaders.set(AUTHORIZATION, token));
4960
}
5061

5162
private OperatorContext buildOperatorContext(String root) {
@@ -56,12 +67,4 @@ private OperatorContext buildOperatorContext(String root) {
5667
.build();
5768
}
5869

59-
private Operator setHeaders(Operator operator, String token) {
60-
return operator.headers(httpHeaders -> {
61-
httpHeaders.set(AUTHORIZATION, token);
62-
UserAgent.setUserAgent(httpHeaders);
63-
JsonCodec.setDecodeHeaders(httpHeaders);
64-
});
65-
}
66-
6770
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMapper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19-
import java.util.function.Function;
20-
2119
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
20+
import reactor.core.publisher.Flux;
2221

23-
import reactor.core.publisher.Mono;
22+
import java.util.function.Function;
2423

25-
public interface ErrorPayloadMapper extends Function<Mono<HttpClientResponseWithBody>, Mono<HttpClientResponseWithBody>> {
24+
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithBody>, Flux<HttpClientResponseWithBody>> {
2625

2726
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/JsonCodec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import reactor.netty.ByteBufFlux;
3030
import reactor.netty.NettyOutbound;
3131
import reactor.netty.http.client.HttpClientRequest;
32-
import reactor.netty.http.client.HttpClientResponse;
3332

3433
import java.nio.charset.Charset;
3534
import java.util.function.BiFunction;

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public Operator headers(Consumer<HttpHeaders> headersTransformer) {
6868
return new Operator(this.context, this.httpClient.headers(headersTransformer));
6969
}
7070

71+
public Operator headersWhen(Function<HttpHeaders, Mono<? extends HttpHeaders>> headersWhenTransformer) {
72+
return new Operator(this.context, this.httpClient.headersWhen(headersWhenTransformer));
73+
}
74+
7175
public UriConfiguration patch() {
7276
return request(HttpMethod.PATCH);
7377
}
@@ -148,8 +152,9 @@ public ResponseReceiver addChannelHandler(Function<HttpClientResponse, ChannelHa
148152
}
149153

150154
public Mono<HttpClientResponse> get() {
151-
return this.responseReceiver.response((response, body) -> processResponse(response, body)
152-
.map(HttpClientResponseWithBody::getResponse))
155+
return this.responseReceiver.response((resp, body) -> Mono.just(HttpClientResponseWithBody.of(body, resp)))
156+
.transform(this::processResponse)
157+
.map(HttpClientResponseWithBody::getResponse)
153158
.singleOrEmpty();
154159
}
155160

@@ -163,9 +168,10 @@ public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publishe
163168
attachChannelHandlers(response, connection);
164169
ByteBufFlux body = connection.inbound().receive();
165170

166-
return processResponse(response, body).flatMapMany(responseTransformer)
167-
.doFinally(signalType -> connection.dispose());
168-
});
171+
return Mono.just(HttpClientResponseWithBody.of(body, response));
172+
})
173+
.transform(this::processResponse)
174+
.flatMap(responseTransformer);
169175
}
170176

171177
public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
@@ -186,28 +192,37 @@ private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
186192
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
187193
}
188194

189-
private Mono<HttpClientResponseWithBody> invalidateToken(Mono<HttpClientResponseWithBody> inbound) {
195+
private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponseWithBody> inbound) {
190196
return inbound
191-
.flatMap(response -> {
197+
.doOnNext(response -> {
192198
if (isUnauthorized(response)) {
193199
this.context.getTokenProvider().ifPresent(tokenProvider -> tokenProvider.invalidate(this.context.getConnectionContext()));
194-
return inbound
195-
.transform(this::invalidateToken);
196-
} else {
197-
return Mono.just(response);
200+
throw new InvalidTokenException();
198201
}
199202
});
200203
}
201204

202-
private Mono<HttpClientResponseWithBody> processResponse(HttpClientResponse response, ByteBufFlux body) {
203-
HttpClientResponseWithBody responseWithBody = HttpClientResponseWithBody.of(body, response);
204-
205-
return Mono.just(responseWithBody)
205+
private Flux<HttpClientResponseWithBody> processResponse(Flux<HttpClientResponseWithBody> inbound) {
206+
return inbound
206207
.transform(this::invalidateToken)
208+
.retry(t -> t instanceof InvalidTokenException)
207209
.transform(this.context.getErrorPayloadMapper()
208210
.orElse(ErrorPayloadMappers.fallback()));
209211
}
210212

213+
private static final class InvalidTokenException extends RuntimeException {
214+
215+
private static final long serialVersionUID = -3114034909507471614L;
216+
217+
private InvalidTokenException() {
218+
}
219+
220+
@Override
221+
public synchronized Throwable fillInStackTrace() {
222+
return null;
223+
}
224+
}
225+
211226
}
212227

213228
public static class ResponseReceiverConstructor extends OperatorContextAware {

0 commit comments

Comments
 (0)