Skip to content

Commit f994a25

Browse files
committed
Merge branch '3.x'
2 parents 358e225 + 564e3a8 commit f994a25

File tree

7 files changed

+141
-61
lines changed

7 files changed

+141
-61
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor;
18+
19+
import org.immutables.value.Value;
20+
import reactor.netty.Connection;
21+
import reactor.netty.http.client.HttpClientResponse;
22+
23+
@Value.Immutable
24+
public interface _HttpClientResponseWithConnection {
25+
26+
@Value.Parameter
27+
Connection getConnection();
28+
29+
@Value.Parameter
30+
HttpClientResponse getResponse();
31+
32+
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/client/CloudFoundryClientCompatibilityChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ void check() {
4242
.build())
4343
.map(response -> Version.valueOf(response.getApiVersion()))
4444
.zipWith(Mono.just(Version.valueOf(CloudFoundryClient.SUPPORTED_API_VERSION)))
45-
.doOnNext(consumer((server, supported) -> logCompatibility(server, supported, this.logger)))
46-
.subscribe();
45+
.subscribe(consumer((server, supported) -> logCompatibility(server, supported, this.logger)), t -> this.logger.error("An error occurred while checking version compatibility:", t));
4746
}
4847

4948
private static void logCompatibility(Version server, Version supported, Logger logger) {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
19+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2020
import reactor.core.publisher.Flux;
2121

2222
import java.util.function.Function;
2323

24-
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithBody>, Flux<HttpClientResponseWithBody>> {
24+
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithConnection>, Flux<HttpClientResponseWithConnection>> {
2525

2626
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMappers.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.cloudfoundry.client.v2.ClientV2Exception;
2323
import org.cloudfoundry.client.v3.ClientV3Exception;
2424
import org.cloudfoundry.client.v3.Errors;
25-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
25+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2626
import org.cloudfoundry.uaa.UaaException;
2727
import reactor.core.publisher.Mono;
28+
import reactor.netty.ByteBufFlux;
29+
import reactor.netty.Connection;
2830
import reactor.netty.http.client.HttpClientResponse;
2931

3032
import java.util.Map;
@@ -58,15 +60,21 @@ public static ErrorPayloadMapper clientV3(ObjectMapper objectMapper) {
5860

5961
public static ErrorPayloadMapper fallback() {
6062
return inbound -> inbound
61-
.flatMap(responseWithBody -> {
62-
HttpClientResponse response = responseWithBody.getResponse();
63+
.flatMap(responseWithConnection -> {
64+
HttpClientResponse response = responseWithConnection.getResponse();
6365

6466
if (isError(response)) {
65-
return responseWithBody.getBody().aggregate().asString()
66-
.flatMap(payload -> Mono.error(new UnknownCloudFoundryException(response.status().code(), payload)));
67+
Connection connection = responseWithConnection.getConnection();
68+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive());
69+
70+
return body.aggregate().asString()
71+
.doFinally(signalType -> connection.channel().close())
72+
.flatMap(payload -> {
73+
return Mono.error(new UnknownCloudFoundryException(response.status().code(), payload));
74+
});
6775
}
6876

69-
return Mono.just(responseWithBody);
77+
return Mono.just(responseWithConnection);
7078
});
7179
}
7280

@@ -87,13 +95,17 @@ private static boolean isError(HttpClientResponse response) {
8795
return statusClass == CLIENT_ERROR || statusClass == SERVER_ERROR;
8896
}
8997

90-
private static Function<HttpClientResponseWithBody, Mono<HttpClientResponseWithBody>> mapToError(ExceptionGenerator exceptionGenerator) {
98+
private static Function<HttpClientResponseWithConnection, Mono<HttpClientResponseWithConnection>> mapToError(ExceptionGenerator exceptionGenerator) {
9199
return response -> {
92100
if (!isError(response.getResponse())) {
93101
return Mono.just(response);
94102
}
95103

96-
return response.getBody().aggregate().asString()
104+
Connection connection = response.getConnection();
105+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
106+
.doFinally(signalType -> connection.dispose()));
107+
108+
return body.aggregate().asString()
97109
.switchIfEmpty(Mono.error(new UnknownCloudFoundryException(response.getResponse().status().code())))
98110
.flatMap(payload -> {
99111
try {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.cloudfoundry.reactor.util;
1818

1919
import io.netty.channel.ChannelHandler;
20+
import io.netty.handler.codec.http.HttpHeaderNames;
21+
import io.netty.handler.codec.http.HttpHeaderValues;
2022
import io.netty.handler.codec.http.HttpHeaders;
2123
import io.netty.handler.codec.http.HttpMethod;
2224
import io.netty.handler.codec.http.HttpResponseStatus;
2325
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
26+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2427
import org.reactivestreams.Publisher;
2528
import org.springframework.web.util.UriComponentsBuilder;
2629
import reactor.core.publisher.Flux;
@@ -154,48 +157,62 @@ public ResponseReceiver addChannelHandler(Function<HttpClientResponse, ChannelHa
154157
}
155158

156159
public Mono<HttpClientResponse> get() {
157-
return this.responseReceiver.response((resp, body) -> Mono.just(HttpClientResponseWithBody.of(body, resp)))
160+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
158161
.transform(this::processResponse)
159-
.map(HttpClientResponseWithBody::getResponse)
162+
.map(HttpClientResponseWithConnection::getResponse)
160163
.singleOrEmpty();
161164
}
162165

163166
public <T> Mono<T> parseBody(Class<T> bodyType) {
164-
addChannelHandler(ignore -> JsonCodec.createDecoder());
167+
addChannelHandler(response -> {
168+
if (HttpHeaderValues.APPLICATION_JSON.contentEquals(response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE))) {
169+
return JsonCodec.createDecoder();
170+
}
171+
172+
return null;
173+
});
174+
165175
return parseBodyToMono(responseWithBody -> deserialized(responseWithBody.getBody(), bodyType));
166176
}
167177

168178
public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
169-
return this.responseReceiver.responseConnection((response, connection) -> {
170-
attachChannelHandlers(response, connection);
171-
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
172-
.doFinally(signalType -> connection.dispose()));
173-
174-
return Mono.just(HttpClientResponseWithBody.of(body, response));
175-
})
179+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
176180
.transform(this::processResponse)
181+
.flatMap(httpClientResponseWithConnection -> {
182+
Connection connection = httpClientResponseWithConnection.getConnection();
183+
HttpClientResponse response = httpClientResponseWithConnection.getResponse();
184+
185+
attachChannelHandlers(response, connection);
186+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
187+
.doFinally(signalType -> connection.dispose()));
188+
189+
return Mono.just(HttpClientResponseWithBody.of(body, response));
190+
})
177191
.flatMap(responseTransformer);
178192
}
179193

180194
public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
181195
return parseBodyToFlux(responseTransformer).singleOrEmpty();
182196
}
183197

184-
private static boolean isUnauthorized(HttpClientResponseWithBody response) {
198+
private static boolean isUnauthorized(HttpClientResponseWithConnection response) {
185199
return response.getResponse().status() == HttpResponseStatus.UNAUTHORIZED;
186200
}
187201

188202
private void attachChannelHandlers(HttpClientResponse response, Connection connection) {
189203
for (Function<HttpClientResponse, ChannelHandler> handlerBuilder : this.channelHandlerBuilders) {
190-
connection.addHandler(handlerBuilder.apply(response));
204+
ChannelHandler handler = handlerBuilder.apply(response);
205+
if (handler != null) {
206+
connection.addHandler(handler);
207+
}
191208
}
192209
}
193210

194211
private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
195212
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
196213
}
197214

198-
private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponseWithBody> inbound) {
215+
private Flux<HttpClientResponseWithConnection> invalidateToken(Flux<HttpClientResponseWithConnection> inbound) {
199216
return inbound
200217
.doOnNext(response -> {
201218
if (isUnauthorized(response)) {
@@ -205,7 +222,7 @@ private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponse
205222
});
206223
}
207224

208-
private Flux<HttpClientResponseWithBody> processResponse(Flux<HttpClientResponseWithBody> inbound) {
225+
private Flux<HttpClientResponseWithConnection> processResponse(Flux<HttpClientResponseWithConnection> inbound) {
209226
return inbound
210227
.transform(this::invalidateToken)
211228
.retryWhen(Retry.max(this.context.getConnectionContext().getInvalidTokenRetries()).filter(InvalidTokenException.class::isInstance))

0 commit comments

Comments
 (0)